Flink cdc3.0同步实例(动态变更表结构、分库分表同步)
文章目录
- 前言
- 准备
- flink环境
- docker构建mysql、doris环境
- 数据准备
- 通过 FlinkCDC cli 提交任务
- 整库同步
- 同步变更
- 路由变更
- 路由表结构不一致无法同步
- 结尾
前言
最近Flink CDC 3.0发布, 不仅提供基础的数据同步能力。schema 变更自动同步、整库同步、分库分表等增强功能使 Flink CDC 3.0 在更复杂的数据集成与用户业务场景中发挥作用:用户无需在数据源发生 schema 变更时手动介入,大大降低用户的运维成本;只需对同步任务进行简单配置即可将多表、多库同步至下游,并进行合并等逻辑,显著降低用户的开发难度与入门门槛。Flink CDC 3.0 正式发布。
我们今天基于 Flink CDC 3.0 同步 MySQL 到 Doris ,来体验下新上的整库同步、表结构变更同步和分库分表同步的功能。
准备
flink环境
准备 Flink Standalone 集群,下载最新版本 Flink 1.18.0 ,解压后得到 flink-1.18.0 目录。并且设置 FLINK_HOME 为 flink-1.18.0 所在目录。
通过在 conf/flink-conf.yaml
配置文件追加下列参数开启 checkpoint,每隔 3 秒做一次 checkpoint,方便后续观察数据变更。
execution.checkpointing.interval: 3000
使用下面的命令启动 Flink 集群。
./bin/start-cluster.sh
启动成功的话,可以在 http://localhost:8081/ 访问到 Flink Web UI,如下所示:
多次执行 start-cluster.sh 可以拉起多个 TaskManager,保证Total Task Slots >= 2, 不然提交任务会有资源不足异常,比如我这里执行了3次。 或者是修改 conf/flink-conf.yaml
资源配置。
docker构建mysql、doris环境
如果有安装这两个组件,就可以免去docker,接下来的教程将以 docker-compose 的方式准备所需要的组件。
由于 Doris 的运行需要内存映射支持,需在宿主机执行如下命令:
sysctl -w vm.max_map_count=2000000
docker 镜像启动,使用下面的内容创建一个 docker-compose.yml
文件:
version: '2.1'
services:doris:image: yagagagaga/doris-standaloneports:- "8030:8030"- "8040:8040"- "9030:9030"mysql:image: debezium/example-mysql:1.1ports:- "3306:3306"environment:- MYSQL_ROOT_PASSWORD=123456- MYSQL_USER=mysqluser- MYSQL_PASSWORD=mysqlpw
该 Docker Compose 中包含的容器有:
MySQL: 包含商品信息的数据库 app_db
Doris: 存储从 MySQL 中根据规则映射过来的结果表
在 docker-compose.yml
所在目录下执行下面的命令来启动本教程需要的组件:
docker-compose up -d
该命令将以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。你可以通过 docker ps 来观察上述的容器是否正常启动了,也可以通过访问 http://localhost:8030/ 来查看 Doris 是否运行正常。
数据准备
进入 MySQL 容器, 或者通过客户端工具连接到mysql
docker-compose exec mysql mysql -uroot -p123456
创建数据库 app_db 和表 orders,products
并插入数据
-- 创建数据库
CREATE DATABASE app_db;USE app_db;-- 创建 orders 表
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);-- 插入数据
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);-- 创建 shipments 表
CREATE TABLE `shipments` (
`id` INT NOT NULL,
`city` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);-- 插入数据
INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');-- 创建 products 表
CREATE TABLE `products` (
`id` INT NOT NULL,
`product` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);
Doris 暂时不支持自动创建数据库,需要先创建写入表对应的数据库。
进入 Doris Web UI。http://localhost:8030/,默认的用户名为 root,默认密码为空。
通过 Web UI 创建 app_db 数据库
create database if not exists app_db;
通过 FlinkCDC cli 提交任务
下载下面列出的二进制压缩包,并解压得到目录 flink-cdc-3.0.0
:
flink-cdc-3.0.0-bin.tar.gz flink-cdc-3.0.0 下会包含 bin、lib、log、conf 四个目录。
下载下面列出的 connector 包,并且移动到 lib 目录下
- MySQL pipeline connector 3.0.0
- Apache Doris pipeline connector 3.0.0
整库同步
编写任务配置 yaml 文件,下面给出了一个整库同步的示例文件 mysql-to-doris.yaml:
################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*server-id: 5400-5404server-time-zone: UTCsink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1pipeline:name: Sync MySQL Database to Dorisparallelism: 2
其中:
source 中的 tables: app_db.\.*
通过正则匹配同步 app_db
下的所有表。
sink 添加 table.create.properties.replication_num
参数是由于 Docker 镜像中只有一个 Doris BE 节点。
最后,通过命令行提交任务到 Flink Standalone cluster
bash bin/flink-cdc.sh conf/mysql-to-doris.yaml
提交成功后,返回信息如:
在 Flink Web UI,可以看到一个名为 Sync MySQL Database to Doris
的任务正在运行。job id对应上面的cb049fe4a2112510a77ee46e197054a6
打开 Doris 的 Web UI,可以看到数据表已经被创建出来,数据能成功写入。
同步变更
接下来,修改 MySQL 数据库中表的数据,Doris 中显示的订单数据也将实时更新:
INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);
ALTER TABLE app_db.orders ADD amount varchar(100) NULL;
UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;
DELETE FROM app_db.orders WHERE id=2;
-- 区别于官方再新增一条数据
INSERT INTO app_db.orders VALUES (4, 200, 200.00);
也可以拆开每执行一步,刷新一次 Doris Web UI,可以看到 Doris 中显示的 orders 数据将实时更新,如下所示:
同样的,去修改 shipments, products 表,也能在 Doris 中实时看到同步变更的结果。
路由变更
Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置,借助这种能力,我们能够实现表名库名替换,整库同步等功能。
下面提供一个配置文件conf/mysql-to-doris-route.yaml
说明:
################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db1.\.*server-id: 5400-5404server-time-zone: UTCsink:type: dorisfenodes: 127.0.0.1:8030benodes: 127.0.0.1:8040username: rootpassword: ""table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1route:- source-table: app_db1.orders\.*sink-table: app_db1.ods_orderspipeline:name: Sync MySQL Database to Dorisparallelism: 2
通过上面的 route 配置,使用正则表达式,可以将诸如 app_db1.order_01、app_db1.order_02 的表汇总到 app_db1.ods_orders 中。从而实现分库分表同步的功能。注意,目前还不支持多表中存在相同主键数据的场景,将在后续版本支持。
另外官方文档里的写法存在一个问题。
正则表达式前面加上’\‘转义,app_db1.orders\.*
,否则会抛出异常:java.util.regex.PatternSyntaxException: Dangling meta character ‘*’ near index 0 *
我们在mysql和doris分别创建数据库app_db1
, 然后初始化mysql
-- 创建表orders_01
CREATE TABLE `orders_01` (`id` int NOT NULL,`price` decimal(10,2) NOT NULL,`amount` varchar(100) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;-- 创建表orders_02
CREATE TABLE `orders_02` (`id` int NOT NULL,`price` decimal(10,2) NOT NULL,`amount` varchar(100) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
启动新的job。
然后在orders_01,orders_02分别插入数据
INSERT INTO `orders_01` (`id`, `price`) VALUES (11, 4.00);
INSERT INTO `orders_02` (`id`, `price`) VALUES (12, 100.00);
在doris里验证,数据都写入了app_db1.ods_orders
路由表结构不一致无法同步
看Schema Evolution 设计原理,Flink CDC 3.0 在作业拓扑中引入了 SchemaRegistry,结合 SchemaOperator 协调并控制作业拓扑中的 schema 变更事件处理。当上游数据源发生 schema 变更时,SchemaRegistry 会控制 SchemaOperator 以暂停数据流,并将流水线中的数据从 sink 全部刷出以保证 schema 一致性。当 schema 变更事件在外部系统处理成功后,SchemaOperator 恢复数据流,完成本次 schema 变更的处理。
所以考虑只修改orders_01,再插入数据看doris同步的变化。
-- 添加sku字段
ALTER TABLE app_db1.orders_01 ADD sku varchar(32) NULL;
-- 向orders_01插入id=13
INSERT INTO `orders_01` VALUES (13, 4.00, 8.00, 'apple01');
-- 向orders_02插入id=14
INSERT INTO `orders_02` VALUES (14, 1.00, 1.00);
可以看到doris中的app_db1.orders
表结构发生了变化,但是orders_02
的id=14这条数据没有正常写入。flink异常提示:java.lang.IllegalStateException: Column size does not match the data size
而当修改orders_02的表结构,也会有异常:Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: status of AddColumnEvent is already existed。并且之后写入的数据无法正常同步。
结尾
flink cdc的功能越来越强,也再尝试解决用户的使用痛点。不过放到生产环境使用还需要建立在更多的实践测试之上。
相关文章:
Flink cdc3.0同步实例(动态变更表结构、分库分表同步)
文章目录 前言准备flink环境docker构建mysql、doris环境数据准备 通过 FlinkCDC cli 提交任务整库同步同步变更路由变更路由表结构不一致无法同步 结尾 前言 最近Flink CDC 3.0发布, 不仅提供基础的数据同步能力。schema 变更自动同步、整库同步、分库分表等增强功…...
国产Apple Find My认证芯片哪里找,伦茨科技ST17H6x芯片可以帮到您
深圳市伦茨科技有限公司(以下简称“伦茨科技”)发布ST17H6x Soc平台。成为继Nordic之后全球第二家取得Apple Find My「查找」认证的芯片厂家,该平台提供可通过Apple Find My认证的Apple查找(Find My)功能集成解决方案。…...
肺癌相关知识
写在前面 大概想了解下肺癌相关的知识,开此贴做记录,看看后续有没有相关的生信文章思路。 综述 文章名期刊影响因子Lung cancer immunotherapy: progress, pitfalls, and promisesMol Cancer37.3 常见治疗手段有surgery, radiation therapy, chemoth…...
ChimeraX使用教程-安装及基本操作
ChimeraX使用教程-安装及基本操作 1、访问https://www.cgl.ucsf.edu/chimerax/download.html进行下载,然后安装 安装完成后,显示界面 2、基本操作 1、点击file,导入 .PDB 文件。 (注:在 alphafold在线预测蛋白》点…...
【小黑嵌入式系统第十一课】μC/OS-III程序设计基础(一)——任务设计、任务管理(创建基本状态内部任务)、任务调度、系统函数
上一课: 【小黑嵌入式系统第十课】μC/OS-III概况——实时操作系统的特点、基本概念(内核&任务&中断)、与硬件的关系&实现 文章目录 一、任务设计1.1 任务概述1.2 任务的类型1.2.1 单次执行类任务(运行至完成型&#…...
Redis一些常用的技术
文章目录 第1关:Redis 事务与锁机制第2关:流水线第3关:发布订阅第4关:超时命令第5关:使用Lua语言 第1关:Redis 事务与锁机制 编程要求 根据提示,在右侧编辑器Begin-End补充代码,根据…...
基于QPainter 绘图图片绕绘制设备中心旋转
项目地址:https://gitcode.com/m0_45463480/QPainter/tree/main 获取途径:进入CSDN->GitCode直接下载或者通过git拉取仓库内容。 QPainter是Qt框架中的一个类,用于在QWidget或QPixmap等设备上进行绘图操作。它提供了丰富的绘图功能,可以用于绘制线条、图形、文本等。Q…...
计算机网络(4):网络层
网络层提供的两种服务 虚电路服务(Virtual Circuit Service)和数据报服务(Datagram Service)是在网络层(第三层)提供的两种不同的通信服务。它们主要区别在于建立连接的方式和数据传输的方式。 虚电路服务…...
动态内存分配(malloc和free、calloc和realloc)
目录 一、为什么要有动态内存分配 二、C/C中程序内存区域划分 三、malloc和free 2.1、malloc 2.2、free 四、calloc和realloc 3.1、calloc 3.2、realloc 3.3realloc在调整内存空间的是存在两种情况: 3.4realloc有malloc的功能 五、常见的动…...
C语言---井字棋(三子棋)
Tic-Tac-Toe 1 游戏介绍和随机数1.1 游戏介绍1.2 随机数的生成1.3 棋盘大小和符号 2 设计游戏2.1 初始化棋盘2.2 打印棋盘2.3 玩家下棋2.4 电脑下棋2.5 判断输赢2.6 game()函数2.7 main()函数 3 完整三子棋代码3.1 Tic_Tac_Toe.h3.2 Tic_Tac_Toe.c3.3 Test.c 4 游戏代码的缺陷 …...
[Kubernetes]3. k8s集群Service详解
在上一节讲解了k8s 的pod,deployment,以及借助pod,deployment来部署项目,但会存在问题: 每次只能访问一个 pod,没有负载均衡自动转发到不同 pod访问还需要端口转发Pod重创后IP变了,名字也变了针对上面的问题,可以借助Service来解决,下面就来看看Service怎么使用 一.Service详…...
C++ 指定范围内递增初始化一个vector<int> | Python: list(range(31, 90))
通过lambda表达式 std::iota()实现: template <typename Tp> inline void print_vec(const std::vector<Tp>& vec) {fmt::print("[{}]\n", fmt::join(vec, ", ")); }// 相当于Python的lst list(range(31, 90))const std::ve…...
【Java之数据结构与算法】
选择排序 package Code01;public class Code01_SelectionSort {public static void selectionSort(int[] arr) {if(arrnull||arr.length<2) {return;}for(int i0;i<arr.length;i) {int minIndex i;for(int ji1;j<arr.length;j) {minIndex arr[minIndex] > arr[j…...
爬虫scrapy中间件的使用
爬虫scrapy中间件的使用 学习目标: 应用 scrapy中使用间件使用随机UA的方法应用 scrapy中使用代理ip的的方法应用 scrapy与selenium配合使用 1. scrapy中间件的分类和作用 1.1 scrapy中间件的分类 根据scrapy运行流程中所在位置不同分为: 下载中间件…...
普冉(PUYA)单片机开发笔记 [完结篇]:使用体会
失败的移植:FreeRTOS 当使用了 PY32F003 的各种接口和功能后,手痒痒想把 FreeRTOS 也搬到这个 MCU 上,参考 STM32 和 GD32 对 FreeRTOS 的移植步骤,把 FreeRTOS v202212.00 版本的源码搬到了 Keil 工程中,编译倒是通过…...
Elasticsearch:生成 AI 中的微调与 RAG
在自然语言处理 (NLP) 领域,出现了两种卓越的技术,每种技术都有其独特的功能:微调大型语言模型 (LLM) 和 RAG(检索增强生成)。 这些方法极大地影响了我们利用语言模型的方式,使它们更加通用和有效。 在本文…...
ip静态好还是dhcp好?
选择使用静态 IP 还是 DHCP(动态主机配置协议)取决于您的网络需求和环境。下面是它们的一些特点和适用场景: 静态 IP: 固定的 IP 地址:静态 IP 是手动配置在设备上的固定 IP 地址,不会随时间或网络变化而改…...
PolarDB-X、OceanBase、CockroachDB、TiDB二级索引写入性能测评
为什么要做这个测试 二级索引是关系型数据库相较于NoSQL数据库的一个关键差异。二级索引必须是强一致的,因此索引的写入需要与主键的写入放在一个事务当中,事务的性能是二级索引性能的基础。 目前市面上的分布式数据库中,从使用体验的角度看…...
Convolutional Neural Network(CNN)——卷积神经网络
1.NN的局限性 拓展性差 NN的计算量大性能差,不利于在不同规模的数据集上有效运行若输入维度发生变化,需要修改并重新训练网络容易过拟合 全连接导致参数量特别多,容易过拟合如果增加更多层,参数量会翻倍无法有效利用局部特征 输入…...
鸿蒙开发基本概念
1、开发准备 1.1、UI框架 HarmonyOS提供了一套UI开发框架,即方舟开发框架(ArkUI框架)。方舟开发框架可为开发者提供应用UI开发所必需的能力,比如多种组件、布局计算、动画能力、UI交互、绘制等。 方舟开发框架针对不同目的和技术…...
Open CV 图像处理基础:(二)从基本概念到实践操作
Open CV 图像处理基础:从基本概念到实践操作 一、引言 图像处理是计算机视觉领域的一个重要分支,它涉及对图像的各种操作和处理。了解图像的基本概念、读取和显示方法以及基本操作是图像处理的基础。本文将通过示例文章的形式,帮助初学者逐…...
【MAC】M2 安装docker 与 mysql
一、docker下载地址 下载地址 二、安装docker完成 罗列一下docker常用命令 # 查看docker版本 docker --version# 拉取镜像 docker pull 镜像名# 查看当前所有镜像 docker images# 查看运行中的容器 docker ps -a docker ps grep| 镜像名#镜像启动操作: sudo dock…...
轻量级web开发框架Flask本地部署及无公网ip远程访问界面
文章目录 前言1. 安装部署Flask2. 安装Cpolar内网穿透3. 配置Flask的web界面公网访问地址4. 公网远程访问Flask的web界面 前言 本篇文章讲解如何在本地安装Flask,以及如何将其web界面发布到公网上并进行远程访问。 Flask是目前十分流行的web框架,采用P…...
用最通俗的语言讲解 TCP “三次握手,四次挥手”
目录 一. 前言 二. TCP 报文的头部结构 三. 三次握手 3.1. 三次握手过程 3.2. 为什么要三次握手 四. 四次挥手 4.1. 四次挥手过程 4.2. 为什么要四次挥手 五. 大白话说 5.1. 大白话说三次握手 5.2. 大白话说四次挥手 六. 总结 一. 前言 TCP 是一种面向连接的、可靠…...
使用RedisCacheWriter#clean在线异步地批量扫描匹配删除缓存数据-spring-data-redis
1.背景 生产环境,某云的某个业务Redis实例,触发内存使用率,连续 3 次 平均值 > 85 %告警。 运维同学告知,看看需要怎么优化或者升级配置?分享了其实例RDB的内存剖析链接。 通过内存剖析详情发现,存在某…...
机器视觉:AI赋能缺陷检测,铸就芯片产品的大算力与高能效
导言:近年来,国内芯片行业快速发展,市场对芯片需求的不断增大,芯片的缺陷检测压力也越来越大。芯片产品在生产制造过程中,需要经历数道工序,每个生产环节的材料、环境、工艺参数等都有可能造成产品缺陷。不…...
(9)Linux Git的介绍以及缓冲区
💭 前言 本章我们先对缓冲区的概念进行一个详细的探究,之后会带着大家一步步去编写一个简陋的 "进度条" 小程序。最后我们来介绍一下 Git,着重讲解一下 Git 三板斧,一般只要掌握三板斧就基本够用了。 缓冲区ÿ…...
华为云之ECS云产品快速入门
华为云之ECS云产品快速入门 一、ECS云服务器介绍二、本次实践目标三、创建虚拟私有云VPC1.虚拟私有云VPC介绍2.进入虚拟私有云VPC管理页面3.创建虚拟私有云4.查看创建的VPC 四、创建弹性云服务器ECS——Linux1.进入ECS购买界面2.创建弹性云服务器(Linux)——基础配置步骤3.创建…...
tcp 的限制 (TCP_WRAPPERS)
#江南的江 #每日鸡汤:青春是打开了就合不上的书,人生是踏上了就回不了头的路,爱情是扔出了就收不回的赌注。 #初心和目标:拿到高级网络工程师 TCP_WRAPPERs Tcp_wrappers 对于七层模型中是位于第四层的安全工具,他…...
如何保证架构的质量
1. 如何保证架构的质量: ①. 稳定性、健壮性(1). 系统稳定性: ①. 定义:a. 当一个实际的系统处于一个平衡的状态时,如果受到外来作用的影响时,系统经过一个过渡过程仍然能够回到原来的平衡状态.b. 可以说这个系统是稳定的,否则系统不稳定c. 如一根绳子绑着小球,处于垂直状态,…...
网站规划与设计网站页面/优化模型数学建模
文章目录1、Table is in readonly mode (zookeeper path: /clickhouse/tables/iov/t_fault/2)2、Replica /clickhouse/tables/s1/dwd/xxxx/replicas/dt_fault already exists3、数据写入成功,但是数据库并不存在数据4、查询时(非MergeTree表引擎…...
做网站的价位/百度搜索引擎优化详解
可以访问 查看更多关于 消息中间件 的原创文章。移山是禧云自研的数据迁移平台,包含异构数据源的迁移、实时数据同步等服务。有兴趣的可以看这里:本文主要介绍移山实时数据同步服务产生的背景以及整体架构设计。可以访问一. 移山实时数据同步服务产生背…...
增加网站访问量/成都谷歌seo
在第一次安装的时候出现这个错误信息 解决办法: 修改config.inc.php文件里的两个属性值为: $tlCfg->log_path TL_ABS_PATH . logs . DIRECTORY_SEPARATOR ;$g_repositoryPath TL_ABS_PATH . "upload_area" . DIRECTORY_SEPARATOR; 配置中文 在config…...
做早餐的网站/seo关键词优化推广
1、查看物理CPU的个数[rootMysqlCluster01 ~]# cat /proc/cpuinfo |grep "physical id"|sort |uniq|wc -l12、查看逻辑CPU的个数[rootMysqlCluster01 ~]# cat /proc/cpuinfo |grep "processor"|wc -l43、查看CPU是几核(即,核心数)[rootMysqlClu…...
南阳建网站企业/百度学术论文查重官网
NPM酷库,每天两分钟,了解一个流行NPM库。 JSON是JS中数据交换时最常用的数据格式,其序列话和反序列化性能非常好,但是其语法却比较严格,比如以下是一个合法的JS声明,却不是一个合法的JSON: { fo…...
烟台建网站公司哪家好/谷歌浏览器 安卓下载2023版
我有一个Manager类,该类将数据保存在SQL表中,并从SQL表中获取结果并测试这些数据.当我运行程序时,将显示一个获取ID和密码的框架,如果它们正确,则另一个框架将但是我不知道为什么它只是测试SQL表的最后一行?我的意思是如果我用除最后一行以外的其他ID和密码设置那些…...