谈谈Flink消费kafka的偏移量
offset配置:
flinkKafkaConsumer.setStartFromEarliest():从topic的最早offset位置开始处理数据,如果kafka中保存有消费者组的消费位置将被忽略。
flinkKafkaConsumer.setStartFromLatest():从topic的最新offset位置开始处理数据,如果kafka中保存有消费者组的消费位置将被忽略。
flinkKafkaConsumer.setStartFromTimestamp(…):从指定的时间戳(毫秒)开始消费数据,Kafka中每个分区中数据大于等于设置的时间戳的数据位置将被当做开始消费的位置。如果kafka中保存有消费者组的消费位置将被忽略。
flinkKafkaConsumer.setStartFromGroupOffsets():默认的设置。根据代码中设置的group.id设置的消费者组,去kafka中或者zookeeper中找到对应的消费者offset位置消费数据。如果没有找到对应的消费者组的位置,那么将按照auto.offset.reset设置的策略读取offset。
消费者offset提交配置:
配置offset的提交方式取决于是否为job设置开启checkpoint。可以使用env.enableCheckpointing(5000)来设置开启checkpoint。5000单位毫秒,代表每5秒进行依次checkpoint
关闭checkpoint:如何禁用了checkpoint,那么offset位置的提交取决于Flink读取kafka客户端的配置,enable.auto.commit ( auto.commit.enable【Kafka 0.8】)配置是否开启自动提交offset, auto.commit.interval.ms决定自动提交offset的周期。
开启checkpoint:如果开启了checkpoint,那么当checkpoint保存状态完成后,将checkpoint中保存的offset位置提交到kafka。这样保证了Kafka中保存的offset和checkpoint中保存的offset一致,可以通过配置setCommitOffsetsOnCheckpoints(boolean)来配置是否将checkpoint中的offset提交到kafka中(默认是true)。如果使用这种方式,那么properties中配置的kafka offset自动提交参数enable.auto.commit和周期提交参数auto.commit.interval.ms参数将被忽略。
总结:Flink提供了消费kafka数据的offset如何提交给Kafka或者zookeeper(kafka0.8之前,因为0.8之前offset是维护在zookeeper中的)的配置 ;关闭checkpoint的话,flink消费kafka数据 offset取决于kafka客户端的配置;开启checkpoint的话,flink消费kafka offset由jobmanager中的checkpoint维护,并同步到kafka中保持一置,注意,Flink并不依赖提交给Kafka或者zookeeper中的offset来保证容错。提交的offset只是为了外部来查询监视kafka数据消费的情况。
使用checkpoint + 两阶段提交来保证仅消费一次kafka中的数据:
Flink checkpoint机制: 这种机制是在Flink应用内部实现仅一次处理数据的基础。
当谈及“exactly-once semantics”仅一次处理数据时,指的是每条数据只会影响最终结果一次。Flink可以保证当机器出现故障或者程序出现错误时,也没有重复的数据或者未被处理的数据出现,实现仅一次处理的语义。
checkpoint中包含:
当前应用的状态;
当前消费流数据的位置;
注意:checkpoint机制仅限于Flink架构内部保证仅一次处理数据;
使用两阶段提交协议保证flink连接外部系统数据仅一次处理
Flink checkpoint机制: 这种机制是在Flink应用内部实现仅一次处理数据的基础。
当谈及“exactly-once semantics”仅一次处理数据时,指的是每条数据只会影响最终结果一次。Flink可以保证当机器出现故障或者程序出现错误时,也没有重复的数据或者未被处理的数据出现,实现仅一次处理的语义。
checkpoint中包含:
当前应用的状态;
当前消费流数据的位置;
注意:checkpoint机制仅限于Flink架构内部保证仅一次处理数据;
使用两阶段提交协议保证flink连接外部系统数据仅一次处理;
当Flink处理完的数据需要写入外部系统时,不保证仅一次处理数据。为了提供端到端的仅一次处理数据,在将数据写入外部系统时也要保证仅一次处理数据,这些外部系统必须提供一种手段来允许程序提交或者回滚写入操作,同时还要保证与Flink的checkpoint机制协调使用,在分布式系统中协调提交和回滚的常见方法就是两阶段提交协议。下面给出一个实例了解Flink如何使用两阶段提交协议来实现数据仅一次处理语义。
该实例是从kafka中读取数据,经过处理数据之后将结果再写回kafka。kafka0.11版本之后支持事务,这也是Flink与kafka交互时仅一次处理的必要条件。【注意:当Flink处理完的数据写入kafka时,即当sink为kafka时,自动封装了两阶段提交协议】。Flink支持仅一次处理数据不仅仅限于和Kafka的结合,只要sink提供了必要的两阶段协调实现,可以对任何sink都能实现仅一次处理数据语义。
其原理如下:
上图Flink程序包含以下组件:
一个从kafka中读取数据的source
一个窗口聚合操作
一个将结果写往kafka的sink。
要使sink支持仅一次处理数据语义,必须以事务的方式将数据写往kafka,将两次checkpoint之间的操作当做一个事务提交,确保出现故障时操作能够被回滚。假设出现故障,在分布式多并发执行sink的应用程序中,仅仅执行单次提交或回滚事务是不够的,因为分布式中的各个sink程序都必须对这些提交或者回滚达成共识,这样才能保证两次checkpoint之间的数据得到一个一致性的结果。Flink使用两阶段提交协议(pre-commit+commit)来实现这个问题。
Filnk checkpointing开始时就进入到pre-commit阶段,具体来说,一旦checkpoint开始,Flink的JobManager向输入流中写入一个checkpoint barrier将流中所有消息分隔成属于本次checkpoint的消息以及属于下次checkpoint的消息,barrier也会在操作算子间流转,对于每个operator来说,该barrier会触发operator的State Backend来为当前的operator来打快照。如下图示:

Flink DataSource中存储着Kafka消费的offset,当完成快照保存后,将chechkpoint barrier传递给下一个operator。这种方式只有在Flink内部状态的场景是可行的,内部状态指的是由Flink的State Backend管理状态,例如上面的window的状态就是内部状态管理。只有当内部状态时,pre-commit阶段无需执行额外的操作,仅仅是写入一些定义好的状态变量即可,checkpoint成功时Flink负责提交这些状态写入,否则就不写入当前状态。
但是,一旦operator操作包含外部状态,事情就不一样了。我们不能像处理内部状态一样处理外部状态,因为外部状态涉及到与外部系统的交互。这种情况下,外部系统必须要支持可以与两阶段提交协议绑定的事务才能保证仅一次处理数据。
本例中的data sink是将数据写往kafka,因为写往kafka是有外部状态的,这种情况下,pre-commit阶段下data sink 在保存状态到State Backend的同时,还必须pre-commit外部的事务。如下图:
当checkpoint barrier在所有的operator都传递一遍切对应的快照都成功完成之后,pre-commit阶段才算完成。这个过程中所有创建的快照都被视为checkpoint的一部分,checkpoint中保存着整个应用的全局状态,当然也包含pre-commit阶段提交的外部状态。当程序出现崩溃时,我们可以回滚状态到最新已经完成快照的时间点。
下一步就是通知所有的operator,告诉它们checkpoint已经完成,这便是两阶段提交的第二个阶段:commit阶段。这个阶段中JobManager会为应用中的每个operator发起checkpoint已经完成的回调逻辑。本例中,DataSource和Winow操作都没有外部状态,因此在该阶段,这两个operator无需执行任何逻辑,但是Data Sink是有外部状态的,因此此时我们需要提交外部事务。如下图示:
汇总以上信息,总结得出:
一旦所有的operator完成各自的pre-commit,他们会发起一个commit操作。
如果一个operator的pre-commit失败,所有其他的operator 的pre-commit必须被终止,并且Flink会回滚到最近成功完成的checkpoint位置。
一旦pre-commit完成,必须要确保commit也要成功,内部的operator和外部的系统都要对此进行保证。假设commit失败【网络故障原因】,Flink程序就会崩溃,然后根据用户重启策略执行重启逻辑,重启之后会再次commit。
因此,所有的operator必须对checkpoint最终结果达成共识,即所有的operator都必须认定数据提交要么成功执行,要么被终止然后回滚。
参考:https://blog.51cto.com/u_16213620/7923389
相关文章:
谈谈Flink消费kafka的偏移量
offset配置: flinkKafkaConsumer.setStartFromEarliest():从topic的最早offset位置开始处理数据,如果kafka中保存有消费者组的消费位置将被忽略。 flinkKafkaConsumer.setStartFromLatest():从topic的最新offset位置开始处理数据,如果kafka中保存有消费…...
MySQL 高级SQL高级语句(二)
一.CREATE VIEW 视图 可以被当作是虚拟表或存储查询。 视图跟表格的不同是,表格中有实际储存数据记录,而视图是建立在表格之上的一个架构,它本身并不实际储存数据记录。 临时表在用户退出或同数据库的连接断开后就自动消失了,而…...
MySQL之高可用性(四)
高可用性 故障转移和故障恢复 冗余是很好的技术,但实际上只有在遇到故障需要恢复时才会用到。(见鬼,这可以用备份来实现)。冗余一点儿也不会增加可用性或减少宕机。在故障转移的过程中,高可用性是建立在冗余的基础上。当有一个组件失效&…...
招聘智能管理系统设计
设计一个招聘智能管理系统,需要从多个维度考虑,包括但不限于用户界面、功能模块、数据安全、算法模型等。以下是一个基本的设计框架: 1. 系统架构: 前端:提供直观的用户界面,包括应聘者和招聘者的登录/注册…...
达梦数据库系列—15. 表的备份和还原
目录 1、表备份 2、表还原 1、表备份 表备份和表还原恢复,都必须在联机状态下进行。 与备份数据库与表空间不同,不需要备份归档日志,不存在增量备份之说。 CREATE TABLE TAB_FOR_RES_02(C1 INT);CREATE INDEX I_TAB_FOR_RES_02 ON TAB_F…...
无线领夹麦克风哪个品牌音质最好,直播用领夹麦克风还是声卡麦
随着社交媒体的兴起,直播和Vlog已经成为内容创作的新趋势,这些变化不仅改变了人们分享生活的方式,也带动了音频设备市场的增长。无线领夹麦克风,以其便携性和卓越的录音品质,迅速成为视频制作者的重要工具。它们在直播…...
《Windows API每日一练》6.2 客户区鼠标消息
第五章已经讲到,Windows只会把键盘消息发送到当前具有输入焦点的窗口。鼠标消息则不同:当鼠标经过窗口或在窗口内被单击,则即使该窗口是非活动窗口或不带输入焦点, 窗口过程还是会收到鼠标消息。Windows定义了 21种鼠标消息。不过…...
体验升级:扫描全能王智能高清滤镜2.0全面测评
🤵♂️ 个人主页:艾派森的个人主页 ✍🏻作者简介:Python学习者 🐋 希望大家多多支持,我们一起进步!😄 如果文章对你有帮助的话, 欢迎评论 💬点赞Ǵ…...
【JVM系列】JVM调优
💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…...
Linux基础 - Postfix 与 Dovecot 部署邮件系统
目录 零. 简介 一. 部署 二. 设置用户别名信箱 三. Linux 邮件客户端 零. 简介 Postfix 和 Dovecot 是在 Linux 系统中常用于部署邮件系统的两个重要组件。 Postfix 是一种邮件传输代理(MTA),主要负责接收、转发和发送邮件。它具有高性能…...
Qt的安装
一、Qt安装 下载地址:https://download.qt.io/archive/qt/ opencv下载安装 下载地址:https://opencv.org/releases/ 陈年旧文,没有下文,以此纪念。。。。。...
ThreeJS-3D教学十二:ShaderMaterial
一、首先 Shader 是做什么的 Shader 可以自定义每个顶点、每个片元/像素如何显示,而控制顶点和片元显示是通过设置 vertexShader 顶点着色器和 fragmentShader 片元着色器,这两个着色器用在 ShaderMaterial 和 RawShaderMaterial 材质上。 我们先看一个例…...
计算机网络面试TCP篇之TCP三次握手与四次挥手
TCP 三次握手与四次挥手面试题 任 TCP 虐我千百遍,我仍待 TCP 如初恋。 巨巨巨巨长的提纲,发车!发车! PS:本次文章不涉及 TCP 流量控制、拥塞控制、可靠性传输等方面知识,这些知识在这篇: TCP …...
Python-数据分析组合可视化实例图【附完整源码】
数据分析组合可视化实例图 开篇:应女朋友的要求,于是写下了这篇详细的数据可视化代码及完整注释 一:柱状图、折线图横向组合网格布局 本段代码使用了pyecharts库来创建一个包含多个图表(柱状图、折线图)和网格布局的…...
【JavaEE】Spring Web MVC详解
一.基本概念. 1.什么是Spring Web MVC? 官方链接: https://docs.spring.io/spring-framework/reference/web/webmvc.html Spring Web MVC is the original web framework built on the Servlet API and has been included in the Spring Framework from the very beginning…...
docker安装rocketMq5x以上的版本
1.背景 安装RocketMQ 5.x以上的版本主要是因为新版本引入了许多性能优化、新功能以及对已有特性的增强,这些改进可以帮助提升消息队列系统的稳定性和效率。 1.性能提升:RocketMQ 5.x版本通常包括了对消息处理速度、吞吐量和延迟的优化,使得系…...
【Spring】DAO 和 Repository 的区别
DAO 和 Repository 的区别 1.概述2.DAO 模式2.1 User2.2 UserDao2.3 UserDaoImpl 3.Repository 模式3.1 UserRepository3.2 UserRepositoryImpl 4.具有多个 DAO 的 Repository 模式4.1 Tweet4.2 TweetDao 和 TweetDaoImpl4.3 增强 User 域4.4 UserRepositoryImpl 5.比较两种模式…...
高阶面试-秒杀系统的设计
场景 特价商品如茅台,在8月1日22点10分0秒开始秒杀 平台用户量:几千万,预计几十万用户感兴趣 需求 临时性的活动,不要太大技术改动 原则 商品不能超卖下单成功的订单不能丢失服务器和数据库不能崩溃尽量不让机器人抢走商品 …...
四十五、 证券基金业数据出境有无特别规范需要注意?
证券基金业数据合规除应遵守本《实务问答》前述的各项通用规定外,还应注意中国证券监督管理委员会等其他机构发布的相关规范。其中,与数据出境相关的主要包括《证券期货业数据分类分级指引》(JR/T 0158—2018,2018年 9月 27日实施…...
02.Linux下安装FFmpeg
目录 一、下载FFmpeg的编译源码 二、编译源码 三、ffmpeg工具结构解析 1、bin目录 2、include库 3、lib库 四、注意事项 五、可能出现的一些问题 1、某些工具未安装/版本过久 2、缺少pkg-config工具 3、缺少ffmplay FFmpeg 是一个开源的跨平台音视频处理工具集&…...
日语AI面试高效通关秘籍:专业解读与青柚面试智能助攻
在如今就业市场竞争日益激烈的背景下,越来越多的求职者将目光投向了日本及中日双语岗位。但是,一场日语面试往往让许多人感到步履维艰。你是否也曾因为面试官抛出的“刁钻问题”而心生畏惧?面对生疏的日语交流环境,即便提前恶补了…...
C++_核心编程_多态案例二-制作饮品
#include <iostream> #include <string> using namespace std;/*制作饮品的大致流程为:煮水 - 冲泡 - 倒入杯中 - 加入辅料 利用多态技术实现本案例,提供抽象制作饮品基类,提供子类制作咖啡和茶叶*//*基类*/ class AbstractDr…...
rknn优化教程(二)
文章目录 1. 前述2. 三方库的封装2.1 xrepo中的库2.2 xrepo之外的库2.2.1 opencv2.2.2 rknnrt2.2.3 spdlog 3. rknn_engine库 1. 前述 OK,开始写第二篇的内容了。这篇博客主要能写一下: 如何给一些三方库按照xmake方式进行封装,供调用如何按…...
关于iview组件中使用 table , 绑定序号分页后序号从1开始的解决方案
问题描述:iview使用table 中type: "index",分页之后 ,索引还是从1开始,试过绑定后台返回数据的id, 这种方法可行,就是后台返回数据的每个页面id都不完全是按照从1开始的升序,因此百度了下,找到了…...
【HTML-16】深入理解HTML中的块元素与行内元素
HTML元素根据其显示特性可以分为两大类:块元素(Block-level Elements)和行内元素(Inline Elements)。理解这两者的区别对于构建良好的网页布局至关重要。本文将全面解析这两种元素的特性、区别以及实际应用场景。 1. 块元素(Block-level Elements) 1.1 基本特性 …...
大模型多显卡多服务器并行计算方法与实践指南
一、分布式训练概述 大规模语言模型的训练通常需要分布式计算技术,以解决单机资源不足的问题。分布式训练主要分为两种模式: 数据并行:将数据分片到不同设备,每个设备拥有完整的模型副本 模型并行:将模型分割到不同设备,每个设备处理部分模型计算 现代大模型训练通常结合…...
【JavaSE】绘图与事件入门学习笔记
-Java绘图坐标体系 坐标体系-介绍 坐标原点位于左上角,以像素为单位。 在Java坐标系中,第一个是x坐标,表示当前位置为水平方向,距离坐标原点x个像素;第二个是y坐标,表示当前位置为垂直方向,距离坐标原点y个像素。 坐标体系-像素 …...
初探Service服务发现机制
1.Service简介 Service是将运行在一组Pod上的应用程序发布为网络服务的抽象方法。 主要功能:服务发现和负载均衡。 Service类型的包括ClusterIP类型、NodePort类型、LoadBalancer类型、ExternalName类型 2.Endpoints简介 Endpoints是一种Kubernetes资源…...
嵌入式学习笔记DAY33(网络编程——TCP)
一、网络架构 C/S (client/server 客户端/服务器):由客户端和服务器端两个部分组成。客户端通常是用户使用的应用程序,负责提供用户界面和交互逻辑 ,接收用户输入,向服务器发送请求,并展示服务…...
Vue 3 + WebSocket 实战:公司通知实时推送功能详解
📢 Vue 3 WebSocket 实战:公司通知实时推送功能详解 📌 收藏 点赞 关注,项目中要用到推送功能时就不怕找不到了! 实时通知是企业系统中常见的功能,比如:管理员发布通知后,所有用户…...
