Flink系列之:Upsert Kafka SQL 连接器
Flink系列之:Upsert Kafka SQL 连接器
- 一、Upsert Kafka SQL 连接器
- 二、依赖
- 三、完整示例
- 四、可用元数据
- 五、键和值格式
- 六、主键约束
- 七、一致性保证
- 八、为每个分区生成相应的watermark
- 九、数据类型映射
一、Upsert Kafka SQL 连接器
- Scan Source: Unbounded 、
- Sink: Streaming Upsert Mode
Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。
作为 source,upsert-kafka 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的 value 被解释为同一 key 的最后一个 value 的 UPDATE,如果有这个 key(如果不存在相应的 key,则该更新被视为 INSERT)。用表来类比,changelog 流中的数据记录被解释为 UPSERT,也称为 INSERT/UPDATE,因为任何具有相同 key 的现有行都被覆盖。另外,value 为空的消息将会被视作为 DELETE 消息。
作为 sink,upsert-kafka 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。
二、依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>3.0.2-1.18</version></dependency>
三、完整示例
下面的示例展示了如何创建和使用 Upsert Kafka 表:
CREATE TABLE pageviews_per_region (user_region STRING,pv BIGINT,uv BIGINT,PRIMARY KEY (user_region) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'pageviews_per_region','properties.bootstrap.servers' = '...','key.format' = 'avro','value.format' = 'avro'
);CREATE TABLE pageviews (user_id BIGINT,page_id BIGINT,viewtime TIMESTAMP,user_region STRING,WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH ('connector' = 'kafka','topic' = 'pageviews','properties.bootstrap.servers' = '...','format' = 'json'
);-- 计算 pv、uv 并插入到 upsert-kafka sink
INSERT INTO pageviews_per_region
SELECTuser_region,COUNT(*),COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;
确保在 DDL 中定义主键。
这段代码是用来创建两个表,一个是"pageviews_per_region",另一个是"pageviews",并定义了它们的结构和连接器。
-
"pageviews_per_region"表包含了三个字段:user_region(用户所在地区,字符串类型)、pv(页面访问量,长整型)和uv(独立访客量,长整型)。该表的主键为user_region,但不强制执行。
-
"pageviews"表包含了四个字段:user_id(用户ID,长整型)、page_id(页面ID,长整型)、viewtime(访问时间,时间戳类型)和user_region(用户所在地区,字符串类型)。该表还定义了一个称为"viewtime"的水位线(watermark),它指定了在两秒之前的数据不再考虑为计算pv和uv。
这两个表都使用了Kafka连接器来读写数据。'connector’属性指定了使用的连接器类型,'topic’属性指定了连接器读写的Kafka主题,'properties.bootstrap.servers’属性指定了Kafka集群的地址。
对于"pageviews_per_region"表,'key.format’和’value.format’属性指定了数据的序列化格式为Avro。
对于"pageviews"表,'format’属性指定了数据的序列化格式为JSON。
最后,使用INSERT INTO语句,在"pageviews_per_region"表中计算出每个地区的pv和uv,并将结果插入到upsert-kafka sink中。
总之,这段代码的作用是通过Kafka连接器创建两个表,并将"pageviews"表中的数据计算出每个地区的pv和uv,并插入到"pageviews_per_region"表中。
四、可用元数据
连接器参数
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 必选 | (none) | String | 指定要使用的连接器,Upsert Kafka 连接器使用:‘upsert-kafka’。 |
topic | 必选 | (none) | String | 用于读取和写入的 Kafka topic 名称。 |
properties.bootstrap.servers | 必选 | (none) | String | 以逗号分隔的 Kafka brokers 列表。 |
properties.* | 可选 | (none) | String | 该选项可以传递任意的 Kafka 参数。选项的后缀名必须匹配定义在 Kafka 参数文档中的参数名。 Flink 会自动移除 选项名中的 “properties.” 前缀,并将转换后的键名以及值传入 KafkaClient。 例如,你可以通过 ‘properties.allow.auto.create.topics’ = ‘false’ 来禁止自动创建 topic。 但是,某些选项,例如’key.deserializer’ 和 ‘value.deserializer’ 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。 |
key.format | 必选 | (none) | String | 用于对 Kafka 消息中 key 部分序列化和反序列化的格式。key 字段由 PRIMARY KEY 语法指定。支持的格式包括 ‘csv’、‘json’、‘avro’ |
key.fields-prefix | 可选 | (none) | String | 为键格式的所有字段定义自定义前缀,以避免与值格式的字段发生名称冲突。默认情况下,前缀为空。如果定义了自定义前缀,则表架构和“key.fields”都将使用前缀名称。构造密钥格式的数据类型时,将删除前缀,并在密钥格式中使用无前缀的名称。请注意,此选项要求“value.fields-include”必须设置为“EXCEPT_KEY”。 |
value.format | 必选 | (none) | String | 用于对 Kafka 消息中 value 部分序列化和反序列化的格式。支持的格式包括 ‘csv’、‘json’、‘avro’。 |
value.fields-include | 必选 | ‘ALL’ | String | 控制哪些字段应该出现在 value 中。可取值:ALL:消息的 value 部分将包含 schema 中所有的字段,包括定义为主键的字段。EXCEPT_KEY:记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。 |
sink.parallelism | 可选 | (none) | Integer | 定义 upsert-kafka sink 算子的并行度。默认情况下,由框架确定并行度,与上游链接算子的并行度保持一致。 |
sink.buffer-flush.max-rows | 可选 | 0 | Integer | 缓存刷新前,最多能缓存多少条记录。当 sink 收到很多同 key 上的更新时,缓存将保留同 key 的最后一条记录,因此 sink 缓存能帮助减少发往 Kafka topic 的数据量,以及避免发送潜在的 tombstone 消息。 可以通过设置为 ‘0’ 来禁用它。默认,该选项是未开启的。注意,如果要开启 sink 缓存,需要同时设置 ‘sink.buffer-flush.max-rows’ 和 ‘sink.buffer-flush.interval’ 两个选项为大于零的值。 |
sink.buffer-flush.interval | 可选 | 0 | Duration | 缓存刷新的间隔时间,超过该时间后异步线程将刷新缓存数据。当 sink 收到很多同 key 上的更新时,缓存将保留同 key 的最后一条记录,因此 sink 缓存能帮助减少发往 Kafka topic 的数据量,以及避免发送潜在的 tombstone 消息。 可以通过设置为 ‘0’ 来禁用它。默认,该选项是未开启的。注意,如果要开启 sink 缓存,需要同时设置 ‘sink.buffer-flush.max-rows’ 和 ‘sink.buffer-flush.interval’ 两个选项为大于零的值。 |
五、键和值格式
此连接器需要键和值格式,其中键字段源自 PRIMARY KEY 约束。
以下示例显示如何指定和配置键和值格式。格式选项以“键”或“值”加上格式标识符作为前缀。
CREATE TABLE KafkaTable (`ts` TIMESTAMP(3) METADATA FROM 'timestamp',`user_id` BIGINT,`item_id` BIGINT,`behavior` STRING,PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka',...'key.format' = 'json','key.json.ignore-parse-errors' = 'true','value.format' = 'json','value.json.fail-on-missing-field' = 'false','value.fields-include' = 'EXCEPT_KEY'
)
六、主键约束
Upsert Kafka 始终以 upsert 方式工作,并且需要在 DDL 中定义主键。在具有相同主键值的消息按序存储在同一个分区的前提下,在 changelog source 定义主键意味着 在物化后的 changelog 上主键具有唯一性。定义的主键将决定哪些字段出现在 Kafka 消息的 key 中。
七、一致性保证
默认情况下,如果启用 checkpoint,Upsert Kafka sink 会保证至少一次将数据插入 Kafka topic。
这意味着,Flink 可以将具有相同 key 的重复记录写入 Kafka topic。但由于该连接器以 upsert 的模式工作,该连接器作为 source 读入时,可以确保具有相同主键值下仅最后一条消息会生效。因此,upsert-kafka 连接器可以像 HBase sink 一样实现幂等写入。
八、为每个分区生成相应的watermark
Flink 支持根据 Upsert Kafka 的 每个分区的数据特性发送相应的 watermark。当使用这个特性的时候,watermark 是在 Kafka consumer 内部生成的。 合并每个分区 生成的 watermark 的方式和 stream shuffle 的方式是一致的。 数据源产生的 watermark 是取决于该 consumer 负责的所有分区中当前最小的 watermark。如果该 consumer 负责的部分分区是 idle 的,那么整体的 watermark 并不会前进。在这种情况下,可以通过设置合适的 table.exec.source.idle-timeout 来缓解这个问题。
九、数据类型映射
Upsert Kafka 用字节存储消息的 key 和 value,因此没有 schema 或数据类型。消息按格式进行序列化和反序列化,例如:csv、json、avro。因此数据类型映射表由指定的格式确定。
相关文章:
Flink系列之:Upsert Kafka SQL 连接器
Flink系列之:Upsert Kafka SQL 连接器 一、Upsert Kafka SQL 连接器二、依赖三、完整示例四、可用元数据五、键和值格式六、主键约束七、一致性保证八、为每个分区生成相应的watermark九、数据类型映射 一、Upsert Kafka SQL 连接器 Scan Source: Unbounded 、Sink…...
前端与后端的异步编排(promise、async、await 、CompletableFuture)
前端与后端的异步编排 文章目录 前端与后端的异步编排1、为什么需要异步编排2、前端中的异步2.1 、Promise的使用2.1.1、Promise的基础概念2.1.2、Promise中的两个回调函数2.1.3、工具方法1、Promise.all()2、Promise.race()3、Promise.resolve() 2.2 、async 与 aw…...
python打开opencv图像与QImage图像及其转化
目录 1、Qimage图像 2、opencv图像 3、python打开QImage图像通过Qlabel控件显示 4、python打开QImage图像通过opencv显示 5、python打开opencv图像并显示 6、python打开opencv图像通过Qlabel控件显示 1、Qimage图像 QImage是Qt库中用于存储和处理图像的类。它可以存储多种…...
linux 其他版本RCU
1、不可抢占RCU 如果我们的需求是“不管内核是否编译了可抢占RCU,都要使用不可抢占RCU”,那么应该使用不可抢占RCU的专用编程接口。 读者使用函数rcu_read_lock_sched()标记进入读端临界区,使用函数rcu_read_unlock_ sched()标记退出读端临界…...
【单调栈】LeetCode:2818操作使得分最大
作者推荐 map|动态规划|单调栈|LeetCode975:奇偶跳 涉及知识点 单调栈 题目 给你一个长度为 n 的正整数数组 nums 和一个整数 k 。 一开始,你的分数为 1 。你可以进行以下操作至多 k 次,目标是使你的分数最大: 选择一个之前没有选过的 非…...
uniapp 添加分包页面,配置分包预下载
为什么要分包 ? 分包即将小程序代码分成多个部分打包,可以减少小程序的加载时间,提升用户体验 添加分包页面 比较便捷的方法是使用vscode插件 uni-create-view 新建分包文件夹 以在我的页面,添加分包的设置页面为例,新建文件夹 s…...
成功案例分享:物业管理小程序如何助力打造智慧社区
随着科技的进步和互联网的普及,数字化转型已经渗透到各个行业,包括物业管理。借助小程序这一轻量级应用,物业管理可以实现线上线下服务的无缝对接,提升服务质量,优化用户体验。本文将详细介绍如何通过乔拓云网设计小程…...
Electron执行本地cmd命令
javascript执行本地cmd命令,javascript代码怎么执行_js调用本机cmd-CSDN博客 使用 Node.js 打开本地应用_nodejs启动应用-CSDN博客 笔记:nodejs脚本唤醒本地应用程序或者调用命令-CSDN博客 electron调起本地应用_electron 调用本地程序-CSDN博客 命令行打开vscode 你可以使用…...
YOLOv8改进 | 主干篇 | 利用MobileNetV3替换Backbone(轻量化网络结构)
一、本文介绍 本文给大家带来的改进机制是MobileNetV3,其主要改进思想集中在结合硬件感知的网络架构搜索(NAS)和NetAdapt算法,以优化移动设备CPU上的性能。它采用了新颖的架构设计,包括反转残差结构和线性瓶颈层&…...
MATLAB Mobile - 使用预训练网络对手机拍摄的图像进行分类
系列文章目录 前言 此示例说明如何使用深度学习对移动设备摄像头采集的图像进行分类。 在您的移动设备上安装和设置 MATLAB Mobile™。然后,从 MATLAB Mobile 的“设置”登录 MathWorks Cloud。 在您的设备上启动 MATLAB Mobile。 一、在您的设备上安装 MATLAB M…...
LangChain入门指南:定义、功能和工作原理
LangChain入门指南:定义、功能和工作原理 引言LangChain是什么?LangChain的核心功能LangChain的工作原理LangChain实际应用案例如何开始使用LangChain 引言 在人工智能的浪潮中,语言模型已成为推动技术革新的重要力量。从简单的文本生成到复…...
关键字:import关键字
在 Java 中,import关键字用于导入类或接口,使你可以在代码中使用它们而无需完全限定其名称。以下是使用import关键字的示例代码: 在上述示例中,通过使用import关键字导入了java.util.ArrayList类,这样就可以在代码中直…...
【C#】.net core 6.0 通过依赖注入注册和使用上下文服务
给自己一个目标,然后坚持一段时间,总会有收获和感悟! 请求上下文是指在 Web 应用程序中处理请求时,包含有关当前请求的各种信息的对象。这些信息包括请求的头部、身体、查询字符串、路由数据、用户身份验证信息以及其他与请求相关…...
关于redis单线程和IO多路复用的理解
首先,Redis是一个高性能的分布式缓存中间件。其复杂性不言而喻,对于Redis整体而言肯定不是只有一个线程。 我们常说的Redis 是单线程,主要是指 Redis 在网络 IO和键值对读写是采用一个线程来完成的,这也是 Redis 对外提供键值存储…...
第四十一章 XML 映射参数摘要
文章目录 第四十一章 XML 映射参数摘要 第四十一章 XML 映射参数摘要 TopicParameters启用 XML 映射。XMLENABLED 类参数将属性映射到元素或属性。XMLPROJECTION property parameter ("NONE", "ATTRIBUTE", "XMLATTRIBUTE", "CONTENT"…...
redis之五种基本数据类型
一) 字符串(String) 1 使用场景 2 编码 3 编码转换 二) List(列表) 1 使用场景 2 编码 三) Set(无序集合) 1 使用场景 2 编码 3 编码转换 四) ZSet(有序集合) 1 使用场景 2 编码 3 编码转换 五) Hash 1 使用场景 2 编码 3 编码转换 五种基本数据类型 redis…...
RocketMQ系统性学习-RocketMQ高级特性之消息大量堆积处理、部署架构和高可用机制
🌈🌈🌈🌈🌈🌈🌈🌈 【11来了】文章导读地址:点击查看文章导读! 🍁🍁🍁🍁🍁🍁dz…...
Angular 进阶之五: Signals到底用不用?
Angular 在V16的时候推出了Signals,在17正式作为主打功能之一强烈推荐,看过了各种博主的各种科普文章也没说明白,到底这东西值不值得用?毕竟项目大了,重构代码也不是闹着玩儿的。各种科普文章主要在说两点:…...
构建数字化金融生态系统:云原生的创新方法
内容来自演讲:曾祥龙 | DaoCloud | 解决方案架构师 摘要 本文探讨了金融企业在实施云原生体系时面临的挑战,包括复杂性、安全、数据持久化、服务网格使用和高可用容灾架构等。针对网络管理复杂性,文章提出了Spiderpool开源项目,…...
前端性能优化五:css和js位置
1. 精简HTML代码: ①. css链接文件尽量放在页面头部:a. css的加载不会阻塞DOM Tree的解析.b. 但会阻塞DOM Tree渲染,也会阻塞后面JS的执行.c. 将css放在任何body元素之前:(1). 可以确保在文档中解析了所有css的样式包括内联样式和外联的.(2). 减少了浏览器必须重排文档的次数.…...
苏州耕耘无忧物联网:降本增效,设备维护管理数字化转型的引领者
随着科技的快速发展和工业4.0的推动,设备维护管理已经从传统的被动式、经验式维护,转向了更加积极主动、数据驱动的维护模式。在这个过程中,苏州耕耘无忧物联科技有限公司以其深厚的技术积累和丰富的管理经验,引领着设备维护管理数…...
15个热门的开源数据可视化项目
数据可视化(即 BI仪表盘)是图形表示的数据。它涉及产生将表示的数据之间的关系传达给图像查看者的图像。这种通信是通过在可视化过程中使用图形标记和数据值之间的系统映射来实现的。该映射建立了如何在视觉上表示数据值,确定图形标记的属性(例如大小或颜色)如何以及在多大程…...
【第七在线】数据分析与人工智能在商品计划中的应用
随着技术的不断进步,数据分析和人工智能(AI)已经成为了现代商品计划的关键组成部分。在服装行业,这两项技术正在帮助企业更好地理解市场需求、优化库存管理、提高生产效率和提供更好的客户体验。本文将深入探讨数据分析和人工智能…...
【圣诞】极安云科赠书活动第①期:CTF实战:从入门到提升
【圣诞】极安云科赠书活动第①期:CTF实战:从入门到提升 9787111724834 9787121376955 9787302556275 ISBN编号:9787111724834 书名:CTF实战:从入门到提升 定:99.00元 开本:184mm260ÿ…...
分布式搜索elasticsearch概念
什么是elasticsearch? elasticsearch是一款非常强大的开源搜索引擎,可以帮助我们从海量数据中快速找到需要的内容 目录 elasticsearch的场景 elasticsearch的发展 Lucene篇 Elasticsearch篇 elasticsearch的安装 elasticsearch的场景 elasticsear…...
Linux环境安装Hadoop
(1)下载Hadoop安装包并上传 下载Hadoop安装包到本地,并导入到Linux服务器的/opt/software路径下 (2)解压安装包 解压安装文件并放到/opt/module下面 [roothadoop100 ~]$ cd /opt/software [roothadoop100 software…...
swing快速入门(二十五)
注释很详细,直接上代码 新增内容 1.ImageIO.write读取并显示图片 2.ImageIO.writeImageIO.write读取并保存图片 package swing21_30;import javax.imageio.ImageIO; import java.awt.*; import java.awt.event.WindowAdapter; import java.awt.event.WindowEvent…...
智能优化算法应用:基于卷尾猴算法3D无线传感器网络(WSN)覆盖优化 - 附代码
智能优化算法应用:基于卷尾猴算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用:基于卷尾猴算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.卷尾猴算法4.实验参数设定5.算法结果6.参考文…...
前端传输formDate格式的数据,后端不能用@RequestBody接收
写了个接口,跟前端对接,前端说怎么一直415的报错 我寻思不对啊,我swagger都请求成功了,后来发现前端一直是以formdata格式提交的数据,这样我其实是可以不加RequestBody的; 知识点: RequestBody…...
【AivaAI】做音乐,无人能比它更专业
关于Aiva Aiva AIVA是音乐制作初创公司AIVA Technologies打造的一款人工智能产品。是人工智能领域头款获得国际认证的虚拟作曲家。 Aiva登录 可以选择Google登录,或者其他邮箱登录。 输入用户名,登录完成。 开始制作音乐 在主页选择“创建曲目…...
互联网站建设维护有关岗位/360收录提交入口网址
BEL (7)BELl,响铃。在 ASCII 编码中,BEL 是个比较有意思的东西。BEL 用一个可以听得见的声音来吸引人们的注意,既可以用于计算机,也可以用于周边设备(比如打印机)。注意,BEL 不是声卡或者喇叭发出的声音,而…...
wordpress精简化教程/seo网站关键词优化方式
问题 1:C 中的类可以定义多个对象,那么对象构造的顺序是怎样的? 问题 2:对象构造顺序会带来什么影响呢? 对象构造往往与构造函数相关联,构造函数体有可能是非常复杂的程序逻辑组成,不同类的构造…...
如何找网站制作/开发网站用什么软件
描述 将一个整数中的数字进行颠倒,当颠倒后的整数溢出时,返回 0 (标记为 32 位整数)。 样例 - 样例 1:输入:123 输出:321- 样例 2:输入:-123 输出:-321解析 if(n1534236469) ret…...
买了域名怎么做网站/百度集团公司简介
无悔入华夏怎么通关快呢?下面小编为大家带来无悔入华夏快速通关攻略,一起看看吧.名臣带的:墨子(墨守加防御),李牧(神,前期点出据守加防御,中期点出另一个加兵,相当于子弟兵),商鞅(拿来治国的),…...
如何建设部网站查职称/营销网站建设流程
本文首发于个人微信公众号《andyqian》,期待你的关注~前言我们都知道,在计算机世界里,再复杂,再美的程序,到最后都会变成0与1。也就是我们常说的:二进制。二进制相信大家都很熟悉。与现实世界不同的是&…...
白石龙做网站/深圳网络推广哪家公司好
转眼间,来北京已经三个月了,来北京后更大的责任与压力让我一直处于一种紧张的工作状态,几乎每天工作十几个小时。最近感觉比较累,工作效率不高,为了保持更好的工作状态,这段时间需要调整一下工作的节奏&…...