实战:MySQL数据同步神器之Canal
1.概叙
场景一:数据增量实时同步
项目中业务数据量比较大,每类业务表都达到千万级别,虽然做了分库分表,每张表数据控制在300W以下,但是效率还是达不到要求,为了提高查询效率,打算使用ES进行数据查询。
那么这个时候问题来了,怎么把MySQL中增量数据同步到ES?
场景二:缓存一致性问题
Java web应用性能分析之【高并发之缓存-多级缓存】_java多级缓存-CSDN博客
在前面文章中有提到这个场景,如何保证redis、EhCache、MySQL数据一致?
我们都知道作为数据库写操作,是不通过缓存的。假设商品服务实例 1 将 1 号商品价格调整为 80 元,这会衍生一个新问题:如何主动向应用程序推送数据变更的消息来保证它们也能同步更新缓存呢?
针对上面两种场景,可以看看canal的解决方案。
什么是Canal?
canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。项目起源于阿里巴巴内部对于跨机房数据同步的需求,通过解析MySQL的二进制日志(Binary Log),Canal能够捕获并推送数据库的变更事件,满足了诸如数据库镜像、实时备份、索引实时维护等多种业务场景的需求。
GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件
Home · alibaba/canal Wiki · GitHub
支持范围
Canal当前支持MySQL数据库的多个版本,包括但不限于5.1.x、5.5.x、5.6.x、5.7.x及8.0.x,同时也兼容阿里云RDS等云数据库服务,为用户提供了广泛的数据库兼容性保障。
部分支持MySQL体系数据库:Mariadb 10.x、PolarDB-X
主要特性
高性能与低延迟:Canal 1.1.x版本进行了深度优化,性能提升高达150%。
Prometheus监控:原生集成Prometheus监控,便于系统健康状况的跟踪。
消息系统集成:直接支持Kafka、RocketMQ消息投递,便于与大数据平台对接。
云数据库支持:无缝对接阿里云RDS,解决了自动主备切换及离线Binlog解析问题。
Docker部署:提供Docker镜像,简化部署流程。
WebUI管理:Canal-Admin工程引入WebUI,实现动态配置、任务管理与日志查看等功能。
2.Canal原理
MySQL主备复制原理
-
MySQL master 将数据变更写入二进制日志(binary log), 日志中的记录叫做二进制日志事件(binary log events,可以通过 show binlog events 进行查看)
-
MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
-
MySQL slave 重放 relay log 中事件,将数据变更反映到它自己的数据
Canal工作原理
Canal巧妙地模拟了MySQL主从复制的机制。具体而言:
- 伪装为MySQL Slave:canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
- 获取Binary Log:MySQL Master接收到请求后,开始推送Binary Log给Canal。
- 解析日志事件:Canal解析接收到的Binary Log,将数据变更信息转换为易于处理的结构化数据。canal 解析 binary log 对象(原始为 byte 流),转换为json格式。
- 数据同步:Canal 客户端通过 TCP 协议或 MQ 形式监听 Canal 服务端,同步数据到 ES、KFK、HBase、RocketMQ、Pulsar等。
优点: 可以完全和业务代码解耦,增量日志订阅。
缺点:实时性不高,订阅mysql日志,DB中数据事务成功后,开始同步至canal。
Canal总体架构
对应这些软件包:
- deployer包:即canal server,负责从master库同步binlog,将数据通过tcp的方式同步给Adapter适配器,经过Adapter适配同步给目标库;通过MQ将数据同步给canal client或目标库,canal client也可以再同步给目标库。
- server包官方介绍:https://github.com/alibaba/canal/wiki/DevGuide
- adapter包:给目标库同步数据。目前支持:Hbase、RDB、ES。
- adapter包官方介绍:https://github.com/alibaba/canal/wiki/ClientAdapter
- admin包:canal 1.1.4版本,迎来最重要的WebUI能力,引入canal-admin工程,支持面向WebUI的canal动态管理能力,支持配置、任务、日志等在线白屏运维能力,具体文档:Canal Admin Guide
- admin包官方介绍:https://github.com/alibaba/canal/wiki/Canal-Admin-Guide
- canal.client:消费server数据
- Canal设计了client-server架构,支持多种语言客户端通过protobuf 3.0协议与之交互,官方及社区提供了以下客户端:
- Java客户端:ClientExample
- C#客户端:CanalSharp
- Go客户端:canal-go
- Python客户端:canal-python
- PHP客户端:canal-php
- Rust客户端:canal-rs
- Node.js客户端:canal-nodejs
-
除了基础功能,Canal还支持丰富的进阶特性和周边生态工具,如:
Canal-Admin:提供Web界面管理Canal实例,实现配置、监控和运维的可视化操作。
canal2sql:一个工具项目,能根据Binlog生成SQL,便于数据迁移或备份。
Otter:Canal的消费端开源项目,用于数据同步与数据集成。
Canal高可用架构
整个 HA 机制的控制主要是依赖了zookeeper的两个特性:watcher、EPHEMERAL节点。canal的 HA 机制实现分为两部分,canal server 和 canal client分别有对应的实现。
canal server实现流程如下:
- 1、canal server 要启动某个 canal instance 时都先向 zookeeper 进行一次尝试启动判断 (实现:创建 EPHEMERAL 节点,谁创建成功就允许谁启动)。
- 2、创建 zookeeper 节点成功后,对应的 canal server 就启动对应的 canal instance,没有创建成功的 canal instance 就会处于 standby 状态。
- 3、一旦 zookeeper 发现 canal server A 创建的节点消失后,立即通知其他的 canal server 再次进行步骤1的操作,重新选出一个 canal server 启动instance。
- 4、canal client 每次进行connect时,会首先向 zookeeper 询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect。
为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态。
依赖:
- JDK1.8
- MySQL:用于canal-admin存储配置和节点等相关数据
- Zookeeper
3.Canal实战
准备环境
名称 | 版本 |
---|---|
MySQL | 5.7 |
elasticsearch | 7.17.9 |
canal | 1.1.7 |
jdk | 1.8 |
-
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
- 注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
-
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
下载 canal
本次操作只需要下载admin、deployer两个包。
下载后解压即可用,当然,要运行起来的话,需要配置jdk1.8,这里就不再多说。
创建admin的数据库:conf/canal_manager.sql
启动admin前,先完成建库和见表(包括admin的默认登录账号密码 admin/123456),否则启动报错。主要是如下六张表。
修改配置文件
修改server的配置文件:conf/canal.properties 和conf/example/instance.properties
切记数据库和配置文件中的密码要一致。
修改admin的配置文件:conf/application.yml
分别启动admin和server
启动server:bin/startup.bat
启动admin:bin/startup.bat
登录admin:http://192.168.1.4:8089/#/login?redirect=%2Fdashboard
默认账号密码:admin/123456
配置java客户端
引入依赖
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency>
客户端代码
package com.zxx.study.base.canal;import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.SneakyThrows;import java.net.InetSocketAddress;
import java.util.List;/*** @author zhouxx* @create 2024-08-01 21:59*/
public class CanalClient {@SneakyThrowspublic static void main(String[] args) {try {// 创建canal客户端,单链接模式CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.1.4",11111), "example", "", "");// 创建连接canalConnector.connect();while (true) {// 订阅数据库// canalConnector.subscribe("mall");// 获取数据Message message = canalConnector.get(100);// 获取Entry集合List<CanalEntry.Entry> entries = message.getEntries();// 判断集合是否为空,如果为空,则等待一会继续拉取数据if (entries.size() <= 0) {
// System.out.println("当次抓取没有数据,休息一会。。。。。。");Thread.sleep(1000);} else {// 遍历entries,单条解析for (CanalEntry.Entry entry : entries) {//1.获取表名String tableName = entry.getHeader().getTableName();//2.获取类型CanalEntry.EntryType entryType = entry.getEntryType();//3.获取序列化后的数据ByteString storeValue = entry.getStoreValue();//4.判断当前entryType类型是否为ROWDATAif (CanalEntry.EntryType.ROWDATA.equals(entryType)) {//5.反序列化数据CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);//6.获取当前事件的操作类型CanalEntry.EventType eventType = rowChange.getEventType();//7.获取数据集List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();//8.遍历rowDataList,并打印数据集for (CanalEntry.RowData rowData : rowDataList) {JSONObject beforeData = new JSONObject();List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();for (CanalEntry.Column column : beforeColumnsList) {beforeData.put(column.getName(), column.getValue());}JSONObject afterData = new JSONObject();List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();for (CanalEntry.Column column : afterColumnsList) {afterData.put(column.getName(), column.getValue());}//数据打印System.out.println("Table:" + tableName +",EventType:" + eventType +",Before:" + beforeData +",After:" + afterData);/*** Table:test_user,EventType:UPDATE,Before:{"name":"1111221","id":"1"},After:{"name":"11tom","id":"1"}* Table:test_user,EventType:INSERT,Before:{},After:{"name":"zhouxx","id":"17"}* Table:test_user,EventType:INSERT,Before:{},After:{"name":"zhouxx1","id":"18"}* */}}}}}}catch (Exception e){e.printStackTrace();}}}
运行效果:在数据库里面忝删改查,均可以在客户端中打印出来
cancel框架同步mysql数据到kafka
参考:cancel框架同步mysql数据到kafka_mysql cancel-CSDN博客
利用canal进行MySQL到ES的数据实时同步
参考:利用canal进行MySQL到ES的数据实时同步_canal es-CSDN博客
结合上图,至此,场景一和场景二中的问题,均可以解决。
相关文章:
实战:MySQL数据同步神器之Canal
1.概叙 场景一:数据增量实时同步 项目中业务数据量比较大,每类业务表都达到千万级别,虽然做了分库分表,每张表数据控制在300W以下,但是效率还是达不到要求,为了提高查询效率,打算使用ES进行数…...
5.6软件工程-运维
运维 系统转换系统维护系统评价练习题 系统转换 新老系统的转换 系统转换是指:新系统开发完毕,投入运行,取代现有系统的过程,需要考虑多方面的问题,以实现与老系统的交接,有一下三种转换计划: …...
在JavaScript中如何确保构造函数只被new调用
构造函数是一个特殊的函数,用于初始化一个新创建的对象。它是在创建对象时自动调用的。构造函数通常用于为对象的属性赋值,或者执行其他必要的设置。 使用函数名大写字母开头,这是一种命名约定,用于区分构造函数和普通函数。如何…...
【数据结构算法经典题目刨析(c语言)】反转链表(图文详解)
💓 博客主页:C-SDN花园GGbond ⏩ 文章专栏:数据结构经典题目刨析(c语言) 目录 一、题目描述 二、思路分析 三、代码实现 一、题目描述: 二、思路分析 : 通过三个指针n1,n2,n3来实现链表的反转 1.首先初始化 n1为…...
机器学习之争:Python vs R,谁更胜一筹?
一、引言 随着人工智能和大数据的迅速发展,机器学习已成为现代科技的重要组成部分。在医疗、金融、零售、制造等多个领域,机器学习技术的应用无处不在。从数据分析到预测建模,再到深度学习,机器学习正在改变我们的工作和生活方式…...
Vulnhub靶机:JANGOW_ 1.0.1
目录 前言: 一、安装虚拟机Jangow:1.0.1靶机 二、Web部分 前言: 难度:简单,本文使用VirtualBox打开,下载地址: https://download.vulnhub.com/jangow/jangow-01-1.0.1.ova 一、安装虚拟机J…...
Python脚本实现USB自动复制文件
USB驱动器作为常见的数据存储设备,经常用于数据传输和备份。 然而,我们在手动处理文件复制可能效率低下且容易出错。 因此,我们可以利用Python编写脚本来自动化这一过程,提高效率和数据安全性。 准备工作 首先,我们需…...
【C++学习第19天】最小生成树(对应无向图)
一、最小生成树 二、代码 1、Prim算法 #include <cstring> #include <iostream> #include <algorithm>using namespace std;const int N 510, INF 0x3f3f3f3f;int n, m; int g[N][N]; int dist[N]; bool st[N];int prim() {memset(dist, 0x3f, sizeof di…...
第一个 Flask 项目
第一个 Flask 项目 安装环境创建项目启动程序访问项目参数说明Flask对象的初始化参数app.run()参数 应用程序配置参数使用 Flask 的 config.from_object() 方法使用 Flask 的 config.from_pyfile() 方法使用 Flask 的 config.from_envvar() 方法步骤 1: 设置环境变量步骤 2: 编…...
利用 Angular 发挥环境的力量
一.介绍 您是否曾想过如何在不同的环境中为同一应用设置不同的颜色、标题或 API 调用?可以肯定的是,生产 API 和测试 API 是不同的,应谨慎使用。部署时,我们不会在项目的所有地方手动更改所有 API 调用。不应这样做,因…...
Vue3+TypeScript+printjs 实现标签批量打印功能
前言:临时性需求没怎么接触过前端,代码实现有问题及优化点希望大佬可以留言告知一下 开发工具:VS CODE 界面开发:Vue3TypeScriptElementPlus 打印组件:Print-JS 前端打印入口图: 标签页面: …...
微信文件如何直接打印及打印功能在哪里设置?
在数字化时代,打印需求依旧不可或缺,但传统打印店的高昂价格和不便操作常常让人头疼。幸运的是,琢贝打印作为一款集便捷、经济、高效于一体的网上打印平台,正逐渐成为众多用户的首选。特别是通过微信小程序下单,更是让…...
dataX -20240804-master分支
1、相关报错 Error: java.io.IOException: java.lang.RuntimeException: ORC split generation failed with exception: org.apache.orc.impl.SchemaEvolution$IllegalEvolutionException: ORC does not support type conversion from file type struct<nanos:int> (10)…...
【网络】传输层
传输层 一、预备知识1、端口号1、端口号范围划分2、知名端口号3、两个问题4、netstat && iostate5、pidof6、谈下面协议始终铭记两个问题 二、UDP协议(简单)1、UDP协议端格式2、UDP的特点3、面向数据报4、UDP缓冲区 三、TCP协议(重点…...
学生管理系统之更新和删除、筛选
学生管理系统之更新和删除 建立新的窗口 添加组件 进行布局 使用Widget把二个放在一块,作为一列,然后全选进行栅格布局,最后添加弹簧进行微调。 编写增加的槽函数 在主函数中调用对话框...
教您一键批量下载拼多多批发图片信息,节省时间
图片是电商的核心展示手段,高质量、吸引人的图片能显著提升商品吸引力,增强用户体验,促进购买决策。良好的视觉呈现有助于品牌形象的塑造,提高转化率和客户满意度,对电商平台的流量和销售业绩具有直接影响。 使用图快…...
基于微信小程序的微课堂笔记的设计与实现(源码+论文+部署讲解等)
博主介绍:✌全网粉丝10W,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和学生毕业项目实战,高校老师/讲师/同行前辈交流✌ 技术栈介绍:我是程序员阿龙ÿ…...
去噪扩散恢复模型
去噪扩散恢复模型 Bahjat Kawar 计算机科学系 以色列海法理工学院 bahjat.kawarcs.technion.ac.il Michael Elad 计算机科学系 以色列海法理工学院 eladcs.technion.ac.il Stefano Ermon 计算机科学系 美国加利福尼亚州斯坦福大学 ermoncs.stanford.edu …...
Stable Diffusion 官方模型V1.5版本下载
模型描述 Stable Diffusion的官方模型更适合绘制偏写实的风格,如果您想绘制二次元之类的风格,可以考虑下载本站的其它模型。 安装方法 将模型下载后,将会得到一个名为****.ckpt格式的文件,将该文件剪切至你的Stable Diffusion本…...
【算法】双指针-OJ题详解1
双指针-OJ题 移动零(点击跳转)原理讲解代码实现 复写零(点击跳转)原理讲解代码实现 快乐数(点击跳转)原理讲解代码实现 盛最多水的容器(点击跳转)原理讲解代码实现 有效三角形的个数…...
29 两个任务切换(1)
1 这里涉及到进程的切换与之前的 特权级的切换还是不一样的。 2 给每个进程 在 GDT表中,分配一个 TSS, 这个TSS中 保存着这个进程 所用到的 通用寄存器段寄存器 3个可能的栈, 当进行 进程切换的时候,就是切换到 另一个 TSS表&am…...
正则表达式概述
一、正则表达式概述 正则表达式(Regular Expression,简称regex或regexp)是一种强大的文本处理工具,它使用一种特定的模式来描述和匹配一系列符合某个句法规则的字符串。在Python中,我们可以使用re模块来操作正则表达式…...
【C语言】Top K问题【建小堆】
前言 TopK问题:从n个数中,找出最大(或最小)的前k个数。 在我们生活中,经常会遇到TopK问题 比如外卖的必吃榜;成单的前K名;各种数据的最值筛选 问题分析 显然想开出40G的空间是不现实的&#…...
Rust 程序设计语言学习——并发编程
安全且高效地处理并发编程是 Rust 的另一个主要目标。并发编程(Concurrent programming),代表程序的不同部分相互独立地执行,而并行编程(parallel programming)代表程序不同部分同时执行,这两个…...
联邦学习研究综述【联邦学习】
文章目录 0 前言机器学习两大挑战: 1 什么是联邦学习?联邦学习的一次迭代过程如下:联邦学习技术具有以下几个特点: 2 联邦学习的算法原理目标函数本地目标函数联邦学习的迭代过程 3 联邦学习分类横向联邦学习纵向联邦学习联邦迁移…...
深入理解Python中的列表推导式
深入理解Python中的列表推导式 在Python编程中,列表推导式(List Comprehension)是一种简洁而强大的语法,用于创建和操作列表。它不仅提高了代码的可读性,还能显著减少代码的行数。本文将详细介绍什么是列表推导式,如何使用它,以及一些实际应用示例,帮助读者更好地理解…...
Android 实现左侧导航栏:NavigationView是什么?NavigationView和Navigation搭配使用
目录 1)左侧导航栏效果图 2)NavigationView是什么? 3)NavigationView和Navigation搭配使用 4)NavigationView的其他方法 一、实现左侧导航栏 由于Android这边没有直接提供左侧导航栏的控件,所以我尝试了…...
如何快速下载拼多多图片信息,效率高
图片是电商吸引顾客的关键因素,高质量的商品图片能提升产品吸引力,增强用户购买欲望。良好的视觉展示有助于建立品牌形象,提高转化率。同时,图片也是商品信息的主要传递媒介,对消费者决策过程至关重要。 使用图快下载器…...
windows 10下,修改ubuntu的密码
(1)在搜索框里面输入cmd,然后点击右键,选择管理员打开 Microsoft Windows [版本 10.0.22631.3880] (c) Microsoft Corporation。保留所有权利。 C:\Windows\System32>C: C:\Windows\System32>cd ../../ C:\>cd Users\ASUS\AppData\Local\Micros…...
【MySQL】慢sql优化全流程解析
定位慢sql 工具排查慢sql 调试工具:Arthas运维工具:Skywalking 通过以上工具可以看到哪个接口比较慢,并且可以分析SQL具体的执行时间,定位到哪个sql出了问题。 启用慢查询日志 慢查询日志记录了所有执行时间超过指定参数(lon…...
网站上传wordpress/营业推广策略
新建git仓库方法 mkdir r0capture cd r0capture git init touch README.md git add README.md git commit -m "first commit" git remote add origin gitgitee.com:jenny95/r0capture.git git push -u origin master...
自己学建网站/童程童美少儿编程怎样收费
linux构建DHCP及DHCP中继服务器 用虚拟机搭建,所以把ISO镜像挂载 1.挂载光盘 #/mount /dev/hdc /mnt 2.安装DHCP服务 #cd /mnt/Server/ #rpm -ivh dhcp-3.0.5-21.el5.i386.rpm 3.复制主配置文件 #cp /user/share/doc/dhcp-3.0.5/dhcpd.conf.sample /etc/dhcpd.conf…...
张家港优化网站seo/关键词排名优化如何
注:博主使用的是STM32F4探索者 一、官方资料 1、文档 (1)STM32F4数据手册:STM32F407ZGT6.pdf (2)STM32F4中文手册:STM32F4xx中文参考手册.pdf (3)开发板原理图&…...
wordpress收费主题破解/网页制作与网站建设实战教程
PhotoShop 快速选择工具及选择并遮住使用 快速选择工具 按住 ALT 加鼠标右键左右拖动,调节画笔大小在选区内按住 ALT ,默认 加号(添加选区),变成 减号(减去选区)选择并遮住 使用快速选择或者魔…...
吉安市网站制作/app投放推广
PHP基础变量命名 变量类型 引用赋值类型 范围 引号 字符串连接符 自动(强制)转换 判断变量类型运算符 自增自减 三元运算符 优先级if switch for while 四个跳出的区别形参 实参 全局变量 静态变量 each/return 有关function的函数引用…...
网站如何做社群/培训心得体会感悟
我到底值多少钱;我不止一次的这样的问自己。金钱难道真的能体现我的价值吗。人活着究竟是为了什么啊。我们活在这个世界上首先是生存。生存需要物质上的支持。所以我们去追求金钱。当我们可以添饱自己的肚子的时候。我们迷失了我们的方向。我们忘记了我们现在可以生…...