基于Canal的数据同步
基于Canal的数据同步
一、 系统结构
该数据同步系统由Spring Boot和Canal共同组成。
Spring Boot 是一个流行的 Java Web 框架,而 Canal 则是阿里巴巴开源的 MySQL 数据库的数据变更监听框架。结合 Spring Boot 和 Canal,可以实现 MySQL 数据库的实时数据同步到其他系统中。
- canal.deployer-1.1.7-SNAPSHOT.tar.gz为Canal软件压缩包,需要安装在服务器上,并根据下文进行配置文件的修改。
- CanalClient.rar为用Spring Boot框架编写的数据库监听同步项目
二、. Canal配置
在解压Canal文件夹后,需要配置两个文件。
在配置Canal前,需要确保Mysql的Binlog已经开启,并且模式为ROW,找到当前binlog的文件名和position。
1) 配置文件路径:canal/conf/canal.properties
2) 配置文件路径:
canal/conf/example/instance.properties
三、 Spring Boot配置
1. 项目结构
2. Canal账号密码配置
进入到Config下的CanalClient类文件。
注意密码是通过MD5加密的,图中这这段字符应替换为canal。
3. 目标数据库配置
在yml中配置数据同步目标数据库即可
源代码:
package com.canal.canalclient.config;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.springframework.jdbc.core.JdbcTemplate;import java.net.InetSocketAddress;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
/*** @Auther: fzl* @Date: 2020/4/20 01:21* @Description:*/
public class CanalClient {private static Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();public static void startCanal() {//获取canalServer连接:本机地址,端口号CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("IP地址", "端口号"), "example", "canal", "canal");int batchSize = 1000;try {//连接canalServerconnector.connect();//订阅Desctinstionconnector.subscribe();connector.rollback();try {while (true) {//尝试从master那边拉去数据batchSize条记录,有多少取多少//轮询拉取数据 上面的whereMessage message = connector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {//睡眠Thread.sleep(1000);} else {dataHandle(message.getEntries());}connector.ack(batchId);System.out.println("aa"+size);//当队列里面堆积的sql大于一定数值的时候就模拟执行if (SQL_QUEUE.size() >= 10) {executeQueueSql();}}} catch (InterruptedException e) {e.printStackTrace();} catch (InvalidProtocolBufferException e) {e.printStackTrace();}} finally {connector.disconnect();}}public static JdbcTemplate jdbcTemplate;/*** 模拟执行队列里面的sql语句*/public static void executeQueueSql() {int size = SQL_QUEUE.size();for (int i = 0; i < size; i++) {String sql = SQL_QUEUE.poll();jdbcTemplate.execute(sql);System.out.println("[sql]----> " + sql);}}/*** 数据处理** @param entrys*/private static void dataHandle(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException {for (CanalEntry.Entry entry : entrys) {if (EntryType.ROWDATA == entry.getEntryType()) {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());CanalEntry.EventType eventType = rowChange.getEventType();if (eventType == EventType.DELETE) {saveDeleteSql(entry);} else if (eventType == EventType.UPDATE) {saveUpdateSql(entry);} else if (eventType == CanalEntry.EventType.INSERT) {saveInsertSql(entry);}}}}/*** 保存更新语句** @param entry*/private static void saveUpdateSql(CanalEntry.Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();for (CanalEntry.RowData rowData : rowDatasList) {List<Column> newColumnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("update " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " set ");for (int i = 0; i < newColumnList.size(); i++) {sql.append(" " + newColumnList.get(i).getName()+ " = '" + newColumnList.get(i).getValue() + "'");if (i != newColumnList.size() - 1) {sql.append(",");}}sql.append(" where ");List<Column> oldColumnList = rowData.getBeforeColumnsList();for (Column column : oldColumnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 保存删除语句** @param entry*/private static void saveDeleteSql(CanalEntry.Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();for (CanalEntry.RowData rowData : rowDatasList) {List<Column> columnList = rowData.getBeforeColumnsList();StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " where ");for (Column column : columnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 保存插入语句** @param entry*/private static void saveInsertSql(CanalEntry.Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();for (CanalEntry.RowData rowData : rowDatasList) {List<Column> columnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " (");for (int i = 0; i < columnList.size(); i++) {sql.append(columnList.get(i).getName());if (i != columnList.size() - 1) {sql.append(",");}}sql.append(") VALUES (");for (int i = 0; i < columnList.size(); i++) {sql.append("'" + columnList.get(i).getValue() + "'");if (i != columnList.size() - 1) {sql.append(",");}}sql.append(")");SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}
}
package com.canal.canalclient;import com.canal.canalclient.config.CanalClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class StartedFunction implements ApplicationRunner {@Autowired@Qualifier("test_master_energy") //有多个数据源的,需要名称区分private static JdbcTemplate jdbcTemplate;@Overridepublic void run(ApplicationArguments args) throws Exception{log.info("开始监听同步数据库");CanalClient.jdbcTemplate = jdbcTemplate;CanalClient.startCanal();}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.3</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.canal</groupId><artifactId>CanalClient</artifactId><version>0.0.1-SNAPSHOT</version><name>CanalClient</name><description>CanalClient</description><properties><java.version>19</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.2.9</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.32</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><build><plugins><!-- 打包时跳过测试 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.12.4</version><configuration><skipTests>true</skipTests></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins><resources><resource><directory>src/main/resources</directory></resource><resource><directory>src/main/java</directory><includes><include>**/*.xml</include></includes></resource></resources></build></project>
相关文章:
基于Canal的数据同步
基于Canal的数据同步 一、 系统结构 该数据同步系统由Spring Boot和Canal共同组成。 Spring Boot 是一个流行的 Java Web 框架,而 Canal 则是阿里巴巴开源的 MySQL 数据库的数据变更监听框架。结合 Spring Boot 和 Canal,可以实现 MySQL 数据库的实时数…...
vuetify设置页面默认主题色
前言 最近工作中接到一个任务: 项目中分light和dark两种主题色a、b页面默认为dark其他页面默认为light 项目前端环境: vue2jsyarnvuexvuetifyelement ui 解决思路 routerjs中配置路径时进行默认主题设置 在左侧aside点击菜单时,进行主题切…...
【Python入门第二十三天】Python 继承
Python 继承 继承允许我们定义继承另一个类的所有方法和属性的类。 父类是继承的类,也称为基类。 子类是从另一个类继承的类,也称为派生类。 创建父类 任何类都可以是父类,因此语法与创建任何其他类相同: 实例 创建一个名为…...
C#中,读取一个或多个文件内容的方法
读取一个或多个文件内容的方法 在C#中,可以使用File.ReadAllLines方法一次读取多个文件中的所有行内容。例如,以下代码读取了两个文件中的所有行内容,然后将它们合并在一起: string[] file1Lines File.ReadAllLines("file1…...
1 基于神经辐射场(neural Radiance Fileds, Nerf)的三维重建- 简介
Nerf简介 Nerf(neural Radiance Fileds) 为2020年ICCV上提出的一个基于隐式表达的三维重建方法,使用2D的 Posed Imageds 来生成(表达)复杂的三维场景。现在越来越多的研究人员开始关注这个潜力巨大的领域,也…...
水果FLStudio21.0.0中文版全能数字音乐工作站DAW
FL Studio 21.0.0官方中文版重磅发布纯正简体中文支持,更快捷的音频剪辑及素材管理器,多样主题随心换!Mac版新增对苹果M2/1家族芯片原生支持。编曲、剪辑、录音、混音,20余年的技术积淀和实力研发,FL Studio 已经从电音…...
【GlobalMapper精品教程】055:GM坐标转换器的巧妙使用
GM软件提供了一个简单实用的坐标转换工具,可以实现地理坐标和投影坐标之间的高斯正反算及多种转换计算。 文章目录 一、坐标转换器认识二、坐标转换案例1. 地理坐标←→地理坐标2. 地理坐标←→投影坐标三、在输出坐标上创建新的点四、其他转换工具的使用一、坐标转换器认识 …...
C语言之中rand()函数是如何实现的
rand()函数是一个C标准库中的随机数生成函数,用于生成一个范围在0到RAND_MAX之间的伪随机数。RAND_MAX是一个常量,它是随机数的最大值,通常被定义为32767。 rand()函数的实现原理可以概括为以下几个步骤: 初始化随机数生成器 在…...
winform控件PropertyGrid的应用(使运行中的程序能像vistual studio那样设置控件属性)
上周在看别人写的上位机demo代码时,发现创建的项目模板是"Windows 窗体控件库"(如下图) 生成的项目结构像自定义控件库,没有程序入口方法Main,但却很神奇能调试,最后发现原来Vistual Studio启动了一个外挂程序UserContr…...
SBUS的协议详解
SBUS 1.串口配置: 100k波特率, 8位数据位(在stm32中要选择9位), 偶校验(EVEN), 2位停止位, 无控流,25个字节, 2.协议格式: [startbyte] [data1][data2]……...
【PyTorch】教程:torch.nn.Hardshrink
torch.nn.Hardshrink CLASS torch.nn.Hardshrink(lambd0.5) 参数 lambd ([float]) – the λ\lambdaλ 默认为 0.5 定义 HardShrink(x){x,if x>λx,if x<−λ0,otherwise \text{HardShrink}(x) \begin{cases} x, & \text{ if } x > \lambda \\ x, & \text{…...
JavaScript 函数参数
JavaScript 函数对参数的值(arguments)没有进行任何的检查。JavaScript 函数参数与大多数其他语言的函数参数的区别在于:它不会关注有多少个参数被传递,不关注传递的参数的数据类型。函数显式参数与隐藏参数(arguments)在先前的教程中,我们已…...
【C】标准IO库函数
fopen/fclose #include <stdio.h>FILE *fopen(const char *path, const char *mode); 返回值:成功返回文件指针,出错返回NULL并设置errnoint fclose(FILE *fp); 返回值:成功返回0,出错返回EOF并设置errnomode参数是一个字符…...
http客户端Feign
Feign替代RestTemplate RestTemplate方式调用存在的缺陷 String url"http://userservice/user/"order.getUserId();User user restTemplate.getForObject(url, User.class); 代码可读性差,变成体验不统一; 参数复杂的时候URL难以维护。 &l…...
如何在Java中使用枚举类:从入门到进阶
枚举类是Java中一种特殊的数据类型,它允许我们将一组有限的值作为一组常量来使用,这些常量在代码中具有固定的名称和类型。在Java中,枚举类通常用于代表状态、选项和类别等具有离散值的变量。本篇博客将深入探讨Java中的枚举类,包…...
操作系统(1.2)--引论
目录 一、操作系统的基本特性 1.并发性 1.1 并行与并发 1.2 引入进程 2.共享性 2.1 互斥共享方式 2.3 同时访问方式 3.虚拟 3.1 时分复用技术 4. 异 步 二、操作系统的主要功能 1.处理机管理功能 1.1 进程控制 1.2 进程同步 1.3 进程通信 1.4 调度 2. 内…...
【Linux】 shell if的[]和[[]]区别
文章目录[]和test[]和[[]]区别总结参考[]和test Shell中的 test 命令用于检查某个条件是否成立,它可以进行数值、字符和文件三个方面的测试 test常用于 if ,作为判断条件,if test等价于 if [ ],因此,test和[] 内的内…...
利用flask解析海康摄像头视频
利用flask解析海康摄像头视频利用flask解析海康摄像头和大华摄像头的视频一、安装依赖包二、获取海康摄像头视频流三、将视频流输出到Web页面四、 创建HTML模板文件利用flask解析海康摄像头和大华摄像头的视频 作为AI智能的一种应用场景,视频监控系统已经在各个行业…...
./docker-compose.yml‘ is invalid
文章目录前言提示原因版本太低解决方法更新删除原来不能执行的/usr/local/bin/docker-compose下载安装docker-compose添加权限前言 安装ctfd过程中的一些报错 rootubuntu:/CTFd# docker-compose up -d ERROR: The Compose file ./docker-compose.yml is invalid because: net…...
Java 流程控制
条件/选择结构 if if(条件表达式){// 表达式为 true 时,执行该代码块 }if(true) {System.out.println("hello"); }if else if(条件表达式){// 表达式为 true 时,执行该代码块 } else {// 表达式为 false 时,执行该代码块 }if(1 …...
边界无限入选首届“网络安全高成长性企业”并荣获“勇创之星”
近日,由工业和信息化部、四川省人民政府主办的“2023年中国网络和数据安全产业高峰论坛网络安全产融合作分论坛”在成都举行,论坛上公布了“2022年度网络安全高成长性企业”名单。云原生安全、应用安全“灵动智御”理念创领者北京边界无限科技有限公司&a…...
SpringBoot项目的快速创建方式(包含第一个程序的运行)
目录 一、IDEA所用的版本以及插件 二、操作步骤 一、IDEA所用的版本以及插件 idea的版本: idea2022版本下载安装配置与卸载详细步骤(包含运行第一个java程序教程)_idea2022下载_云边的快乐猫的博客-CSDN博客 如果英文看不懂就点击…...
linux下设置定期执行需要root权限的sh文件
1、准备好一个shell文件 比如我这个叫clean.sh,位于/home/admin/gdhysthj/clean.sh 2、首先将shell文件赋权为可执行文件 chmod 777 clean.sh 3、切换为超级管理员 su 4、设置定时器 crontab -u root -e 5、回车后,进入一个类似vim的界面,…...
认识异或运算
1.什么是异或运算 异或运算是位运算的一种,符号为:^ 运算规则为:相同为0,不同为1 例如 性质: N ^ 0 N N ^ N 0 A ^ B B ^ A (A ^ B) ^ C A ^ (B ^ C)N ^ 0 N public class XorOperation {public static void …...
内容提供者的简单使用
内容提供者的简单使用 最近在复习ContentProvider时遇到了一些问题,几经波折,终于解决了,故写下这篇博客,希望能帮到有相同问题的兄弟。 何时使用 当我们想要一个应用的数据向外部公开时,ContentProvider是一个不错…...
Modelsim 操作结构和流程
用到的命令一般都写到.do文件中,使用脚本语言进行批量处理。Step 1: Map librariesStep 2: Compile the designStep 3: Optimize the design (OPTIONAL)Step 4: Load the design into the simulatorStep 5: Run the simulationStep 6: Debug the design Note: Desig…...
vue和react有什么不同
vue上手难度低,不过react社区活跃度更多一些,一般数据比较多的大型项目会倾向于使用react。在react官网中,官方也建议我们使用React来构建快速响应的大型 Web 应用程序。vue2.0是面向对象编程({data: {}, methods: {}, created() …...
js求解《初级算法》28. 找出字符串中第一个匹配项的下标
一、题目描述 给你两个字符串 haystack 和 needle ,请你在 haystack 字符串中找出 needle 字符串的第一个匹配项的下标(下标从 0 开始)。如果 needle 不是 haystack 的一部分,则返回 -1 。 输入:haystack "sadb…...
VAE--part1
Variational Auto-Encoder, VAE__part1分布变换VAE慢谈VAE 初现分布标准化重参数技巧VAE的本质是什么?VAE的本质结构正态分布?变分在哪里参考博客仅做学习记录,侵删分布变换 VAE和GAN都是生成式模型,它们俩的目标基本一致&#x…...
备战四级!!!
目录 一、替换词 二、作文常见句型 (1)常见开头 (2)阐述观点 (3)结束语 (4)提出建议 (5)表示论证 (6)给出原因 (…...
个人空间备案网站名称/曼联vs恩波利比分
概要:why:为什么回收,见whatwhat:垃圾回收哪些内存(不可达对象的确定)when:何时执行GC(安全点、安全区域)how:如何回收(原理——垃圾回收算法、实现——垃圾收集器)1、垃圾回收哪些内存JVM运行时数据区中&a…...
网站建设客服接听术语/靠谱的广告联盟
ARKit 是 iOS 11 的主打功能之一,开发者可以利用 ARKit 轻松的为 iPhone和 iPad 打造增强现实应用。iOS 11 发布到现在已经6个月了,根据 app 数据公司 SensorTower 的统计,App Store 中集成了 ARKit 功能的应用下载数量超过 1300 万。这些应…...
文登区住房和城乡建设局网站/韶关网站seo
sftp 命令 使用sftp:服务器之间传取文件sftp root<ip>pwdcd /home/testbinget hello.shlsput world.shbye #或者 quitposted on 2017-03-12 19:26 绿Z 阅读(...) 评论(...) 编辑 收藏 转载于:https://www.cnblogs.com/greenZ/p/6538922.html...
百兆独享 做资源网站/京东关键词优化技巧
题目链接 重新排列数组 题目描述 注意 nums.length 2n 解答思路 使用另一个数组以题目的要求保存之前的数组中的元素即可 代码 class Solution {public int[] shuffle(int[] nums, int n) {int[] res new int[2 * n];for(int i 0; i < n; i) {res[i * 2] nums[i]…...
动态网站项目实训教程任务3怎么做/tool站长工具
C#字符雨 一题目描述 C# 是一个简单的、现代的、通用的、面向对象的编程语言,C# 编程是基于 C 和 C 编程语言的,推荐学习C#有用的网站可以看看咯,such as: C# Programming Guide 那么有许多强大的编程功能的C#来实现一些有趣的…...
深圳燃气招聘网最新招聘/首页优化公司
Java代码实现的计算难免会显得不够高效。而利用MATLAB写好相应的计算函数,然后打包成jar包供Java调用,在某些情况下会更加方便。或者有些时候会涉及到使用Java调用MatLab展现一些二维三维图。因此用到Java调用MatLab。一:注意事项1࿱…...