FlinkCDC 实现 MySQL 数据变更实时同步
文章目录
- 1、基本介绍
- 2、代码实战
- 2.1、数据源准备
- 2.2、代码实战
- 2.3、数据格式
1、基本介绍
Flink CDC 是 Apache Flink 提供的一个功能强大的组件,用于实时捕获和处理数据库中的数据变更。可以实时地从各种数据库(如MySQL、PostgreSQL、Oracle、MongoDB等)中捕获数据变更并将其转换为流式数据,FlinkCDC 同步数据有两种方式:
- FlinkSQL
- Flink DataStream 和 Table API(本文使用该方式)

对比其他的CDC开源方案,发现FlinkCDC是绝大多数场景最好的选择方式,别在傻傻的只关注Canal了,如下图所示:

2、代码实战
2.1、数据源准备
本次我是用MySQL 8.0版本,并且创建好数据库(库名为quick_chat),本次演示表结构如下:
CREATE TABLE `quick_chat_msg` (`id` bigint NOT NULL COMMENT '主键id',`from_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '账户id(发送人)',`to_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '账户id(接收人)',`relation_id` varchar(50) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '发送关联',`content` varchar(500) DEFAULT NULL COMMENT '消息内容',`msg_type` tinyint(1) DEFAULT NULL COMMENT '消息类型(1:文字,2:语音,3:表情包,4:文件,5:语音通话,6:视频通话)',`extra_info` varchar(500) DEFAULT NULL COMMENT '额外信息',`create_time` datetime DEFAULT NULL COMMENT '创建时间',`deleted` tinyint(1) DEFAULT NULL COMMENT '删除标识',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
需要保证MySQL的Binlog格式是ROW,不过MySQL 8.0版本格式默认就是ROW:

最后,要把数据库时区配置好,否则会出现问题,命令如下:
SET persist time_zone = '+8:00';
SET time_zone = '+8:00';
SHOW VARIABLES LIKE '%time_zone%';

2.2、代码实战
首先,引入Flink CDC相关依赖,内容如下:
<dependencies><!-- Flink connector连接器基础包 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.14.0</version></dependency><!-- Flink CDC MySQL源 --><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.3.0</version></dependency><!-- Flink DataStream数据流API --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.2.0</version><scope>provided</scope></dependency><!-- Flink客户端--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.0</version></dependency><!--Flink WebUI,端口8081(默认没有开启)--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>1.14.0</version></dependency><!--Flink Table API&SQL程序可以连接到其他外部系统,用于读写批处理表和流式表。--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime_2.12</artifactId><version>1.14.0</version></dependency>
</dependencies>
第二步,开发 Sink 监听类,用于监听 MySQL 数据变化:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;public class MySinkHandler extends RichSinkFunction<String> {@Overridepublic void invoke(String value, Context context) throws Exception {System.out.println(value);}@Overridepublic void open(Configuration parameters) throws Exception {}@Overridepublic void close() throws Exception {}
}
最后,配置好 Flink CDC 监听进程,随着项目启动运行:
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Component
public class MySqlSourceExample {@PostConstructpublic void init() throws Exception {// 配置监听数据源MySqlSource<String> source = MySqlSource.<String>builder().hostname("8.141.28.132").port(3306)// 数据库集合,可以配置多个.databaseList("quick_chat")// 表集合,可以配置多个.tableList("quick_chat.quick_chat_msg").username("root").password("root").deserializer(new JsonDebeziumDeserializationSchema()).includeSchemaChanges(true).build();// 配置 Flink WebUIConfiguration configuration = new Configuration();configuration.setInteger(RestOptions.PORT, 8081);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 检查点间隔时间// checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从早先打下的checkpoint恢复运行,且不影响作业逻辑的准确性。env.enableCheckpointing(5000);DataStreamSink<String> sink = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source").addSink(new MySinkHandler());env.execute();}
}
项目启动完毕后,可以通过8081端口访问Flink UI页面:

2.3、数据格式
上述操作完毕后,我对表数据进行了新增、修改、删除操作,控制台可以看到MySQL变更监听日志输出信息:
# 新增
{"before": null,"after": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"source": {"version": "1.6.4.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1729135279000,"snapshot": "false","db": "quick_chat","sequence": null,"table": "quick_chat_msg","server_id": 1,"gtid": null,"file": "binlog.000002","pos": 2452,"row": 0,"thread": null,"query": null},"op": "c","ts_ms": 1729135278633,"transaction": null
}
# 修改
{"before": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"after": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊,小猫咪","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"source": {"version": "1.6.4.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1729135289000,"snapshot": "false","db": "quick_chat","sequence": null,"table": "quick_chat_msg","server_id": 1,"gtid": null,"file": "binlog.000002","pos": 2825,"row": 0,"thread": null,"query": null},"op": "u","ts_ms": 1729135288473,"transaction": null
}
# 删除
{"before": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊,小猫咪","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"after": null,"source": {"version": "1.6.4.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1729135301000,"snapshot": "false","db": "quick_chat","sequence": null,"table": "quick_chat_msg","server_id": 1,"gtid": null,"file": "binlog.000002","pos": 3247,"row": 0,"thread": null,"query": null},"op": "d","ts_ms": 1729135300692,"transaction": null
}
相关文章:
FlinkCDC 实现 MySQL 数据变更实时同步
文章目录 1、基本介绍2、代码实战2.1、数据源准备2.2、代码实战2.3、数据格式 1、基本介绍 Flink CDC 是 Apache Flink 提供的一个功能强大的组件,用于实时捕获和处理数据库中的数据变更。可以实时地从各种数据库(如MySQL、PostgreSQL、Oracle、MongoDB…...
JavaWeb——Maven(4/8):Maven坐标,idea集成-导入maven项目(两种方式)
目录 Maven坐标 导入Maven项目 第一种方式 第二种方式 Maven坐标 Maven 坐标 是 Maven 当中资源的唯一标识。通过这个坐标,我们就能够唯一定位资源的位置。 Maven 坐标主要用在两个地方。第一个地方:我们可以使用坐标来定义项目。第二个地方&#…...
实现uniapp天地图边界范围覆盖
在uniapp中,难免会遇到使用地图展示的功能,但是百度谷歌这些收费的显然对于大部分开源节流的开发者是不愿意接受的,所以天地图则是最佳选择。 此篇文章,详细的实现地图展示功能,并且可以自定义容器宽高,还可…...
思科网络设备命令
一、交换机巡检命令 接口和流量状态 show interface stats:查看所有接口当前流量。show interface summary:查看所有接口当前状态和流量。show interface status:查看接口状态及可能的错误。show interface | include errors | FastEthernet …...
Egg.js使用ejs快速自动生成resetful风格的CRUD接口
目前的插件能够自动生成egg的crud的都不太好用 我们自己写一个吧 ejs模块 也方便定制 安装依赖 npm install ejs --save ejs 是一个简单易用的模板引擎,常用于 Node.js 应用程序中 在项目根目录下创建 template/controller.ejs 模板文件 use strict;const Co…...
自动化抖音点赞取消脚本批量处理
🌟 前言 欢迎来到我的技术小宇宙!🌌 这里不仅是我记录技术点滴的后花园,也是我分享学习心得和项目经验的乐园。📚 无论你是技术小白还是资深大牛,这里总有一些内容能触动你的好奇心。🔍 &#x…...
基于YOLOv8深度学习的智能车牌检测与识别系统【python源码+Pyqt5界面+数据集+训练代码】目标检测、深度学习实战
背景及意义 智能车牌检测与识别系统通过使用最新的YOLOv8与PaddleOCR算法能够迅速、准确地在多种环境下实现实时车牌的检测和识别。本文基于YOLOv8深度学习框架,通过16770张图片,训练了一个进行车牌检测模型,可以检测蓝牌与绿牌,然后对检测到的车牌使用O…...
qt QGraphicsGridLayout详解
一、概述 QGraphicsGridLayout是Qt框架中用于在QGraphicsScene中布置图形项的一个布局管理器。它类似于QWidget中的QGridLayout,但主要处理的是QGraphicsItem和QGraphicsWidget等图形项。通过合理设置网格位置、伸缩因子和尺寸,可以实现复杂而灵活的布局…...
数字处理系列
(1)将数字转化成中文的过滤器 <template><div><p>数字转中文:{{ 110 | numberToChinese }}</p></div></template><script>export default {filters: {numberToChinese(num) {const chineseNums …...
基于开源Jetlinks物联网平台协议包-MQTT自定义主题数据的编解码
目录 前言 1.下载官方协议包 2.解压 3.自定义主题 4.重写解码方法 5.以下是我解析后接收到的数据 前言 最近这段时间,一直在用开源的Jetlinks物联网平台在学习,偶尔有一次机会接触到物联网设备对接,在协议对接的时候,遇到了…...
【Python】Python2.7升级Python3
需求背景 服务是跑在docker的容器里的,因此要新建image依赖环境是Ubuntu,老的是16.4。 步骤 先准备环境,因为只有你的环境上去了,运行代码的时候才会报错,这样才会把需要改的代码暴露出来。 python3.5目前也是被遗弃的…...
Python 内置函数 round() 详解
在 Python 编程中,round() 函数是一个非常实用的内置函数,用于对数字进行四舍五入。无论是在数据处理、财务计算还是科学计算中,round() 函数都能帮助我们得到所需的精确值。本文将详细介绍 round() 函数的用法和注意事项。 1. round() 函数…...
JavaScript入门中-流程控制语句
本文转载自:https://fangcaicoding.cn/article/52 大家好!我是方才,目前是8人后端研发团队的负责人,拥有6年后端经验&3年团队管理经验,截止目前面试过近200位候选人,主导过单表上10亿、累计上100亿数据…...
kconfig语法(一)
一、安装Kconfiglib python -m pip install windows-curses python -m pip install kconfiglib二、使用样例 ①创建kconfig文件。 ②在kconfig文件添加内容: config KCONFIG_DEMO_ITEM1boolprompt "demonstate item1 for bool learning"config KCONFIG_DEMO_ITE…...
十七、行为型(命令模式)
命令模式(Command Pattern) 概念 命令模式是一种行为型设计模式,它将请求封装成一个对象,从而使您可以使用不同的请求对客户进行参数化,排队请求,以及支持可撤销操作。通过这种模式,调用操作的…...
原材料供应商的GRS认证证书过期了怎么办?
在全球纺织和时尚产业中,GRS(Global Recycle Standard,全球再生标准)认证已成为衡量企业环保和可持续发展的重要指标。然而,当原材料供应商的GRS认证证书过期时,企业需迅速采取行动,以确保供应链…...
C++编程:实现一个基于原始指针的环形缓冲区(RingBuffer)缓存串口数据
文章目录 0. 引言1. 使用示例2. 流程图2.1 追加数据流程2.2 获取空闲块流程2.3 处理特殊字符流程2.4 释放块流程2.5 获取下一个使用块流程 3. 代码详解3.1 Block 结构体3.2 RingBuffer 类3.3 主要方法解析append 方法currentUsed 和 currentUsing 方法release 方法nextUsed 方法…...
LangChain 创始人万字科普:手把手教你设计 Agent 用户交互
LangChain 可以算是 LLM 时代做 AI 应用开发必备的框架和平台,从模型选择、数据库链接与各种 Agent 搭建等,AI 应用的搭建、运行和管理都可以在 LangChain 上进行。 某种意义上,LangChain 可能是最了解 Agent(智能体)…...
Docker 用例:15 种最常见的 Docker 使用方法
容器化应用程序而不是将它们托管在虚拟机上是过去几年一直流行的概念,使容器管理流行起来。Docker 处于这一转变的核心,帮助组织无缝地采用容器化技术。最近,Docker 用例遍布所有行业,无论规模大小和性质如何。 什么是Docker&…...
若依 RuoYi4.6.0 代码审计
环境布置: 到官网下载源码:https://github.com/yangzongzhuan/RuoYi 采用phpstudy集成数据库,5.7版本。JDK1.8。 IDEA打开项目,等待自动加载,修改application-druid.yml配置文件:数据库名,账…...
苍穹外卖--缓存菜品
1.问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得,如果用户端访问量比较大,数据库访问压力随之增大 2.实现思路 通过Redis来缓存菜品数据,减少数据库查询操作。 缓存逻辑分析: ①每个分类下的菜品保持一份缓存数据…...
Nginx server_name 配置说明
Nginx 是一个高性能的反向代理和负载均衡服务器,其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机(Virtual Host)。 1. 简介 Nginx 使用 server_name 指令来确定…...
uniapp微信小程序视频实时流+pc端预览方案
方案类型技术实现是否免费优点缺点适用场景延迟范围开发复杂度WebSocket图片帧定时拍照Base64传输✅ 完全免费无需服务器 纯前端实现高延迟高流量 帧率极低个人demo测试 超低频监控500ms-2s⭐⭐RTMP推流TRTC/即构SDK推流❌ 付费方案 (部分有免费额度&#x…...
大模型多显卡多服务器并行计算方法与实践指南
一、分布式训练概述 大规模语言模型的训练通常需要分布式计算技术,以解决单机资源不足的问题。分布式训练主要分为两种模式: 数据并行:将数据分片到不同设备,每个设备拥有完整的模型副本 模型并行:将模型分割到不同设备,每个设备处理部分模型计算 现代大模型训练通常结合…...
Swagger和OpenApi的前世今生
Swagger与OpenAPI的关系演进是API标准化进程中的重要篇章,二者共同塑造了现代RESTful API的开发范式。 本期就扒一扒其技术演进的关键节点与核心逻辑: 🔄 一、起源与初创期:Swagger的诞生(2010-2014) 核心…...
重启Eureka集群中的节点,对已经注册的服务有什么影响
先看答案,如果正确地操作,重启Eureka集群中的节点,对已经注册的服务影响非常小,甚至可以做到无感知。 但如果操作不当,可能会引发短暂的服务发现问题。 下面我们从Eureka的核心工作原理来详细分析这个问题。 Eureka的…...
华为OD机考-机房布局
import java.util.*;public class DemoTest5 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseSystem.out.println(solve(in.nextLine()));}}priv…...
TSN交换机正在重构工业网络,PROFINET和EtherCAT会被取代吗?
在工业自动化持续演进的今天,通信网络的角色正变得愈发关键。 2025年6月6日,为期三天的华南国际工业博览会在深圳国际会展中心(宝安)圆满落幕。作为国内工业通信领域的技术型企业,光路科技(Fiberroad&…...
elementUI点击浏览table所选行数据查看文档
项目场景: table按照要求特定的数据变成按钮可以点击 解决方案: <el-table-columnprop"mlname"label"名称"align"center"width"180"><template slot-scope"scope"><el-buttonv-if&qu…...
基于鸿蒙(HarmonyOS5)的打车小程序
1. 开发环境准备 安装DevEco Studio (鸿蒙官方IDE)配置HarmonyOS SDK申请开发者账号和必要的API密钥 2. 项目结构设计 ├── entry │ ├── src │ │ ├── main │ │ │ ├── ets │ │ │ │ ├── pages │ │ │ │ │ ├── H…...
