FlinkSQL处理Canal-JSON数据
背景信息
Canal是一个CDC(ChangeLog Data Capture,变更日志数据捕获)工具,可以实时地将MySQL变更传输到其他系统。Canal为变更日志提供了统一的数据格式,并支持使用JSON或protobuf序列化消息(Canal默认使用protobuf)。支持Canal格式的连接器有消息队列Kafka和对象存储OSS。
Flink支持将Canal的JSON消息解析为INSERT、UPDATE或DELETE消息到Flink SQL系统中。在很多情况下,利用Canal这个特性非常的有用,例如:
-
将增量数据从数据库同步到其他系统
-
日志审计
-
数据库的实时物化视图
-
数据库表的temporal join变更历史
Flink还支持将Flink SQL中的INSERT、UPDATE或DELETE消息编码为Canal格式的JSON消息,输出到Kafka等存储中。
重要
目前Flink还不支持将UPDATE_BEFORE和UPDATE_AFTER合并为一条UPDATE消息。因此,Flink将UPDATE_BEFORE和UPDATE_AFTER分别编码为DELETE和INSERT类型的Canal消息。
将Kafka topic注册成Flink表之后,您可以将Canal消息用作变更日志源。
-- 关于MySQL "products" 表的实时物化视图。
-- 计算相同产品的最新平均重量。
SELECT name, AVG(weight) FROM topic_products GROUP BY name;-- 将MySQL "products" 表的所有数据和增量更改同步到Elasticsearch "products" 索引以供将来搜索。
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;
配置选项
选项 | 要求 | 默认 | 类型 | 描述 |
format | 必填 | (none) | String | 指定要使用的格式,使用Canal格式时,参数取值为canal-json。 |
canal-json.ignore-parse-errors | 选填 | false | Boolean | 参数取值如下:
|
canal-json.timestamp-format.standard | 选填 | SQL | String | 指定输入和输出时间戳格式。参数取值如下:
|
canal-json.map-null-key.mode | 选填 | FAIL | String | 指定处理Map中key值为空的方法。参数取值如下:
|
canal-json.map-null-key.literal | 选填 | null | String | 当canal-json.map-null-key.mode的值是LITERAL时,指定字符串常量替换Map中的空key值。 |
canal-json.encode.decimal-as-plain-number | 选填 | false | Boolean | 参数取值如下:
|
canal-json.database.include | 选填 | (none) | String | 一个可选的正则表达式,通过正则匹配Canal记录中的database元字段,仅读取指定数据库的changelog记录。正则字符串与Java的Pattern兼容。 |
canal-json.table.include | 选填 | (none) | String | 一个可选的正则表达式,通过正则匹配Canal记录中的table元字段,仅读取指定表的changelog记录。正则字符串与Java的Pattern兼容。 |
类型映射
目前,Canal使用JSON格式进行序列化和反序列化。有关数据类型映射的更多详细信息,请参阅JSON Format。Canal格式额外兼容了数据传输服务DTS在Kafka集群存储使用的Canal扩展变更类型(INIT)。请参见Kafka集群的数据存储格式。
其他使用说明
可用的元数据
下面的格式元数据可以在DDL语句中声明为只读(VIRTUAL)列。
重要
格式元数据字段只有在相应的连接器转发格式元数据时才可用。目前,只有Kafka连接器能够声明其值格式的元数据字段。
键 | 数据类型 | 说明 |
database | STRING NULL | 原始数据库。对应于Canal记录中的database字段。 |
table | STRING NULL | 原始数据库的表。对应于Canal记录中的table字段。 |
sql-type | MAP<STRING, INT> NULL | 各种sql类型的映射。对应于Canal记录中的sqlType字段。 |
pk-names | ARRAY<STRING> NULL | 主键名称数组。对应于Canal记录中的pkNames字段。 |
ingestion-timestamp | TIMESTAMP_LTZ(3) NULL | 连接器处理事件时的时间戳。对应于Canal记录中的ts字段。 |
如何在Kafka中访问Canal元数据字段的代码示例如下。
CREATE TABLE KafkaTable (origin_database STRING METADATA FROM 'value.database' VIRTUAL,origin_table STRING METADATA FROM 'value.table' VIRTUAL,origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL,origin_pk_names ARRAY<STRING> METADATA FROM 'value.pk-names' VIRTUAL,origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,user_id BIGINT,item_id BIGINT,behavior STRING
) WITH ('connector' = 'kafka','topic' = 'user_behavior','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','value.format' = 'canal-json'
);
常见问题
故障时投递重复的变更事件
在正常的操作环境下,Canal能够以exactly-once的语义投递每条变更事件,Flink能够正常消费Canal产生的变更事件。在非正常情况下(例如有故障发生),Canal只能保证at-least-once的投递语义。此时,Canal可能会投递重复的变更事件到Kafka中,当Flink从Kafka中消费的时候就会得到重复的事件,可能导致Flink query的运行得到错误的结果或者非预期的异常。因此,在这种情况下,建议将作业参数table.exec.source.cdc-events-duplicate设置成true,并在该source上定义PRIMARY KEY。Flink系统会生成一个额外的有状态算子,使用该PRIMARY KEY来对变更事件去重并生成一个规范化的changelog流。
参考:Canal格式的使用方法和类型映射_实时计算 Flink版(Flink)-阿里云帮助中心
相关文章:
FlinkSQL处理Canal-JSON数据
背景信息 Canal是一个CDC(ChangeLog Data Capture,变更日志数据捕获)工具,可以实时地将MySQL变更传输到其他系统。Canal为变更日志提供了统一的数据格式,并支持使用JSON或protobuf序列化消息(Canal默认使用…...
玩转贝启科技BQ3588C开源鸿蒙系统开发板 —— DevEco Studio下载与安装
一、下载DevEco Studio IDE开发工具 1. 登录鸿蒙官网 网址为: 华为HarmonyOS智能终端操作系统官网 | 应用设备分布式开发者生态 页面如下: 2. 搜索“DevEco Studio IDE” 点击右上角的“请输入关键词”,在其中搜索“DevEc…...
大模型上下文长度的超强扩展:从LongLora到LongQLora
前言 本文一开始是《七月论文审稿GPT第2版:从Meta Nougat、GPT4审稿到Mistral、LongLora Llama》中4.3节的内容,但考虑到 一方面,LongLora的实用性较高二方面,为了把LongLora和LongQLora更好的写清楚,而不至于受篇幅…...
pdf格式转换为txt格式
pdf文档转换为txt文档 首先在python3虚拟环境中安装PyPDF2 Python 3.6.8 (default, Jun 20 2023, 11:53:23) [GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux Type "help", "copyright", "credits" or "license" for more infor…...
scss使用for循环遍历,动态赋值类名并配置不同颜色
需求:后端要传入不同的等级,前端通过等级展示不同的字体颜色,通过scss遍历更有利于动态修改颜色或者增删等级 1.通过 for $i from 1 through 4 定义循环,索引值为i 2.nth($colors, $i) 取出对应的颜色 $colors: #ff0000, #00ff…...
GaussDB数据库使用COPY命令导数
目录 一、前言 二、GaussDB数据库使用COPY命令导数语法 1、语法COPY FROM 2、语法COPY TO 3、特别说明及参数示意 三、GaussDB数据库使用COPY命令导数示例 1、操作步骤 2、准备工作(示例) 3、把一个表的数据拷贝到一个文件(示例&…...
SunFMEA软件免费试用:FMEA的目标和限制是什么?
免费试用FMEA软件-免费版-SunFMEA FMEA,即故障模式与影响分析,是一种预防性的质量工具,旨在识别、评估和优先处理潜在的故障模式及其对系统性能的影响。其目标是提高产品和过程的可靠性和安全性,降低产品故障的风险,并…...
【Redis交响乐】Redis中的数据类型/内部编码/单线程模型
文章目录 一. Redis中的数据类型和内部编码二. Redis的单线程模型面试题: redis是单线程模型,为什么效率之高,速度之快呢? 在上一篇博客中我们讲述了Redis中的通用命令,本篇博客中我们将围绕每个数据结构来介绍相关命令. 一. Redis中的数据类型和内部编码 type命令实际返回的…...
APK 瘦身
APK 瘦身的主要原因是考虑应用的下载转化率和留存率,应用太大了,用户可能就不下载了。再者,因为手机空间问题,用户有可能会卸载一些占用空间比较大的应用,所以,应用的大小也会影响留存率。 1 APK 的结构 …...
GitHub上的15000个Go模块存储库易受劫持攻击
内容概要: 目前研究发现,GitHub上超过15000个Go模块存储库容易受到一种名为“重新劫持”的攻击。 由于GitHub用户名的更改会造成9000多个存储库容易被重新劫持,同时因为帐户删除,会对6000多个存储库造成重新劫持的危机。目前统计…...
避免3ds Max效果图渲染一片黑的4个正确解决方法
在进行3ds Max效果图渲染时,有时候会遇到渲染一片黑的情况,这给我们的工作带来了很大的困扰。为了解决这个问题,下面我将介绍4个正确的解决方法。 1.相机位置 首先需要考虑场景内的相机位置是否有问题。如果相机放在了模型的内部或者墙体的外…...
UI演示双视图立体匹配与重建
相关文章: PyQt5和Qt designer的详细安装教程:https://blog.csdn.net/qq_43811536/article/details/135185233?spm1001.2014.3001.5501Qt designer界面和所有组件功能的详细介绍:https://blog.csdn.net/qq_43811536/article/details/1351868…...
添加一个编辑的小功能(PHP的Laravel)
一个编辑的按钮可以弹出会话框修改断更天数 前台 加一个编辑按钮的样式,他的名字是固定好的 之前有人封装过直接用就好,但是一定放在class里面,不要放在id里面 看见不认识的方法一定要去看里面封装的是什么 之前就是没有看,所以…...
YOLOv8改进 | 主干篇 | ConvNeXtV2全卷积掩码自编码器网络
一、本文介绍 本文给大家带来的改进机制是ConvNeXtV2网络,ConvNeXt V2是一种新型的卷积神经网络架构,它融合了自监督学习技术和架构改进,特别是加入了全卷积掩码自编码器框架和全局响应归一化(GRN)层。我将其替换YOLOv8的特征提取网络,用于提取更有用的特征。经过我的实…...
elasticsearch7.17.9两节点集群改为单节点
需求 将数据从node-23-1节点中迁移到node-83-1节点。但是现在node-83-1并没有加入到集群中,因此首先将node-83-1加入到node-23-1的集群 解决方案 使用ES版本为7.17.9,最开始设置集群为一个节点,node-23-1的配置如下 cluster.name: my-app…...
二叉树的层序遍历,力扣
目录 题目地址: 题目: 我们直接看题解吧: 解题方法: 方法分析: 解题分析: 解题思路: 代码实现: 代码补充说明: 题目地址: 102. 二叉树的层序遍历 - 力扣&…...
构建Dockerfile报错/bin/sh: 1: cd: can‘t cd to /xxx/yyy问题记录
目录 关键的命令行 排查分析 原因 附:Dockerfile构建时打印命令输出的办法 关键的命令行 WORKDIR /app COPY record . RUN cd record && xxx 执行到RUN时报了错: /bin/sh: 1: cd: cant cd to /app/record 并且宿主机当前目录也准备好了re…...
Vue常用的修饰符详解(有哪些,怎么用)
文章目录 一、修饰符是什么二、修饰符的作用1.表单修饰符lazytrimnumber 2.事件修饰符stoppreventselfoncecapturepassivenative 3.鼠标按钮修饰符4.键盘修饰符5.v-bind修饰符asyncpropscamel 三、应用场景参考文献 一、修饰符是什么 在程序世界里,修饰符是用于限定…...
Linux C/C++ 获取CPUID
实现方式: INTEL CC 格式 AT^T CC 格式 GCC/C库 __cpuid 宏 大致讲义: AT^T 格式汇编很反人类,GCC可以改编译器选项为INTEL内嵌汇编,但一般在GCC还是按照默认的AT^T汇编来拽写把,不想用也可以让AI工具把INTEL内嵌…...
2023年“中银杯”安徽省网络安全B模块(部分解析)
前言 以下是2023年中银杯安徽省网络安全B模块题目,镜像可以私聊我 B模块安全事件响应/网络安全数据取证/应用安全(400 分) B-1:CMS网站渗透测试 任务环境说明: √服务器场景:Server2206(关…...
194.【2023年华为OD机试真题(C卷)】单行道汽车通行时间(迭代计算—JavaPythonC++JS实现)
请到本专栏顶置查阅最新的华为OD机试宝典 点击跳转到本专栏-算法之翼:华为OD机试 🚀你的旅程将在这里启航!本专栏所有题目均包含优质解题思路,高质量解题代码,详细代码讲解,助你深入学习,深度掌握! 文章目录 【2023年华为OD机试真题(C卷)】单行道汽车通行时间(…...
第二证券机构策略:股指预计维持蓄势震荡格局 关注煤炭、电力等板块
第二证券以为,技能面看,在元旦节前资金抄底推进指数收回2900整数关口,并向着3000点渠道压力前进。沪指在底部均线位支撑摆放较强,调整空间估计不大,在3000点渠道下方调整就是再次优化低吸的时机。操作上,在…...
Go 泛型之泛型约束
Go 泛型之泛型约束 文章目录 Go 泛型之泛型约束一、引入二、最宽松的约束:any三、支持比较操作的内置约束:comparable四、自定义约束五、类型集合(type set)六、简化版的约束形式七、约束的类型推断八、小结 一、引入 虽然泛型是…...
【数据仓库与联机分析处理】数据仓库
目录 一、数据仓库的概念 二、数据仓库与操作性数据库的区别 三、发展前期 四、数据仓库的系统结构 五、建模划分 六、主要案例 一、数据仓库的概念 目前很难给数据仓库(Data Warehouse)一个严格的定义,不准确地说,数据仓库…...
机器学习:贝叶斯估计在新闻分类任务中的应用
文章摘要 随着互联网的普及和发展,大量的新闻信息涌入我们的生活。然而,这些新闻信息的质量参差不齐,有些甚至包含虚假或误导性的内容。因此,对新闻进行有效的分类和筛选,以便用户能够快速获取真实、有价值的信息&…...
[C#]基于deskew算法实现图像文本倾斜校正
【算法介绍】 让我们开始讨论Deskeweing算法的一般概念。我们的主要目标是将旋转的图像分成文本块,并确定它们的角度。为了让您详细了解我将使用的方法: 照常-将图像转换为灰度。应用轻微的模糊以减少图像中的噪点。现在,我们的目标是找到带…...
Qt通过pos()获取坐标信息
背景:这是一个QWidget窗体,里面是各种布局的组合,一层套一层。 我希望得到绿色部分的坐标信息(x,y) QPoint get_pos(QWidget* w, QWidget* parent) {if ((QWidget*)w->parent() parent) {return w->pos();}else {QPoint pos(w->po…...
【Webpack】资源输入输出 - 配置资源出口
所有与出口相关的配置都集中在 output对象里 output对象里可以包含数十个配置项,这里介绍几个常用的 filename 顾名思义,filename的作用是控制输出资源的文件名,其形式为字符串,如: module.exports {entry: ./src/a…...
【XR806开发板试用】XR806串口驱动CM32M对小厨宝的控制实验
一.说明 非常感谢基于安谋科技STAR-MC1的全志XR806 Wi-FiBLE开源鸿蒙开发板试用活动,并获得开发板试用。 XR806是全志科技旗下子公司广州芯之联研发设计的一款支持WiFi和BLE的高集成度无线MCU芯片,支持OpenHarmony minisystem和FreeRTOS,具有集成度高、…...
中介者模式-Mediator Pattern-1
如果在一个系统中对象之间的联系呈现为网状结构, 对象之间存在大量的多对多联系,将导致系统非常复杂。 这些对象既会影响别的对象,也会被别的对象所影响。 这些对象称为同事对象,它们之间通过彼此的相互作用实现系统的行为。 在网…...
做网站设计需要学会哪些/网页设计html代码大全
Jquery中的选择器分为几大类:基本过滤选择器,层次选择器,内容过滤选择器,可见性过滤选择器,属性过滤选择器,子元素过滤选择器,表单对象选择器和表单对象属相过滤选择器。 1.非基本过滤选择器&am…...
wordpress系统密码忘记/快速提升网站排名
1.JavaScript打印 <input id"btnPrint" type"button" value"打印" οnclick"javascript:window.print();" /> 可以用样式控制,你想让那块打印就打印啊,样式如下: <style type"text…...
wordpress+dux5.0/seo优化教程视频
播放铃声: Uri notification RingtoneManager.getDefaultUri(RingtoneManager.TYPE_ALARM); Ringtone r RingtoneManager.getRingtone(context,notification); r.play(); 系统闹钟源码分析...
提供手机自适应网站公司/如何创建自己的网站
欢迎关注头条号:Java小野猫web项目部署到tomcat上之后,有时需要打断点单步调试,如果用的是Intellij idea,可以通过如下方法实现:开启debug端口,启动tomcat以tomcat7.0.75为例,打开bin目录下的ca…...
绚丽的网站/网址导航大全
前言 今天我们来学习一下动态sql,看起来很NB的感觉。我们来看看官网是怎么来介绍动态sql的。动态 SQL 是 MyBatis 的强大特性之一。如果你使用过 JDBC 或其它类似的框架,你应该能理解根据不同条件拼接 SQL 语句有多痛苦,例如拼接时要确保不能…...
上海公司注册核名查询/黑帽seo教程
为什么80%的码农都做不了架构师?>>> 首先说一下Redis公认的特点: Redis支持数据的持久化,可以将内存中的数据保持在磁盘中,重启的时候可以再次加载进行使用。Redis不仅仅支持简单的key-value类型的数据,同…...