Flink和Kafka连接时的精确一次保证
Flink写入Kafka两阶段提交
端到端的 exactly-once(精准一次)
kafka -> Flink -> kafka
1)输入端
输入数据源端的 Kafka 可以对数据进行持久化保存,并可以重置偏移量(offset)
2)Flink内部
Flink 内部可以通过检查点机制保证状态和处理结果的 exactly-once 语义
3)输出端
两阶段提交(2PC)
。
写入 Kafka 的过程实际上是一个两段式的提交:处理完毕得到结果,写入 Kafka 时是基于事务的“预提交”
;等到检查点保存完毕,才会提交事务进行“正式提交”
。
如果中间出现故障,事务进行回滚,预提交就会被放弃;恢复状态之后,也只能恢复所有已经确认提交的操作。
必须的配置
1)必须启用检查点
2)指定 KafkaSink 的发送级别为 DeliveryGuarantee.EXACTLY_ONCE
3)配置 Kafka 读取数据的消费者的隔离级别
【默认kafka消费者隔离级别是读未提交
,2PC第一阶段预提交数据也会被读到,下游消费者需要设置为读已提交
】
4)事务超时配置
【配置的事务超时时间 transaction.timeout.ms 默认是1小时,而Kafka 集群配置的事务最大超时时间 transaction.max.timeout.ms 默认是15 分钟。在检查点保存时间很长时,有可能出现 Kafka 已经认为事务超时了,丢弃了预提交的数据;而Sink任务认为还可以继续等待。如果接下来检查点保存成功,发生故障后回滚到这个检查点的状态,这部分数据就被真正丢掉了。因此checkpoint 间隔 < 事务超时时间 < max的15分钟
】
代码实战
kafka -> Flink -> kafka【Flink处理kafka来源数据再输出到kafka】
public class KafkaEOSDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 【1】、启用检查点,设置为精准一次env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/chk");checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 2.读取 kafkaKafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092").setGroupId("default").setTopics("topic_1").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest()).build();DataStreamSource<String> kafkasource = env.fromSource(kafkaSource,WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource");/*3.写出到 Kafka精准一次 写入 Kafka,需要满足以下条件,【缺一不可】1、开启 checkpoint2、sink 设置保证级别为 精准一次3、sink 设置事务前缀4、sink 设置事务超时时间: checkpoint 间隔 < 事务超时时间 < max的15分钟*/KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口.setBootstrapServers("hadoop102:9092")// 指定序列化器:指定 Topic 名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("ws").setValueSerializationSchema(new SimpleStringSchema()).build())// 【3.1】 精准一次,开启 2pc.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 【3.2】 精准一次,必须设置 事务的前缀.setTransactionalIdPrefix("li-")// 【3.3】 设置事务超时时间.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "").build();kafkasource.sinkTo(kafkaSink);env.execute();}
}
后续读取“ws”这个 topic 的消费者,要设置事务的隔离级别为“读已提交”
public class KafkaEOSConsumer {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 消费 在前面使用【两阶段提交】写入的 TopicKafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092").setGroupId("default").setTopics("ws").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest())// 作为 下游的消费者,要设置事务的隔离级别为 【读已提交】.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed").build();env.fromSource(kafkaSource,WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource").print();env.execute();}
}
处理程序以及消费程序如上设置才能真正实现端到端精准一次的保证。
相关文章:
Flink和Kafka连接时的精确一次保证
Flink写入Kafka两阶段提交 端到端的 exactly-once(精准一次) kafka -> Flink -> kafka 1)输入端 输入数据源端的 Kafka 可以对数据进行持久化保存,并可以重置偏移量(offset) 2)Flink内…...
UE4动作游戏实例RPG Action解析三:实现效果,三连击Combo,射线检测,显示血条,火球术
一、三连Combo 实现武器三连击,要求: 1.下一段Combo可以随机选择, 2.在一定的时机才能再次检测输入 3. 等当前片段播放完才播放下一片段 1.1、蒙太奇设置 通过右键-新建蒙太奇片段,在蒙太奇里创建三个片段,并且移除相关连接,这样默认只会播放第一个片段 不同片段播…...
Linux/麒麟系统上部署Vue+SpringBoot前后端分离项目
目录 1. 前端准备工作 1.1 在项目根目录创建两份环境配置文件 1.2 环境配置 2. 后端准备工作 2.1 在项目resources目录创建两份环境配置文件 2.2 环境配置 3. 前后端打包 3.1 前端打包 3.2 后端打包 4、服务器前后端配置及部署 4.1 下载、安装、启动Nginx 4.2 前端项目部署…...
STM32在FreeRTOS下的us延时
STM32在FreeRTOS下的us延时 前言 freeRTOS下跑SPI时需要微秒级别的延时,但是freeRTOS只提供了毫秒级的,记录一下实现us延时的方法。 前期分析 最简单的方式就是开个定时器或者干脆直接计算一下用nop做都可以实现us延时,但是显然还是使用滴…...
软件测试/人工智能丨深入人工智能软件测试:PyTorch引领新时代
在人工智能的浪潮中,软件测试的角色变得愈发关键。本文将介绍在人工智能软件测试中的一些关键技术,以及如何借助PyTorch深度学习框架来推动测试的创新与升级。 PyTorch:深度学习的引擎 PyTorch作为一种开源的深度学习框架,为软件…...
Android 当中的 Fragment 协作解耦方式
Android 当中的 Fragment 协作解耦方式 文章目录 Android 当中的 Fragment 协作解耦方式第一章 前言介绍第01节 遇到的问题第02节 绘图说明 第二章 核心代码第01节 代理人接口第02节 中间人 Activity第03节 开发者A第04节 开发者B第05节 测试类 第一章 前言介绍 第01节 遇到的…...
城市网吧视频智能监控方案,实现视频远程集中监控
网吧环境较为复杂,电脑设备众多且人员流动性大,极易发生人员或消防事故,亟需改变,TSINGSEE青犀AI智能网吧视频监管方案可以帮助实现对网吧环境和用户活动的实时监控和管理。 1、视频监控系统 在网吧内部布置高清摄像头࿰…...
C#WPF视频播放器实例
本文实例演示C#WPF视频播放器 实例如下: 修改mainwindow的代码 <Windowx:Class="PlayerDemo.MainWindow"xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"xml…...
【uniapp】Google Maps
话不多说 直接上干货 提前申请谷歌地图账号一、新建地图 使用h5获取当前定位或者使用三方uniapp插件 var coords ""navigator.geolocation.getCurrentPosition(function(position) {coords {lat: position.coords.latitude,lng: position.coords.longitude};lats …...
C语言变量与常量
跟着肯哥(不是我)学C语言的变量和常量、跨文件访问、栈空间 栈空间还不清楚,期待明天的课程内容 C变量 变量(Variable)是用于存储和表示数据值的名称。 主要包括四个环节:定义、初始化、声明、使用 在我刚…...
AI创作系统ChatGPT网站源码/支持DALL-E3文生图/支持最新GPT-4-Turbo模型+Prompt应用
一、AI创作系统 SparkAi创作系统是基于OpenAI很火的ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统,支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美,可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如…...
二维码智慧门牌管理系统升级,异常门牌聚合解决方案助力高效管理
文章目录 前言一、异常门牌聚合解决方案 前言 在今天的数字化时代,智慧城市已成为发展趋势,其中二维码智慧门牌管理系统扮演着至关重要的角色。通过对门牌信息进行数字化管理,该系统极大提升了城市管理的效率和便捷性。然而,随着…...
【XTDrone Ubuntu20.04】XTDrone+ Ubuntu20.04 + PX4安装
XTDrone仿真平台配置 文章目录 XTDrone仿真平台配置依赖安装 ROS一键安装Marvos安装PX4 安装安装QTGroundControlXTDrone下载安装 环境: VMWare 16.0 Ubuntu 22.04 (因为没人配过)Ubuntu 20.04 参考文章: 仿真平台基础配置 (yuq…...
河北大学选择ZStack Cube超融合一体机打造实训云平台
河北大学通过云轴科技ZStack Cube超融合一体机构建校园实训云平台,部署测试仅耗时1天,该平台能够更快地为学生提供高性能、高可用的云主机、云存储和云网络服务;同时也能满足日常运维管理要求,为学生提供更好的实训环境。 河北省…...
IDEA远程一键部署SpringBoot到Docker
IDEA是Java开发利器,Spring Boot是Java生态中最流行的微服务框架,docker是时下最火的容器技术,那么它们结合在一起会产生什么化学反应呢? 一、开发前准备 1. Docker安装 可以参考:https://docs.docker.com/install/ 2…...
索引三星结构
三星索引的定义,可以先给我们对索引优化提供一个大概的思路: 满足第1颗星: 取出所有的等值谓词的列,作为索引最开头的列——以任意顺序都可以。 满足第2颗星: 将order by加入到索引列,不要改变这些列的顺…...
rust 笔记 高级错误处理
文章目录 错误处理组合器or() 和 and()or_else() 和 and_then()filtermap() 和 map_err()map_or() 和 map_or_else()ok_or() and ok_or_else() 自定义错误类型错误转换 From 特征 归一化不同的错误类型Box<dyn Error>自定义错误类型 简化错误处理thiserroranyhow 错误处理…...
python+Django 使用apscheduler实现定时任务 管理调度
apscheduler实现定时任务 管理调度 在Django 项目中经常会用到定时任务去处理一些业务处理 使用 APScheduler 可以轻松地实现定时任务的管理和调度。你可以通过以下步骤来创建、启动、停止和删除定时任务: 1.创建调度器对象: from apscheduler.schedu…...
Java编程中,异步操作流程中,最终一致性以及重试补偿的设计与实现
一、背景 微服务设计中,跨服务的调用,由于网络或程序故障等各种原因,经常会出现调用失败而需要重试。另外,在异步操作中,我们提供接口让外部服务回调。回调过程中,也可能出现故障。 这就要求我们主动向外…...
吴恩达《机器学习》8-7:多元分类
在机器学习领域,经常会遇到不止两个类别的分类问题。这时,需要使用多类分类技术。本文将深入探讨多类分类,并结合学习内容中的示例,了解神经网络在解决这类问题时的应用。 一、理解多类分类 多类分类问题是指当目标有多个类别时…...
Postman批量运行用例
近期在复习Postman的基础知识,在小破站上跟着百里老师系统复习了一遍,也做了一些笔记,希望可以给大家一点点启发。 一)注意点 有上传文件的接口,需要做如下设置: 1、打开能读取外部文件的开关 2、把需要…...
20个Golang最佳实践
在本教程中,我们将探讨 Golang 中的 20 个最佳编码实践。它将帮助您编写有效的 Go 代码。 #20:使用正确的缩进 良好的缩进使您的代码具有可读性。一致地使用制表符或空格(最好是制表符)并遵循 Go 标准缩进约定。 package main …...
Java 类之 java.lang.System
Java 类之 java.lang.System 文章目录 Java 类之 java.lang.System一、简介二、主要功能1、currentTimeMillis() - 获取当前时间的毫秒数说明代码示例 2、getProperty(String key) - 获取系统属性说明代码示例 3、exit(int status) - 终止虚拟机说明代码示例 4、arraycopy(Obje…...
认识Modbus通信协议(笔记)
Modbus 莫迪康 1979年 PLC 1969年 什么是modbus? 它是一个Bus,即总线协议。比如串口协议、IIC协议、SPI都是通信协议。 协议,顾名思义是一种规定和约束 Modbus协议是一种引用层的报文传输协议,RTU、ASCLL、TCP,都属于Modbus协…...
【算法】距离(最近公共祖先节点)
题目 给出 n 个点的一棵树,多次询问两点之间的最短距离。 注意: 边是无向的。所有节点的编号是 1,2,…,n。 输入格式 第一行为两个整数 n 和 m。n 表示点数,m 表示询问次数; 下来 n−1 行,每行三个整数 x,y,k&am…...
基于SpringBoot的SSMP整合案例(消息一致性处理与表现层开发)
消息一致性处理 在后端执行完相应的操作后,我们需要将执行操作后的结果与数据返回前端,前端 调用我们传回去的数据,前端是如何知道我们传回去的数据名称的? 答:前后端遵循了同一个"协议"。这个协议就是定义…...
c#之反射详解
总目录 文章目录 总目录一、反射是什么?1、C#编译运行过程2、反射与元数据3、反射的优缺点 二、反射的使用1、反射相关的类和命名空间1、System.Type类的应用2、System.Activator类的应用3、System.Reflection.Assembly类的应用4、System.Reflection.Module类的应用…...
synchronized jvm实现思考
底层实现时,为什么使用了cxq队列和entryList双向链表?这里为什么不跟AQS中使用一个队列就行了,加了一个entryList的目的是为了什么? 个人理解这里多一个entryList,可能是用于减少频繁的cas操作。假设存在很多锁竞争时&…...
【hive基础】hive常见操作速查
文章目录 一. hive变量操作1. 查看当前hive配置信息2. 设置变量3. 修改变量4. 进入hive终端重新加载配置 二. 执行hive sql三. 启动hive 一. hive变量操作 1. 查看当前hive配置信息 # 查看当前所有配置信息 hive > set ;# 查看某一项配置信息 hive >set hive.metastore…...
2024年山东省职业院校技能大赛中职组“网络安全”赛项竞赛试题-A
2024年山东省职业院校技能大赛中职组 “网络安全”赛项竞赛试题-A 一、竞赛时间 总计:360分钟 二、竞赛阶段 竞赛阶段 任务阶段 竞赛任务 竞赛时间 分值 A、B模块 A-1 登录安全加固 180分钟 200分 A-2 本地安全策略设置 A-3 流量完整性保护 A-4 …...
基于51单片机电子钟温度计数码显示设计( proteus仿真+程序+设计报告+讲解视频)
这里写目录标题 ✅1.主要功能:✅讲解视频:✅2.仿真设计✅3. 程序代码✅4. 设计报告✅5. 设计资料内容清单&&下载链接✅[资料下载链接:](https://docs.qq.com/doc/DS0Nja3BaQmVtWUpZ) 基于51单片机电子钟温度检测数码显示设计( proteu…...
jenkins+centos7上传发布net6+gitlab
工作中实践了一下jenkins的操作,所以记录一下这次经验,没有使用到docker 先看下成果: 选择发布项目 选择要发布的分支 构建中 发布成功 开始 首先安装好jenkins并注册自己的jenkins账号 因为我们的项目代码管理使用的是gitlab,…...
python趣味编程-5分钟实现一个F1 赛车公路游戏(含源码、步骤讲解)
Python 中的 F1 赛车公路游戏及其源代码 F1 Race Road Game是用Python编程语言开发的,它是一个桌面应用程序。 这款 Python 语言的 F1 赛道游戏可以免费下载开源代码,它是为想要学习 Python 的初学者创建的。 该项目系统使用了 Pygame 和 Random 函数。 Pygame 是一组跨平…...
Kafka快速入门
文章目录 Kafka快速入门1、相关概念介绍前言1.1 基本介绍1.2 常见消息队列的比较1.3 Kafka常见相关概念介绍 2、安装Kafka3、初体验前期准备编码测试配置介绍 bug记录 Kafka快速入门 1、相关概念介绍 前言 在当今信息爆炸的时代,实时数据处理已经成为许多应用程序和…...
基于Pytorch的从零开始的目标检测
引言 目标检测是计算机视觉中一个非常流行的任务,在这个任务中,给定一个图像,你预测图像中物体的包围盒(通常是矩形的) ,并且识别物体的类型。在这个图像中可能有多个对象,而且现在有各种先进的技术和框架来解决这个问…...
interview review
M: intrinsic matrix [ f x s c x 0 f y c y 0 0 1 ] \begin{bmatrix}f_x & s & c_x \\ 0 & f_y & c_y \\ 0 & 0 & 1\end{bmatrix} fx00sfy0cxcy1 ( c x , c y ) (c_x, c_y) (cx,cy): camera center in pixels ( f x , f y …...
layui表头多出一列(已解决)
问题描述 :layui表头多出来一列,但是表体没有内容,很影响美观。 好像是原本的表格有滚轮,我操作放大之后滚轮没有了,但是滚轮自带的表头样式还在, 之后手动把这个样式隐藏掉了,代码如下…...
LeetCode解法汇总307. 区域和检索 - 数组可修改
目录链接: 力扣编程题-解法汇总_分享记录-CSDN博客 GitHub同步刷题项目: https://github.com/September26/java-algorithms 原题链接:力扣(LeetCode)官网 - 全球极客挚爱的技术成长平台 描述: 给你一个数…...
Java源码分析:Guava之不可变集合ImmutableMap的源码分析
原创/朱季谦 一、案例场景 遇到过这样的场景,在定义一个static修饰的Map时,使用了大量的put()方法赋值,就类似这样—— public static final Map<String,String> dayMap new HashMap<>(); static {dayMap.put("Monday&q…...
详解自动化测试之 Selenium
目录 1. 什么是自动化 2.自动化测试的分类 3. selenium(web 自动化测试工具) 1)选择 selenium 的原因 2)环境部署 3)什么是驱动? 4. 一个简单的自动化例子 5.selenium 常用方法 5.1 查找页面元素&…...
vue监听对象属性值变化
一、官方文档 二、实现方法 方法一、直接根据watch来监听 export default {data() {return {object: {username: ,password: }}},watch: {object.username(newVal, oldVal) {console.log(newVal, oldVal)}} }方法二:利用watch和computed来实现监听 利用computed定…...
Unicode编码的emoji表情如何在前端页面展示(未完成)
Unicode编码的emoji表情如何在前端页面展示 一、首先几个定义解决办法 一、首先几个定义 U1F601 和 0x1F601 表示同一个 Unicode 代码点,即笑脸 Emoji 的代码点。它们之间的区别在于表示方式和数据类型。 1.U1F601 是一种常见的表示方式,也称为 “U” 标…...
基于SSM的设备配件管理和设备检修系统
末尾获取源码 开发语言:Java Java开发工具:JDK1.8 后端框架:SSM 前端:Vue 数据库:MySQL5.7和Navicat管理工具结合 服务器:Tomcat8.5 开发软件:IDEA / Eclipse 是否Maven项目:是 目录…...
鸿蒙开发|鸿蒙系统项目开发前的准备工作
文章目录 鸿蒙项目开发的基本流程介绍鸿蒙项目开发和其他项目有什么不同成为华为开发者-注册和实名认证1.登录官方网站 鸿蒙项目开发的基本流程介绍 直接上图,简单易懂! 整个项目的开发通过4个模块进行:开发准备、开发应用、运行调试测试和发…...
Evil靶场
Evil 1.主机发现 使用命令探测存活主机,80.139是kali的地址,所以靶机地址就是80.134 fping -gaq 192.168.80.0/242.端口扫描 开放80,22端口 nmap -Pn -sV -p- -A 192.168.80.1343.信息收集 访问web界面 路径扫描 gobuster dir -u http…...
第77题. 组合
原题链接:第77题. 组合 全代码: class Solution { private:vector<vector<int>> result; // 存放符合条件结果的集合vector<int> path; // 用来存放符合条件结果void backtracking(int n, int k, int startIndex) {if (path.size() …...
读书笔记:彼得·德鲁克《认识管理》第21章 企业与政府
一、章节内容概述 企业社会责任最重要的维度之一是政企关系。无论对于企业的顺利运作,还是对于政府的顺利运作,政企关系都至关重要。然而,重商主义典范和宪政主义典范这两种传统理论越来越不适应社会现实,越来越失效。虽然当前尚…...
C/C++疫情集中隔离 2021年12月电子学会青少年软件编程(C/C++)等级考试一级真题答案解析
目录 C/C疫情集中隔离 一、题目要求 1、编程实现 2、输入输出 二、算法分析 三、程序编写 四、程序说明 五、运行结果 六、考点分析 C/C疫情集中隔离 2021年12月 C/C编程等级考试一级编程题 一、题目要求 1、编程实现 A同学12月初从国外回来,按照防疫要…...
052-第三代软件开发-系统监测
第三代软件开发-系统监测 文章目录 第三代软件开发-系统监测项目介绍系统监测 关键字: Qt、 Qml、 cpu、 内存、memory 项目介绍 欢迎来到我们的 QML & C 项目!这个项目结合了 QML(Qt Meta-Object Language)和 C 的强大功…...
向量矩阵范数pytorch
向量矩阵范数pytorch 矩阵按照某个维度求和(dim就是shape数组的下标)1. torch1.1 Tensors一些常用函数 一些安装问题cd进不去不去目录PyTorch里面_表示重写内容 在默认情况下,PyTorch会累积梯度,我们需要清除之前的值 范数是向量或…...