Flink CDC系列之:调研应用Flink CDC将 ELT 从 MySQL 流式传输到 StarRocks方案
Flink CDC系列之:调研应用Flink CDC将 ELT 从 MySQL 流式传输到 StarRocks方案
- 准备
- 准备 Flink Standalone 集群
- 准备 docker compose
- 为 MySQL 准备记录
- 使用 Flink CDC CLI 提交作业
- 同步架构和数据更改
- 路由变更
- 清理
本教程将展示如何使用 Flink CDC 快速构建从 MySQL 到 StarRocks 的 Streaming ELT 作业,包括同步一个数据库的所有表、模式变更演变和将分片表同步到一张表的功能。
本教程中的所有练习都在 Flink CDC CLI 中执行,整个过程使用标准 SQL 语法,无需一行 Java/Scala 代码或 IDE 安装。
准备
准备一台安装了 Docker 的 Linux 或 MacOS 电脑。
准备 Flink Standalone 集群
下载 Flink 1.18.0 ,解压得到 flink-1.18.0 目录。
使用以下命令进入 Flink 目录,并将 FLINK_HOME 设置为 flink-1.18.0 所在的目录。
cd flink-1.18.0
通过将以下参数附加到 conf/flink-conf.yaml 配置文件来启用检查点,每 3 秒执行一次检查点。
execution.checkpointing.interval: 3000
使用以下命令启动 Flink 集群。
./bin/start-cluster.sh
如果启动成功,你就可以通过http://localhost:8081/访问Flink Web UI,如下所示。

多次执行start-cluster.sh可以启动多个TaskManager。
准备 docker compose
以下教程将使用 docker-compose 准备所需的组件。使用下面提供的内容创建 docker-compose.yml 文件:
version: '2.1'
services:StarRocks:image: starrocks/allin1-ubuntu:3.2.6ports:- "8080:8080"- "9030:9030"MySQL:image: debezium/example-mysql:1.1ports:- "3306:3306"environment:- MYSQL_ROOT_PASSWORD=123456- MYSQL_USER=mysqluser- MYSQL_PASSWORD=mysqlpw
Docker Compose 应包含以下服务(容器):
- MySQL:包含一个名为 app_db 的数据库
- StarRocks:存储来自 MySQL 的表
docker-compose up -d
该命令会自动以分离模式启动 Docker Compose 配置中定义的所有容器。运行 docker ps 检查这些容器是否正常运行。您也可以访问 http://localhost:8030/ 检查 StarRocks 是否正在运行。
为 MySQL 准备记录
进入 MySQL 容器
docker-compose exec mysql mysql -uroot -p123456
创建 app_db 数据库和订单、产品、发货表,然后插入记录
-- create database
CREATE DATABASE app_db;USE app_db;-- create orders table
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);-- insert records
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);-- create shipments table
CREATE TABLE `shipments` (
`id` INT NOT NULL,
`city` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);-- insert records
INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');-- create products table
CREATE TABLE `products` (
`id` INT NOT NULL,
`product` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);-- insert records
INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');
使用 Flink CDC CLI 提交作业
- 下载下面列出的二进制压缩包并解压到目录 flink cdc-3.1.0’:
flink-cdc-3.1.0-bin.tar.gz flink-cdc-3.1.0 目录下会包含四个目录:bin、lib、log、conf。 - 下载下面列出的连接器包并移动到 lib 目录
下载链接只针对稳定版本,SNAPSHOT 依赖需要自行基于 master 或 release 分支构建。请注意,需要将 jar 移动到 Flink CDC Home 的 lib 目录,而不是 Flink Home 的 lib 目录。- MySQL 管道连接器 3.1.0
- StarRocks 管道连接器 3.1.0
您还需要将 MySQL 连接器放入 Flink lib 文件夹或使用 --jar 参数传递它,因为它们不再与 CDC 连接器一起打包:
- MySQL Connector Java
编写任务配置yaml文件。下面是同步整个数据库的示例文件mysql-to-starrocks.yaml:
################################################################################
# Description: Sync MySQL all tables to StarRocks
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*server-id: 5400-5404server-time-zone: UTCsink:type: starrocksname: StarRocks Sinkjdbc-url: jdbc:mysql://127.0.0.1:9030load-url: 127.0.0.1:8080username: rootpassword: ""table.create.properties.replication_num: 1pipeline:name: Sync MySQL Database to StarRocksparallelism: 2
注意:
- source 中的 tables: app_db..* 通过正则匹配同步 app_db 中的所有表。
- sink 中的 table.create.properties.replication_num 是因为 Docker 镜像中只有一个 StarRocks BE 节点。
最后,使用Cli将作业提交到Flink Standalone集群。
bash bin/flink-cdc.sh mysql-to-starrocks.yaml
提交成功后返回信息如下:
Pipeline has been submitted to cluster.
Job ID: 02a31c92f0e7bc9a1f4c0051980088a0
Job Description: Sync MySQL Database to StarRocks
我们可以通过 Flink Web UI 找到一个名为“Sync MySQL Database to StarRocks“的作业正在运行。

通过Dbeaver等数据库连接工具使用mysql://127.0.0.1:9030连接jdbc,可以在StarRocks中查看写入三张表的数据。

同步架构和数据更改
进入MySQL容器
docker-compose exec mysql mysql -uroot -p123456
然后修改MySQL中的schema和记录,Doris的表也会实时改变:
在MySQL中的orders中插入一条记录:
INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);
在 MySQL 的订单中添加一列:
ALTER TABLE app_db.orders ADD amount varchar(100) NULL;
从 MySQL 更新订单中的一条记录:
UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;
从 MySQL 中删除订单中的一条记录:
DELETE FROM app_db.orders WHERE id=2;
每执行一步刷新一下Dbeaver,可以看到StarRocks中展示的订单表会实时更新,如下图:

同样的,通过修改shipping和products表,你也可以在StarRocks中实时看到同步修改的结果。
路由变更
Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置。
利用此功能,我们可以实现表名、数据库名替换、全库同步等功能。以下是使用路由功能的示例文件:
################################################################################
# Description: Sync MySQL all tables to StarRocks
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*server-id: 5400-5404server-time-zone: UTCsink:type: starrocksjdbc-url: jdbc:mysql://127.0.0.1:9030load-url: 127.0.0.1:8030username: rootpassword: ""table.create.properties.replication_num: 1route:- source-table: app_db.orderssink-table: ods_db.ods_orders- source-table: app_db.shipmentssink-table: ods_db.ods_shipments- source-table: app_db.productssink-table: ods_db.ods_productspipeline:name: Sync MySQL Database to StarRocksparallelism: 2
通过上面的路由配置,我们可以将app_db.orders的表结构和数据同步到ods_db.ods_orders中,从而实现数据库迁移的功能。具体来说,source-table支持正则匹配多表来同步分库分表,如下:
route:- source-table: app_db.order\.*sink-table: ods_db.ods_orders
这样我们就可以将app_db.order01、app_db.order02、app_db.order03等分片表同步到一张ods_db.ods_orders表中了。
注意,目前还不支持多张表存在相同主键数据的场景,后续版本会支持。
清理
完成教程后,运行以下命令停止docker-compose.yml目录中的所有容器:
docker-compose down
在Flink flink-1.18.0目录下,执行以下命令停止Flink集群:
./bin/stop-cluster.sh
相关文章:
Flink CDC系列之:调研应用Flink CDC将 ELT 从 MySQL 流式传输到 StarRocks方案
Flink CDC系列之:调研应用Flink CDC将 ELT 从 MySQL 流式传输到 StarRocks方案 准备准备 Flink Standalone 集群准备 docker compose为 MySQL 准备记录使用 Flink CDC CLI 提交作业 同步架构和数据更改路由变更清理 本教程将展示如何使用 Flink CDC 快速构建从 MySQ…...
一次元空间FullGC导致OOM问题分析
原文,作者:kkyeer 原文需要翻墙,所以转载。 现象 观测平台告警:FullGC次数大于阈值,5分钟内大于11次,频次大概1-2周有一次 告警后服务概率性会自动恢复,控制台打印 Exception: java.lang.OutOf…...
Web前端开发工具和依赖安装
各种安装: node.js https://nodejs.org/zh-cn/ 安装完node.js 可以使用npm,npm跟随nodejs一起安装 node --version 查看已安装node.js的版本,确认是否安装nodejs npm -v 查看npm版本npm install <Module Name> 安装模块 npm insta…...
【学习心得】远程root用户访问服务器中的MySQL8
一、Ubuntu下的MySQL8安装 在Ubuntu系统中安装MySQL 8.0可以通过以下步骤进行1. 更新包管理工具的仓库列表: sudo apt update 2. 安装MySQL 8.0,root用户默认没有密码: sudo apt install mysql-server sudo apt install mysql-client 【…...
lust变频器维修电梯变频器CDD34.014.W2.1LSPC1
LUST伺服在安装时须注意,不可有任何的铁屑、螺丝、导线等掉人驱动器内。在安装完成后应作基本的检测动作,如对地阻抗,和短路检测等。 所有的安装及使用事项需要符合安全规定,并且也需要符合当地的相关规定和灾害预防措施。DC BUS…...
跨越地域限制:在线原型设计软件的自由与便捷
网络原型设计软件因其便捷性和灵活性,在现代设计工作中扮演着至关重要的角色。与传统的桌面端软件相比,网络原型设计工具无需安装,不受地域限制,且兼容各种操作系统,无论是Linux、Solaris、Mac还是Windows,…...
flash-waimai:高仿饿了么外卖平台,使用他轻松打造自己的外卖平台
嗨,大家好,我是小华同学,关注我们获得“最新、最全、最优质”开源项目和工作学习方法 flash-waimai 是一个完整的外卖平台解决方案,包括手机端、后台管理端和 API 服务。该项目仿照了饿了么的外卖服务,为用户提供了一个…...
2.5 塑性力学—应变状态
个人专栏—塑性力学 1.1 塑性力学基本概念 塑性力学基本概念 1.2 弹塑性材料的三杆桁架分析 弹塑性材料的三杆桁架分析 1.3 加载路径对桁架的影响 加载路径对桁架的影响 2.1 塑性力学——应力分析基本概念 应力分析基本概念 2.2 塑性力学——主应力、主方向、不变量 主应力、主…...
1.机器人抓取与操作介绍-深蓝学院
介绍 操作任务 操作 • Insertion • Pushing and sliding • 其它操作任务 抓取 • 两指(平行夹爪)抓取 • 灵巧手抓取 7轴 Franka 对应人的手臂 6轴 UR构型去掉一个自由度 课程大纲 Robotic Manipulation 操作 • Robotic manipulation refers…...
六,Linux基础环境搭建(CentOS7)- 安装HBase
Linux基础环境搭建(CentOS7)- 安装HBase 大家注意以下的环境搭建版本号,如果版本不匹配有可能出现问题! 一、HBase下载及安装 HBase是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“…...
《计算机网络网络层:连接虚拟世界的关键桥梁》
一、网络层概述 网络层在计算机网络中占据着至关重要的地位,它作为连接不同网络的关键层次,起着承上启下的作用。网络层的主要任务是实现网络互连,将数据设法从源端经过若干个中间节点传送到目的端,为分组交换网上的不同主机提供通…...
【AIGC】2024-arXiv-CtrLoRA:一种可扩展且高效的可控图像生成框架
2024-arXiv-CtrLoRA: An Extensible and Efficient Framework for Controllable Image Generation CtrLoRA:一种可扩展且高效的可控图像生成框架摘要1. 引言相关工作3. 方法3.1 准备工作3.3 有效适应新条件3.4 条件嵌入网络的设计 4. 实验4.1 实验设置4.2 与现有方法…...
立仪光谱共焦在玻璃上奥秘与应用
在现代工业和科学研究中,玻璃因其透明、坚硬和易加工的特性被广泛应用于各个领域。然而,玻璃的厚度测量一直是困扰业界的一大难题。传统的千分尺或电容式传感器虽然在一定程度上能满足生产需求,但在精度、效率以及适用范围上存在明显的局限。…...
【天气识别系统】Python+卷积神经网络算法+人工智能+深度学习+TensorFlow+算法模型训练+Django网页界面
一、介绍 天气识别系统,以Python作为主要编程语言,通过收集了4种常见的天气图像数据集(多云、雨天、晴天、日出),然后基于TensorFlow搭建卷积神经网络算法模型,通过多轮迭代训练,最后得到一个识…...
MiniCTX:面向大语言模型定理证明的上下文相关基准测试系统
卡内基梅隆大学的研究人员推出MiniCTX,这是一个强大的基准测试系统,旨在通过整合前所未有的多重上下文元素(包括前提、先前证明、注释、符号以及导入和声明等结构组件)来彻底改变大型语言模型中定理证明能力的评估方式,…...
树莓派开发相关知识三PWM控制转速
基于树莓派PWM控制 控制L298N马达驱动转速 马达驱动转速 1、L298N电路图: 2、需要留意的有几点 INA~IND四个引脚分别控制OUTA-OUTD,即,INA高电平则OUTA有电。 ENA,ENB分别使能控制OUTA~OUTB以及OUTC~OUTD。 OUT口有VCC电压驱动…...
SpringBoot最常用的注解
1、RestController 作用:与Controller类似,但是RestController会自动将返回值转换为JSON格式。 2、RequestMapping 作用:用于映射请求URL和处理方法。 RequestMapping是Spring MVC框架中的一个核心注解,它用于映射HTTP请求和控…...
js 获取当前时间与前一个月时间
// 获取当前时间的毫秒数 var currentTimeMillis new Date().getTime();// 获取前一个月的Date对象 var dateLastMonth new Date(); dateLastMonth.setMonth(dateLastMonth.getMonth() - 1);// 获取前一个月的毫秒数 var timeMillisLastMonth dateLastMonth.getTime();conso…...
深度了解flink rpc机制(四) 组件启动流程源码分析
前言 目前已发布了3篇关于Flink RPC相关的文章,分别从底层通信系统akka/Pekko,RPC实现方式动态代理以及Flink RPC相关的组件做了介绍 深度了解flink rpc机制(一)-Akka/Pekko_flink pekko akka-CSDN博客 深度了解flink rpc机制&…...
C++基于opencv的视频质量检测--遮挡检测
文章目录 0.引言1. 原始代码分析1.1 存在的问题 2. 优化方案3. 优化后的代码4. 代码详细解读4.1. 输入检查4.2. 图像预处理4.3. 高斯模糊4.4. 梯度计算4.5. 计算梯度幅值和方向4.6. 边缘检测4.7. 计算边缘密度4.8. 估计遮挡程度4.9. 限定结果范围4.10. 返回结果 0.引言 视频质…...
如何设计一个安全的 RESTful API?
如何设计一个安全的 RESTful API?在当今数字化时代,RESTful API 已成为不同系统间数据交互的核心桥梁。随着网络攻击手段的日益复杂,API 的安全性已成为开发者不可忽视的挑战。一个设计不当的 API 可能导致数据泄露、服务瘫痪甚至法律风险。那…...
【HDLBits 刷题 13】Buliding Larger Circuits
目录 写在前面 Buliding Larger Circuits count1k shiftcount fsm seq fsmshift fsm fancytimer fsm onehot 写在前面 以下的解题方法不一定为最佳解决方案,有更好的方法欢迎提出,共同学习,共同进步! Buliding Larger …...
diffvg底层原理揭秘:可微光栅化技术如何让矢量图形支持梯度下降优化
diffvg底层原理揭秘:可微光栅化技术如何让矢量图形支持梯度下降优化 【免费下载链接】diffvg Differentiable Vector Graphics Rasterization 项目地址: https://gitcode.com/gh_mirrors/di/diffvg diffvg是一个创新的开源项目,它实现了可微矢量图…...
终极指南:如何利用Certbot与机器学习构建智能证书异常监控系统
终极指南:如何利用Certbot与机器学习构建智能证书异常监控系统 【免费下载链接】certbot Certbot is EFFs tool to obtain certs from Lets Encrypt and (optionally) auto-enable HTTPS on your server. It can also act as a client for any other CA that uses t…...
百考通AI赋能文献综述,精准破解文献梳理难题
在学术研究的道路上,文献综述是承前启后的关键环节,它既是对领域内已有研究的系统梳理,也是确立自身研究创新点的核心基础。然而,海量文献的筛选、观点的整合、逻辑的搭建,往往让科研工作者与学生耗费大量时间与精力。…...
如何通过社区反馈打造更强大的Mousetrap.js快捷键库:开发者指南
如何通过社区反馈打造更强大的Mousetrap.js快捷键库:开发者指南 【免费下载链接】mousetrap Simple library for handling keyboard shortcuts in Javascript 项目地址: https://gitcode.com/gh_mirrors/mo/mousetrap Mousetrap.js是一款轻量级的JavaScript键…...
Qwen3与Transformer模型深度结合:提升字幕语义理解
Qwen3与Transformer模型深度结合:提升字幕语义理解 不知道你有没有过这样的体验:看视频时,字幕要么跟不上语速,要么翻译得生硬别扭,甚至完全曲解了说话人的意思。尤其是在处理口语化表达、网络流行语或者带有歧义的句…...
Qwen2.5-7B-Instruct参数详解:28层GQA架构与RMSNorm优化原理
Qwen2.5-7B-Instruct参数详解:28层GQA架构与RMSNorm优化原理 1. 引言:为什么我们需要了解模型参数? 你可能已经听说过Qwen2.5-7B-Instruct这个模型,也知道它很强大,但当你看到技术文档里那些“28层”、“GQA”、“RM…...
Origin2017热力图的隐藏技巧:如何用折线图实现数据标签显示
Origin2017热力图数据标签的进阶实现方案 科研数据可视化中,热力图因其直观的色彩映射能力,成为展示高维数据的利器。但Origin2017版本存在一个明显的功能短板——无法直接为热力图添加数据标签。这给需要精确展示数值的学术工作者带来了困扰。本文将系统…...
RemoveWindowsAI:隐私保护与系统优化的Windows AI功能管理方案
RemoveWindowsAI:隐私保护与系统优化的Windows AI功能管理方案 【免费下载链接】RemoveWindowsAI Force Remove Copilot and Recall in Windows 项目地址: https://gitcode.com/GitHub_Trending/re/RemoveWindowsAI 在数字化办公与娱乐日益融合的今天&#x…...
