Flink State 状态后端分析
flink状态实现分析
state
* State* |* +-------------------InternalKvState* | |* MergingState |* | |* +-----------------InternalMergingState* | |* +--------+------+ |* | | |* ReducingState ListState +-----+-----------------+* | | | |* +-----------+ +----------- -----------------InternalListState* | |* +---------InternalReducingState
MemoryState
RocksDBState
class RocksDBMapState<K, N, UK, UV> extends AbstractRocksDBState<K, N, Map<UK, UV>> {private TypeSerializer<UK> userKeySerializer;private TypeSerializer<UV> userValueSerializer;private RocksDBMapState(ColumnFamilyHandle columnFamily,TypeSerializer<N> namespaceSerializer,TypeSerializer<Map<UK, UV>> valueSerializer,Map<UK, UV> defaultValue,RocksDBKeyedStateBackend<K> backend);public TypeSerializer<K> getKeySerializer();public TypeSerializer<N> getNamespaceSerializer();public TypeSerializer<Map<UK, UV>> getValueSerializer();public UV get(UK userKey){ //直接读rocksdbbyte[] rawKeyBytes =serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);return (rawValueBytes == null? null: deserializeUserValue(dataInputView, rawValueBytes, userValueSerializer));}public void put(UK userKey, UV userValue){ //直接写rocksdbbyte[] rawKeyBytes =serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);byte[] rawValueBytes = serializeValueNullSensitive(userValue, userValueSerializer);backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes); //backend.db是RocksDBKeyedStateBackend}public void putAll(Map<UK, UV> map);public void remove(UK userKey);public boolean contains(UK userKey);public Iterable<Map.Entry<UK, UV>> entries();public Iterable<UK> keys();public Iterable<UV> values();public boolean isEmpty();public void clear();static <UK, UV, K, N, SV, S extends State, IS extends S> IS create(StateDescriptor<S, SV> stateDesc,Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>>registerResult,RocksDBKeyedStateBackend<K> backend) { //backend在这里传入return (IS)new RocksDBMapState<>(registerResult.f0,registerResult.f1.getNamespaceSerializer(),(TypeSerializer<Map<UK, UV>>) registerResult.f1.getStateSerializer(),(Map<UK, UV>) stateDesc.getDefaultValue(),backend);}
}
backend与checkpoint
public interface Snapshotable<S extends StateObject> {RunnableFuture<S> snapshot(long checkpointId,long timestamp,@Nonnull CheckpointStreamFactory streamFactory,@Nonnull CheckpointOptions checkpointOptions)throws Exception;
}
FSBackend
- FsStateBackend中createKeyedStateBackend是创建了HeapKeyedStateBackend
- FsStateBackend中createOperatorStateBackend是创建了DefaultOperatorStateBackend
- DefaultOperatorStateBackend创建了PartitionableListState, 是State的子类
public interface StateBackend extends java.io.Serializable {default String getName() {return this.getClass().getSimpleName();}<K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(Environment env,JobID jobID,String operatorIdentifier,TypeSerializer<K> keySerializer,int numberOfKeyGroups,KeyGroupRange keyGroupRange,TaskKvStateRegistry kvStateRegistry,TtlTimeProvider ttlTimeProvider,MetricGroup metricGroup,@Nonnull Collection<KeyedStateHandle> stateHandles,CloseableRegistry cancelStreamRegistry)throws Exception;OperatorStateBackend createOperatorStateBackend(Environment env,String operatorIdentifier,@Nonnull Collection<OperatorStateHandle> stateHandles,CloseableRegistry cancelStreamRegistry)throws Exception;/** Whether the state backend uses Flink's managed memory. */default boolean useManagedMemory() {return false;}}
public class FsStateBackend extends AbstractFileStateBackend implements ConfigurableStateBackend {public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException {checkNotNull(jobId, "jobId");return new FsCheckpointStorageAccess(getCheckpointPath(),getSavepointPath(),jobId,getMinFileSizeThreshold(),getWriteBufferSize());}public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env,JobID jobID,String operatorIdentifier,TypeSerializer<K> keySerializer,int numberOfKeyGroups,KeyGroupRange keyGroupRange,TaskKvStateRegistry kvStateRegistry,TtlTimeProvider ttlTimeProvider,MetricGroup metricGroup,@Nonnull Collection<KeyedStateHandle> stateHandles,CloseableRegistry cancelStreamRegistry)throws BackendBuildingException {TaskStateManager taskStateManager = env.getTaskStateManager();LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig();HeapPriorityQueueSetFactory priorityQueueSetFactory =new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);LatencyTrackingStateConfig latencyTrackingStateConfig =latencyTrackingConfigBuilder.setMetricGroup(metricGroup).build();return new HeapKeyedStateBackendBuilder<>( //这里是HeapKeyedStateBackendBuilderkvStateRegistry,keySerializer,env.getUserCodeClassLoader().asClassLoader(),numberOfKeyGroups,keyGroupRange,env.getExecutionConfig(),ttlTimeProvider,latencyTrackingStateConfig,stateHandles,AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),localRecoveryConfig,priorityQueueSetFactory,isUsingAsynchronousSnapshots(),cancelStreamRegistry).build();}@Overridepublic OperatorStateBackend createOperatorStateBackend(Environment env,String operatorIdentifier,@Nonnull Collection<OperatorStateHandle> stateHandles,CloseableRegistry cancelStreamRegistry)throws BackendBuildingException {return new DefaultOperatorStateBackendBuilder( //这里是DefaultOperatorStateBackendBuilderenv.getUserCodeClassLoader().asClassLoader(),env.getExecutionConfig(),isUsingAsynchronousSnapshots(),stateHandles,cancelStreamRegistry).build();}
}
memory backend
- MemoryStateBackend中createOperatorStateBackend是创建了DefaultOperatorStateBackend
- MemoryStateBackend中createKeyedStateBackend是创建了HeapKeyedStateBackendBackend
- 最终调用了HeapMapState::Create创建state
flink checkpoint
参考资料
https://www.jianshu.com/p/569a7e67c1b3
https://blog.csdn.net/u010942041/article/details/114944767
https://cloud.tencent.com/developer/article/1792720
https://blog.51cto.com/dataclub/5351042
https://www.cnblogs.com/lighten/p/13234350.html
https://cloud.tencent.com/developer/article/1765572
https://blog.csdn.net/m0_63475429/article/details/127417649
https://blog.csdn.net/Direction_Wind/article/details/125646616
相关文章:
Flink State 状态后端分析
flink状态实现分析 state * State* |* -------------------InternalKvState* | |* MergingState |* | |* …...

和年薪30W的阿里测开工程师聊过后,才知道我的工作就是打杂的...
前几天和一个朋友聊面试,他说上个月同时拿到了腾讯和阿里的offer,最后选择了阿里。 阿里内部将员工一共分为了14个等级,P6是资深工程师,P7是技术专家。 其中P6和P7就是一个分水岭了,P6是最接近P7的不持股员工&#x…...
C#开发的OpenRA的界面布局数据加载
C#开发的OpenRA的界面布局数据加载 当显示完成加载界面之后,就是进行其它内容处理。 因为后面内容的加载会比较长时间,所以首先显示加载界面是一种非常友好的方法。 因此在软件设计里,尽可能先显示界面,让用户先看到程序正在运行, 然后再处理时间长的加载。如果不这样做,…...

并查集结构
文章目录并查集特点构建过程查找两个元素是否是同一集合优化查找领头元素设置两个元素为同一集合构建结构应用场景并行计算集合问题并查集特点 对于使用并查集构建的结构,可以使得查询两个元素是否在同一集合,以及合并集合的操作无限接近O(1) 构建过程…...

全国CSM敏捷教练认证将于2023年3月25-26开班,报名从速!
CSM,即Certified Scrum Master,是Scrum联盟发起的Scrum认证。 CSM可以帮助团队正确使用Scrum,从而提高项目整体成功的可能性。 CSM深刻理解Scrum的价值观、实践以及Scrum框架。 CSM是“服务型领导”,帮助Scrum团队一起紧密合作。 …...

JavaEE进阶第六课:SpringBoot ⽇志⽂件
上篇文章介绍了SpringBoot配置文件,这篇文章我们将会介绍SpringBoot ⽇志⽂件 荔枝1.日志有什么用2.自定义日志输出2.1获取程序日志对象2.2使用相关方法输出日志2.3日志级别2.3.1日志级别的作用2.3.2日志级别如何设置2.4日志格式3.持久化日志4.更简单的日志输出4.1使…...
外置MOS管平均电流型LED降压恒流驱动器
产品描述 AP5125 是一款外围电路简单的 Buck 型平均电 流检测模式的 LED 恒流驱动器,适用于 8-100V 电压 范围的非隔离式大功率恒流 LED 驱动领域。芯片采用 固定频率 140kHz 的 PWM 工作模式, 利用平均电 流检测模式,因此具有优异的负载调整…...

python+pytest接口自动化(6)-请求参数格式的确定
我们在做接口测试之前,先需要根据接口文档或抓包接口数据,搞清楚被测接口的详细内容,其中就包含请求参数的编码格式,从而使用对应的参数格式发送请求。例如某个接口规定的请求主体的编码方式为 application/json,那么在…...

开发手册——一、编程规约_3.代码格式
这篇文章主要梳理了在java的实际开发过程中的编程规范问题。本篇文章主要借鉴于《阿里巴巴java开发手册终极版》 下面我们一起来看一下吧。 1. 【强制】大括号的使用约定。如果是大括号内为空,则简洁地写成{}即可,不需要换行;如果是非空代码…...
十七、Django-restframework之序列化器(二)
1. 序列化器 REST framework提供了一个serializer类,它可以非常方便的序列化模型实例和查询集为JSON或者其他内容形式。它还提供反序列化,允许在验证传入数据后将解析的数据转换回复杂类型。 2. 定义序列化器 在crm应用目录下创建serializers.py文件&a…...

python GUI图形化编程-----wxpython
一、python gui(图形化)模块介绍: Tkinter :是python最简单的图形化模块,总共只有14种组建 Pyqt :是python最复杂也是使用最广泛的图形化 Wx :是python当中居中的一个图形化,学习结构很清晰 Pywin :是pyth…...
【Python 】yyyy-MM-dd HH:mm:ss 时间格式 时间戳 全面解读超详细
时间格式 时间格式(协议)描述gg时期或纪元。y不包含纪元的年份。不具有前导零。yy不包含纪元的年份。具有前导零。yyyy包含纪元的四位数的年份。M月份数字。一位数的月份没有前导零。MM月份数字。一位数的月份有一个前导零。MMM月份的缩写名称,在AbbreviatedMonthN…...

【C++】C++11 异常
目录 1. C语言传统的处理错误的方式 2. C异常概念 3. 异常的使用 3.1. 异常的抛出和捕获 3.2. 在函数调用链中异常栈展开匹配原则 3.3. 异常的重新抛出 3.4. 异常安全 3.5. 异常规范 4.自定义异常体系 5. C标准库的异常体系 6. 异常的优缺点 6.1. C异常的优点&…...

关于Thread.start()后的困惑、imap
在for循环中,接着开thread,开完就start,当时有个困惑,就是比如开的一个thread的这个start执行完,但是这个for循环还没执行完,那程序会跑到for循环的后面逻辑吗?比如下面13行for循环开始开第一个…...

qml学习之qwidget与qml结合使用并调用信号槽交互
学习qml系列之一说明: 学习qml系列之qwiget和qml信号槽的交互使用,并在qwidget中显示qml界面 在qml中发送信号到qwidget里 在qwidget里发送信号给qml 在qwidget里面调用qml界面方式 方式一:使用QQuickView 这个是Qt5.0中提供的一个类&…...
【 华为OD机试 2023】 组装新的数组(C++ Java JavaScript Python)
文章目录 题目描述输入描述输出描述备注用例题目解析C++JavaScriptJavaPython题目描述 给你一个整数M和数组N,N中的元素为连续整数,要求根据N中的元素组装成新的数组R,组装规则: R中元素总和加起来等于MR中的元素可以从N中重复选取R中的元素最多只能有1个不在N中,且比N中…...
【洛谷 P2089】烤鸡(循环枚举)
烤鸡 题目背景 猪猪 Hanke 得到了一只鸡。 题目描述 猪猪 Hanke 特别喜欢吃烤鸡(本是同畜牲,相煎何太急!)Hanke 吃鸡很特别,为什么特别呢?因为他有 101010 种配料(芥末、孜然等)…...

windows10安装ubantu双系统
windows10安装ubantu双系统 文章目录windows10安装ubantu双系统一、安装前准备1.前期说明2.制作U盘启动器3.设置硬盘分区相关4.设置给ubantu系统的硬盘大小,设置为未分配(删除卷)二、进行安装1.设置bios相关2.进入bios启动界面选择U盘安装3.进…...
【华为OD机试 2023】 人数最多的站点/小火车最多人时所在园区站点(C++ Java JavaScript Python)
文章目录 题目描述输入描述输出描述用例题目解析C++JavaScriptJavaPython励志做全网最全、解法最多的华为OD机考算法题库,帮助你上岸华为。提供C++/Java、JavaScript、Python四种语言的解法。每篇文章都有详细的结题步骤。有问题,随时解答。😁😁😁😁 目前为了造福广大…...
2024届暑期实习实录(阿里云大数据研发平台)
1. 项目介绍(介绍一下你觉得有挑战的项目 (1)项目的痛点需求(配置变更的痛点、你做的目的是什么?) 思考方向:业务背景,用户需求;产品发展,产品现有局限问题…...

Spark 之 入门讲解详细版(1)
1、简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架。Spark在2013年6月进入Apache成为孵化项目,8个月后成为Apache顶级项目,速度之快足见过人之处&…...

简易版抽奖活动的设计技术方案
1.前言 本技术方案旨在设计一套完整且可靠的抽奖活动逻辑,确保抽奖活动能够公平、公正、公开地进行,同时满足高并发访问、数据安全存储与高效处理等需求,为用户提供流畅的抽奖体验,助力业务顺利开展。本方案将涵盖抽奖活动的整体架构设计、核心流程逻辑、关键功能实现以及…...

K8S认证|CKS题库+答案| 11. AppArmor
目录 11. AppArmor 免费获取并激活 CKA_v1.31_模拟系统 题目 开始操作: 1)、切换集群 2)、切换节点 3)、切换到 apparmor 的目录 4)、执行 apparmor 策略模块 5)、修改 pod 文件 6)、…...
三维GIS开发cesium智慧地铁教程(5)Cesium相机控制
一、环境搭建 <script src"../cesium1.99/Build/Cesium/Cesium.js"></script> <link rel"stylesheet" href"../cesium1.99/Build/Cesium/Widgets/widgets.css"> 关键配置点: 路径验证:确保相对路径.…...

centos 7 部署awstats 网站访问检测
一、基础环境准备(两种安装方式都要做) bash # 安装必要依赖 yum install -y httpd perl mod_perl perl-Time-HiRes perl-DateTime systemctl enable httpd # 设置 Apache 开机自启 systemctl start httpd # 启动 Apache二、安装 AWStats࿰…...

基于Docker Compose部署Java微服务项目
一. 创建根项目 根项目(父项目)主要用于依赖管理 一些需要注意的点: 打包方式需要为 pom<modules>里需要注册子模块不要引入maven的打包插件,否则打包时会出问题 <?xml version"1.0" encoding"UTF-8…...
Spring Boot面试题精选汇总
🤟致敬读者 🟩感谢阅读🟦笑口常开🟪生日快乐⬛早点睡觉 📘博主相关 🟧博主信息🟨博客首页🟫专栏推荐🟥活动信息 文章目录 Spring Boot面试题精选汇总⚙️ **一、核心概…...

04-初识css
一、css样式引入 1.1.内部样式 <div style"width: 100px;"></div>1.2.外部样式 1.2.1.外部样式1 <style>.aa {width: 100px;} </style> <div class"aa"></div>1.2.2.外部样式2 <!-- rel内表面引入的是style样…...

OPENCV形态学基础之二腐蚀
一.腐蚀的原理 (图1) 数学表达式:dst(x,y) erode(src(x,y)) min(x,y)src(xx,yy) 腐蚀也是图像形态学的基本功能之一,腐蚀跟膨胀属于反向操作,膨胀是把图像图像变大,而腐蚀就是把图像变小。腐蚀后的图像变小变暗淡。 腐蚀…...
深度学习之模型压缩三驾马车:模型剪枝、模型量化、知识蒸馏
一、引言 在深度学习中,我们训练出的神经网络往往非常庞大(比如像 ResNet、YOLOv8、Vision Transformer),虽然精度很高,但“太重”了,运行起来很慢,占用内存大,不适合部署到手机、摄…...