当前位置: 首页 > news >正文

Flink分流,合流,状态,checkpoint和精准一次笔记

第8章
分流
1.使用侧输出流

2.合流
2.1 union :使用 ProcessFunction 处理合流后的数据
2.2 Connect :
两条流的格式可以不一样, map操作使用CoMapFunction,process 传入:CoProcessFunction
2.2 BroadcastConnectedStream
keyBy 进行了按键分区,那么要传入的就是 KeyedBroadcastProcessFunction;
如果没有按键分区,就传入 BroadcastProcessFunction

3.基于时间的合流——双流联结(Join)3.1 窗口联结(Window Join)stream1.join(stream2).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<JoinFunction>)3.2 间隔联结(Interval Join)所以匹配的条件为:a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBoundprocess函数传入:ProcessJoinFunction3.3 窗口同组联结(Window CoGroup)stream1.coGroup(stream2).where(<KeySelector>).equalTo(<KeySelector>).window(TumblingEventTimeWindows.of(Time.hours(1))).apply(<CoGroupFunction>)

第九章:状态编程

	1 状态的分类:托管状态(Managed State)和原始状态(Raw State)1.托管状态分为两类:算子状态(Operator State)和按键分区状态(Keyed State)1.1算子状态可以用在所有算子上,使用的时候其实就跟一个本地变量没什么区别——因为本地变量的作用域也是当前任务实例。在使用时,我们还需进一步实现 CheckpointedFunction 接口。ListState、UnionListState 和 BroadcastState1.2 按键分区状态(Keyed State):状态是根据输入流中定义的键(key)来维护和访问的,所以只能定义在按键分区流(KeyedStream)中,也就 keyBy 之后才可以使用支持的数据结构:值状态(ValueState)、列表状态(ListState)、映射状态(MapState)、归约状态(ReducingState)、聚合状态(AggregatingState)open中声明状态:getRuntimeContext.getMapState(new MapStateDescriptor[String,String]("my-map-state",classOf[String],classOf[String]))2.状态生存时间(TTL)StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10))//这就是设定的状态生存时间.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//创建状态和更改状态(写操作)时更新失效时间.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)//表示从不返回过期值.build();ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("mystate", String.class);stateDescriptor.enableTimeToLive(ttlConfig);3.状态持久化和状态后端1. 状态后端的分类:“哈希表状态后端”(HashMapStateBackend)、内嵌 RocksDB 状态后端”(EmbeddedRocksDBStateBackend)。

第十章:检查点(Checkpoint)

1.从检查点来恢复状态了。具体的步骤为:(1)重启应用,所有任务的状态会清空(2)读取检查点,重置状态。找到最近一次保存的检查点,从中读出每个算子任务状态的快照,分别填充到对应的状态中。(3)重放数据:保存检查点后开始重新读取数据,这可以通过 Source 任务向外部数据源重新提交偏移量(offset)来实现(4)继续处理数据2.检查点算法:Flink 使用了 Chandy-Lamport 算法的一种变体,被称为“异步分界线快照”(asynchronous barrier snapshotting)算法。算法的核心就是两个原则:当上游任务向多个并行
下游任务发送 barrier 时,需要广播出去;而当多个上游任务向同一个下游任务传递 barrier 时,
需要在下游任务执行“分界线对齐”(barrier alignment)操作,也就是需要等到所有并行分区
的 barrier 都到齐,才可以开始状态的保存。具体过程如下:(1)JobManager 发送指令,触发检查点的保存;Source 任务保存状态,插入分界线(2)状态快照保存完成,分界线向下游传递(3)向下游多个并行子任务广播分界线,执行分界线对齐(4)分界线对齐后,保存状态到持久化存储(5)先处理缓存数据,然后正常继续处理3 端到端精确一次(end-to-end exactly-once)
3.1 输入端保证外部数据源就必须拥有重放数据的能力3.2输出端保证幂等写入事务写入:预写日志(WAL)和两阶段提交(2PC)(1)预写日志(write-ahead-log,WAL):缺点:再次确认可能会导致数据写出成功,但是确认消息失败,导致的数据重复写入①先把结果数据作为日志(log)状态保存起来②进行检查点保存时,也会将这些结果数据一并做持久化存储③在收到检查点完成的通知时,将所有结果一次性写入外部系统。

(2)两阶段提交(two-phase-commit,2PC)

		具体的实现步骤为:①当第一条数据到来时,或者收到检查点的分界线时,Sink 任务都会启动一个事务。②接下来接收到的所有数据,都通过这个事务写入外部系统;这时由于事务没有提交,所以数据尽管写入了外部系统,但是不可用,是“预提交”的状态。③当 Sink 任务收到 JobManager 发来检查点完成的通知时,正式提交事务,写入的结果就真正可用了。当中间发生故障时,当前未提交的事务就会回滚,于是所有写入外部系统的数据也就实现了撤回2PC 对外部系统的要求外部系统必须提供事务支持,或者 Sink 任务必须能够模拟外部系统上的事务。⚫ 在检查点的间隔期间里,必须能够开启一个事务并接受数据写入。⚫ 在收到检查点完成的通知之前,事务必须是“等待提交”的状态。在故障恢复的情况下,这可能需要一些时间。如果这个时候外部系统关闭事务(例如超时了),那么未提交的数据就会丢失。⚫ Sink 任务必须能够在进程失败后恢复事务。⚫ 提交事务必须是幂等操作。也就是说,事务的重复提交应该是无效的。(3) kafka-flink-kafka 实现端到端 exactly-once 的具体过程可以分解如下 1.启动检查点保存:标志着我们进入了两阶段提交协议的“预提交”阶段2.算子任务对状态做快照保存到状态后端3.Sink 任务开启事务,进行预提交4.检查点保存完成,提交事务当所有算子的快照都完成,JobManager 会向所有任务发确认通知,告诉大家当前检查点已成功保存,当 Sink 任务收到确认通知后,就会正式提交之前的事务需要的配置:必须启用检查点、 FlinkKafkaProducer 的构造函数中传入参数 Semantic.EXACTLY_ONCE、Kafka 读取数据的消费者的隔离级别(read_committed)、事务超时配置

相关文章:

Flink分流,合流,状态,checkpoint和精准一次笔记

第8章 分流 1.使用侧输出流 2.合流 2.1 union &#xff1a;使用 ProcessFunction 处理合流后的数据 2.2 Connect &#xff1a; 两条流的格式可以不一样&#xff0c; map操作使用CoMapFunction&#xff0c;process 传入&#xff1a;CoProcessFunction 2.2 BroadcastConnectedSt…...

c# 实现sql查询DataTable数据集 对接SqlSugar ORM

有时候对于已经查询到的数据集&#xff0c;想要进行二次筛选或者查询&#xff0c;还得再查一遍数据库 或者其他的一些逻辑处理不太方便&#xff0c;就想着为什么不能直接使用sql来查询DataTable呢&#xff1f; 搜索全网没找到可用方案&#xff0c;所以自己实现了一个。 主要…...

记一次布尔盲注漏洞的挖掘与分析

在上篇文章记一次由于整型参数错误导致的任意文件上传的漏洞成因的分析过程中&#xff0c;发现menu_id貌似是存在注入的。 public function upload() {$menu_id $this->post(menu_id);if ($id) {$where "id {$id}";if ($menu_id) {$where . " and menu_id…...

C++11 新特性 ---- noexcept

1. 异常 异常通常用于处理逻辑上可能发生的错误 在C98中&#xff0c;提供了一套完善的异常处理机制&#xff0c;直接在程序中将各种类型的异常抛出&#xff0c;从而强制终止程序的运行。 1.1 基本语法 当函数抛出异常时&#xff0c;程序会停止执行&#xff0c;并显示异常信息…...

《Linux运维总结:Centos7.6之OpenSSH7.4p1升级版本至9.4p1》

Centos通过yum升级OpenSSH 在官方支持更新的CentOS版本&#xff0c;如果出现漏洞&#xff0c;都会通过更新版本来修复漏洞。这时候直接使用yum update就可以升级版本。 yum -y update openssh 但是&#xff0c;CentOS更新需要有一段时间&#xff0c;不能在漏洞刚出来的时候就有…...

七夕节日表白:七大网页风格与其适用人群

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…...

通达信指标公式16:使用BARSLAST函数写一个指标回测的思路

★★★★★博文原创不易&#xff0c;我的博文不需要打赏&#xff0c;也不需要知识付费&#xff0c;可以白嫖学习小技巧&#xff0c;喜欢的老铁可以多多帮忙点赞&#xff0c;小红牛在此表示感谢&#xff0c;就是对作者的最大支持。愿与诸君共勉&#xff0c;悟道于股市★★★★★…...

Jenkins自动化部署Vue项目

1、新建item&#xff0c;选择 Freestyle project 2、源码管理选择git&#xff0c;输入git仓库地址和授权账号&#xff0c;并指明要部署的分支 3、构建选择 Execute shell&#xff0c;输入vue项目打包命令 命令示例&#xff1a; source /etc/profile node -v npm config set re…...

Android JNI打印logcat日志

在 JNI 中打印日志可以使用 __android_log_print 函数来实现。该函数是 Android NDK 提供的一个用于在本地代码中输出日志消息到 logcat 的方法。 要在 JNI 中打印日志&#xff0c;请按照以下步骤进行操作&#xff1a; 在你的 JNI C/C 代码中包含 <android/log.h> 头文件…...

第28次CCF计算机软件能力认证(测试)

测试300分要是考试的时候也能这么发挥就好 第一题&#xff1a;现值计算 解题思路&#xff1a;直接模拟 n , m input().split() n int(n);m float(m) l list(map(int , input().split())) res 0 for i in range(0 , n 1):res pow(1 m , -i) * l[i] print(res) 第二题…...

九耶丨阁瑞钛伦特-Java高频面试题-请谈谈 ReadWriteLock 和 StampedLock

ReadWriteLock包括两种子锁 &#xff08;1&#xff09;ReadWriteLock ReadWriteLock 可以实现多个读锁同时进行&#xff0c;但是读与写和写于写互斥&#xff0c;只能有一个写锁线程在进行。 &#xff08;2&#xff09;StampedLock StampedLock是Jdk在1.8提供的一种读写锁&a…...

【Linux操作系统】深入探索Linux系统编程中的信号集操作函数

在Linux系统编程中&#xff0c;信号集操作函数是非常重要的工具&#xff0c;它们允许我们对信号进行管理和控制。本篇博客将详细介绍Linux系统编程中的信号集操作函数&#xff0c;包括信号集的创建、添加和删除信号&#xff0c;以及对信号集进行操作的常用函数。通过深入了解这…...

[C初阶笔记]P2

Git 1、Git是Linus为了帮助管理Linux内核开发 而开发的一个开放源码的分布式版本控制软件。 2、Git和TortoiseGit的作用。 Git中有各种命令行操作&#xff0c;来维护代码&#xff0c;可以将代码推送到代码托管平台。 TortoiseGit是将Git中各自命令行操作转化为图形化操作。 …...

C++并发编程学习01——hello concurrent world

经典用例 #include <iostream> #include <thread>void hello() {std::cout << "hello concurrent world" << std::endl; }int main() {std::thread t(hello);t.join(); }编译 g -g test.cpp -o out -lpthreadgdb调试 (gdb) r Starting pr…...

大数据扫盲(2): 数据分析BI与ETL的紧密关系——ETL是成功BI的先决条件

着业务的发展每个企业都将产生越来越多的数据&#xff0c;然后这些数据本身并不能直接带来洞察力并产生业务价值。为了释放数据的潜力&#xff0c;数据分析BI&#xff08;商业智能&#xff09;成为了现代企业不可或缺的一部分。然而&#xff0c;在数据分析的背后&#xff0c;有…...

Java web 中的 jsp

JSP是什么 JSP是动态网页编程技术 JSP的四大作用域 1.page 表示在当前页面有效 2.request 表现在一次请求中有效 3.session 表示在一次会话中有效 4.application 表示在整个应用程序中有效 jsp内置对象是什么 在jsp开发中会频繁使用到一些对象,如果每次我们在jsp页面中需要…...

uniapp 数组操作

字符串转数组 let string "12345,56789" string.split(,) // [12345,56789] 数组转字符串 let array ["123","456"] array.join(",") // "123,456" 数组元素删除 let array [123,456] // 删除起始下标为1&#xff0…...

数据结构算法--4堆排序

堆排序过程: >建立堆(大根堆) >得到堆顶元素&#xff0c;为最大元素 >去掉堆顶&#xff0c;将堆最后一个元素放到堆顶&#xff0c;此时可通过一次调整使堆重新有序 >堆顶元素为第二大元素 >重复步骤3&#xff0c;直到堆变空 此时是建立堆后的大根堆模型 将…...

C++学习系列之DLL动态库使用

C学习系列之DLL动态库使用 啰嗦动态库的创建动态库的调用函数生成1.需要头文件函数定义&#xff08;头文件&#xff09;2.需要函数定义&#xff08;函数文件&#xff09;3.动态库中的头文件4.动态库中的主文件5.运行查看是否存在C#的调用的入口点6.C#调用 总结 啰嗦 项目需要&…...

Java实现钉钉企业内部应用机器和自定义机器人发送消息

前言 公司让写一个服务监控的功能,当监测到服务停止时,向钉钉群里推送报警信息。之前大概看到钉钉的开放平台的API文档,好像能群发消息的只有机器人。 钉钉开放平台目前提供三种机器人: 企业内部应用机器人 群模板机器人 自定义机器人 本来向用自己比较熟悉的自定义机器人…...

基于QT4的GPX文件编辑器开发

GPX文件是记录地理点的文件,本质是一种xml文件。GPX文件目前没有很好的编辑器,因此作者决定开发一款无需安装的绿色编辑器。 在QT4开发中,XML可以用DOM来实现,但其逻辑并不是很清晰。使用模型视图反而会更加可读。因此在开发中,使用model-view模式来实现数据读写。 1 需…...

树结构使用实例---实现数组和树结构的转换

文章目录 一、为什么要用树结构&#xff1f;二、使用步骤 1.引入相关json2.树结构的转换总结 一、为什么要用树结构&#xff1f; 本文将讲述一个实例&#xff0c;构造一棵树来实现数组和tree的转换&#xff0c;这在前端树结构中是经常遇到的 后端返回树结构方便管理&#xff…...

论文阅读_条件控制_ControlNet

name_en: Adding Conditional Control to Text-to-Image Diffusion Models name_ch: 向文本到图像的扩散模型添加条件控制 paper_addr: http://arxiv.org/abs/2302.05543 date_read: 2023-08-17 date_publish: 2023-02-10 tags: [‘图形图像’,‘大模型’,‘多模态’] author: …...

全链路数据湖开发治理解决方案2.0重磅升级,全面增强数据入湖、调度和治理能力

简介&#xff1a; 阿里云全链路数据湖开发治理解决方案能力持续升级&#xff0c;发布2.0版本。解决方案包含开源大数据平台E-MapReduce(EMR) &#xff0c; 一站式大数据数据开发治理平台DataWorks &#xff0c;数据湖构建DLF&#xff0c;对象存储OSS等核心产品。支持EMR新版数据…...

【算法题】2769. 找出最大的可达成数字

题目&#xff1a; 给你两个整数 num 和 t 。 如果整数 x 可以在执行下述操作不超过 t 次的情况下变为与 num 相等&#xff0c;则称其为 可达成数字 &#xff1a; 每次操作将 x 的值增加或减少 1 &#xff0c;同时可以选择将 num 的值增加或减少 1 。 返回所有可达成数字中的…...

023:vue中解决el-date-picker更改样式不生效问题

第023个 查看专栏目录: VUE ------ element UI 本文章目录 修改后的效果示例源代码&#xff08;共52行&#xff09;核心内容步骤&#xff1a;&#xff08;1&#xff09;更改样式&#xff08;2&#xff09;添加参数 专栏目标 在vue项目开发中&#xff0c;我们打算保持颜色的一致…...

爬虫借助代理会让网速快点吗?

亲爱的程序员朋友们&#xff0c;你曾经遇到过爬虫网速慢的情况吗&#xff1f;别着急&#xff01;今天我将和你一起探讨一下使用代理是否可以加速爬虫&#xff0c;让我们一起进入这个轻松又专业的知识分享。 一、原因和机制的解析 1.IP限制 某些网站为了保护资源和防止爬虫行…...

探索智能文字识别:技术、应用与发展前景

探索智能文字识别&#xff1a;技术、应用与发展前景 前言一张图全览大赛作品解读随心记你不对我对小结 智能文字识别体系化解读图像预处理文字定位和分割文字区域识别图像校正字体识别和匹配结果后处理小结 如何应对复杂场景下挑战复杂场景应对方法小结 人才时代对人才要求合合…...

STL——list用法

一、list介绍 1、list是可以在常数范围内在任意位置进行插入和删除的序列式容器&#xff0c;并且该容器可以前后双向迭代。 2、list就是一个带头双向循环链表&#xff0c;list通常在任意位置进行插入、移除元素的执行效率更好。 3、list最大的缺陷是不支持任意位置的随机访问…...

Linux的基础指令

目录 1、ls指令 .和..意义 2、pwd指令 3、cd指令 ①cd ~ ②cd - 关于cd ..的用法 绝对路径和相对路径 4、touch指令 5、mkdir指令 tree指令 6、rmdir指令 7、rm指令 * 8、man指令 9、cp指令 nano&#xff1a; 10、mv指令 11、cat指令 12、more指令 13、less…...

河北住房和城乡建设网站/谷歌浏览器手机版免费官方下载

启动项目java -jar /目录 &例如 nohup java -jar /data/wwwroot/springboot.kingsuper.net/demo.jar & 重新启动项目&#xff0c;要先将之前的端口杀死查询端口netstat -tunlp 杀死端口 kill -9 端口例如 kill -9 18607转载于:https://www.cnblogs.com/SeaWxx/p/109377…...

网站购物车作用/手机网站建设平台

水仙花数&#xff08;Narcissistic number&#xff09;也被称为超完全数字不变数&#xff08;pluperfect digital invariant, PPDI&#xff09;、自恋数、自幂数、阿姆斯壮数或阿姆斯特朗数&#xff08;Armstrong number&#xff09;&#xff0c;水仙花数是指一个 n 位数&#…...

海口模板建站平台/网络营销策划

ExecuteNonQuery()方法主要用户更新数据&#xff0c;通常它使用Update,Insert,Delete语句来操作数据库.因为名中有“Non Query”,就是执行非查询类的语句,比如update、delete、insert等数据更新操作。 【返回值】 对于 Update,Insert,Delete 语句 执行成功是返回值为该命令所影…...

浙江市建设网站/优化网站标题名词解释

先创建一个分区&#xff0c;具体步骤参考这个链接然后挂载&#xff1a; 提示错误的话&#xff0c;试试 mount -t ext4 /dev/sdb4 /mnt/sdb4/...

我的网站模板/软文接单平台

首先来看一个完整的sql语句组成部分&#xff1a; SELECT DISTINCT <column> FROM <left_table> <join_type> JOIN <right_table> ON <join_condition> WHERE <where_condition> GROUP BY <group_by_list> HAVING &l…...

怎样建设传奇网站空间/大连网站搜索排名

/*游戏或者运动才能让我短暂的忘记心痛&#xff0c;现如今感觉学习比游戏和运动还重要——曾少锋*/ 在Git-Bash中配置自己的名字和Email&#xff1a; git config --global user.name "your name"git config --global user.email "emailexample.com" 仓库&a…...