当前位置: 首页 > news >正文

springboot集成canal

目录

      • 一、打开mysql的binlog
        • 1.1 打开 MySQL 配置文件 `my.cnf`(通常位于 `/etc/mysql/my.cnf` 或 `/etc/my.cnf`)并添加或修改以下设置:
        • 1.2 重启mysql服务
        • 1.3 验证是否生效
      • 二、 部署canal 服务端(docker)
        • 2.1 下载启动脚本(可能需要梯子)
        • 2.2 启动服务
        • 2.3 验证服务启动成功
      • 三、springboot端集成canal客户端
        • 3.1 添加依赖 /配置
        • 3.2 客户端代码
        • 3.3 数据同步效果

项目上需要一个app,但是他们没有公网服务器,所以就在自家公网服务器开了一个mysql,项目上的服务器是能访问外网的,所以canal完美适配了这个需求

原理简介:canal服务端模拟mysql主从协议伪装成从数据库,从而读取主库的binlog,我们使用canal客户端自定义数据同步规则。

具体步骤

一、打开mysql的binlog

1.1 打开 MySQL 配置文件 my.cnf(通常位于 /etc/mysql/my.cnf/etc/my.cnf)并添加或修改以下设置:
[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=row

注意 :确保binlog-format是 row模式

1.2 重启mysql服务

具体命令根据你的服务器类型决定

1.3 验证是否生效
SHOW MASTER STATUS;

二、 部署canal 服务端(docker)

2.1 下载启动脚本(可能需要梯子)
# 下载脚本
wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh 
2.2 启动服务
# 构建一个destination name为test的队列
sh run.sh -e canal.auto.scan=false \-e canal.destinations=test \-e canal.instance.master.address=127.0.0.1:3306  \-e canal.instance.dbUsername=canal  \-e canal.instance.dbPassword=canal  \-e canal.instance.connectionCharset=UTF-8 \-e canal.instance.tsdb.enable=true \-e canal.instance.gtidon=false  \-e canal.instance.filter.regex=.*\\..* 

参数解释:

-e canal.auto.scan=false:关闭自动扫描数据库实例。即 Canal 不会自动检测数据库的变更实例,而是使用手动指定的配置。
-e canal.destinations=test:设置 Canal 的目标队列名称为 test。destination 是 Canal 中用来标识不同数据源的名称。
-e canal.instance.master.address=127.0.0.1:3306:指定主数据库的地址和端口。这里是本地 MySQL 实例,监听在 3306 端口。
-e canal.instance.dbUsername=canal:设置连接到主数据库的用户名为 canal。这个用户名需要有足够的权限以读取 MySQL 的 binlog。
-e canal.instance.dbPassword=canal:设置连接到主数据库的密码为 canal。这个密码需要与 dbUsername 配对,以验证用户身份。
-e canal.instance.connectionCharset=UTF-8:设置数据库连接的字符集为 UTF-8。确保字符集正确可以避免中文字符等数据的乱码问题。
-e canal.instance.tsdb.enable=true:启用 Canal 的时间序列数据库(TSDB)。TSDB 用于存储时间戳和位置信息,这有助于在重启时恢复复制状态。
-e canal.instance.gtidon=false:关闭 GTID(全局事务标识符)。如果 GTID 处于关闭状态,Canal 将基于 binlog 文件和位置进行复制,而不是 GTID。
-e canal.instance.filter.regex=.*\\..*:设置 binlog 过滤规则。这条规则表示 Canal 将监听所有数据库和所有表的变更。正则表达式 .*\\..* 匹配所有数据库(.)和表(.*)。
2.3 验证服务启动成功
docker logs <containerids>

可以看到这样的打印:
image.png

三、springboot端集成canal客户端

3.1 添加依赖 /配置
<!--  canal begin-->
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version>
</dependency>
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.0</version>
</dependency>
<!--  canal end-->
canal:host: 127.0.0.1 #自己的canal服务器ipport: 11111  #canal默认端口destination: test #配置文件配置的名称username: rootpassword: 214365batch:size: 100
3.2 客户端代码
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.eco.db.entity.Record;
import com.eco.fishway.service.RecordService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;@Slf4j
@Component
public class CanalClient implements InitializingBean, DisposableBean {@Value("${canal.host}")private String canalHost;@Value("${canal.port}")private int canalPort;@Value("${canal.destination}")private String canalDestination;@Value("${canal.username}")private String canalUsername;@Value("${canal.password}")private String canalPassword;@Value("${canal.batch.size}")private int batchSize;private final RecordService recordService;private CanalConnector canalConnector;private ExecutorService executorService;public CanalClient(RecordService recordService) {this.recordService = recordService;}@Overridepublic void afterPropertiesSet() throws Exception {this.canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalHost, canalPort),canalDestination,canalUsername,canalPassword);this.executorService = Executors.newSingleThreadExecutor();this.executorService.execute(new Task());}@Overridepublic void destroy() throws Exception {if (executorService != null) {executorService.shutdown();}}private class Task implements Runnable {@Overridepublic void run() {while (true) {try {//连接canalConnector.connect();//订阅canalConnector.subscribe();while (true) {Message message = canalConnector.getWithoutAck(batchSize); // batchSize为每次获取的batchSize大小long batchId = message.getId();//获取批量的数量int size = message.getEntries().size();try {//如果没有数据if (batchId == -1 || size == 0) {// log.info("无数据");// 线程休眠2秒Thread.sleep(2000);} else {// 如果有数据,处理数据printEntry(message.getEntries());// 确认处理完成canalConnector.ack(batchId);}} catch (Exception e) {log.error(e.getMessage());// 程序错误,也直接确认,跳过这次偏移canalConnector.ack(batchId);}} catch (Exception e) {log.error("Error occurred when running Canal Client", e);} finally {canalConnector.disconnect();}}}}private void printEntry(List<CanalEntry.Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (isTransactionEntry(entry)){//开启/关闭事务的实体类型,跳过continue;}//RowChange对象,包含了一行数据变化的所有特征//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等CanalEntry.RowChange rowChange;try {rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);}//获取操作类型:insert/update/delete类型CanalEntry.EventType eventType = rowChange.getEventType();//打印Header信息log.info("================》; binlog[{} : {}] , name[{}, {}] , eventType : {}",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType);//判断是否是DDL语句if (rowChange.getIsDdl()) {log.info("================》;isDdl: true,sql:{}", rowChange.getSql());}log.info(rowChange.getSql());//获取RowChange对象里的每一行数据,打印出来for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {//如果是删除语句if (eventType == CanalEntry.EventType.DELETE) {log.info(">>>>>>>>>> 删除 >>>>>>>>>>");printColumnAndExecute(rowData.getBeforeColumnsList(), "DELETE");//如果是新增语句} else if (eventType == CanalEntry.EventType.INSERT) {log.info(">>>>>>>>>> 新增 >>>>>>>>>>");printColumnAndExecute(rowData.getAfterColumnsList(), "INSERT");//如果是更新的语句} else {log.info(">>>>>>>>>> 更新 >>>>>>>>>>");//变更前的数据log.info("------->; before");printColumnAndExecute(rowData.getBeforeColumnsList(), null);//变更后的数据log.info("------->; after");printColumnAndExecute(rowData.getAfterColumnsList(), "UPDATE");}}}}/*** 执行数据同步* @param columns* @param type*/private void printColumnAndExecute(List<CanalEntry.Column> columns, String type) {if(type == null){return;}JSONObject jsonObject = new JSONObject();for (CanalEntry.Column column : columns) {jsonObject.put(column.getName(), column.getValue());}// 此处使用json转对象的方式进行转换Record bean = jsonObject.toBean(Record.class);if(type.equals("INSERT")){// 执行新增recordService.save(bean);log.info("新增成功->{}", jsonObject.toJSONString(0));}else if (type.equals("UPDATE")){// 执行编辑recordService.updateById(bean);log.info("编辑成功->{}", jsonObject.toJSONString(0));}else if (type.equals("DELETE")){// 执行删除recordService.removeById(bean.getRecordId());log.info("删除成功->{}", jsonObject.toJSONString(0));}}/*** 判断当前entry是否为事务日志*/private boolean isTransactionEntry(CanalEntry.Entry entry){if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN){log.info("********* 日志文件为:{}, 事务开始偏移量为:{}, 事件类型为type={}",entry.getHeader().getLogfileName(),entry.getHeader().getLogfileOffset(),entry.getEntryType());return true;}else if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){log.info("********* 日志文件为:{}, 事务结束偏移量为:{}, 事件类型为type={}",entry.getHeader().getLogfileName(),entry.getHeader().getLogfileOffset(),entry.getEntryType());return true;}else {return false;}}}
3.3 数据同步效果

image.png
有点感叹需求就是最好的老师,但是完不成需求就不好玩了

相关文章:

springboot集成canal

目录 一、打开mysql的binlog1.1 打开 MySQL 配置文件 my.cnf&#xff08;通常位于 /etc/mysql/my.cnf 或 /etc/my.cnf&#xff09;并添加或修改以下设置&#xff1a;1.2 重启mysql服务1.3 验证是否生效 二、 部署canal 服务端&#xff08;docker&#xff09;2.1 下载启动脚本(可…...

leetcode数论(2447. 最大公因数等于 K 的子数组数目)

前言 经过前期的数据结构和算法学习&#xff0c;开始以OD机考题作为练习题&#xff0c;继续加强下熟练程度。 描述 给你一个整数数组 nums 和一个整数 k &#xff0c;请你统计并返回 nums 的子数组中元素的最大公因数等于 k 的子数组数目。 子数组 是数组中一个连续的非空序列…...

实现数组扁平化的几种方式

目标: 实现数组扁平化[1,[2,[3,4,5]]] > [1,2,3,4,5] 我们有几种方法可以实现,分别为: 1、递归 function flatten(list){return list.reduce((tar, cur) > {if(Array.isArray(cur)){tar tar.concat(flatten(cur));} else {tar.push(cur);}return tar;}, []); } flatt…...

3D打印技术正悄然重塑模具工业格局

虽被誉为“工业之母”的模具在批量生产中仍占据核心地位&#xff0c;但3D打印以其“无模”直接成型的特性&#xff0c;在小批量、非标准化及复杂结构件制造领域展现出独特优势&#xff0c;随着技术和装备的不断发展&#xff0c;目前3D打印正逐渐向批量生产渗透&#xff0c;某品…...

深入解析 KMZ 文件的处理与可视化:从数据提取到地图展示项目实战

文章目录 1. KMZ 文件与 KML 文件简介1.1 KMZ 文件1.2 KML 文件 2. Python 环境配置与依赖安装3. 代码实现详解3.1 查找 KMZ 文件3.2 解压 KMZ 文件3.3 解析 KML 文件3.4 可视化 KMZ 数据 4. 项目实战4.1. 数据采集4.2. 项目完整代码 5. 项目运行与结果展示6. 总结与展望 在处理…...

YOLOv5轻量化改进 | backbone | 结合MobileNetV4(包含多个结构和使用方式)

YOLOv5轻量化改进 | backbone | 结合MobileNetV4(包含多个结构) 本文介绍论文原理介绍网络代码多种yaml设置网络测试及实验结果<!-- 这里放入论文图片 --> &emsp;;本文介绍 本文给大家带来的改进机制是结合MobileNetV4骨干网络,其中来自2024.5月发布的MobileNetV4…...

学习安卓开发遇到的问题

问题1&#xff1a;学习禁用与恢复按钮中&#xff1a; java代码报错&#xff1a;报错代码是 R.id.btn_enable;case R.id.btn_disable;case R.id.btn_test: 代码如下&#xff1a;&#xff08;实现功能在代码后面&#xff09; package com.example.apptest;import static java.…...

数学建模--禁忌搜索

目录 算法基本原理 关键要素 应用实例 实现细节 python代码示例 总结 禁忌搜索算法在解决哪些具体类型的组合优化问题中最有效&#xff1f; 禁忌搜索算法的邻域结构设计有哪些最佳实践或案例研究&#xff1f; 如何动态更新禁忌表以提高禁忌搜索算法的效率和性能&#…...

LeetCode 第136场双周赛个人题解

Q1. 求出胜利玩家的数目 原题链接 Q1. 求出胜利玩家的数目 思路分析 直接模拟 时间复杂度&#xff1a;O(N) AC代码 class Solution { public:int winningPlayerCount(int n, vector<vector<int>>& pick) {unordered_map<int, unordered_map<int, …...

The operation was rejected by your operating system. code CERT_HAS_EXPIRED报错解决

各种报错&#xff0c;试了清缓存&#xff0c;使用管理员权限打开命令行工具&#xff0c;更新npm&#xff0c;都不好使 最终解决&#xff1a;删除 c:/user/admin/ .npmrc...

[Git][基本操作]详细讲解

目录 1.创建本地仓库2.配置 Git3.添加文件1.添加文件2.提交文件3.其他 && 说明 4.删除文件5.跟踪修改文件6.版本回退7.撤销修改0.前言1.未add2.已add&#xff0c;未commit3.已add&#xff0c;已commit 1.创建本地仓库 创建⼀个Git本地仓库&#xff1a;git init运行该命…...

springMVC中从Excel文件中导入导出数据

目录 1. 数据库展示2. 导入依赖3. 写方法3.1 导入数据3.2 导出数据 4. 效果5. 不足6. 参考链接 1. 数据库展示 2. 导入依赖 pom.xml <!--文件上传处理--><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId>&…...

C++的STL简介(三)

目录 1.vector的模拟实现 1.1begin&#xff08;&#xff09; 1.2end&#xff08;&#xff09; 1.3打印信息 1.4 reserve&#xff08;&#xff09; 1.5 size&#xff08;&#xff09; 1.6 capacity&#xff08;&#xff09; 1.7 push_back() 1.8[ ] 1.9 pop_back() 1.10 insert&…...

BERT模型

BERT模型是由谷歌团队于2019年提出的 Encoder-only 的 语言模型&#xff0c;发表于NLP顶会ACL上。原文题目为&#xff1a;《BERT: Pre-training of Deep Bidirectional Transformers for Language Understanding》链接 在前大模型时代&#xff0c;BERT模型可以算是一个参数量比…...

举例说明计算机视觉(CV)技术的优势和挑战

计算机视觉&#xff08;CV&#xff09;技术是通过计算机模拟和处理图像与视频数据来模拟人类视觉的能力。它可以带来许多优势&#xff0c;也面临一些挑战。 优势&#xff1a; 自动化&#xff1a;CV技术可以自动处理大量的图像和视频数据&#xff0c;从而提高工作效率和准确性。…...

Animate软件基础:关于补间动画中的图层

Animate 文档中的每一个场景都可以包含任意数量的时间轴图层。使用图层和图层文件夹可组织动画序列的内容和分隔动画对象。在图层和文件夹中组织它们可防止它们在重叠时相互擦除、连接或分段。若要创建一次包含多个元件或文本字段的补间移动的动画&#xff0c;请将每个对象放置…...

mac|安装hashcat(压缩包密码p解)

一、安装Macports&#xff08;如果有brew就不用这一步&#xff09; 根据官网文档&#xff1a;The MacPorts Project -- Download & Installation&#xff0c;安装步骤如下 1、下载MacPorts&#xff0c;这里我用的是tar.gz &#xff0c;可以通过keka&#xff08;keka安装在…...

【保姆级系列:锐捷模拟器的下载安装使用全套教程】

保姆级系列&#xff1a;锐捷模拟器的下载安装使用全套教程 1.介绍2.下载3.安装4.实践教程5.验证 1.介绍 锐捷目前可以通过EVE-NG来模拟自己家的路由器&#xff0c;交换机&#xff0c;防火墙。实现方式是把自己家的镜像导入到EVE-ng里面来运行。下面主要就是介绍如何下载镜像和…...

virtualbox7安装centos7.9配置静态ip

1.背景 我大概在一年之前安装virtualbox7centos7.9的环境&#xff0c;但看视频说用vagrant启动的窗口可以不用第三方工具(比如xshell、secure等)连接centos7.9&#xff0c;于是尝鲜试了下还可以&#xff0c;导致系统文件格式是vmdk了&#xff08;网上有vmdk转vdi的方法&#xf…...

结构型设计模式:桥接/组合/装饰/外观/享元

结构型设计模式&#xff1a;适配器/代理 (qq.com)...

vLLM初识(一)

vLLM初识&#xff08;一&#xff09; 前言 在LLM推理优化——KV Cache篇&#xff08;百倍提速&#xff09;中&#xff0c;我们已经介绍了KV Cache技术的原理&#xff0c;从中我们可以知道&#xff0c;KV Cache本质是空间换时间的技术&#xff0c;对于大型模型和长序列&#xf…...

【Apache Doris】周FAQ集锦:第 18 期

【Apache Doris】周FAQ集锦&#xff1a;第 18 期 SQL问题数据操作问题运维常见问题其它问题关于社区 欢迎查阅本周的 Apache Doris 社区 FAQ 栏目&#xff01; 在这个栏目中&#xff0c;每周将筛选社区反馈的热门问题和话题&#xff0c;重点回答并进行深入探讨。旨在为广大用户…...

docker部署可执行的jar

1.将项目打包&#xff0c;上传到服务器的指定目录 2.在该目录下创建Dockerfile文件 3.Dockerfile写入如下指令 # 基于哪个镜像 FROM java:8 # 拷贝文件到容器&#xff0c;也可以直接写成ADD xxxxx.jar /app.jar ADD springboot-file-0.0.1.jar file.jar RUN bash -c touch /…...

OpenCV||超详细的图像处理模块

一、颜色变换cvtColor dst cv2.cvtColor(src, code[, dstCn[, dst]]) src: 输入图像&#xff0c;即要进行颜色空间转换的原始图像。code: 转换代码&#xff0c;指定要执行的颜色空间转换类型。这是一个必需的参数&#xff0c;决定了源颜色空间到目标颜色空间的转换方式。dst…...

java面向对象期末总结

子类父类方法执行顺序&#xff1f;多态中和子类打印不一样&#xff1b; 子类在实现父类方法的时候没有用super关键字进行调用也会先执行父类的构造方法吗&#xff1f; 是的&#xff0c;当子类实例化时&#xff0c;先执行父类的构造方法&#xff0c;再执行子类的构造方法。即使在…...

文件搜索 36

删除文件 文件搜索 package File;import java.io.File;public class file3 {public static void main(String[] args) {search(new File("D :/"), "qq");}/*** 去目录搜索文件* param dir 目录* param filename 要搜索的文件名称*/public static void sear…...

IO多路转接

文章目录 五种IO模型fcntl多路转接selectpollepollepoll的工作模式 五种IO模型 阻塞IO: 在内核将数据准备好之前, 系统调用会一直等待. 所有的套接字, 默认都是阻塞方式.阻塞IO是最常见的IO模型。非阻塞IO: 如果内核还未将数据准备好, 系统调用仍然会直接返回, 并且返回EWOULD…...

基于深度学习的面部表情分类识别系统

&#xff1a;温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长 QQ 名片 :) 1. 项目简介 面部表情识别是计算机视觉领域的一个重要研究方向&#xff0c; 它在人机交互、心理健康评估、安全监控等领域具有广泛的应用。近年来&#xff0c;随着深度学习技术的快速发展&#xf…...

日志远程同步实验

目录 一.实验环境 二.实验配置 1.node1发送方配置 &#xff08;1&#xff09;node1写udp协议 &#xff08;2&#xff09;重启服务并清空日志 2.node2接收方配置 &#xff08;1&#xff09;node2打开接受日志的插件&#xff0c;指定插件用的端口 &#xff08;2&#xff…...

数据结构之《二叉树》(中)

在数据结构之《二叉树》(上)中学习了树的相关概念&#xff0c;还了解的树中的二叉树的顺序结构和链式结构&#xff0c;在本篇中我们将重点学习二叉树中的堆的相关概念与性质&#xff0c;同时试着实现堆中的相关方法&#xff0c;一起加油吧&#xff01; 1.实现顺序结构二叉树 在…...

资溪做面包招聘的网站/百度知道首页登录

夜光序言&#xff1a; 与你在这最后的夏天 抹不去的思念 斜阳里的微笑 渐行渐远 你总是这样微微笑着仿佛在说&#xff1a;“我在身边……” 正文&#xff1a; Js第三天 夜光 帅气&#xff1a; 01 淘宝搜索框 夜光 1.2 判断用户输入事件&#…...

网站建设及推广枣强/新的seo网站优化排名 排名

hibernate Validator 是 Bean Validation 的参考实现 。Hibernate Validator 提供了 JSR 303 规范中所有内置 constraint 的实现&#xff0c;除此之外还有一些附加的 constraint。 在日常开发中&#xff0c;Hibernate Validator经常用来验证bean的字段&#xff0c;基于注解&…...

作文网投稿网站/百度云网页版登录入口

对mini-batch梯度下降算法的理解以及代码实现1.什么是mini-batch梯度下降2.mini-batch梯度下降算法的伪代码3.为什么要使用mini-batch梯度下降算法4.比较BGD,SGD,MBGD5.用python实现mini-batch梯度下降1.什么是mini-batch梯度下降 首先我们知道原始梯度下降算法是在整个训练集…...

深圳云网站建站公司/重庆百度竞价推广

6.7.2 设置串口通信参数串口通信参数指的是波特率、数据位、奇偶校验位和停止位。对串口实现控制的时候同样要用到termio结构体。下面将结合具体的代码说明如何设置这些参数。1&#xff0e;波特率设置获得端口波特率信息是通过cfgetispeed函数和cfgetospeed函数来实现的。cfget…...

网站开发报价清单/江苏网页设计

今天为大家带来一款常用的8位单片机NY8A051H单片机&#xff0c;这是九齐单片机NY8A051F的升级产品&#xff0c;内建比较器&#xff0c;复位脚输出高。同样是以EPROM作为记忆体的8位元微控制器&#xff0c;而且采用CMOS制程&#xff0c;显著提高性价比。NY8A051H单片机核心建立在…...

商务网站建设项目的技术可行性/今日重大财经新闻

前提&#xff0c;用vscode和微信小程序工具同时编写代码 第一步&#xff1a;下载easy less 首先打开vscode&#xff0c;搜索easy less &#xff0c;下载 如下 第二步&#xff1a;配置 点击右上角打开json设置 最后面补充输入以下代码&#xff1a; "less.compile"…...