Flink State 状态后端分析
flink状态实现分析
state
* State* |* +-------------------InternalKvState* | |* MergingState |* | |* +-----------------InternalMergingState* | |* +--------+------+ |* | | |* ReducingState ListState +-----+-----------------+* | | | |* +-----------+ +----------- -----------------InternalListState* | |* +---------InternalReducingState
MemoryState
RocksDBState
class RocksDBMapState<K, N, UK, UV> extends AbstractRocksDBState<K, N, Map<UK, UV>> {private TypeSerializer<UK> userKeySerializer;private TypeSerializer<UV> userValueSerializer;private RocksDBMapState(ColumnFamilyHandle columnFamily,TypeSerializer<N> namespaceSerializer,TypeSerializer<Map<UK, UV>> valueSerializer,Map<UK, UV> defaultValue,RocksDBKeyedStateBackend<K> backend);public TypeSerializer<K> getKeySerializer();public TypeSerializer<N> getNamespaceSerializer();public TypeSerializer<Map<UK, UV>> getValueSerializer();public UV get(UK userKey){ //直接读rocksdbbyte[] rawKeyBytes =serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);return (rawValueBytes == null? null: deserializeUserValue(dataInputView, rawValueBytes, userValueSerializer));}public void put(UK userKey, UV userValue){ //直接写rocksdbbyte[] rawKeyBytes =serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);byte[] rawValueBytes = serializeValueNullSensitive(userValue, userValueSerializer);backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes); //backend.db是RocksDBKeyedStateBackend}public void putAll(Map<UK, UV> map);public void remove(UK userKey);public boolean contains(UK userKey);public Iterable<Map.Entry<UK, UV>> entries();public Iterable<UK> keys();public Iterable<UV> values();public boolean isEmpty();public void clear();static <UK, UV, K, N, SV, S extends State, IS extends S> IS create(StateDescriptor<S, SV> stateDesc,Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>>registerResult,RocksDBKeyedStateBackend<K> backend) { //backend在这里传入return (IS)new RocksDBMapState<>(registerResult.f0,registerResult.f1.getNamespaceSerializer(),(TypeSerializer<Map<UK, UV>>) registerResult.f1.getStateSerializer(),(Map<UK, UV>) stateDesc.getDefaultValue(),backend);}
}
backend与checkpoint
public interface Snapshotable<S extends StateObject> {RunnableFuture<S> snapshot(long checkpointId,long timestamp,@Nonnull CheckpointStreamFactory streamFactory,@Nonnull CheckpointOptions checkpointOptions)throws Exception;
}
FSBackend
- FsStateBackend中createKeyedStateBackend是创建了HeapKeyedStateBackend
- FsStateBackend中createOperatorStateBackend是创建了DefaultOperatorStateBackend
- DefaultOperatorStateBackend创建了PartitionableListState, 是State的子类
public interface StateBackend extends java.io.Serializable {default String getName() {return this.getClass().getSimpleName();}<K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(Environment env,JobID jobID,String operatorIdentifier,TypeSerializer<K> keySerializer,int numberOfKeyGroups,KeyGroupRange keyGroupRange,TaskKvStateRegistry kvStateRegistry,TtlTimeProvider ttlTimeProvider,MetricGroup metricGroup,@Nonnull Collection<KeyedStateHandle> stateHandles,CloseableRegistry cancelStreamRegistry)throws Exception;OperatorStateBackend createOperatorStateBackend(Environment env,String operatorIdentifier,@Nonnull Collection<OperatorStateHandle> stateHandles,CloseableRegistry cancelStreamRegistry)throws Exception;/** Whether the state backend uses Flink's managed memory. */default boolean useManagedMemory() {return false;}}
public class FsStateBackend extends AbstractFileStateBackend implements ConfigurableStateBackend {public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException {checkNotNull(jobId, "jobId");return new FsCheckpointStorageAccess(getCheckpointPath(),getSavepointPath(),jobId,getMinFileSizeThreshold(),getWriteBufferSize());}public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env,JobID jobID,String operatorIdentifier,TypeSerializer<K> keySerializer,int numberOfKeyGroups,KeyGroupRange keyGroupRange,TaskKvStateRegistry kvStateRegistry,TtlTimeProvider ttlTimeProvider,MetricGroup metricGroup,@Nonnull Collection<KeyedStateHandle> stateHandles,CloseableRegistry cancelStreamRegistry)throws BackendBuildingException {TaskStateManager taskStateManager = env.getTaskStateManager();LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig();HeapPriorityQueueSetFactory priorityQueueSetFactory =new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);LatencyTrackingStateConfig latencyTrackingStateConfig =latencyTrackingConfigBuilder.setMetricGroup(metricGroup).build();return new HeapKeyedStateBackendBuilder<>( //这里是HeapKeyedStateBackendBuilderkvStateRegistry,keySerializer,env.getUserCodeClassLoader().asClassLoader(),numberOfKeyGroups,keyGroupRange,env.getExecutionConfig(),ttlTimeProvider,latencyTrackingStateConfig,stateHandles,AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),localRecoveryConfig,priorityQueueSetFactory,isUsingAsynchronousSnapshots(),cancelStreamRegistry).build();}@Overridepublic OperatorStateBackend createOperatorStateBackend(Environment env,String operatorIdentifier,@Nonnull Collection<OperatorStateHandle> stateHandles,CloseableRegistry cancelStreamRegistry)throws BackendBuildingException {return new DefaultOperatorStateBackendBuilder( //这里是DefaultOperatorStateBackendBuilderenv.getUserCodeClassLoader().asClassLoader(),env.getExecutionConfig(),isUsingAsynchronousSnapshots(),stateHandles,cancelStreamRegistry).build();}
}
memory backend
- MemoryStateBackend中createOperatorStateBackend是创建了DefaultOperatorStateBackend
- MemoryStateBackend中createKeyedStateBackend是创建了HeapKeyedStateBackendBackend
- 最终调用了HeapMapState::Create创建state
flink checkpoint
参考资料
https://www.jianshu.com/p/569a7e67c1b3
https://blog.csdn.net/u010942041/article/details/114944767
https://cloud.tencent.com/developer/article/1792720
https://blog.51cto.com/dataclub/5351042
https://www.cnblogs.com/lighten/p/13234350.html
https://cloud.tencent.com/developer/article/1765572
https://blog.csdn.net/m0_63475429/article/details/127417649
https://blog.csdn.net/Direction_Wind/article/details/125646616
相关文章:
Flink State 状态后端分析
flink状态实现分析 state * State* |* -------------------InternalKvState* | |* MergingState |* | |* …...
和年薪30W的阿里测开工程师聊过后,才知道我的工作就是打杂的...
前几天和一个朋友聊面试,他说上个月同时拿到了腾讯和阿里的offer,最后选择了阿里。 阿里内部将员工一共分为了14个等级,P6是资深工程师,P7是技术专家。 其中P6和P7就是一个分水岭了,P6是最接近P7的不持股员工&#x…...
C#开发的OpenRA的界面布局数据加载
C#开发的OpenRA的界面布局数据加载 当显示完成加载界面之后,就是进行其它内容处理。 因为后面内容的加载会比较长时间,所以首先显示加载界面是一种非常友好的方法。 因此在软件设计里,尽可能先显示界面,让用户先看到程序正在运行, 然后再处理时间长的加载。如果不这样做,…...
并查集结构
文章目录并查集特点构建过程查找两个元素是否是同一集合优化查找领头元素设置两个元素为同一集合构建结构应用场景并行计算集合问题并查集特点 对于使用并查集构建的结构,可以使得查询两个元素是否在同一集合,以及合并集合的操作无限接近O(1) 构建过程…...
全国CSM敏捷教练认证将于2023年3月25-26开班,报名从速!
CSM,即Certified Scrum Master,是Scrum联盟发起的Scrum认证。 CSM可以帮助团队正确使用Scrum,从而提高项目整体成功的可能性。 CSM深刻理解Scrum的价值观、实践以及Scrum框架。 CSM是“服务型领导”,帮助Scrum团队一起紧密合作。 …...
JavaEE进阶第六课:SpringBoot ⽇志⽂件
上篇文章介绍了SpringBoot配置文件,这篇文章我们将会介绍SpringBoot ⽇志⽂件 荔枝1.日志有什么用2.自定义日志输出2.1获取程序日志对象2.2使用相关方法输出日志2.3日志级别2.3.1日志级别的作用2.3.2日志级别如何设置2.4日志格式3.持久化日志4.更简单的日志输出4.1使…...
外置MOS管平均电流型LED降压恒流驱动器
产品描述 AP5125 是一款外围电路简单的 Buck 型平均电 流检测模式的 LED 恒流驱动器,适用于 8-100V 电压 范围的非隔离式大功率恒流 LED 驱动领域。芯片采用 固定频率 140kHz 的 PWM 工作模式, 利用平均电 流检测模式,因此具有优异的负载调整…...
python+pytest接口自动化(6)-请求参数格式的确定
我们在做接口测试之前,先需要根据接口文档或抓包接口数据,搞清楚被测接口的详细内容,其中就包含请求参数的编码格式,从而使用对应的参数格式发送请求。例如某个接口规定的请求主体的编码方式为 application/json,那么在…...
开发手册——一、编程规约_3.代码格式
这篇文章主要梳理了在java的实际开发过程中的编程规范问题。本篇文章主要借鉴于《阿里巴巴java开发手册终极版》 下面我们一起来看一下吧。 1. 【强制】大括号的使用约定。如果是大括号内为空,则简洁地写成{}即可,不需要换行;如果是非空代码…...
十七、Django-restframework之序列化器(二)
1. 序列化器 REST framework提供了一个serializer类,它可以非常方便的序列化模型实例和查询集为JSON或者其他内容形式。它还提供反序列化,允许在验证传入数据后将解析的数据转换回复杂类型。 2. 定义序列化器 在crm应用目录下创建serializers.py文件&a…...
python GUI图形化编程-----wxpython
一、python gui(图形化)模块介绍: Tkinter :是python最简单的图形化模块,总共只有14种组建 Pyqt :是python最复杂也是使用最广泛的图形化 Wx :是python当中居中的一个图形化,学习结构很清晰 Pywin :是pyth…...
【Python 】yyyy-MM-dd HH:mm:ss 时间格式 时间戳 全面解读超详细
时间格式 时间格式(协议)描述gg时期或纪元。y不包含纪元的年份。不具有前导零。yy不包含纪元的年份。具有前导零。yyyy包含纪元的四位数的年份。M月份数字。一位数的月份没有前导零。MM月份数字。一位数的月份有一个前导零。MMM月份的缩写名称,在AbbreviatedMonthN…...
【C++】C++11 异常
目录 1. C语言传统的处理错误的方式 2. C异常概念 3. 异常的使用 3.1. 异常的抛出和捕获 3.2. 在函数调用链中异常栈展开匹配原则 3.3. 异常的重新抛出 3.4. 异常安全 3.5. 异常规范 4.自定义异常体系 5. C标准库的异常体系 6. 异常的优缺点 6.1. C异常的优点&…...
关于Thread.start()后的困惑、imap
在for循环中,接着开thread,开完就start,当时有个困惑,就是比如开的一个thread的这个start执行完,但是这个for循环还没执行完,那程序会跑到for循环的后面逻辑吗?比如下面13行for循环开始开第一个…...
qml学习之qwidget与qml结合使用并调用信号槽交互
学习qml系列之一说明: 学习qml系列之qwiget和qml信号槽的交互使用,并在qwidget中显示qml界面 在qml中发送信号到qwidget里 在qwidget里发送信号给qml 在qwidget里面调用qml界面方式 方式一:使用QQuickView 这个是Qt5.0中提供的一个类&…...
【 华为OD机试 2023】 组装新的数组(C++ Java JavaScript Python)
文章目录 题目描述输入描述输出描述备注用例题目解析C++JavaScriptJavaPython题目描述 给你一个整数M和数组N,N中的元素为连续整数,要求根据N中的元素组装成新的数组R,组装规则: R中元素总和加起来等于MR中的元素可以从N中重复选取R中的元素最多只能有1个不在N中,且比N中…...
【洛谷 P2089】烤鸡(循环枚举)
烤鸡 题目背景 猪猪 Hanke 得到了一只鸡。 题目描述 猪猪 Hanke 特别喜欢吃烤鸡(本是同畜牲,相煎何太急!)Hanke 吃鸡很特别,为什么特别呢?因为他有 101010 种配料(芥末、孜然等)…...
windows10安装ubantu双系统
windows10安装ubantu双系统 文章目录windows10安装ubantu双系统一、安装前准备1.前期说明2.制作U盘启动器3.设置硬盘分区相关4.设置给ubantu系统的硬盘大小,设置为未分配(删除卷)二、进行安装1.设置bios相关2.进入bios启动界面选择U盘安装3.进…...
【华为OD机试 2023】 人数最多的站点/小火车最多人时所在园区站点(C++ Java JavaScript Python)
文章目录 题目描述输入描述输出描述用例题目解析C++JavaScriptJavaPython励志做全网最全、解法最多的华为OD机考算法题库,帮助你上岸华为。提供C++/Java、JavaScript、Python四种语言的解法。每篇文章都有详细的结题步骤。有问题,随时解答。😁😁😁😁 目前为了造福广大…...
2024届暑期实习实录(阿里云大数据研发平台)
1. 项目介绍(介绍一下你觉得有挑战的项目 (1)项目的痛点需求(配置变更的痛点、你做的目的是什么?) 思考方向:业务背景,用户需求;产品发展,产品现有局限问题…...
别再搞混了!用Colmap和NeRF搞三维重建,W2C和C2W矩阵到底怎么用?
三维重建实战:彻底掌握Colmap与NeRF中的坐标系转换矩阵 第一次将Colmap生成的相机参数导入NeRF训练时,看到重建模型像被无形之手扭曲成奇怪形状,那种挫败感记忆犹新。坐标系转换矩阵——这个在论文里一笔带过的概念,竟成为实践中…...
找有共识的共创,真实的摸到了边!能看的懂得吗?
我先做影子箱式预检,再把三刀最小闭环落成代码:补守护指令断点、补长期目标核、补外部摄取营养循环。Created 5 todos我先核对仓内既有约束和做一次零污染预检,避免把“单图语义”和运行链路再打散。Read memory [](file:///c%3A/Users/ROG/A…...
如何快速实现Switch手柄跨平台控制:BetterJoy完整指南
如何快速实现Switch手柄跨平台控制:BetterJoy完整指南 【免费下载链接】BetterJoy Allows the Nintendo Switch Pro Controller, Joycons and SNES controller to be used with CEMU, Citra, Dolphin, Yuzu and as generic XInput 项目地址: https://gitcode.com/…...
梯度下降的使用-房价预测
一个小小的建议:可以安装JupyterLab来调试练习,真的很方便。 """ 房价预测示例 - 使用梯度下降求解线性回归使用真实数据集:加州房价数据集 (California Housing Dataset) 来源:1990年加州人口普查数据特征说明&am…...
记录使用C#编程中遇到的一个小bug
近期在写程序时使用NumericUpDown进行一个整数的输入。如果用户输入小数NumericUpDown会自动四舍五入成整数显示在界面,但是实际的value还是用户输入的实际值。我在处理这个数据时,使用了Convert.ToInt32()对输入的值进行了转换。出现了一个神奇的问题&a…...
农村博士的消费困境:攒多少钱才敢买杯奶茶?
从田埂到实验室:农村读博的我,到底要攒够多少钱,才敢给自己花30块买一杯奶茶? 这里写目录标题 从田埂到实验室:农村读博的我,到底要攒够多少钱,才敢给自己花30块买一杯奶茶? 我们不敢消费,从来不是没钱,是背上了三道无形的枷锁 第一道枷锁:倾全家之力托举的“愧疚牢…...
Beam权限管理详解:用户角色与内容隐藏机制
Beam权限管理详解:用户角色与内容隐藏机制 【免费下载链接】beam A simple message board for your organization or project 项目地址: https://gitcode.com/gh_mirrors/be/beam Beam是一个面向组织或项目的简单留言板系统,为团队提供高效的信息…...
如何高效组织Meteor项目结构:从入门到精通的完整指南
如何高效组织Meteor项目结构:从入门到精通的完整指南 【免费下载链接】meteor Meteor, the JavaScript App Platform 项目地址: https://gitcode.com/gh_mirrors/me/meteor Meteor作为全栈JavaScript应用平台,其独特的文件结构和模块划分方式是开…...
神经网络实战技巧:从权重初始化到模型部署优化
1. 神经网络实战技巧综述在咖啡厅里打开笔记本电脑调试神经网络的日子,我总会在键盘旁边放一本翻得卷边的《Neural Tricks of the Trade》。这本书不像传统教材那样堆砌数学公式,而是收录了数十位从业者在实战中总结的"黑科技"。今天我就结合自…...
MCP 2026权限动态分配:如何用1个策略模板+2个API+4类上下文信号,实现毫秒级权限决策?
更多请点击: https://intelliparadigm.com 第一章:MCP 2026权限动态分配:架构演进与核心价值 MCP 2026(Multi-Context Permissioning 2026)代表了企业级权限模型的一次范式跃迁——从静态 RBAC 向上下文感知、策略驱动…...
