当前位置: 首页 > news >正文

Flink State 状态后端分析

flink状态实现分析

state

 *             State*               |*               +-------------------InternalKvState*               |                         |*          MergingState                   |*               |                         |*               +-----------------InternalMergingState*               |                         |*      +--------+------+                  |*      |               |                  |* ReducingState    ListState        +-----+-----------------+*      |               |            |                       |*      +-----------+   +-----------   -----------------InternalListState*                  |                |*                  +---------InternalReducingState

MemoryState

AbstractHeapState
HeapMapState
InternalMapState
InternalKvState
State
AbstractHeapMergingState
HeapListState
InternalListState
AbstractHeapAppendingState
InternalMergingState
InternalAppendingState
HeapValueState
InternalValueState

RocksDBState

State
InternalKvState
AbstractRocksDBState
RocksDBMapState
RocksDBListState
RocksDBValueState
RocksDBReducingState
RocksDBAggregatingState
class RocksDBMapState<K, N, UK, UV> extends AbstractRocksDBState<K, N, Map<UK, UV>> {private TypeSerializer<UK> userKeySerializer;private TypeSerializer<UV> userValueSerializer;private RocksDBMapState(ColumnFamilyHandle columnFamily,TypeSerializer<N> namespaceSerializer,TypeSerializer<Map<UK, UV>> valueSerializer,Map<UK, UV> defaultValue,RocksDBKeyedStateBackend<K> backend);public TypeSerializer<K> getKeySerializer();public TypeSerializer<N> getNamespaceSerializer();public TypeSerializer<Map<UK, UV>> getValueSerializer();public UV get(UK userKey){ //直接读rocksdbbyte[] rawKeyBytes =serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);return (rawValueBytes == null? null: deserializeUserValue(dataInputView, rawValueBytes, userValueSerializer));}public void put(UK userKey, UV userValue){ //直接写rocksdbbyte[] rawKeyBytes =serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);byte[] rawValueBytes = serializeValueNullSensitive(userValue, userValueSerializer);backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes); //backend.db是RocksDBKeyedStateBackend}public void putAll(Map<UK, UV> map);public void remove(UK userKey);public boolean contains(UK userKey);public Iterable<Map.Entry<UK, UV>> entries();public Iterable<UK> keys();public Iterable<UV> values();public boolean isEmpty();public void clear();static <UK, UV, K, N, SV, S extends State, IS extends S> IS create(StateDescriptor<S, SV> stateDesc,Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>>registerResult,RocksDBKeyedStateBackend<K> backend) { //backend在这里传入return (IS)new RocksDBMapState<>(registerResult.f0,registerResult.f1.getNamespaceSerializer(),(TypeSerializer<Map<UK, UV>>) registerResult.f1.getStateSerializer(),(Map<UK, UV>) stateDesc.getDefaultValue(),backend);}
}

backend与checkpoint

AbstractKeyedStateBackend
RocksDBKeyedStateBackend
CheckpointableKeyedStateBackend
KeyedStateBackend
Snapshotable
HeapKeyedStateBackend
OperatorStateBackend
DefaultOperatorStateBackend
OperatorStateStore
public interface Snapshotable<S extends StateObject> {RunnableFuture<S> snapshot(long checkpointId,long timestamp,@Nonnull CheckpointStreamFactory streamFactory,@Nonnull CheckpointOptions checkpointOptions)throws Exception;
}

FSBackend

  • FsStateBackend中createKeyedStateBackend是创建了HeapKeyedStateBackend
  • FsStateBackend中createOperatorStateBackend是创建了DefaultOperatorStateBackend
  • DefaultOperatorStateBackend创建了PartitionableListState, 是State的子类
AbstractFileStateBackend
FsStateBackend
AbstractStateBackend
CheckpointStorage
StateBackend
ConfigurableStateBackend
public interface StateBackend extends java.io.Serializable {default String getName() {return this.getClass().getSimpleName();}<K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(Environment env,JobID jobID,String operatorIdentifier,TypeSerializer<K> keySerializer,int numberOfKeyGroups,KeyGroupRange keyGroupRange,TaskKvStateRegistry kvStateRegistry,TtlTimeProvider ttlTimeProvider,MetricGroup metricGroup,@Nonnull Collection<KeyedStateHandle> stateHandles,CloseableRegistry cancelStreamRegistry)throws Exception;OperatorStateBackend createOperatorStateBackend(Environment env,String operatorIdentifier,@Nonnull Collection<OperatorStateHandle> stateHandles,CloseableRegistry cancelStreamRegistry)throws Exception;/** Whether the state backend uses Flink's managed memory. */default boolean useManagedMemory() {return false;}}
public class FsStateBackend extends AbstractFileStateBackend implements ConfigurableStateBackend {public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException {checkNotNull(jobId, "jobId");return new FsCheckpointStorageAccess(getCheckpointPath(),getSavepointPath(),jobId,getMinFileSizeThreshold(),getWriteBufferSize());}public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env,JobID jobID,String operatorIdentifier,TypeSerializer<K> keySerializer,int numberOfKeyGroups,KeyGroupRange keyGroupRange,TaskKvStateRegistry kvStateRegistry,TtlTimeProvider ttlTimeProvider,MetricGroup metricGroup,@Nonnull Collection<KeyedStateHandle> stateHandles,CloseableRegistry cancelStreamRegistry)throws BackendBuildingException {TaskStateManager taskStateManager = env.getTaskStateManager();LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig();HeapPriorityQueueSetFactory priorityQueueSetFactory =new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);LatencyTrackingStateConfig latencyTrackingStateConfig =latencyTrackingConfigBuilder.setMetricGroup(metricGroup).build();return new HeapKeyedStateBackendBuilder<>( //这里是HeapKeyedStateBackendBuilderkvStateRegistry,keySerializer,env.getUserCodeClassLoader().asClassLoader(),numberOfKeyGroups,keyGroupRange,env.getExecutionConfig(),ttlTimeProvider,latencyTrackingStateConfig,stateHandles,AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),localRecoveryConfig,priorityQueueSetFactory,isUsingAsynchronousSnapshots(),cancelStreamRegistry).build();}@Overridepublic OperatorStateBackend createOperatorStateBackend(Environment env,String operatorIdentifier,@Nonnull Collection<OperatorStateHandle> stateHandles,CloseableRegistry cancelStreamRegistry)throws BackendBuildingException {return new DefaultOperatorStateBackendBuilder(  //这里是DefaultOperatorStateBackendBuilderenv.getUserCodeClassLoader().asClassLoader(),env.getExecutionConfig(),isUsingAsynchronousSnapshots(),stateHandles,cancelStreamRegistry).build();}
}

memory backend

  • MemoryStateBackend中createOperatorStateBackend是创建了DefaultOperatorStateBackend
  • MemoryStateBackend中createKeyedStateBackend是创建了HeapKeyedStateBackendBackend
  • 最终调用了HeapMapState::Create创建state
AbstractFileStateBackend
MemoryStateBackend
ConfigurableStateBackend
AbstractStateBackend
CheckpointStorage
StateBackend

flink checkpoint

CheckpointStorage
+resolveCheckpoint(String externalPointer)
+createCheckpointStorage(JobID jobId)
RocksDBStateBackend
+checkpointStreamBackend : StateBackend
CheckpointStorageAccess
AbstractFsCheckpointStorageAccess
FsCheckpointStorageAccess
MemoryBackendCheckpointStorageAccess
RestoreOperation
RocksDBRestoreOperation
RocksDBFullRestoreOperation
RocksDBHeapTimersFullRestoreOperation
RocksDBIncrementalRestoreOperation
RocksDBSnapshotOperation
RocksDBIncrementalSnapshotOperation
RocksDBNativeFullSnapshotOperation

参考资料

https://www.jianshu.com/p/569a7e67c1b3
https://blog.csdn.net/u010942041/article/details/114944767
https://cloud.tencent.com/developer/article/1792720
https://blog.51cto.com/dataclub/5351042
https://www.cnblogs.com/lighten/p/13234350.html
https://cloud.tencent.com/developer/article/1765572
https://blog.csdn.net/m0_63475429/article/details/127417649
https://blog.csdn.net/Direction_Wind/article/details/125646616

相关文章:

Flink State 状态后端分析

flink状态实现分析 state * State* |* -------------------InternalKvState* | |* MergingState |* | |* …...

和年薪30W的阿里测开工程师聊过后,才知道我的工作就是打杂的...

前几天和一个朋友聊面试&#xff0c;他说上个月同时拿到了腾讯和阿里的offer&#xff0c;最后选择了阿里。 阿里内部将员工一共分为了14个等级&#xff0c;P6是资深工程师&#xff0c;P7是技术专家。 其中P6和P7就是一个分水岭了&#xff0c;P6是最接近P7的不持股员工&#x…...

C#开发的OpenRA的界面布局数据加载

C#开发的OpenRA的界面布局数据加载 当显示完成加载界面之后,就是进行其它内容处理。 因为后面内容的加载会比较长时间,所以首先显示加载界面是一种非常友好的方法。 因此在软件设计里,尽可能先显示界面,让用户先看到程序正在运行, 然后再处理时间长的加载。如果不这样做,…...

并查集结构

文章目录并查集特点构建过程查找两个元素是否是同一集合优化查找领头元素设置两个元素为同一集合构建结构应用场景并行计算集合问题并查集特点 对于使用并查集构建的结构&#xff0c;可以使得查询两个元素是否在同一集合&#xff0c;以及合并集合的操作无限接近O(1) 构建过程…...

全国CSM敏捷教练认证将于2023年3月25-26开班,报名从速!

CSM&#xff0c;即Certified Scrum Master&#xff0c;是Scrum联盟发起的Scrum认证。 CSM可以帮助团队正确使用Scrum&#xff0c;从而提高项目整体成功的可能性。 CSM深刻理解Scrum的价值观、实践以及Scrum框架。 CSM是“服务型领导”&#xff0c;帮助Scrum团队一起紧密合作。 …...

JavaEE进阶第六课:SpringBoot ⽇志⽂件

上篇文章介绍了SpringBoot配置文件&#xff0c;这篇文章我们将会介绍SpringBoot ⽇志⽂件 荔枝1.日志有什么用2.自定义日志输出2.1获取程序日志对象2.2使用相关方法输出日志2.3日志级别2.3.1日志级别的作用2.3.2日志级别如何设置2.4日志格式3.持久化日志4.更简单的日志输出4.1使…...

外置MOS管平均电流型LED降压恒流驱动器

产品描述 AP5125 是一款外围电路简单的 Buck 型平均电 流检测模式的 LED 恒流驱动器&#xff0c;适用于 8-100V 电压 范围的非隔离式大功率恒流 LED 驱动领域。芯片采用 固定频率 140kHz 的 PWM 工作模式&#xff0c; 利用平均电 流检测模式&#xff0c;因此具有优异的负载调整…...

python+pytest接口自动化(6)-请求参数格式的确定

我们在做接口测试之前&#xff0c;先需要根据接口文档或抓包接口数据&#xff0c;搞清楚被测接口的详细内容&#xff0c;其中就包含请求参数的编码格式&#xff0c;从而使用对应的参数格式发送请求。例如某个接口规定的请求主体的编码方式为 application/json&#xff0c;那么在…...

开发手册——一、编程规约_3.代码格式

这篇文章主要梳理了在java的实际开发过程中的编程规范问题。本篇文章主要借鉴于《阿里巴巴java开发手册终极版》 下面我们一起来看一下吧。 1. 【强制】大括号的使用约定。如果是大括号内为空&#xff0c;则简洁地写成{}即可&#xff0c;不需要换行&#xff1b;如果是非空代码…...

十七、Django-restframework之序列化器(二)

1. 序列化器 REST framework提供了一个serializer类&#xff0c;它可以非常方便的序列化模型实例和查询集为JSON或者其他内容形式。它还提供反序列化&#xff0c;允许在验证传入数据后将解析的数据转换回复杂类型。 2. 定义序列化器 在crm应用目录下创建serializers.py文件&a…...

python GUI图形化编程-----wxpython

一、python gui&#xff08;图形化&#xff09;模块介绍&#xff1a; Tkinter :是python最简单的图形化模块&#xff0c;总共只有14种组建 Pyqt :是python最复杂也是使用最广泛的图形化 Wx :是python当中居中的一个图形化&#xff0c;学习结构很清晰 Pywin :是pyth…...

【Python 】yyyy-MM-dd HH:mm:ss 时间格式 时间戳 全面解读超详细

时间格式 时间格式(协议)描述gg时期或纪元。y不包含纪元的年份。不具有前导零。yy不包含纪元的年份。具有前导零。yyyy包含纪元的四位数的年份。M月份数字。一位数的月份没有前导零。MM月份数字。一位数的月份有一个前导零。MMM月份的缩写名称&#xff0c;在AbbreviatedMonthN…...

【C++】C++11 异常

目录 1. C语言传统的处理错误的方式 2. C异常概念 3. 异常的使用 3.1. 异常的抛出和捕获 3.2. 在函数调用链中异常栈展开匹配原则 3.3. 异常的重新抛出 3.4. 异常安全 3.5. 异常规范 4.自定义异常体系 5. C标准库的异常体系 6. 异常的优缺点 6.1. C异常的优点&…...

关于Thread.start()后的困惑、imap

在for循环中&#xff0c;接着开thread&#xff0c;开完就start&#xff0c;当时有个困惑&#xff0c;就是比如开的一个thread的这个start执行完&#xff0c;但是这个for循环还没执行完&#xff0c;那程序会跑到for循环的后面逻辑吗&#xff1f;比如下面13行for循环开始开第一个…...

qml学习之qwidget与qml结合使用并调用信号槽交互

学习qml系列之一说明&#xff1a; 学习qml系列之qwiget和qml信号槽的交互使用&#xff0c;并在qwidget中显示qml界面 在qml中发送信号到qwidget里 在qwidget里发送信号给qml 在qwidget里面调用qml界面方式 方式一&#xff1a;使用QQuickView 这个是Qt5.0中提供的一个类&…...

【 华为OD机试 2023】 组装新的数组(C++ Java JavaScript Python)

文章目录 题目描述输入描述输出描述备注用例题目解析C++JavaScriptJavaPython题目描述 给你一个整数M和数组N,N中的元素为连续整数,要求根据N中的元素组装成新的数组R,组装规则: R中元素总和加起来等于MR中的元素可以从N中重复选取R中的元素最多只能有1个不在N中,且比N中…...

【洛谷 P2089】烤鸡(循环枚举)

烤鸡 题目背景 猪猪 Hanke 得到了一只鸡。 题目描述 猪猪 Hanke 特别喜欢吃烤鸡&#xff08;本是同畜牲&#xff0c;相煎何太急&#xff01;&#xff09;Hanke 吃鸡很特别&#xff0c;为什么特别呢&#xff1f;因为他有 101010 种配料&#xff08;芥末、孜然等&#xff09;…...

windows10安装ubantu双系统

windows10安装ubantu双系统 文章目录windows10安装ubantu双系统一、安装前准备1.前期说明2.制作U盘启动器3.设置硬盘分区相关4.设置给ubantu系统的硬盘大小&#xff0c;设置为未分配&#xff08;删除卷&#xff09;二、进行安装1.设置bios相关2.进入bios启动界面选择U盘安装3.进…...

【华为OD机试 2023】 人数最多的站点/小火车最多人时所在园区站点(C++ Java JavaScript Python)

文章目录 题目描述输入描述输出描述用例题目解析C++JavaScriptJavaPython励志做全网最全、解法最多的华为OD机考算法题库,帮助你上岸华为。提供C++/Java、JavaScript、Python四种语言的解法。每篇文章都有详细的结题步骤。有问题,随时解答。😁😁😁😁 目前为了造福广大…...

2024届暑期实习实录(阿里云大数据研发平台)

1. 项目介绍&#xff08;介绍一下你觉得有挑战的项目 &#xff08;1&#xff09;项目的痛点需求&#xff08;配置变更的痛点、你做的目的是什么&#xff1f;&#xff09; 思考方向&#xff1a;业务背景&#xff0c;用户需求&#xff1b;产品发展&#xff0c;产品现有局限问题…...

利用最小二乘法找圆心和半径

#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …...

【Oracle APEX开发小技巧12】

有如下需求&#xff1a; 有一个问题反馈页面&#xff0c;要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据&#xff0c;方便管理员及时处理反馈。 我的方法&#xff1a;直接将逻辑写在SQL中&#xff0c;这样可以直接在页面展示 完整代码&#xff1a; SELECTSF.FE…...

CMake基础:构建流程详解

目录 1.CMake构建过程的基本流程 2.CMake构建的具体步骤 2.1.创建构建目录 2.2.使用 CMake 生成构建文件 2.3.编译和构建 2.4.清理构建文件 2.5.重新配置和构建 3.跨平台构建示例 4.工具链与交叉编译 5.CMake构建后的项目结构解析 5.1.CMake构建后的目录结构 5.2.构…...

【解密LSTM、GRU如何解决传统RNN梯度消失问题】

解密LSTM与GRU&#xff1a;如何让RNN变得更聪明&#xff1f; 在深度学习的世界里&#xff0c;循环神经网络&#xff08;RNN&#xff09;以其卓越的序列数据处理能力广泛应用于自然语言处理、时间序列预测等领域。然而&#xff0c;传统RNN存在的一个严重问题——梯度消失&#…...

现代密码学 | 椭圆曲线密码学—附py代码

Elliptic Curve Cryptography 椭圆曲线密码学&#xff08;ECC&#xff09;是一种基于有限域上椭圆曲线数学特性的公钥加密技术。其核心原理涉及椭圆曲线的代数性质、离散对数问题以及有限域上的运算。 椭圆曲线密码学是多种数字签名算法的基础&#xff0c;例如椭圆曲线数字签…...

图表类系列各种样式PPT模版分享

图标图表系列PPT模版&#xff0c;柱状图PPT模版&#xff0c;线状图PPT模版&#xff0c;折线图PPT模版&#xff0c;饼状图PPT模版&#xff0c;雷达图PPT模版&#xff0c;树状图PPT模版 图表类系列各种样式PPT模版分享&#xff1a;图表系列PPT模板https://pan.quark.cn/s/20d40aa…...

稳定币的深度剖析与展望

一、引言 在当今数字化浪潮席卷全球的时代&#xff0c;加密货币作为一种新兴的金融现象&#xff0c;正以前所未有的速度改变着我们对传统货币和金融体系的认知。然而&#xff0c;加密货币市场的高度波动性却成为了其广泛应用和普及的一大障碍。在这样的背景下&#xff0c;稳定…...

Redis的发布订阅模式与专业的 MQ(如 Kafka, RabbitMQ)相比,优缺点是什么?适用于哪些场景?

Redis 的发布订阅&#xff08;Pub/Sub&#xff09;模式与专业的 MQ&#xff08;Message Queue&#xff09;如 Kafka、RabbitMQ 进行比较&#xff0c;核心的权衡点在于&#xff1a;简单与速度 vs. 可靠与功能。 下面我们详细展开对比。 Redis Pub/Sub 的核心特点 它是一个发后…...

基于TurtleBot3在Gazebo地图实现机器人远程控制

1. TurtleBot3环境配置 # 下载TurtleBot3核心包 mkdir -p ~/catkin_ws/src cd ~/catkin_ws/src git clone -b noetic-devel https://github.com/ROBOTIS-GIT/turtlebot3.git git clone -b noetic https://github.com/ROBOTIS-GIT/turtlebot3_msgs.git git clone -b noetic-dev…...

uniapp手机号一键登录保姆级教程(包含前端和后端)

目录 前置条件创建uniapp项目并关联uniClound云空间开启一键登录模块并开通一键登录服务编写云函数并上传部署获取手机号流程(第一种) 前端直接调用云函数获取手机号&#xff08;第三种&#xff09;后台调用云函数获取手机号 错误码常见问题 前置条件 手机安装有sim卡手机开启…...