Flink状态的理解
Flink是一个带状态的数据处理系统;系统在处理数据的过程中,各算子所记录的状态会随着数据的处理而不断变化;
1. 状态
所谓状态State,一般指一个具体的 Task 的状态,即线程处理过程中需要保存的历史数据或历史累计数据,默认保存在 Java 的堆内存中。

根据算子是否存在按照Key进行分区,State可以划分为keyed state 和 Non-keyed state(Operator State、算子状态)
- operator state是task级别的state,说白了就是每个task对应一个state, 在逻辑上,由算子task下所有subtask共享

Operator State的经常被用在Source或Sink算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证Flink应用的Exactly-Once语义。 - keyed state 是基于KeyedStream上的状态,这个状态是跟特定的Key 绑定的。KeyedStream流上的每一个Key,都对应一个State

2. 状态数据结构
状态数据由Flink内置状态机制管理。keyed state提供了5种数据结构
2.1 keyed state 数据结构
| 状态 | 状态描述 |
|---|---|
| ValueState | 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的key |
| ListState | 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索 |
| MapState | 维护了一个映射列表 |
| ReducingState | 保存一个单值,表示添加到状态的所有值的聚合 |
| AggregateState | 保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与添加到状态的元素的类型不同 |
2.2 operator state
| 状态 | 状态描述 |
|---|---|
| ListState | ListState的快照存储数据,系统重启后,list数据的重分配模式为: round-robin 轮询平均分配 |
| UnionListState | UnionListState的快照存储数据,在系统重启后,list数据的重分配模式为: 广播模式; 在每个subtask上都拥有一份完整的数据; |
3. 状态后端
默认情况下,state会保存在taskmanager的JVM堆内存,checkpoint会存储在JobManager的内存中。然而,状态数据的存储和checkpoint的存储位置可以改变,由state Backend(状态后端)配置实现
老版本(flink-1.12版及以前) Fsstatebackend MemoryStatebackend RocksdbStateBackend
flink1.12版本之后,可用的状态后端类型有两种
HashMapStateBackend、EmbeddedRocksDBStateBackend
而且其所生成的快照文件也统一了格式,因而在job重新部署或者版本升级时,可以任意替换statebackend
-
HashMapStateBackend
※ 状态数据是以java对象形式存储在heap内存中;
※ 内存空间不够时,也会溢出部分数据到本地磁盘文件;
※ 可以支撑大规模的状态数据;(只不过在状态数据规模超出内存空间时,读写效率就会明显降低) -
EmbeddedRocksDBStateBackend
※ RocksDB使用一套日志结构的数据库引擎,它是Flink中内置的第三方状态管理器, 为了更好的性能,这套引擎是用C++编写的。 Key和value是任意大小的字节流。
※ 它需要配置一个远端的filesystem uri(一般是HDFS),在做checkpoint的时候,会把本地的数据直接复制到fileSystem中。fail over的时候从fileSystem中恢复到本地, RocksDB克服了state受内存限制的缺点,同时又能够持久化到远端文件系统中,比较适合在生产中使用。
※ 使用RocksDB + HDFS进行state存储:首先
state先在taskManger的本地存储到RocksDB,然后异步写入到HDFS中,状态数量仅仅受限于本地磁盘容量限制。

4. 状态数据容错
Flink是一个stateful(带状态)的数据处理系统;系统在处理数据的过程中,各算子所记录的状态会随着数据的处理而不断变化;
一旦系统崩溃,需要重启后能够恢复出崩溃前的状态才能进行数据的接续处理;因此,必须要一种机制能对系统内的各种状态进行持久化容错;Flink用checkpoint机制实现状态数据的容错
4.1 checkpoint
Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator/task的状态来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常。
-
checkpoint默认关闭,需要手工开启。开启后,默认Exactly-once快照模式。还有一种快照模式为At-least-once
-
checkPoint的位置设置
flink-conf.yaml#state.checkpoints.dir -
Flink支持不同的重启策略,以在故障发生时控制作业如何重启,集群在启动时会伴随一个默认的重启策略。配置参数
flink-conf.yaml#restart-strategy
♦ 如果没有启用 checkpointing,则使用无重启 (no restart) 策略 ♦ 如果启用了 checkpointing,但没有配置重启策略,则使用固定 间隔 (fixed-delay) 策略,尝试重启次数默认值是:Integer.MAX_VALUE。
♦ 另一种重启策略为Failure rate,某时间段内失败了N次就重启
全局配置
# 每隔3s重启一次,重试间隔为10s
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
#5分钟内若失败了3次则认为该job失败,重试间隔为10s
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
# 不重启
restart-strategy: none
单个JOB内配置
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
//开启状态检查点机制(它将会定期对整个系统中各个task的状态进行快照持久化,以便失败重启后还能从失败之前的状态恢复)env.enableCheckpointing(1000,CheckpointingMode.EXACTLY_ONCE);
// checkpoint机制触发后,持久化保存各task状态数据的存储位置(生产中用hdfs
env.getCheckpointConfig().setCheckpointStorage("hdfs://node01:8020/tmp/flink/state");
// 指定状态后端存储(内存)
env.setStateBackend(new HashMapStateBackend());
// 开启自动重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.milliseconds(2000)));
// env.setRestartStrategy(RestartStrategies.failureRateRestart(3, org.apache.flink.api.common.time.Time.seconds(100), org.apache.flink.api.common.time.Time.seconds(10)));
// env.setRestartStrategy(RestartStrategies.noRestart())
- checkpoint个数默认保留最近成功生成的一个,支持保留多个,通过参数
flink-conf.yaml#state.checkpoints.num-retained控制,如果希望回退到某个Checkpoint点,只需要指定对应的某个Checkpoint路径即可实现
flink run -t yarn-per-job -yjm 1024 -ytm 1024 -s hdfs://node01:8020/tmp/flink/state/715b120fe8736a3af7842ea0a5264c46/chk-6/_metadata
4.2 savePoint
savePoint是检查点一种特殊实现,底层其实也是使用Checkpoint的机制。
savePoint是用户以手工命令的方式触发checkpoint,并将结果持久化到指定的存储目录中。
作业升级、代码修改、任务迁移和维护,都可以使用savePoint
-
savePoint的存储位置
savePoint的存储位置flink-conf.yaml#state.savepoints.dir,不是必须设置,但设置了后, 后面创建指定Job的Savepoint时,可以不用在手动执行命令时指定Savepoint的位置。 -
savePoint的手动触发:
#【针对on standAlone模式】
bin/flink savepoint jobId [targetDirectory]#【针对on yarn模式需要指定-yid参数】
bin/flink savepoint jobId [targetDirectory] -yid yarnAppId
jobId 需要触发savepoint的jobId编号
targetDirectory 指定savepoint存储数据目录
-yid 指定yarnAppId
例如: flink savepoint 84e766231bbe4b9ff3667f9a0d80b867 -yid application_1619059559839_0001
- 查看HDFS上savepoint目录
#Savepoint directory /flink/savepoints/savepoint-:shortjobid-:savepointid/
#Savepoint file contains the checkpoint meta data /savepoints/savepoint-:shortjobid-:savepointid/_metadata

4. 触发savepoint并且停止作业
##语法: bin/flink stop jobId -yid yarnAppId
##例如: flink stop 84e766231bbe4b9ff3667f9a0d80b867 -yid application_1619059559839_0001
- 从指定的savepoint启动job
##语法: bin/flink run -s savepointPath [runArgs]
##例如: flink run -t yarn-per-job -yjm 1024 -ytm 1024
-s hdfs://node01:8020/flink/savepoints/savepoint-84e766-0591f3377ad0
-c com.loess.checkpoint.TestCheckPoint flink-study-1.0-SNAPSHOT.jar
- 清除savepoint数据
bin/flink savepoint -d savepointPath
##也可以手动删除某个savepoint,这通过常规的文件系统操作就可以做到,不影响其它的savepoints和checkpoints
- savePoint使用建议
为了能够在作业的不同版本之间以及 Flink 的不同版本之间顺利升级,推荐通过 uid(String) 方法手动的给算子赋予 ID,这些
ID 将用于确定每一个算子的状态范围。不手动给各算子指定 ID,则会由 Flink 自动给每个算子生成一个 ID。只要这些 ID 没有改变就能从保存点
(savepoint)将程序恢复回来。而这些自动生成的 ID
依赖于程序的结构,并且对代码的更改敏感。当程序改变时,ID会随之变化,所以建议用户手动设置 ID
DataStream<String> stream = env.// Stateful source (e.g. Kafka) with ID.addSource(new StatefulSource()).uid("source-id") // ID for the source operator.shuffle()// Stateful mapper with ID.map(new StatefulMapper()).uid("mapper-id") // ID for the mapper// Stateless printing sink.print(); // Auto-generated ID
4.3 savePoint与checkPoint的区别
- checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从checkpoint来恢复运行,且不影响作业逻辑的准确性。而savepoint的侧重点是“维护”,即Flink作业需要在人工干预下手动重启、升级、迁移或A/B测试时,先将状态整体写入可靠存储,维护完毕之后再从savepoint恢复现场。
- savepoint是通过checkpoint机制创建的,所以savepoint本质上是特殊的checkpoint。
- checkpoint面向Flink Runtime本身,由Flink的各个TaskManager定时触发快照并自动清理,一般不需要用户干预;savepoint面向用户,完全根据用户的需要触发与清理。
- checkpoint是支持增量的(通过RocksDB),特别是对于超大状态的作业而言可以降低写入成本。savepoint并不会连续自动触发,所以savepoint没有必要支持增量。

5. checkpoint机制原理
checkPoint是所有 Operator / Task 的状态在某个时间点的一份拷贝(一份快照), 这个时间点应该是所有 Operator / Task 任务都恰好处理完一个相同的输入数据的时候。

若某个subTask挂了,则此时的状态都被清空,从checkpoint恢复最近一次的状态,重新启动应用程序,计算输入流
5.1 Barrier机制
Barrier是一种特殊事件,用来作为快照信号,由checkpoint 协调器向数据流中注入该信号,subtask任务收到该信号后,就会执行状态的快照。

- 首先是JobManager中的checkpoint Coordinator(协调器) 向任务中的所有source Task周期性发送barrier(栅栏)进行快照请求。
- source Task接受到barrier后, 会把当前自己的状态进行snapshot(可以保存在HDFS上)。
- source向checkpoint coordinator确认snapshot已经完成。
- source继续向下游transformation operator发送 barrier。
- transformation operator重复source的操作,直到sink operator向协调器确认snapshot完成。
- coordinator确认完成
本周期的snapshot已经完成。
5.2 Barrier对齐
对于下游算子来说,可能有多个与之相连的上游输入,我们将算子之间的边 称为通道。Source要将一个ID为n的Checkpoint Barrier向所有下游算子广播,这也意味着下游算子的多个输入里都有同一个Checkpoint Barrier,而且 不同输入里Checkpoint Barrier的流入进度可能不同。因此Checkpoint Barrier传播的过程需要进行对齐(Barrier Alignment)

算子对齐分为四部:
(1). 算子子任务在某个输入通道中收到第一个ID为n的Checkpoint Barrier,但是其他输入通道中ID为n的Checkpoint Barrier还未到达,该算子子任务开始准备进行对齐。
(2). 算子子任务将第一个输入通道的数据缓存下来,同时继续处理其他输入通道的数据,这个过程被称为对齐。
(3). 第二个输入通道的Checkpoint Barrier抵达该算子子任务,该算子子任务执行快照,将状态写入State Backend,然后将ID为n的Checkpoint Barrier向下游所有输出通道广播。
(4). 对于这个算子子任务,快照执行结束,继续处理各个通道中新流入数据,包括刚才缓存起来的数据。

6.快照性能优化方案
每次进行Checkpoint前,都需要暂停处理新流入数据,然后开始执行快照,假如状态比较大,一次快照可能长达几秒甚至几分钟。
Checkpoint Barrier对齐时,必须等待所有上游通道都处理完,假如某个上游通道处理很慢,这可能造成整个数据流堵塞。
两种优化方案
① Flink提供了异步快照(Asynchronous Snapshot)的机制。当实际执行快照时,Flink可以立即向下广播Checkpoint Barrier,表示自己已经执行完自己部分的快照。一旦数据同步完成,再给Checkpoint Coordinator发送确认信息。通过基于 Chandy-Lamport 算法的分布式快照,将检查点的保存和数据处理分离开,不暂停整个应用。
② Flink允许跳过对齐这一步,或者说一个算子子任务不需要等待所有上游通道的Checkpoint Barrier,直接将Checkpoint Barrier广播,执行快照并继续处理后续流入数据。为了保证数据一致性,Flink必须将那些较慢的数据流中的元素也一起快照,一旦重启,这些元素会被重新处理一遍
相关文章:
Flink状态的理解
Flink是一个带状态的数据处理系统;系统在处理数据的过程中,各算子所记录的状态会随着数据的处理而不断变化; 1. 状态 所谓状态State,一般指一个具体的 Task 的状态,即线程处理过程中需要保存的历史数据或历史累计数据…...
6.3.tensorRT高级(1)-yolov5模型导出、编译到推理(无封装)
目录 前言1. YOLOv5导出2. YOLOv5推理3. 补充知识总结 前言 杜老师推出的 tensorRT从零起步高性能部署 课程,之前有看过一遍,但是没有做笔记,很多东西也忘了。这次重新撸一遍,顺便记记笔记。 本次课程学习 tensorRT 高级-yolov5模…...
如何利用设备数字化平台推动精益制造?
人工智能驱动技术的不断发展,尤其是基于机器学习的预测分析工具的使用,为制造业带来了全新的效率和价值水平。一直以来,精益生产(也叫精益制造)在制造业中扮演着重要角色,而现在通过与工业 4.0的融合&#…...
使用Wps减小PDF文件的大小
第一步、打开左上角的文件 第二步、点击打印选项 第三步、点击打印按钮...
【深度学习】GPT-3
2020年5月,OpenAI在长达72页的论文《https://arxiv.org/pdf/2005.14165Language Models are Few-Shot Learners》中发布了GPT-3,共有1750亿参数量,需要700G的硬盘存储,(GPT-2有15亿个参数),它比GPT-2有了极大的改进。根…...
在登录界面中设置登录框、多选项和按钮(HTML和CSS)
登录框(Input框)的样式: /* 设置输入框的宽度和高度 */ input[type"text"], input[type"password"] {width: 200px;height: 30px; }/* 设置输入框的边框样式、颜色和圆角 */ input[type"text"], input[type&q…...
【语音识别】- 声学,词汇和语言模型
一、说明 语音识别是指计算机通过处理人类语言的音频信号,将其转换为可理解的文本形式的技术。也就是说,它可以将人类的口语语音转换为文本,以便计算机能够进一步处理和理解。它是自然语言处理技术的一部分,被广泛应用于语音识别助…...
【考研英语语法及长难句】小结
【 考场攻略汇总 】 考点汇总 考场攻略 #1 断开长难句只看谓语动词,不考虑非谓语动词先找从句,先看主句 考场攻略 #2 抓住谓语动词,抓住句子最核心的表述动作或内容通过定位谓语动词,找到复杂多变的主语通过谓语动词的数量&…...
C# 反射
反射的概念:C#通过类型(Type)来创建对象,调用对象中的方法,属性等信息;B超就是利用了反射原理将超声波打在人的肚子上,然后通过反射波进行体内器官的成员; 反射提供的类:…...
Pytorch(二)
一、分类任务 构建分类网络模型 必须继承nn.Module且在其构造函数中需调用nn.Module的构造函数无需写反向传播函数,nn.Module能够利用autograd自动实现反向传播Module中的可学习参数可以通过named_parameters()返回迭代器 from torch import nn import torch.nn.f…...
Python 使用http时间同步设置系统时间源码
Python方式实现使用http时间同步设置系统时间源码,系统环境是ubuntu 12.04、Python2.7版本。需要使用到time、os及httplib方法。 Python使用http时间同步设置系统时间,源码如下: #-*-coding:utf8 -*- import httplib as client import time…...
golang sync.singleflight 解决热点缓存穿透问题
在 go 的 sync 包中,有一个 singleflight 包,里面有一个 singleflight.go 文件,代码加注释,一共 200 行出头。内容包括以下几块儿: Group 结构体管理一组相关的函数调用工作,它包含一个互斥锁和一个 map,map 的 key 是…...
4、Linux驱动开发:设备-设备号设备号注册
目录 🍅点击这里查看所有博文 随着自己工作的进行,接触到的技术栈也越来越多。给我一个很直观的感受就是,某一项技术/经验在刚开始接触的时候都记得很清楚。往往过了几个月都会忘记的差不多了,只有经常会用到的东西才有可能真正记…...
C++(MFC)调用Python
环境: phyton版本:3.10 VS版本:VS2017 包含文件头:Python\Python310\include 包含库文件:Python\Python310\libs 程序运行期间,以下函数只需要调用一次即可,重复调用会导致崩溃 void Initial…...
深度学习实践——循环神经网络实践
系列实验 深度学习实践——卷积神经网络实践:裂缝识别 深度学习实践——循环神经网络实践 深度学习实践——模型部署优化实践 深度学习实践——模型推理优化练习 代码可见于: 深度学习实践——循环神经网络实践 0 概况1 架构实现1.1 RNN架构1.1.1 RNN架…...
docker简单web管理docker.io/uifd/ui-for-docker
要先pull这个镜像docker.io/uifd/ui-for-docker 这个软件默认只能使用9000端口,别的不行,因为作者在镜像制作时已加入这一层 刚下下来镜像可以通过docker history docker.io/uifd/ui-for-docker 查看到这个端口已被 设置 如果在没有设置br0网关时&…...
SpringBoot内嵌的Tomcat:
SpringBoot内嵌Tomcat源码: 1、调用启动类SpringbootdemoApplication中的SpringApplication.run()方法。 SpringBootApplication public class SpringbootdemoApplication {public static void main(String[] args) {SpringApplication.run(SpringbootdemoApplicat…...
企业级docker应用注意事项
现在很多企业使用容器化技术部署应用,绕不开的docker技术,在生产环境docker常用操作总结。参考:https://juejin.cn/post/7259275893796651069 1. 尽可能使用官方镜像 在docker hub 官方 使用后面带有 DOCKER OFFICIAL IMAGE 标签的镜像&…...
腾讯云高性能计算集群CPU服务器处理器说明
腾讯云高性能计算集群以裸金属云服务器为节点,通过RDMA互联,提供了高带宽和极低延迟的网络服务,能满足大规模高性能计算、人工智能、大数据推荐等应用的并行计算需求,腾讯云服务器网分享腾讯云服务器高性能计算集群CPU处理器说明&…...
tinkerCAD案例:23.Tinkercad 中的自定义字体
tinkerCAD案例:23.Tinkercad 中的自定义字体 原文 Tinkercad Projects Tinkercad has a fun shape in the Shape Generators section that allows you to upload your own font in SVG format and use it in your designs. I’ve used it for a variety of desi…...
调用支付宝接口响应40004 SYSTEM_ERROR问题排查
在对接支付宝API的时候,遇到了一些问题,记录一下排查过程。 Body:{"datadigital_fincloud_generalsaas_face_certify_initialize_response":{"msg":"Business Failed","code":"40004","sub_msg…...
【快手拥抱开源】通过快手团队开源的 KwaiCoder-AutoThink-preview 解锁大语言模型的潜力
引言: 在人工智能快速发展的浪潮中,快手Kwaipilot团队推出的 KwaiCoder-AutoThink-preview 具有里程碑意义——这是首个公开的AutoThink大语言模型(LLM)。该模型代表着该领域的重大突破,通过独特方式融合思考与非思考…...
AspectJ 在 Android 中的完整使用指南
一、环境配置(Gradle 7.0 适配) 1. 项目级 build.gradle // 注意:沪江插件已停更,推荐官方兼容方案 buildscript {dependencies {classpath org.aspectj:aspectjtools:1.9.9.1 // AspectJ 工具} } 2. 模块级 build.gradle plu…...
如何在网页里填写 PDF 表格?
有时候,你可能希望用户能在你的网站上填写 PDF 表单。然而,这件事并不简单,因为 PDF 并不是一种原生的网页格式。虽然浏览器可以显示 PDF 文件,但原生并不支持编辑或填写它们。更糟的是,如果你想收集表单数据ÿ…...
MySQL 知识小结(一)
一、my.cnf配置详解 我们知道安装MySQL有两种方式来安装咱们的MySQL数据库,分别是二进制安装编译数据库或者使用三方yum来进行安装,第三方yum的安装相对于二进制压缩包的安装更快捷,但是文件存放起来数据比较冗余,用二进制能够更好管理咱们M…...
Selenium常用函数介绍
目录 一,元素定位 1.1 cssSeector 1.2 xpath 二,操作测试对象 三,窗口 3.1 案例 3.2 窗口切换 3.3 窗口大小 3.4 屏幕截图 3.5 关闭窗口 四,弹窗 五,等待 六,导航 七,文件上传 …...
[大语言模型]在个人电脑上部署ollama 并进行管理,最后配置AI程序开发助手.
ollama官网: 下载 https://ollama.com/ 安装 查看可以使用的模型 https://ollama.com/search 例如 https://ollama.com/library/deepseek-r1/tags # deepseek-r1:7bollama pull deepseek-r1:7b改token数量为409622 16384 ollama命令说明 ollama serve #:…...
ubuntu22.04 安装docker 和docker-compose
首先你要确保没有docker环境或者使用命令删掉docker sudo apt-get remove docker docker-engine docker.io containerd runc安装docker 更新软件环境 sudo apt update sudo apt upgrade下载docker依赖和GPG 密钥 # 依赖 apt-get install ca-certificates curl gnupg lsb-rel…...
【HarmonyOS 5】鸿蒙中Stage模型与FA模型详解
一、前言 在HarmonyOS 5的应用开发模型中,featureAbility是旧版FA模型(Feature Ability)的用法,Stage模型已采用全新的应用架构,推荐使用组件化的上下文获取方式,而非依赖featureAbility。 FA大概是API7之…...
内窥镜检查中基于提示的息肉分割|文献速递-深度学习医疗AI最新文献
Title 题目 Prompt-based polyp segmentation during endoscopy 内窥镜检查中基于提示的息肉分割 01 文献速递介绍 以下是对这段英文内容的中文翻译: ### 胃肠道癌症的发病率呈上升趋势,且有年轻化倾向(Bray等人,2018&#x…...
