当前位置: 首页 > news >正文

springboot集成canal

目录

      • 一、打开mysql的binlog
        • 1.1 打开 MySQL 配置文件 `my.cnf`(通常位于 `/etc/mysql/my.cnf` 或 `/etc/my.cnf`)并添加或修改以下设置:
        • 1.2 重启mysql服务
        • 1.3 验证是否生效
      • 二、 部署canal 服务端(docker)
        • 2.1 下载启动脚本(可能需要梯子)
        • 2.2 启动服务
        • 2.3 验证服务启动成功
      • 三、springboot端集成canal客户端
        • 3.1 添加依赖 /配置
        • 3.2 客户端代码
        • 3.3 数据同步效果

项目上需要一个app,但是他们没有公网服务器,所以就在自家公网服务器开了一个mysql,项目上的服务器是能访问外网的,所以canal完美适配了这个需求

原理简介:canal服务端模拟mysql主从协议伪装成从数据库,从而读取主库的binlog,我们使用canal客户端自定义数据同步规则。

具体步骤

一、打开mysql的binlog

1.1 打开 MySQL 配置文件 my.cnf(通常位于 /etc/mysql/my.cnf/etc/my.cnf)并添加或修改以下设置:
[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=row

注意 :确保binlog-format是 row模式

1.2 重启mysql服务

具体命令根据你的服务器类型决定

1.3 验证是否生效
SHOW MASTER STATUS;

二、 部署canal 服务端(docker)

2.1 下载启动脚本(可能需要梯子)
# 下载脚本
wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh 
2.2 启动服务
# 构建一个destination name为test的队列
sh run.sh -e canal.auto.scan=false \-e canal.destinations=test \-e canal.instance.master.address=127.0.0.1:3306  \-e canal.instance.dbUsername=canal  \-e canal.instance.dbPassword=canal  \-e canal.instance.connectionCharset=UTF-8 \-e canal.instance.tsdb.enable=true \-e canal.instance.gtidon=false  \-e canal.instance.filter.regex=.*\\..* 

参数解释:

-e canal.auto.scan=false:关闭自动扫描数据库实例。即 Canal 不会自动检测数据库的变更实例,而是使用手动指定的配置。
-e canal.destinations=test:设置 Canal 的目标队列名称为 test。destination 是 Canal 中用来标识不同数据源的名称。
-e canal.instance.master.address=127.0.0.1:3306:指定主数据库的地址和端口。这里是本地 MySQL 实例,监听在 3306 端口。
-e canal.instance.dbUsername=canal:设置连接到主数据库的用户名为 canal。这个用户名需要有足够的权限以读取 MySQL 的 binlog。
-e canal.instance.dbPassword=canal:设置连接到主数据库的密码为 canal。这个密码需要与 dbUsername 配对,以验证用户身份。
-e canal.instance.connectionCharset=UTF-8:设置数据库连接的字符集为 UTF-8。确保字符集正确可以避免中文字符等数据的乱码问题。
-e canal.instance.tsdb.enable=true:启用 Canal 的时间序列数据库(TSDB)。TSDB 用于存储时间戳和位置信息,这有助于在重启时恢复复制状态。
-e canal.instance.gtidon=false:关闭 GTID(全局事务标识符)。如果 GTID 处于关闭状态,Canal 将基于 binlog 文件和位置进行复制,而不是 GTID。
-e canal.instance.filter.regex=.*\\..*:设置 binlog 过滤规则。这条规则表示 Canal 将监听所有数据库和所有表的变更。正则表达式 .*\\..* 匹配所有数据库(.)和表(.*)。
2.3 验证服务启动成功
docker logs <containerids>

可以看到这样的打印:
image.png

三、springboot端集成canal客户端

3.1 添加依赖 /配置
<!--  canal begin-->
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version>
</dependency>
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.0</version>
</dependency>
<!--  canal end-->
canal:host: 127.0.0.1 #自己的canal服务器ipport: 11111  #canal默认端口destination: test #配置文件配置的名称username: rootpassword: 214365batch:size: 100
3.2 客户端代码
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.eco.db.entity.Record;
import com.eco.fishway.service.RecordService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;@Slf4j
@Component
public class CanalClient implements InitializingBean, DisposableBean {@Value("${canal.host}")private String canalHost;@Value("${canal.port}")private int canalPort;@Value("${canal.destination}")private String canalDestination;@Value("${canal.username}")private String canalUsername;@Value("${canal.password}")private String canalPassword;@Value("${canal.batch.size}")private int batchSize;private final RecordService recordService;private CanalConnector canalConnector;private ExecutorService executorService;public CanalClient(RecordService recordService) {this.recordService = recordService;}@Overridepublic void afterPropertiesSet() throws Exception {this.canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalHost, canalPort),canalDestination,canalUsername,canalPassword);this.executorService = Executors.newSingleThreadExecutor();this.executorService.execute(new Task());}@Overridepublic void destroy() throws Exception {if (executorService != null) {executorService.shutdown();}}private class Task implements Runnable {@Overridepublic void run() {while (true) {try {//连接canalConnector.connect();//订阅canalConnector.subscribe();while (true) {Message message = canalConnector.getWithoutAck(batchSize); // batchSize为每次获取的batchSize大小long batchId = message.getId();//获取批量的数量int size = message.getEntries().size();try {//如果没有数据if (batchId == -1 || size == 0) {// log.info("无数据");// 线程休眠2秒Thread.sleep(2000);} else {// 如果有数据,处理数据printEntry(message.getEntries());// 确认处理完成canalConnector.ack(batchId);}} catch (Exception e) {log.error(e.getMessage());// 程序错误,也直接确认,跳过这次偏移canalConnector.ack(batchId);}} catch (Exception e) {log.error("Error occurred when running Canal Client", e);} finally {canalConnector.disconnect();}}}}private void printEntry(List<CanalEntry.Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (isTransactionEntry(entry)){//开启/关闭事务的实体类型,跳过continue;}//RowChange对象,包含了一行数据变化的所有特征//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等CanalEntry.RowChange rowChange;try {rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);}//获取操作类型:insert/update/delete类型CanalEntry.EventType eventType = rowChange.getEventType();//打印Header信息log.info("================》; binlog[{} : {}] , name[{}, {}] , eventType : {}",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType);//判断是否是DDL语句if (rowChange.getIsDdl()) {log.info("================》;isDdl: true,sql:{}", rowChange.getSql());}log.info(rowChange.getSql());//获取RowChange对象里的每一行数据,打印出来for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {//如果是删除语句if (eventType == CanalEntry.EventType.DELETE) {log.info(">>>>>>>>>> 删除 >>>>>>>>>>");printColumnAndExecute(rowData.getBeforeColumnsList(), "DELETE");//如果是新增语句} else if (eventType == CanalEntry.EventType.INSERT) {log.info(">>>>>>>>>> 新增 >>>>>>>>>>");printColumnAndExecute(rowData.getAfterColumnsList(), "INSERT");//如果是更新的语句} else {log.info(">>>>>>>>>> 更新 >>>>>>>>>>");//变更前的数据log.info("------->; before");printColumnAndExecute(rowData.getBeforeColumnsList(), null);//变更后的数据log.info("------->; after");printColumnAndExecute(rowData.getAfterColumnsList(), "UPDATE");}}}}/*** 执行数据同步* @param columns* @param type*/private void printColumnAndExecute(List<CanalEntry.Column> columns, String type) {if(type == null){return;}JSONObject jsonObject = new JSONObject();for (CanalEntry.Column column : columns) {jsonObject.put(column.getName(), column.getValue());}// 此处使用json转对象的方式进行转换Record bean = jsonObject.toBean(Record.class);if(type.equals("INSERT")){// 执行新增recordService.save(bean);log.info("新增成功->{}", jsonObject.toJSONString(0));}else if (type.equals("UPDATE")){// 执行编辑recordService.updateById(bean);log.info("编辑成功->{}", jsonObject.toJSONString(0));}else if (type.equals("DELETE")){// 执行删除recordService.removeById(bean.getRecordId());log.info("删除成功->{}", jsonObject.toJSONString(0));}}/*** 判断当前entry是否为事务日志*/private boolean isTransactionEntry(CanalEntry.Entry entry){if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN){log.info("********* 日志文件为:{}, 事务开始偏移量为:{}, 事件类型为type={}",entry.getHeader().getLogfileName(),entry.getHeader().getLogfileOffset(),entry.getEntryType());return true;}else if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){log.info("********* 日志文件为:{}, 事务结束偏移量为:{}, 事件类型为type={}",entry.getHeader().getLogfileName(),entry.getHeader().getLogfileOffset(),entry.getEntryType());return true;}else {return false;}}}
3.3 数据同步效果

image.png
有点感叹需求就是最好的老师,但是完不成需求就不好玩了

相关文章:

springboot集成canal

目录 一、打开mysql的binlog1.1 打开 MySQL 配置文件 my.cnf&#xff08;通常位于 /etc/mysql/my.cnf 或 /etc/my.cnf&#xff09;并添加或修改以下设置&#xff1a;1.2 重启mysql服务1.3 验证是否生效 二、 部署canal 服务端&#xff08;docker&#xff09;2.1 下载启动脚本(可…...

leetcode数论(2447. 最大公因数等于 K 的子数组数目)

前言 经过前期的数据结构和算法学习&#xff0c;开始以OD机考题作为练习题&#xff0c;继续加强下熟练程度。 描述 给你一个整数数组 nums 和一个整数 k &#xff0c;请你统计并返回 nums 的子数组中元素的最大公因数等于 k 的子数组数目。 子数组 是数组中一个连续的非空序列…...

实现数组扁平化的几种方式

目标: 实现数组扁平化[1,[2,[3,4,5]]] > [1,2,3,4,5] 我们有几种方法可以实现,分别为: 1、递归 function flatten(list){return list.reduce((tar, cur) > {if(Array.isArray(cur)){tar tar.concat(flatten(cur));} else {tar.push(cur);}return tar;}, []); } flatt…...

3D打印技术正悄然重塑模具工业格局

虽被誉为“工业之母”的模具在批量生产中仍占据核心地位&#xff0c;但3D打印以其“无模”直接成型的特性&#xff0c;在小批量、非标准化及复杂结构件制造领域展现出独特优势&#xff0c;随着技术和装备的不断发展&#xff0c;目前3D打印正逐渐向批量生产渗透&#xff0c;某品…...

深入解析 KMZ 文件的处理与可视化:从数据提取到地图展示项目实战

文章目录 1. KMZ 文件与 KML 文件简介1.1 KMZ 文件1.2 KML 文件 2. Python 环境配置与依赖安装3. 代码实现详解3.1 查找 KMZ 文件3.2 解压 KMZ 文件3.3 解析 KML 文件3.4 可视化 KMZ 数据 4. 项目实战4.1. 数据采集4.2. 项目完整代码 5. 项目运行与结果展示6. 总结与展望 在处理…...

YOLOv5轻量化改进 | backbone | 结合MobileNetV4(包含多个结构和使用方式)

YOLOv5轻量化改进 | backbone | 结合MobileNetV4(包含多个结构) 本文介绍论文原理介绍网络代码多种yaml设置网络测试及实验结果<!-- 这里放入论文图片 --> &emsp;;本文介绍 本文给大家带来的改进机制是结合MobileNetV4骨干网络,其中来自2024.5月发布的MobileNetV4…...

学习安卓开发遇到的问题

问题1&#xff1a;学习禁用与恢复按钮中&#xff1a; java代码报错&#xff1a;报错代码是 R.id.btn_enable;case R.id.btn_disable;case R.id.btn_test: 代码如下&#xff1a;&#xff08;实现功能在代码后面&#xff09; package com.example.apptest;import static java.…...

数学建模--禁忌搜索

目录 算法基本原理 关键要素 应用实例 实现细节 python代码示例 总结 禁忌搜索算法在解决哪些具体类型的组合优化问题中最有效&#xff1f; 禁忌搜索算法的邻域结构设计有哪些最佳实践或案例研究&#xff1f; 如何动态更新禁忌表以提高禁忌搜索算法的效率和性能&#…...

LeetCode 第136场双周赛个人题解

Q1. 求出胜利玩家的数目 原题链接 Q1. 求出胜利玩家的数目 思路分析 直接模拟 时间复杂度&#xff1a;O(N) AC代码 class Solution { public:int winningPlayerCount(int n, vector<vector<int>>& pick) {unordered_map<int, unordered_map<int, …...

The operation was rejected by your operating system. code CERT_HAS_EXPIRED报错解决

各种报错&#xff0c;试了清缓存&#xff0c;使用管理员权限打开命令行工具&#xff0c;更新npm&#xff0c;都不好使 最终解决&#xff1a;删除 c:/user/admin/ .npmrc...

[Git][基本操作]详细讲解

目录 1.创建本地仓库2.配置 Git3.添加文件1.添加文件2.提交文件3.其他 && 说明 4.删除文件5.跟踪修改文件6.版本回退7.撤销修改0.前言1.未add2.已add&#xff0c;未commit3.已add&#xff0c;已commit 1.创建本地仓库 创建⼀个Git本地仓库&#xff1a;git init运行该命…...

springMVC中从Excel文件中导入导出数据

目录 1. 数据库展示2. 导入依赖3. 写方法3.1 导入数据3.2 导出数据 4. 效果5. 不足6. 参考链接 1. 数据库展示 2. 导入依赖 pom.xml <!--文件上传处理--><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId>&…...

C++的STL简介(三)

目录 1.vector的模拟实现 1.1begin&#xff08;&#xff09; 1.2end&#xff08;&#xff09; 1.3打印信息 1.4 reserve&#xff08;&#xff09; 1.5 size&#xff08;&#xff09; 1.6 capacity&#xff08;&#xff09; 1.7 push_back() 1.8[ ] 1.9 pop_back() 1.10 insert&…...

BERT模型

BERT模型是由谷歌团队于2019年提出的 Encoder-only 的 语言模型&#xff0c;发表于NLP顶会ACL上。原文题目为&#xff1a;《BERT: Pre-training of Deep Bidirectional Transformers for Language Understanding》链接 在前大模型时代&#xff0c;BERT模型可以算是一个参数量比…...

举例说明计算机视觉(CV)技术的优势和挑战

计算机视觉&#xff08;CV&#xff09;技术是通过计算机模拟和处理图像与视频数据来模拟人类视觉的能力。它可以带来许多优势&#xff0c;也面临一些挑战。 优势&#xff1a; 自动化&#xff1a;CV技术可以自动处理大量的图像和视频数据&#xff0c;从而提高工作效率和准确性。…...

Animate软件基础:关于补间动画中的图层

Animate 文档中的每一个场景都可以包含任意数量的时间轴图层。使用图层和图层文件夹可组织动画序列的内容和分隔动画对象。在图层和文件夹中组织它们可防止它们在重叠时相互擦除、连接或分段。若要创建一次包含多个元件或文本字段的补间移动的动画&#xff0c;请将每个对象放置…...

mac|安装hashcat(压缩包密码p解)

一、安装Macports&#xff08;如果有brew就不用这一步&#xff09; 根据官网文档&#xff1a;The MacPorts Project -- Download & Installation&#xff0c;安装步骤如下 1、下载MacPorts&#xff0c;这里我用的是tar.gz &#xff0c;可以通过keka&#xff08;keka安装在…...

【保姆级系列:锐捷模拟器的下载安装使用全套教程】

保姆级系列&#xff1a;锐捷模拟器的下载安装使用全套教程 1.介绍2.下载3.安装4.实践教程5.验证 1.介绍 锐捷目前可以通过EVE-NG来模拟自己家的路由器&#xff0c;交换机&#xff0c;防火墙。实现方式是把自己家的镜像导入到EVE-ng里面来运行。下面主要就是介绍如何下载镜像和…...

virtualbox7安装centos7.9配置静态ip

1.背景 我大概在一年之前安装virtualbox7centos7.9的环境&#xff0c;但看视频说用vagrant启动的窗口可以不用第三方工具(比如xshell、secure等)连接centos7.9&#xff0c;于是尝鲜试了下还可以&#xff0c;导致系统文件格式是vmdk了&#xff08;网上有vmdk转vdi的方法&#xf…...

结构型设计模式:桥接/组合/装饰/外观/享元

结构型设计模式&#xff1a;适配器/代理 (qq.com)...

网安人做私活赚外快的好地方_接网络安全私活的平台有哪些

它的流程是&#xff1a;首先发包人对任务进行细分&#xff0c;分解到最小后&#xff0c;然后分包出去&#xff0c;按照各自的能力和知识水平领包&#xff0c;最后完成任务&#xff0c;发包人通过审核通过之后&#xff0c;项目才算完成。 3、猪八戒 找兼职的地方&#xff0c;主…...

跨越框架鸿沟:.NET Framework 项目如何巧妙复用 .NET Core 代码

1. 当老项目遇上新技术&#xff1a;为什么需要跨框架复用代码&#xff1f; 最近接手了一个老项目的升级需求&#xff0c;客户的核心业务系统跑在 .NET Framework 4.7.2 上&#xff0c;但新开发的数据分析模块是用 .NET 6 写的。第一次尝试直接引用时&#xff0c;VS 直接给我弹了…...

STC15单片机PWM异常检测避坑指南:比较器触发+端口保护的工业级应用

STC15单片机PWM异常检测避坑指南&#xff1a;比较器触发端口保护的工业级应用 在工业控制系统中&#xff0c;PWM信号的稳定性直接关系到电机驱动、电源转换等关键环节的可靠性。STC15W4K32S4系列单片机内置的增强型PWM模块&#xff0c;通过硬件级异常检测机制为工业场景提供了坚…...

CCS平台下八路灰度传感器串行读取实战指南

1. 项目背景与传感器选型 第一次接触灰度传感器是在学校的机器人比赛中&#xff0c;当时需要让小车沿着黑线行走。市面上常见的方案是使用模拟量输出的灰度传感器&#xff0c;但需要每个传感器单独接ADC引脚&#xff0c;布线复杂还占用资源。后来发现了"感为"八路灰度…...

好消息!44.7TB北美洲倾斜摄影已全部入库

最近&#xff0c;我们已完成北美洲倾斜摄影数据的全部入库&#xff0c;该数据可用于在内网进行私有化离线部署。 01 44.7TB倾斜摄影数据已全部入库 北美洲倾斜摄影数据全部入库后&#xff0c;一共有44.7TB大小。 北美洲倾斜摄影覆盖范围 数据文件一共有13201个数据分块&…...

AD里面可能会用到的一些规则

---PlaneClearance中的间距比较大&#xff08;可能会切割负片面&#xff0c;造成铜皮不完整&#xff09;--的话&#xff0c;可以设置成8Mil左右&#xff0c;这是一个比较合理的距离---关于铜皮的连接方式考虑手工焊接的简易性的话十字连接&#xff08;下图中第一个&#xff09;…...

2026年国产算力产业指南:自主软硬件+开源生态,产业链核心标的梳理

摘要&#xff1a;本报告系统分析了国产算力在内外双驱下的崛起路径、技术突破与生态构建&#xff0c;让行业从业者与投资者深入了解自主算力的核心竞争力与产业机遇。依托Chiplet、超节点等技术突破&#xff0c;华为昇腾、寒武纪等企业实现AI芯片性能跃升&#xff0c;软件生态通…...

androidstudio历史版本

网址 Android Studio 下载档案 |安卓开发者https://developer.android.google.cn/studio/archive...

二十、kubernetes基础-30-kubernetes-ha-binary-deployment-07-dns-operations

CoreDNS 部署、集群可用性验证与节点管理全攻略 技术深度&#xff1a;⭐⭐⭐⭐⭐ | CSDN 质量评分&#xff1a;97/100 | 适用场景&#xff1a;Kubernetes 服务发现、集群运维、节点管理 作者&#xff1a;云原生架构师 | 更新时间&#xff1a;2026 年 3 月 摘要 本文深入解析 K…...

Phi-4-reasoning-vision-15B企业应用:ERP系统界面截图→业务流程反向建模

Phi-4-reasoning-vision-15B企业应用&#xff1a;ERP系统界面截图→业务流程反向建模 1. 引言&#xff1a;从截图到流程&#xff0c;企业效率的新解法 想象一下这个场景&#xff1a;你刚接手一个老旧的ERP系统&#xff0c;文档缺失&#xff0c;代码复杂&#xff0c;没人能说清…...