当前位置: 首页 > news >正文

FlinkCDC实战:将 MySQL 数据同步至 ES

📌 当前需要处理的业务场景:

  • 将订单表和相关联的表(比如: 商品表、子订单表、物流信息表)组织成宽表, 放入到 ES 中, 加速订单数据的查询.

  • 同步数据到 es.

  • 概述

    • 1. 什么是 CDC

    • 2. 什么是 Flink CDC

    • 3. Flink CDC Connectors 和 Flink 的版本映射

  • 实战

    • 1. 宽表查询

      • 1.1 创建 mysql 表

      • 1.2 启动 Flink 集群和 Flink SQL CLI

      • 1.3 在 Flink SQL CLI 中使用 Flink DDL 创建表

      • 1.4 关联订单数据并且将其写入 Elasticsearch 中

      • 1.5 修改 MySQL 中的数据

本文用到的 mysql 版本为 8.0.28,ES 版本为 7.17.3,flink 版本为 1.13.6,flink-sql-connector-mysql-cdc 版本为 2.2.1

flink-sql-connector-elasticsearch 版本为 7_2.11-1.13.2

概述

1. 什么是 CDC

CDC (Change Data Capture) 是 变更数据获取的简称。核心思想是监测并捕获数据库的变动(数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整地记录下来,写入到消息中间件中以供其他服务进行订阅并消费。

2. 什么是 Flink CDC

Flink 社区开发了 flink-cdc-connectors 组件,这个一个可以直接从 MySQL、PostgreSQL等数据库直接 读取全量数据增量变更数据的source 组件。

3. Flink CDC Connectors 和 Flink 的版本映射

Flink CDC Conectors VersionFlink Version
1.0.01.11.*
1.1.01.11.*
1.2.01.12.*
1.3.01.12.*
1.4.01.13.*
2.0.*1.13.*

实战

1. 宽表查询

在商城项目中,商品、订单、物流的数据往往是存储在 MySQL 中,为了加速查询的效率,可以将数据组织成一张宽表,并实时地把它写到 ElasticSearch 中。

确保要监听的 mysql 服务器开启了 binlog 和对应监听 binlog 的账号有相对应的权限。

1.1 创建 mysql 表

-- MySQL
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;
​
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),(default,"car battery","12V car battery"),(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),(default,"hammer","12oz carpenter's hammer"),(default,"hammer","14oz carpenter's hammer"),(default,"hammer","16oz carpenter's hammer"),(default,"rocks","box of assorted rocks"),(default,"jacket","water resistent black wind breaker"),(default,"spare tire","24 inch spare tire");
​
CREATE TABLE orders (order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,order_date DATETIME NOT NULL,customer_name VARCHAR(255) NOT NULL,price DECIMAL(10, 5) NOT NULL,product_id INTEGER NOT NULL,order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;
​
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);CREATE TABLE shipments (shipment_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,order_id INTEGER NOT NULL,origin VARCHAR(255) NOT NULL,destination VARCHAR(255) NOT NULL,is_arrived BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
INSERT INTO shipments
VALUES (default,10001,'Beijing','Shanghai',false),(default,10002,'Hangzhou','Shanghai',false),(default,10003,'Shanghai','Hangzhou',false);

1.2 启动 Flink 集群和 Flink SQL CLI

# 进入 Flink 目录
cd flink-13.6
​
# 启动 Flink 集群.启动成功的话,可以在 http://ip:8081/ 访问到对应的 Flink Web UI
./bin/start-cluster.sh
​
# 启动 Flink SQL CLI
./bin/sql-client.sh

1.3 在 Flink SQL CLI 中使用 Flink DDL 创建表

首先开启 checkpoint, 每隔 3s 做一次 checkpoint.

-- Flink SQL
Flink SQL> SET execution.checkpointing.interval = 3s;

然后,对于数据库中的表 productsordersshipments,使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据。

-- Flink SQL
Flink SQL> CREATE TABLE products (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = '192.168.110.100','port' = '3380','username' = 'root','password' = 'root','database-name' = 'flink_db','table-name' = 'products');
​
​
Flink SQL> CREATE TABLE orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,PRIMARY KEY (order_id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = '192.168.110.100','port' = '3380','username' = 'root','password' = 'root','database-name' = 'flink_db','table-name' = 'orders');
​
​
Flink SQL> CREATE TABLE shipments (shipment_id INT,order_id INT,origin STRING,destination STRING,is_arrived BOOLEAN,PRIMARY KEY (shipment_id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = '192.168.110.100','port' = '3380','username' = 'root','password' = 'root','database-name' = 'flink_db','table-name' = 'shipments');

最后,创建 enriched_orders 表,用来将关联后的订单数据写入 Elasticsearch 中

-- Flink SQL
Flink SQL> CREATE TABLE enriched_orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,product_name STRING,product_description STRING,shipment_id INT,origin STRING,destination STRING,is_arrived BOOLEAN,PRIMARY KEY (order_id) NOT ENFORCED) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://114.132.43.99:9200','index' = 'enriched_orders');

1.4 关联订单数据并且将其写入 Elasticsearch 中

使用 Flink SQL 将订单表 order 与商品表 products,物流信息表 shipments 关联,并将关联后的订单信息写入 Elasticsearch 中。

-- Flink SQL
Flink SQL> INSERT INTO enriched_ordersSELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrivedFROM orders AS oLEFT JOIN products AS p ON o.product_id = p.idLEFT JOIN shipments AS s ON o.order_id = s.order_id;

现在可以通过 Kibana 查询包含商品和物流信息的订单数据。

首先访问 http://localhost:5601/app/kibana#/management/kibana/index_pattern 创建 index pattern enriched_orders.

然后就可以在 http://localhost:5601/app/kibana#/discover 看到写入的数据了.

1.5 修改 MySQL 中的数据

--MySQL
INSERT INTO orders
VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);
​
INSERT INTO shipments
VALUES (default,10004,'Shanghai','Beijing',false);

每执行一步就刷新一次 Kibana,可以看到 Kibana 中显示的订单数据将实时更新.

相关文章:

FlinkCDC实战:将 MySQL 数据同步至 ES

📌 当前需要处理的业务场景: 将订单表和相关联的表(比如: 商品表、子订单表、物流信息表)组织成宽表, 放入到 ES 中, 加速订单数据的查询. 同步数据到 es. 概述 1. 什么是 CDC 2. 什么是 Flink CDC 3. Flink CDC Connectors 和 Flink 的版本映射 实战 1. 宽表查…...

debug小记

红框: 步过:遇到方法不想进入方法 绿框:代码跑在第几行也可以看见 蓝框:可以显示变量的值,三种方式都可以看变量的值...

Qt C++ 显示多级结构体,包括结构体名、变量名和值

文章目录 mainwindow.hmainwindow.cppstructures.hmain.cpp QTreeView 和 QStandardItemModel 来实现。以下是实现这一功能的步骤和示例代码: 定义多级结构体: 假设你有一个多级结构体,如下所示: struct SubStruct {int subValue…...

【JAVA】旅游行业中大数据的使用

一、应用场景 数据采集与整合:全面收集旅游数据,如客流量、游客满意度等,整合形成统一数据集,为后续分析提供便利。 舆情监测与分析:实时监测旅游目的地的舆情信息,运用NLP算法进行智能处理,及…...

【AI+网络/仿真数据集】1分钟搭建云原生端到端5G网络

导语: 近期智慧网络开放创新平台上线了端到端网络仿真能力,区别于传统的网络仿真工具需要复杂的领域知识可界面操作,该平台的网络仿真能力主打一个小白友好和功能专业。 https://jiutian.10086.cn/open/​jiutian.10086.cn/open/ 端到端仿…...

微服务-01【续】

1.OpenFeign 上篇文章我们利用Nacos实现了服务的治理,利用利用RestTemplate实现了服务的远程调用。但是远程调用的代码太复杂了: 而且这种调用方式,与原本的本地方法调用差异太大,编程时的体验也不统一,一会儿远程调用…...

测试工程师八股文01|Linux系统操作

一、Linux系统操作 1、gzip tar和gzip结合使用 $ tar czf b.tar.gz *txt 以gzip方式打包并且压缩 $ tar xzf b.tar.gz -C btar 以gzip方式解压并解包,如果 btar 目录不存在,则需要先手动创建该目录。 代码第二行:如果没有指定 -C …...

【Qt】qt基础

目录 一、使用Qt Creator创建qt项目 二、项目文件解析 三、Qt中创建图形化界面的程序的两种方法 四、对象树 五、Qt中处理打印乱码问题的利器:qDebug() 一、使用Qt Creator创建qt项目 1.选择项目模板 选中第一类模板Application(Qt应用程序,包含普…...

UniScene:Video、LiDAR 和Occupancy全面SOTA

论文: https://arxiv.org/pdf/2412.05435 项目页面:https://arlo0o.github.io/uniscene/ 0. 摘要 生成高保真度、可控制且带有标注的训练数据对于自动驾驶至关重要。现有方法通常直接从粗糙的场景布局生成单一形式的数据,这不仅无法输出多样化下游任务…...

TensorFlow深度学习实战(1)——神经网络与模型训练过程详解

TensorFlow深度学习实战(1)——神经网络与模型训练过程详解 0. 前言1. 神经网络基础1.1 神经网络简介1.2 神经网络的训练1.3 神经网络的应用 2. 从零开始构建前向传播2.1 计算隐藏层节点值2.2 应用激活函数2.3 计算输出层值2.4 计算损失值2.4.1 在连续变…...

03篇--二值化与自适应二值化

二值化 定义 何为二值化?顾名思义,就是将图像中的像素值改为只有两种值,黑与白。此为二值化。 二值化操作的图像只能是灰度图,意思就是二值化也是一个二维数组,它与灰度图都属于单信道,仅能表示一种色调…...

基于python的一个简单的压力测试(DDoS)脚本

DDoS测试脚本 声明:本文所涉及代码仅供学习使用,任何人利用此造成的一切后果与本人无关 源码 import requests import threading# 目标URL target_url "http://47.121.xxx.xxx/"# 发送请求的函数 def send_request():while True:try:respo…...

基于 Spring Boot 实现图片的服务器本地存储及前端回显

??导读:本文探讨了在网站开发中图片存储的各种方法,包括本地文件系统存储、对象存储服务(如阿里云OSS)、数据库存储、分布式文件系统及内容分发网络(CDN)。文中详细对比了这些方法的优缺点,并…...

深入 TCP VJ-Style

接着 TCP 的文化内涵 继续扯一会儿。 自 30 instruction TCP receive 往前追溯,论文 Jacobson88 源自第一次拥塞崩溃,这篇著名文档在同时期的另一个缘起是另一篇考古文献 [Zhang86] Why TCP Timers Don’t Work Well,后面这篇文献提出了 TCP…...

go高性能单机缓存项目

代码 // Copyright 2021 ByteDance Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apach…...

数据结构绪论

文章目录 绪论数据结构三要素算法 🏡作者主页:点击! 🤖数据结构专栏:点击! ⏰️创作时间:2024年12月12日01点09分 绪论 数据是信息的载体,描述客观事物属性的数、字符及所有能输入…...

前端开发常用四大框架学习难度咋样?

前端开发常用四大框架指的是 jQuery vue react angular ‌jQuery‌: ‌学习难度‌:相对较低‌特点‌:jQuery 是一个快速、小巧、功能丰富的 JavaScript 库。它使得 HTML 文档遍历和操作、事件处理、动画和 Ajax 交互更加简单。‌适用场景‌&a…...

OWASP 十大安全漏洞的原理

1. Broken Access Control(访问控制失效) 原理:应用程序未正确实施权限检查,导致攻击者通过篡改请求、强制浏览或权限提升等手段绕过访问控制。 攻击手段: 修改 URL、HTML、或 API 请求以访问未经授权的资源。 删除…...

论文 | ChunkRAG: Novel LLM-Chunk Filtering Method for RAG Systems

本文详细介绍了一种新颖的检索增强生成(Retrieval-Augmented Generation, RAG)系统方法——ChunkRAG,该方法通过对文档的分块语义分析和过滤显著提升了生成系统的准确性和可靠性。 1. 研究背景与问题 1.1 检索增强生成的意义 RAG系统结合…...

ORACLE SQL思路: 多行数据有相同字段就合并成一条数据 分页展示

数据 分数表: 学号,科目名(A,B,C),分数 需求 分页列表展示, 如果一个学号的科目有相同的分数, 合并成一条数据,用 拼接 科目名 ORACLE SQL 实现 SELECT Z.*, SUBSTR(DECODE(f…...

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造,完美适配AGV和无人叉车。同时,集成以太网与语音合成技术,为各类高级系统(如MES、调度系统、库位管理、立库等)提供高效便捷的语音交互体验。 L…...

网络六边形受到攻击

大家读完觉得有帮助记得关注和点赞!!! 抽象 现代智能交通系统 (ITS) 的一个关键要求是能够以安全、可靠和匿名的方式从互联车辆和移动设备收集地理参考数据。Nexagon 协议建立在 IETF 定位器/ID 分离协议 (…...

Java 8 Stream API 入门到实践详解

一、告别 for 循环&#xff01; 传统痛点&#xff1a; Java 8 之前&#xff0c;集合操作离不开冗长的 for 循环和匿名类。例如&#xff0c;过滤列表中的偶数&#xff1a; List<Integer> list Arrays.asList(1, 2, 3, 4, 5); List<Integer> evens new ArrayList…...

Golang dig框架与GraphQL的完美结合

将 Go 的 Dig 依赖注入框架与 GraphQL 结合使用&#xff0c;可以显著提升应用程序的可维护性、可测试性以及灵活性。 Dig 是一个强大的依赖注入容器&#xff0c;能够帮助开发者更好地管理复杂的依赖关系&#xff0c;而 GraphQL 则是一种用于 API 的查询语言&#xff0c;能够提…...

苍穹外卖--缓存菜品

1.问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得&#xff0c;如果用户端访问量比较大&#xff0c;数据库访问压力随之增大 2.实现思路 通过Redis来缓存菜品数据&#xff0c;减少数据库查询操作。 缓存逻辑分析&#xff1a; ①每个分类下的菜品保持一份缓存数据…...

SAP学习笔记 - 开发26 - 前端Fiori开发 OData V2 和 V4 的差异 (Deepseek整理)

上一章用到了V2 的概念&#xff0c;其实 Fiori当中还有 V4&#xff0c;咱们这一章来总结一下 V2 和 V4。 SAP学习笔记 - 开发25 - 前端Fiori开发 Remote OData Service(使用远端Odata服务)&#xff0c;代理中间件&#xff08;ui5-middleware-simpleproxy&#xff09;-CSDN博客…...

搭建DNS域名解析服务器(正向解析资源文件)

正向解析资源文件 1&#xff09;准备工作 服务端及客户端都关闭安全软件 [rootlocalhost ~]# systemctl stop firewalld [rootlocalhost ~]# setenforce 0 2&#xff09;服务端安装软件&#xff1a;bind 1.配置yum源 [rootlocalhost ~]# cat /etc/yum.repos.d/base.repo [Base…...

STM32HAL库USART源代码解析及应用

STM32HAL库USART源代码解析 前言STM32CubeIDE配置串口USART和UART的选择使用模式参数设置GPIO配置DMA配置中断配置硬件流控制使能生成代码解析和使用方法串口初始化__UART_HandleTypeDef结构体浅析HAL库代码实际使用方法使用轮询方式发送使用轮询方式接收使用中断方式发送使用中…...

宇树科技,改名了!

提到国内具身智能和机器人领域的代表企业&#xff0c;那宇树科技&#xff08;Unitree&#xff09;必须名列其榜。 最近&#xff0c;宇树科技的一项新变动消息在业界引发了不少关注和讨论&#xff0c;即&#xff1a; 宇树向其合作伙伴发布了一封公司名称变更函称&#xff0c;因…...

【网络安全】开源系统getshell漏洞挖掘

审计过程&#xff1a; 在入口文件admin/index.php中&#xff1a; 用户可以通过m,c,a等参数控制加载的文件和方法&#xff0c;在app/system/entrance.php中存在重点代码&#xff1a; 当M_TYPE system并且M_MODULE include时&#xff0c;会设置常量PATH_OWN_FILE为PATH_APP.M_T…...