当前位置: 首页 > news >正文

Flink CDC系列之:调研应用Flink CDC将 ELT 从 MySQL 流式传输到 StarRocks方案

Flink CDC系列之:调研应用Flink CDC将 ELT 从 MySQL 流式传输到 StarRocks方案

  • 准备
    • 准备 Flink Standalone 集群
    • 准备 docker compose
    • 为 MySQL 准备记录
    • 使用 Flink CDC CLI 提交作业
  • 同步架构和数据更改
  • 路由变更
  • 清理

本教程将展示如何使用 Flink CDC 快速构建从 MySQL 到 StarRocks 的 Streaming ELT 作业,包括同步一个数据库的所有表、模式变更演变和将分片表同步到一张表的功能。
本教程中的所有练习都在 Flink CDC CLI 中执行,整个过程使用标准 SQL 语法,无需一行 Java/Scala 代码或 IDE 安装。

准备

准备一台安装了 Docker 的 Linux 或 MacOS 电脑。

准备 Flink Standalone 集群

下载 Flink 1.18.0 ,解压得到 flink-1.18.0 目录。

使用以下命令进入 Flink 目录,并将 FLINK_HOME 设置为 flink-1.18.0 所在的目录。

cd flink-1.18.0

通过将以下参数附加到 conf/flink-conf.yaml 配置文件来启用检查点,每 3 秒执行一次检查点。

execution.checkpointing.interval: 3000

使用以下命令启动 Flink 集群。

./bin/start-cluster.sh

如果启动成功,你就可以通过http://localhost:8081/访问Flink Web UI,如下所示。
在这里插入图片描述
多次执行start-cluster.sh可以启动多个TaskManager。

准备 docker compose

以下教程将使用 docker-compose 准备所需的组件。使用下面提供的内容创建 docker-compose.yml 文件:

version: '2.1'
services:StarRocks:image: starrocks/allin1-ubuntu:3.2.6ports:- "8080:8080"- "9030:9030"MySQL:image: debezium/example-mysql:1.1ports:- "3306:3306"environment:- MYSQL_ROOT_PASSWORD=123456- MYSQL_USER=mysqluser- MYSQL_PASSWORD=mysqlpw

Docker Compose 应包含以下服务(容器):

  • MySQL:包含一个名为 app_db 的数据库
  • StarRocks:存储来自 MySQL 的表
docker-compose up -d

该命令会自动以分离模式启动 Docker Compose 配置中定义的所有容器。运行 docker ps 检查这些容器是否正常运行。您也可以访问 http://localhost:8030/ 检查 StarRocks 是否正在运行。

为 MySQL 准备记录

进入 MySQL 容器

docker-compose exec mysql mysql -uroot -p123456

创建 app_db 数据库和订单、产品、发货表,然后插入记录

-- create database
CREATE DATABASE app_db;USE app_db;-- create orders table
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);-- insert records
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);-- create shipments table
CREATE TABLE `shipments` (
`id` INT NOT NULL,
`city` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);-- insert records
INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');-- create products table
CREATE TABLE `products` (
`id` INT NOT NULL,
`product` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);-- insert records
INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');

使用 Flink CDC CLI 提交作业

  • 下载下面列出的二进制压缩包并解压到目录 flink cdc-3.1.0’:
    flink-cdc-3.1.0-bin.tar.gz flink-cdc-3.1.0 目录下会包含四个目录:bin、lib、log、conf。
  • 下载下面列出的连接器包并移动到 lib 目录
    下载链接只针对稳定版本,SNAPSHOT 依赖需要自行基于 master 或 release 分支构建。请注意,需要将 jar 移动到 Flink CDC Home 的 lib 目录,而不是 Flink Home 的 lib 目录。
    • MySQL 管道连接器 3.1.0
    • StarRocks 管道连接器 3.1.0

您还需要将 MySQL 连接器放入 Flink lib 文件夹或使用 --jar 参数传递它,因为它们不再与 CDC 连接器一起打包:

  • MySQL Connector Java

编写任务配置yaml文件。下面是同步整个数据库的示例文件mysql-to-starrocks.yaml:

################################################################################
# Description: Sync MySQL all tables to StarRocks
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*server-id: 5400-5404server-time-zone: UTCsink:type: starrocksname: StarRocks Sinkjdbc-url: jdbc:mysql://127.0.0.1:9030load-url: 127.0.0.1:8080username: rootpassword: ""table.create.properties.replication_num: 1pipeline:name: Sync MySQL Database to StarRocksparallelism: 2

注意:

  • source 中的 tables: app_db..* 通过正则匹配同步 app_db 中的所有表。
  • sink 中的 table.create.properties.replication_num 是因为 Docker 镜像中只有一个 StarRocks BE 节点。

最后,使用Cli将作业提交到Flink Standalone集群。

bash bin/flink-cdc.sh mysql-to-starrocks.yaml

提交成功后返回信息如下:

Pipeline has been submitted to cluster.
Job ID: 02a31c92f0e7bc9a1f4c0051980088a0
Job Description: Sync MySQL Database to StarRocks

我们可以通过 Flink Web UI 找到一个名为“Sync MySQL Database to StarRocks“的作业正在运行。
在这里插入图片描述
通过Dbeaver等数据库连接工具使用mysql://127.0.0.1:9030连接jdbc,可以在StarRocks中查看写入三张表的数据。

在这里插入图片描述

同步架构和数据更改

进入MySQL容器

docker-compose exec mysql mysql -uroot -p123456

然后修改MySQL中的schema和记录,Doris的表也会实时改变:

在MySQL中的orders中插入一条记录:

INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);

在 MySQL 的订单中添加一列:

ALTER TABLE app_db.orders ADD amount varchar(100) NULL;

从 MySQL 更新订单中的一条记录:

UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;

从 MySQL 中删除订单中的一条记录:

DELETE FROM app_db.orders WHERE id=2;

每执行一步刷新一下Dbeaver,可以看到StarRocks中展示的订单表会实时更新,如下图:
在这里插入图片描述
同样的,通过修改shipping和products表,你也可以在StarRocks中实时看到同步修改的结果。

路由变更

Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置。
利用此功能,我们可以实现表名、数据库名替换、全库同步等功能。以下是使用路由功能的示例文件:

################################################################################
# Description: Sync MySQL all tables to StarRocks
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*server-id: 5400-5404server-time-zone: UTCsink:type: starrocksjdbc-url: jdbc:mysql://127.0.0.1:9030load-url: 127.0.0.1:8030username: rootpassword: ""table.create.properties.replication_num: 1route:- source-table: app_db.orderssink-table: ods_db.ods_orders- source-table: app_db.shipmentssink-table: ods_db.ods_shipments- source-table: app_db.productssink-table: ods_db.ods_productspipeline:name: Sync MySQL Database to StarRocksparallelism: 2

通过上面的路由配置,我们可以将app_db.orders的表结构和数据同步到ods_db.ods_orders中,从而实现数据库迁移的功能。具体来说,source-table支持正则匹配多表来同步分库分表,如下:

route:- source-table: app_db.order\.*sink-table: ods_db.ods_orders

这样我们就可以将app_db.order01、app_db.order02、app_db.order03等分片表同步到一张ods_db.ods_orders表中了。

注意,目前还不支持多张表存在相同主键数据的场景,后续版本会支持。

清理

完成教程后,运行以下命令停止docker-compose.yml目录中的所有容器:

docker-compose down

在Flink flink-1.18.0目录下,执行以下命令停止Flink集群:

./bin/stop-cluster.sh

相关文章:

Flink CDC系列之:调研应用Flink CDC将 ELT 从 MySQL 流式传输到 StarRocks方案

Flink CDC系列之:调研应用Flink CDC将 ELT 从 MySQL 流式传输到 StarRocks方案 准备准备 Flink Standalone 集群准备 docker compose为 MySQL 准备记录使用 Flink CDC CLI 提交作业 同步架构和数据更改路由变更清理 本教程将展示如何使用 Flink CDC 快速构建从 MySQ…...

一次元空间FullGC导致OOM问题分析

原文,作者:kkyeer 原文需要翻墙,所以转载。 现象 观测平台告警:FullGC次数大于阈值,5分钟内大于11次,频次大概1-2周有一次 告警后服务概率性会自动恢复,控制台打印 Exception: java.lang.OutOf…...

Web前端开发工具和依赖安装

各种安装&#xff1a; node.js https://nodejs.org/zh-cn/ 安装完node.js 可以使用npm&#xff0c;npm跟随nodejs一起安装 node --version 查看已安装node.js的版本&#xff0c;确认是否安装nodejs npm -v 查看npm版本npm install <Module Name> 安装模块 npm insta…...

【学习心得】远程root用户访问服务器中的MySQL8

一、Ubuntu下的MySQL8安装 在Ubuntu系统中安装MySQL 8.0可以通过以下步骤进行1. 更新包管理工具的仓库列表&#xff1a; sudo apt update 2. 安装MySQL 8.0&#xff0c;root用户默认没有密码&#xff1a; sudo apt install mysql-server sudo apt install mysql-client 【…...

lust变频器维修电梯变频器CDD34.014.W2.1LSPC1

LUST伺服在安装时须注意&#xff0c;不可有任何的铁屑、螺丝、导线等掉人驱动器内。在安装完成后应作基本的检测动作&#xff0c;如对地阻抗&#xff0c;和短路检测等。 所有的安装及使用事项需要符合安全规定&#xff0c;并且也需要符合当地的相关规定和灾害预防措施。DC BUS…...

跨越地域限制:在线原型设计软件的自由与便捷

网络原型设计软件因其便捷性和灵活性&#xff0c;在现代设计工作中扮演着至关重要的角色。与传统的桌面端软件相比&#xff0c;网络原型设计工具无需安装&#xff0c;不受地域限制&#xff0c;且兼容各种操作系统&#xff0c;无论是Linux、Solaris、Mac还是Windows&#xff0c;…...

flash-waimai:高仿饿了么外卖平台,使用他轻松打造自己的外卖平台

嗨&#xff0c;大家好&#xff0c;我是小华同学&#xff0c;关注我们获得“最新、最全、最优质”开源项目和工作学习方法 flash-waimai 是一个完整的外卖平台解决方案&#xff0c;包括手机端、后台管理端和 API 服务。该项目仿照了饿了么的外卖服务&#xff0c;为用户提供了一个…...

2.5 塑性力学—应变状态

个人专栏—塑性力学 1.1 塑性力学基本概念 塑性力学基本概念 1.2 弹塑性材料的三杆桁架分析 弹塑性材料的三杆桁架分析 1.3 加载路径对桁架的影响 加载路径对桁架的影响 2.1 塑性力学——应力分析基本概念 应力分析基本概念 2.2 塑性力学——主应力、主方向、不变量 主应力、主…...

1.机器人抓取与操作介绍-深蓝学院

介绍 操作任务 操作 • Insertion • Pushing and sliding • 其它操作任务 抓取 • 两指&#xff08;平行夹爪&#xff09;抓取 • 灵巧手抓取 7轴 Franka 对应人的手臂 6轴 UR构型去掉一个自由度 课程大纲 Robotic Manipulation 操作 • Robotic manipulation refers…...

六,Linux基础环境搭建(CentOS7)- 安装HBase

Linux基础环境搭建&#xff08;CentOS7&#xff09;- 安装HBase 大家注意以下的环境搭建版本号&#xff0c;如果版本不匹配有可能出现问题&#xff01; 一、HBase下载及安装 HBase是一个分布式的、面向列的开源数据库&#xff0c;该技术来源于 Fay Chang 所撰写的Google论文“…...

《计算机网络网络层:连接虚拟世界的关键桥梁》

一、网络层概述 网络层在计算机网络中占据着至关重要的地位&#xff0c;它作为连接不同网络的关键层次&#xff0c;起着承上启下的作用。网络层的主要任务是实现网络互连&#xff0c;将数据设法从源端经过若干个中间节点传送到目的端&#xff0c;为分组交换网上的不同主机提供通…...

【AIGC】2024-arXiv-CtrLoRA:一种可扩展且高效的可控图像生成框架

2024-arXiv-CtrLoRA: An Extensible and Efficient Framework for Controllable Image Generation CtrLoRA&#xff1a;一种可扩展且高效的可控图像生成框架摘要1. 引言相关工作3. 方法3.1 准备工作3.3 有效适应新条件3.4 条件嵌入网络的设计 4. 实验4.1 实验设置4.2 与现有方法…...

立仪光谱共焦在玻璃上奥秘与应用

在现代工业和科学研究中&#xff0c;玻璃因其透明、坚硬和易加工的特性被广泛应用于各个领域。然而&#xff0c;玻璃的厚度测量一直是困扰业界的一大难题。传统的千分尺或电容式传感器虽然在一定程度上能满足生产需求&#xff0c;但在精度、效率以及适用范围上存在明显的局限。…...

【天气识别系统】Python+卷积神经网络算法+人工智能+深度学习+TensorFlow+算法模型训练+Django网页界面

一、介绍 天气识别系统&#xff0c;以Python作为主要编程语言&#xff0c;通过收集了4种常见的天气图像数据集&#xff08;多云、雨天、晴天、日出&#xff09;&#xff0c;然后基于TensorFlow搭建卷积神经网络算法模型&#xff0c;通过多轮迭代训练&#xff0c;最后得到一个识…...

MiniCTX:面向大语言模型定理证明的上下文相关基准测试系统

卡内基梅隆大学的研究人员推出MiniCTX&#xff0c;这是一个强大的基准测试系统&#xff0c;旨在通过整合前所未有的多重上下文元素&#xff08;包括前提、先前证明、注释、符号以及导入和声明等结构组件&#xff09;来彻底改变大型语言模型中定理证明能力的评估方式&#xff0c…...

树莓派开发相关知识三PWM控制转速

基于树莓派PWM控制 控制L298N马达驱动转速 马达驱动转速 1、L298N电路图&#xff1a; 2、需要留意的有几点 INA~IND四个引脚分别控制OUTA-OUTD&#xff0c;即&#xff0c;INA高电平则OUTA有电。 ENA&#xff0c;ENB分别使能控制OUTA~OUTB以及OUTC~OUTD。 OUT口有VCC电压驱动…...

SpringBoot最常用的注解

1、RestController 作用&#xff1a;与Controller类似&#xff0c;但是RestController会自动将返回值转换为JSON格式。 2、RequestMapping 作用&#xff1a;用于映射请求URL和处理方法。 RequestMapping是Spring MVC框架中的一个核心注解&#xff0c;它用于映射HTTP请求和控…...

js 获取当前时间与前一个月时间

// 获取当前时间的毫秒数 var currentTimeMillis new Date().getTime();// 获取前一个月的Date对象 var dateLastMonth new Date(); dateLastMonth.setMonth(dateLastMonth.getMonth() - 1);// 获取前一个月的毫秒数 var timeMillisLastMonth dateLastMonth.getTime();conso…...

深度了解flink rpc机制(四) 组件启动流程源码分析

前言 目前已发布了3篇关于Flink RPC相关的文章&#xff0c;分别从底层通信系统akka/Pekko&#xff0c;RPC实现方式动态代理以及Flink RPC相关的组件做了介绍 深度了解flink rpc机制&#xff08;一&#xff09;-Akka/Pekko_flink pekko akka-CSDN博客 深度了解flink rpc机制&…...

C++基于opencv的视频质量检测--遮挡检测

文章目录 0.引言1. 原始代码分析1.1 存在的问题 2. 优化方案3. 优化后的代码4. 代码详细解读4.1. 输入检查4.2. 图像预处理4.3. 高斯模糊4.4. 梯度计算4.5. 计算梯度幅值和方向4.6. 边缘检测4.7. 计算边缘密度4.8. 估计遮挡程度4.9. 限定结果范围4.10. 返回结果 0.引言 视频质…...

手机玩潜水员戴夫?GameViewer远程如何随时随地玩潜水员戴夫教程

如果你是潜水员戴夫的忠实玩家&#xff0c;你知道如何在手机上玩潜水员戴夫吗&#xff1f;潜水员戴夫是一个以神秘蓝洞为背景的海洋冒险游戏。在这个游戏里你白天可以在美丽的大海里打鱼&#xff0c;晚上可以经营寿司店。现在这个游戏也能实现用手机随时随地畅玩了&#xff01;…...

UE5 喷射背包

首选创建一个输入操作 然后在输入映射中添加&#xff0c;shift是向上飞&#xff0c;ctrl是向下飞 进入人物蓝图中编写逻辑&#xff0c;变量HaveJatpack默认true&#xff0c;Thrust为0 最后...

【Vue3】第三篇

Vue3学习第三篇 01. 组件组成02. 组件嵌套关系03. 组件注册方式04. 组件传递数据Props05. 组件传递多种数据类型06. 组件传递Props校验07. 组件事件08. 组件事件配合v-model使用09. 组件数据传递10. 透传Attributes 01. 组件组成 在vue当中&#xff0c;组件是最重要的知识&…...

c++二级指针

如果要通过函数改变一个指针的值&#xff0c;要往函数中传入指针的指针 如果要通过函数改变一个变量的值&#xff0c;那就要往函数中传入这个变量的地址 改变a的值和b的值 #include <iostream>using namespace std;void swap(int* a, int* b) {int temp *a;*a *b;*b …...

客户端存储 — IndexedDB 实现分页查询

前言 相信 IndexedDB 大家都有过了解&#xff0c;但是不一定每个人都有过实践&#xff0c;并且其中涉及到事务、游标等概念&#xff0c;会导致在初次使用时会有些不适应&#xff0c;那么本文会通过 IndexedDB 实现分页查询的形式进行实践&#xff0c;在开始之前&#xff0c;可…...

logback 如何将日志输出到文件

如何作 将日志输出到文件需要使用 RollingFileAppender&#xff0c;该 Appender 必须定义 rollingPolicy &#xff0c;另外 rollingPollicy 下必须定义 fileNamePattern 和 encoder <appender name"fileAppender" class"ch.qos.logback.core.rolling.Rollin…...

Files.newBufferedReader和Files.readAllLines

在Java中&#xff0c;Files.newBufferedReader 和 Files.readAllLines 都是用于从文件中读取数据的工具方法&#xff0c;但它们的使用场景和功能有所不同。下面我将详细解释这两个方法的含义、用途、区别、优缺点以及各自的使用场景。 1. Files.newBufferedReader 含义和用途…...

MySQL 数据库备份与恢复全攻略

MySQL 数据库备份与恢复全攻略 引言 在现代应用中&#xff0c;数据库是核心组件之一。无论是个人项目还是企业级应用&#xff0c;数据的安全性和完整性都至关重要。为了防止数据丢失、损坏或意外删除&#xff0c;定期备份数据库是必不可少的。本文将详细介绍 MySQL 数据库的备…...

Appium中的api(一)

目录 1.基础python代码准备 1--参数的一些说明 2--python内所要编写的代码 解释 2.如何获取包名和界面名 1-api 2-完整代码 代码解释 3.如何关闭驱动连接 4.安装卸载app 1--卸载 2--安装 5.判断app是否安装 6.将应用放到后台在切换为前台的时间 7.UIAutomatorViewer的使用 1--找…...

【AI辅助设计】没错!训练FLUX LoRA就这么简单!

前言 得益于开源社区的力量&#xff0c;在各位大佬的努力下&#xff0c;现在16G VRAM的家用电脑也可以训练FLUX的LoRA了 &#x1f44f;。 今天我使用fluxgym这个方法&#xff0c;训练LoRA&#xff0c;并记录过程。 篇幅有限&#xff0c;这里就不一一展示了&#xff0c;有需要的…...

wordpress 通讯录插件/seo外链在线工具

我是卢松松&#xff0c;点点上面的头像&#xff0c;欢迎关注我哦&#xff01; 上个月我还发文章说《虚拟人能否代替直播带货?》结果这个月快手就推出了“快手虚拟演播助手”&#xff0c;而且还支持多平台推流直播&#xff0c;用户也能化身虚拟形象进入元宇宙直播间了。 这对…...

wordpress 前台发帖/怎么把产品快速宣传并推广

本节内容总结来自传智播客毕向东老师的公开课&#xff0c;感谢毕向东老师 &#xff01;如有错误之处&#xff0c;欢迎大家指教 &#xff01; String s “abc”; // “abc”是一个对象&#xff1b; 只要是字符串&#xff0c;都是String类的实例对象&#xff1b; String s “a…...

上海免费注册公司官网/搜索引擎网站排名优化方案

尽管广泛阅读了JDK源代码并检查了内在例程,但我还是不能一概而论.我正在测试清除使用ByteBuffer.putLong(int index,long value)用allocateDirect分配的ByteBuffer.基于JDK代码,如果缓冲区为“本机字节顺序”,则将导致单个8字节的写操作&#xff1b;如果不按字节交换,则将导致相…...

如何做企业网站后台管理/网络推广引流

1. 迁移背景介绍目前我们的图数据库数据量为 顶点 20 亿&#xff0c;边 200 亿的规模。在迁移之前我们使用的 AgensGraph 数据库一个主库四个备库&#xff0c;机器的配置都比较高&#xff0c;256G 内存 SSD 的磁盘&#xff0c;单机数据量为 3T左右。在数据量比较小的情况下 Age…...

wordpress 如何修改主题宽度/阐述网络营销策略的内容

解析&#xff1a;〈#ffffffff〉 #ffffffff由#加八位数字或字母组成&#xff0c;前两个ff为透明度&#xff08;十六进制&#xff09;&#xff0c;后面六位ffffff为颜色代码&#xff0c;采用RGB配色&#xff08;十六进制&#xff09; 需要修改的机油一般都要查找相关资料&#xf…...

wordpress 餐饮 主题/搜索引擎广告案例

php保存二进制原始数据为图片的程序代码得到post过来的二进制原始数据&#xff0c;选择一个生成路径及图片的名字&#xff0c;之后写入&#xff0c;思路很显而易见//生成图片$imgDir uploadImg/;$filename"nissangcj".$mobile.".jpg";///要生成的图片名字…...