当前位置: 首页 > news >正文

基于 Flink SQL CDC 数据处理的终极武器

文章目录

    • 一、传统的数据同步方案与 Flink SQL CDC 解决方案
      • 1.1 Flink SQL CDC 数据同步与原理解析
      • 1.2 基于日志的 CDC 方案介绍
      • 1.3 选择 Flink 作为 ETL 工具
    • 二、 基于 Flink SQL CDC 的数据同步方案实践
    • 2.1 CDC Streaming ETL
    • 2.2 Flink-CDC实践之mysql案例


来源互联网多篇文章总结

一、传统的数据同步方案与 Flink SQL CDC 解决方案

业务系统经常会遇到需要更新数据到多个存储的需求。例如:一个订单系统刚刚开始只需要写入数据库即可完成业务使用。某天 BI 团队期望对数据库做全文索引,于是我们同时要写多一份数据到 ES 中,改造后一段时间,又有需求需要写入到 Redis 缓存中。

很明显这种模式是不可持续发展的,这种双写到各个数据存储系统中可能导致不可维护和扩展,数据一致性问题等,需要引入分布式事务,成本和复杂度也随之增加。

我们可以通过 CDC(Change Data Capture)工具进行解除耦合,同步到下游需要同步的存储系统,实现一份变动记录,实时处理并投递到多个目的地。通过这种方式提高系统的稳健性,也方便后续的维护。

1.1 Flink SQL CDC 数据同步与原理解析

CDC 是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。在同步过程中,还可以对数据进行一定的处理,例如分组(GROUP BY)、多表的关联(JOIN)等。

业界主要有基于查询的 CDC 和基于日志的 CDC ,可以从下面表格对比他们功能和差异点。

  • 基于查询的 CDC

    用户通常会在数据源表的某个字段中,保存上次更新的时间戳或版本号等信息,然后下游通过不断的查询和与上次的记录做对比,来确定数据是否有变动,是否需要同步。这种方式优点是不涉及数据库底层特性,实现比较通用;缺点是要对业务表做改造,且实时性不高,不能确保跟踪到所有的变更记录,且持续的频繁查询对数据库的压力较大。

    特点:基于批处理,不能捕获到所有数据的变化、高延迟、需要查询数据库,会增加数据库压力

  • 基于日志的 CDC

    可以通过触发器(Trigger)或者日志(例如 Transaction log、Binary log、Write-ahead log 等)来实现。当数据源表发生变动时,会通过附加在表上的触发器或者 binlog 等途径,将操作记录下来。下游可以通过数据库底层的协议,订阅并消费这些事件,然后对数据库变动记录做重放,从而实现同步。这种方式的优点是实时性高,可以精确捕捉上游的各种变动;缺点是部署数据库的事件接收和解析器(例如 Debezium、Canal 等),有一定的学习和运维成本,对一些冷门的数据库支持不够。综合来看,事件接收模式整体在实时性、吞吐量方面占优,如果数据源是 MySQL、PostgreSQL、MongoDB 等常见的数据库实现,建议使用Debezium来实现变更数据的捕获。

    特点: 基于streaming模式、能捕捉所有数据的变化、低延迟、不会增加数据库压力。

经过以上对比,我们可以发现基于日志 CDC 有以下这几种优势:

  • 能够捕获所有数据的变化,捕获完整的变更记录。在异地容灾,数据备份等场景中得到广泛应用,如果是基于查询的 CDC 有可能导致两次查询的中间一部分数据丢失
  • 每次 DML 操作均有记录无需像查询 CDC 这样发起全表扫描进行过滤,拥有更高的效率和性能,具有低延迟,不增加数据库负载的优势
  • 无需入侵业务,业务解耦,无需更改业务模型
  • 捕获删除事件和捕获旧记录的状态,在查询 CDC 中,周期的查询无法感知中间数据是否删除

常见开源CDC方案比较

1.2 基于日志的 CDC 方案介绍

从 ETL 的角度进行分析,一般采集的都是业务库数据,这里使用 MySQL 作为需要采集的数据库,通过 Debezium 把 MySQL Binlog 进行采集后发送至 Kafka 消息队列,然后对接一些实时计算引擎或者 APP 进行消费后把数据传输入 OLAP 系统或者其他存储介质。

Flink 希望打通更多数据源,发挥完整的计算能力。我们生产中主要来源于业务日志和数据库日志,Flink 在业务日志的支持上已经非常完善,但是在数据库日志支持方面在 Flink 1.11 前还属于一片空白,这就是为什么要集成 CDC 的原因之一。

Flink SQL 内部支持了完整的 changelog 机制,所以 Flink 对接 CDC 数据只需要把CDC 数据转换成 Flink 认识的数据

1.3 选择 Flink 作为 ETL 工具

之前的mysql binlog日志处理流程,例如canal监听binlog把日志写入到kafka中。而Flink实时消费Kakfa的数据实现mysql数据的同步或其他内容等。

拆分来说整体上可以分为以下几个阶段:

  • mysql开启binlog
  • canal同步binlog数据写入到kafka
  • flink读取kakfa中的binlog数据进行相关的业务处理。
  • 整体的处理链路较长,需要用到的组件也比较多。

Apache Flink CDC可以直接从数据库获取到binlog供下游进行业务计算分析。简单来说链路如下图:

社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。目前也已开源,开源地址:

https://github.com/ververica/flink-cdc-connectors

flink-cdc-connectors 可以用来替换 Debezium+Kafka 的数据采集模块,从而实现 Flink SQL 采集+计算+传输(ETL)一体化,这样做的优点有以下:

  • 开箱即用,简单易上手
  • 减少维护的组件,简化实时链路,减轻部署成本
  • 减小端到端延迟
  • Flink 自身支持 Exactly Once 的读取和计算
  • 数据不落地,减少存储成本
  • 支持全量和增量流式读取
  • binlog 采集位点可回溯*

二、 基于 Flink SQL CDC 的数据同步方案实践

下面给大家带来2个关于 Flink SQL + CDC 在实际场景中使用较多的案例。在完成实验时候,你需要 Docker、MySQL、Elasticsearch 等组件,具体请参考每个案例参考文档。

2.1 CDC Streaming ETL

模拟电商公司的订单表和物流表,需要对订单数据进行统计分析,对于不同的信息需要进行关联后续形成订单的大宽表后,交给下游的业务方使用 ES 做数据分析,这个案例演示了如何只依赖 Flink 不依赖其他组件,借助 Flink 强大的计算能力实时把 Binlog 的数据流关联一次并同步至 ES 。

例如如下的这段 Flink SQL 代码就能完成实时同步 MySQL 中 orders 表的全量+增量数据的目的。

CREATE TABLE orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN
) WITH ('connector' = 'mysql-cdc','hostname' = 'localhost','port' = '3306','username' = 'root','password' = '123456','database-name' = 'mydb','table-name' = 'orders'
);SELECT * FROM orders

2.2 Flink-CDC实践之mysql案例

  1. 开启mysql binlog

查看mysql-binlog状态并开启mysql-binlog

上图是开始的状态。如果没有开始,则log_bin=off,log_bin_basename和log_bin_index值为空。开启方式如下:

vim vim /etc/my.cnf

在添加以下信息:

#开启binglog
server-id=1
log-bin=/var/lib/mysql/mysql-bin
  • server-id表示单个结点的id,这里由于只有一个结点,所以可以把id随机指定为一个数,这里将id设置成1。若集群中有多个结点,则id不能相同
  • 第二句是指定binlog日志文件的名字为mysql-bin,以及其存储路径。
    添加完成后保存退出。

重启mysql服务:

service mysqld restart
  1. 编写flinksql
  • 源表:
create table Flink_source(id bigint, name string, age int,dt string)
with('connector' = 'mysql-cdc','hostname' = '192.168.1.180','port' = '3306','username' = 'root','password' = '123456','database-name' = 'test','table-name' = 'Flink_source'
);

可以知道,我们要去实时取Flink_source表,而这张表已经存储于mysql数据库的。

  • 目标表:
create table Flink_target(id bigint primary key, name string, age int,dt string)
with('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.1.180:3306/test','username'='root','password'='123456','table-name' = 'Flink_target','sink.buffer-flush.max-rows'='1','sink.buffer-flush.interval'='0'
);

可以知道,我们到实时存入目标表Flink_target,而这张表已经存储于mysql数据库。

  • 插入数据
insert into Flink_target select * from Flink_source;

相关文章:

基于 Flink SQL CDC 数据处理的终极武器

文章目录 一、传统的数据同步方案与 Flink SQL CDC 解决方案1.1 Flink SQL CDC 数据同步与原理解析1.2 基于日志的 CDC 方案介绍1.3 选择 Flink 作为 ETL 工具 二、 基于 Flink SQL CDC 的数据同步方案实践2.1 CDC Streaming ETL2.2 Flink-CDC实践之mysql案例 来源互联网多篇文…...

uniapp使用HQChart的k线,用webSocket更新数据

项目:不借用HQChart的各种接口数据,即数据后端返回,但是数据格式要和原数据格式一样。 //k线图 CreateHQChartKLine(){var chartHeightuni.upx2px(this.ChartHeight);let hqchartCtrlthis.$refs.HQChartCtrl;hqchartCtrl.KLine.Option.Type&…...

idea的Plugins中搜索不到插件

1、ctrlalts 打开设置 ; 2、搜索框输入plugins; 3、点击plugins; 4、点齿轮按钮,选择HTTP Proxy settings; 如下操作: 5、刷新DNS,ipconfig /flushdns 6、重新打开idea 的plugins 插件列表出来了...

flask 实现简单的登录系统demo

你提供的代码是一个基本的Flask应用程序,实现了一个简单的登录系统。以下是代码的详细解释: 1. 导入必要的模块:os 用于生成密钥,Flask 用于创建Web应用程序。 2. 创建Flask应用程序的实例,并为会话管理设置一个密钥。…...

Spring Security安全配置

使用Spring Boot与Spring MVC进行Web开发时,如果项目引入spring-boot- starter-security依赖启动器,MVC Security 安全管理功能就会自动生效,其默认的安全配置是在SecurityAutoConfiguration和UserDetailsServiceAutoConfiguration中实现的。…...

2023Java后端开发之100道常见经典面试题

目录 1.重载和重写的区别? 2.String 和 StringBuffer、StringBuilder 的区别是什么? 3. 与 equals 的区别? 4.抽象类和接口的区别是什么? 5. 面向对象的特点 6.Collection 和 Collections 有什么区别? 7.List、Set、Map 之…...

Redis详解,包括安装命令,应用场景,优缺点,案列分析,各个开发语言如何应用

目录 1、安装命令2、应用场景3、优缺点4、案例分析5、各个开发语言如何应用? Redis 是一个基于内存的开源数据库系统,被广泛应用于 Web 应用、消息队列、缓存、实时统计等领域。下面是 Redis 的详解,包括安装命令、应用场景和优缺点&#xff…...

AI数字人:金融数字化转型的“关键先生”

今年年初ChatGPT的火热,在全球掀起一阵生成式AI(AIGC)热潮。国外的OpenAI、国内的百度等企业,都在AIGC上强力布局。 各种应用场景中,AIGC助力的数字人引起了市场注意。 事实上,数字人不是个新鲜事。早在1…...

mac关闭VPN之后,浏览器就不能够正常上网了(图解)

可能打开谷歌浏览器会显示无法正常连接网络等信息,这个时候可以按照以下步骤: 点击"检查代理服务器地址" 会显示以下这段话,按照这个步骤来操作就可以了。 打开系统偏好设置,点击网络 点击高级 取消掉所有已勾选代…...

YOLOv5改进系列(17)——更换IoU之MPDIoU(ELSEVIER 2023|超越WIoU、EIoU等|实测涨点)

【YOLOv5改进系列】前期回顾: YOLOv5改进系列(0)——重要性能指标与训练结果评价及分析 YOLOv5改进系列(1)——添加SE注意力机制...

基于WSL2、Ubuntu和VS Code的CUDA平台运行C语言程序

一、CUDA程序执行方法 执行步骤为: 安装Visual Studio Code。在Visual Studio Code中安装插件WSL与电脑的WSL2进行连接。点击左下角,然后再选择连接到WSL。 在WSL中创建以 .cu 为后缀的文件。 rootDESKTOP-HR6VO5J:~# mkdir CUDA /…...

构建外卖系统小程序,订单管理功能实现步骤详解

外卖系统小程序是近年来越来越受欢迎的一种订餐方式,方便快捷,并且可以减少人与人之间的接触,更加卫生安全。为了搭建一个完善的外卖系统小程序,订单管理功能是必不可少的一部分。在本文中,我们将详细介绍如何实现订单…...

用asp.net开发h5网页版视频播放网站,类似优酷,jellyfin,emby

之前用jellyfin开源软件搞了一个视频播放服务器,用来共享给家里人看电影和电视剧,jellyfin虽然各方面功能都很强大,但是界面和使用习惯都很不适合,于是就想着利用下班休息时间做一套自己喜欢的视频网站出来. 本来是打算直接用jellyfin的源码进行修改,源码是用C# netcore 写的服…...

Redis—相关背景

Redis—相关背景 🔎Redis—特性In-memory data structures—在内存中存储数据Programmability—可编程性Extensibility—可扩展性Persistence—持久化Clustering—集群High availability—高可用 🔎Redis 为什么快🔎Redis 的使用场景Real-tim…...

SSL 证书过期巡检脚本

哈喽大家好,我是咸鱼 我们知道 SSL 证书是会过期的,一旦过期之后需要重新申请。如果没有及时更换证书的话,就有可能导致网站出问题,给公司业务带来一定的影响 所以说我们要每隔一定时间去检查网站上的 SSL 证书是否过期 如果公…...

leetcode 面试题 01.03. URL化

⭐️ 题目描述 🌟 leetcode链接:面试题 01.03. URL化 思路: 计算出空格的个数,我们可以知道最后一个字符的位置 endPos,再从后 end 向前遍历若不是空格正常拷贝,是空格则替换成 %20,最终当空格…...

uni-app在小米手机上运行【步骤细节】

注意细节重点: 1.手机使用数据线与电脑连接,手机连接模式必须是传输文件模式 2.手机必须打开开发者模式 3.打开开发者模式后,仔细浏览并调整USB调试权限,重点打开USB是否允许安装按钮!!! 操作步…...

微信小程序实现日历功能、日历转换插件、calendar

文章目录 演示htmlJavaScript 演示 效果图 微信小程序实现交互 html <view wx:if"{{calendarArr.length}}"><view class"height_786 df_fdc_aic"><view class"grid_c7_104"><view class"font_weight_800 text_align…...

【浩鲸科技】济南Java后端面经

本文目录 写在前面试题总览题目解析1.说一下SpringBoot中常用的注解2.Redis中的基本数据类型3.TCP的网络协议4.java中常见的锁5.Hashmap的底层数据结构、底层源码、扩容机制源码6.java面向对象的特点 写在前面 关于这个专栏&#xff1a; 本专栏记录一些互联网大厂、小厂的面试实…...

VMware搭建Hadoop集群 for Windows(完整详细,实测可用)

目录 一、VMware 虚拟机安装 &#xff08;1&#xff09;虚拟机创建及配置 &#xff08;2&#xff09;创建工作文件夹 二、克隆虚拟机 三、配置虚拟机的网络 &#xff08;1&#xff09;虚拟网络配置 &#xff08;2&#xff09;配置虚拟机 主机名 &#xff08;3&#xf…...

Docker 离线安装指南

参考文章 1、确认操作系统类型及内核版本 Docker依赖于Linux内核的一些特性&#xff0c;不同版本的Docker对内核版本有不同要求。例如&#xff0c;Docker 17.06及之后的版本通常需要Linux内核3.10及以上版本&#xff0c;Docker17.09及更高版本对应Linux内核4.9.x及更高版本。…...

突破不可导策略的训练难题:零阶优化与强化学习的深度嵌合

强化学习&#xff08;Reinforcement Learning, RL&#xff09;是工业领域智能控制的重要方法。它的基本原理是将最优控制问题建模为马尔可夫决策过程&#xff0c;然后使用强化学习的Actor-Critic机制&#xff08;中文译作“知行互动”机制&#xff09;&#xff0c;逐步迭代求解…...

R语言AI模型部署方案:精准离线运行详解

R语言AI模型部署方案:精准离线运行详解 一、项目概述 本文将构建一个完整的R语言AI部署解决方案,实现鸢尾花分类模型的训练、保存、离线部署和预测功能。核心特点: 100%离线运行能力自包含环境依赖生产级错误处理跨平台兼容性模型版本管理# 文件结构说明 Iris_AI_Deployme…...

相机Camera日志实例分析之二:相机Camx【专业模式开启直方图拍照】单帧流程日志详解

【关注我&#xff0c;后续持续新增专题博文&#xff0c;谢谢&#xff01;&#xff01;&#xff01;】 上一篇我们讲了&#xff1a; 这一篇我们开始讲&#xff1a; 目录 一、场景操作步骤 二、日志基础关键字分级如下 三、场景日志如下&#xff1a; 一、场景操作步骤 操作步…...

SCAU期末笔记 - 数据分析与数据挖掘题库解析

这门怎么题库答案不全啊日 来简单学一下子来 一、选择题&#xff08;可多选&#xff09; 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘&#xff1a;专注于发现数据中…...

基于Uniapp开发HarmonyOS 5.0旅游应用技术实践

一、技术选型背景 1.跨平台优势 Uniapp采用Vue.js框架&#xff0c;支持"一次开发&#xff0c;多端部署"&#xff0c;可同步生成HarmonyOS、iOS、Android等多平台应用。 2.鸿蒙特性融合 HarmonyOS 5.0的分布式能力与原子化服务&#xff0c;为旅游应用带来&#xf…...

Python爬虫(一):爬虫伪装

一、网站防爬机制概述 在当今互联网环境中&#xff0c;具有一定规模或盈利性质的网站几乎都实施了各种防爬措施。这些措施主要分为两大类&#xff1a; 身份验证机制&#xff1a;直接将未经授权的爬虫阻挡在外反爬技术体系&#xff1a;通过各种技术手段增加爬虫获取数据的难度…...

高防服务器能够抵御哪些网络攻击呢?

高防服务器作为一种有着高度防御能力的服务器&#xff0c;可以帮助网站应对分布式拒绝服务攻击&#xff0c;有效识别和清理一些恶意的网络流量&#xff0c;为用户提供安全且稳定的网络环境&#xff0c;那么&#xff0c;高防服务器一般都可以抵御哪些网络攻击呢&#xff1f;下面…...

【7色560页】职场可视化逻辑图高级数据分析PPT模版

7种色调职场工作汇报PPT&#xff0c;橙蓝、黑红、红蓝、蓝橙灰、浅蓝、浅绿、深蓝七种色调模版 【7色560页】职场可视化逻辑图高级数据分析PPT模版&#xff1a;职场可视化逻辑图分析PPT模版https://pan.quark.cn/s/78aeabbd92d1...

在Ubuntu24上采用Wine打开SourceInsight

1. 安装wine sudo apt install wine 2. 安装32位库支持,SourceInsight是32位程序 sudo dpkg --add-architecture i386 sudo apt update sudo apt install wine32:i386 3. 验证安装 wine --version 4. 安装必要的字体和库(解决显示问题) sudo apt install fonts-wqy…...