Flink State 状态后端分析
flink状态实现分析
state
* State* |* +-------------------InternalKvState* | |* MergingState |* | |* +-----------------InternalMergingState* | |* +--------+------+ |* | | |* ReducingState ListState +-----+-----------------+* | | | |* +-----------+ +----------- -----------------InternalListState* | |* +---------InternalReducingState
MemoryState
RocksDBState
class RocksDBMapState<K, N, UK, UV> extends AbstractRocksDBState<K, N, Map<UK, UV>> {private TypeSerializer<UK> userKeySerializer;private TypeSerializer<UV> userValueSerializer;private RocksDBMapState(ColumnFamilyHandle columnFamily,TypeSerializer<N> namespaceSerializer,TypeSerializer<Map<UK, UV>> valueSerializer,Map<UK, UV> defaultValue,RocksDBKeyedStateBackend<K> backend);public TypeSerializer<K> getKeySerializer();public TypeSerializer<N> getNamespaceSerializer();public TypeSerializer<Map<UK, UV>> getValueSerializer();public UV get(UK userKey){ //直接读rocksdbbyte[] rawKeyBytes =serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);return (rawValueBytes == null? null: deserializeUserValue(dataInputView, rawValueBytes, userValueSerializer));}public void put(UK userKey, UV userValue){ //直接写rocksdbbyte[] rawKeyBytes =serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);byte[] rawValueBytes = serializeValueNullSensitive(userValue, userValueSerializer);backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes); //backend.db是RocksDBKeyedStateBackend}public void putAll(Map<UK, UV> map);public void remove(UK userKey);public boolean contains(UK userKey);public Iterable<Map.Entry<UK, UV>> entries();public Iterable<UK> keys();public Iterable<UV> values();public boolean isEmpty();public void clear();static <UK, UV, K, N, SV, S extends State, IS extends S> IS create(StateDescriptor<S, SV> stateDesc,Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>>registerResult,RocksDBKeyedStateBackend<K> backend) { //backend在这里传入return (IS)new RocksDBMapState<>(registerResult.f0,registerResult.f1.getNamespaceSerializer(),(TypeSerializer<Map<UK, UV>>) registerResult.f1.getStateSerializer(),(Map<UK, UV>) stateDesc.getDefaultValue(),backend);}
}
backend与checkpoint
public interface Snapshotable<S extends StateObject> {RunnableFuture<S> snapshot(long checkpointId,long timestamp,@Nonnull CheckpointStreamFactory streamFactory,@Nonnull CheckpointOptions checkpointOptions)throws Exception;
}
FSBackend
- FsStateBackend中createKeyedStateBackend是创建了HeapKeyedStateBackend
- FsStateBackend中createOperatorStateBackend是创建了DefaultOperatorStateBackend
- DefaultOperatorStateBackend创建了PartitionableListState, 是State的子类
public interface StateBackend extends java.io.Serializable {default String getName() {return this.getClass().getSimpleName();}<K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(Environment env,JobID jobID,String operatorIdentifier,TypeSerializer<K> keySerializer,int numberOfKeyGroups,KeyGroupRange keyGroupRange,TaskKvStateRegistry kvStateRegistry,TtlTimeProvider ttlTimeProvider,MetricGroup metricGroup,@Nonnull Collection<KeyedStateHandle> stateHandles,CloseableRegistry cancelStreamRegistry)throws Exception;OperatorStateBackend createOperatorStateBackend(Environment env,String operatorIdentifier,@Nonnull Collection<OperatorStateHandle> stateHandles,CloseableRegistry cancelStreamRegistry)throws Exception;/** Whether the state backend uses Flink's managed memory. */default boolean useManagedMemory() {return false;}}
public class FsStateBackend extends AbstractFileStateBackend implements ConfigurableStateBackend {public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException {checkNotNull(jobId, "jobId");return new FsCheckpointStorageAccess(getCheckpointPath(),getSavepointPath(),jobId,getMinFileSizeThreshold(),getWriteBufferSize());}public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env,JobID jobID,String operatorIdentifier,TypeSerializer<K> keySerializer,int numberOfKeyGroups,KeyGroupRange keyGroupRange,TaskKvStateRegistry kvStateRegistry,TtlTimeProvider ttlTimeProvider,MetricGroup metricGroup,@Nonnull Collection<KeyedStateHandle> stateHandles,CloseableRegistry cancelStreamRegistry)throws BackendBuildingException {TaskStateManager taskStateManager = env.getTaskStateManager();LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig();HeapPriorityQueueSetFactory priorityQueueSetFactory =new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);LatencyTrackingStateConfig latencyTrackingStateConfig =latencyTrackingConfigBuilder.setMetricGroup(metricGroup).build();return new HeapKeyedStateBackendBuilder<>( //这里是HeapKeyedStateBackendBuilderkvStateRegistry,keySerializer,env.getUserCodeClassLoader().asClassLoader(),numberOfKeyGroups,keyGroupRange,env.getExecutionConfig(),ttlTimeProvider,latencyTrackingStateConfig,stateHandles,AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),localRecoveryConfig,priorityQueueSetFactory,isUsingAsynchronousSnapshots(),cancelStreamRegistry).build();}@Overridepublic OperatorStateBackend createOperatorStateBackend(Environment env,String operatorIdentifier,@Nonnull Collection<OperatorStateHandle> stateHandles,CloseableRegistry cancelStreamRegistry)throws BackendBuildingException {return new DefaultOperatorStateBackendBuilder( //这里是DefaultOperatorStateBackendBuilderenv.getUserCodeClassLoader().asClassLoader(),env.getExecutionConfig(),isUsingAsynchronousSnapshots(),stateHandles,cancelStreamRegistry).build();}
}
memory backend
- MemoryStateBackend中createOperatorStateBackend是创建了DefaultOperatorStateBackend
- MemoryStateBackend中createKeyedStateBackend是创建了HeapKeyedStateBackendBackend
- 最终调用了HeapMapState::Create创建state
flink checkpoint
参考资料
https://www.jianshu.com/p/569a7e67c1b3
https://blog.csdn.net/u010942041/article/details/114944767
https://cloud.tencent.com/developer/article/1792720
https://blog.51cto.com/dataclub/5351042
https://www.cnblogs.com/lighten/p/13234350.html
https://cloud.tencent.com/developer/article/1765572
https://blog.csdn.net/m0_63475429/article/details/127417649
https://blog.csdn.net/Direction_Wind/article/details/125646616
相关文章:
Flink State 状态后端分析
flink状态实现分析 state * State* |* -------------------InternalKvState* | |* MergingState |* | |* …...
和年薪30W的阿里测开工程师聊过后,才知道我的工作就是打杂的...
前几天和一个朋友聊面试,他说上个月同时拿到了腾讯和阿里的offer,最后选择了阿里。 阿里内部将员工一共分为了14个等级,P6是资深工程师,P7是技术专家。 其中P6和P7就是一个分水岭了,P6是最接近P7的不持股员工&#x…...
C#开发的OpenRA的界面布局数据加载
C#开发的OpenRA的界面布局数据加载 当显示完成加载界面之后,就是进行其它内容处理。 因为后面内容的加载会比较长时间,所以首先显示加载界面是一种非常友好的方法。 因此在软件设计里,尽可能先显示界面,让用户先看到程序正在运行, 然后再处理时间长的加载。如果不这样做,…...
并查集结构
文章目录并查集特点构建过程查找两个元素是否是同一集合优化查找领头元素设置两个元素为同一集合构建结构应用场景并行计算集合问题并查集特点 对于使用并查集构建的结构,可以使得查询两个元素是否在同一集合,以及合并集合的操作无限接近O(1) 构建过程…...
全国CSM敏捷教练认证将于2023年3月25-26开班,报名从速!
CSM,即Certified Scrum Master,是Scrum联盟发起的Scrum认证。 CSM可以帮助团队正确使用Scrum,从而提高项目整体成功的可能性。 CSM深刻理解Scrum的价值观、实践以及Scrum框架。 CSM是“服务型领导”,帮助Scrum团队一起紧密合作。 …...
JavaEE进阶第六课:SpringBoot ⽇志⽂件
上篇文章介绍了SpringBoot配置文件,这篇文章我们将会介绍SpringBoot ⽇志⽂件 荔枝1.日志有什么用2.自定义日志输出2.1获取程序日志对象2.2使用相关方法输出日志2.3日志级别2.3.1日志级别的作用2.3.2日志级别如何设置2.4日志格式3.持久化日志4.更简单的日志输出4.1使…...
外置MOS管平均电流型LED降压恒流驱动器
产品描述 AP5125 是一款外围电路简单的 Buck 型平均电 流检测模式的 LED 恒流驱动器,适用于 8-100V 电压 范围的非隔离式大功率恒流 LED 驱动领域。芯片采用 固定频率 140kHz 的 PWM 工作模式, 利用平均电 流检测模式,因此具有优异的负载调整…...
python+pytest接口自动化(6)-请求参数格式的确定
我们在做接口测试之前,先需要根据接口文档或抓包接口数据,搞清楚被测接口的详细内容,其中就包含请求参数的编码格式,从而使用对应的参数格式发送请求。例如某个接口规定的请求主体的编码方式为 application/json,那么在…...
开发手册——一、编程规约_3.代码格式
这篇文章主要梳理了在java的实际开发过程中的编程规范问题。本篇文章主要借鉴于《阿里巴巴java开发手册终极版》 下面我们一起来看一下吧。 1. 【强制】大括号的使用约定。如果是大括号内为空,则简洁地写成{}即可,不需要换行;如果是非空代码…...
十七、Django-restframework之序列化器(二)
1. 序列化器 REST framework提供了一个serializer类,它可以非常方便的序列化模型实例和查询集为JSON或者其他内容形式。它还提供反序列化,允许在验证传入数据后将解析的数据转换回复杂类型。 2. 定义序列化器 在crm应用目录下创建serializers.py文件&a…...
python GUI图形化编程-----wxpython
一、python gui(图形化)模块介绍: Tkinter :是python最简单的图形化模块,总共只有14种组建 Pyqt :是python最复杂也是使用最广泛的图形化 Wx :是python当中居中的一个图形化,学习结构很清晰 Pywin :是pyth…...
【Python 】yyyy-MM-dd HH:mm:ss 时间格式 时间戳 全面解读超详细
时间格式 时间格式(协议)描述gg时期或纪元。y不包含纪元的年份。不具有前导零。yy不包含纪元的年份。具有前导零。yyyy包含纪元的四位数的年份。M月份数字。一位数的月份没有前导零。MM月份数字。一位数的月份有一个前导零。MMM月份的缩写名称,在AbbreviatedMonthN…...
【C++】C++11 异常
目录 1. C语言传统的处理错误的方式 2. C异常概念 3. 异常的使用 3.1. 异常的抛出和捕获 3.2. 在函数调用链中异常栈展开匹配原则 3.3. 异常的重新抛出 3.4. 异常安全 3.5. 异常规范 4.自定义异常体系 5. C标准库的异常体系 6. 异常的优缺点 6.1. C异常的优点&…...
关于Thread.start()后的困惑、imap
在for循环中,接着开thread,开完就start,当时有个困惑,就是比如开的一个thread的这个start执行完,但是这个for循环还没执行完,那程序会跑到for循环的后面逻辑吗?比如下面13行for循环开始开第一个…...
qml学习之qwidget与qml结合使用并调用信号槽交互
学习qml系列之一说明: 学习qml系列之qwiget和qml信号槽的交互使用,并在qwidget中显示qml界面 在qml中发送信号到qwidget里 在qwidget里发送信号给qml 在qwidget里面调用qml界面方式 方式一:使用QQuickView 这个是Qt5.0中提供的一个类&…...
【 华为OD机试 2023】 组装新的数组(C++ Java JavaScript Python)
文章目录 题目描述输入描述输出描述备注用例题目解析C++JavaScriptJavaPython题目描述 给你一个整数M和数组N,N中的元素为连续整数,要求根据N中的元素组装成新的数组R,组装规则: R中元素总和加起来等于MR中的元素可以从N中重复选取R中的元素最多只能有1个不在N中,且比N中…...
【洛谷 P2089】烤鸡(循环枚举)
烤鸡 题目背景 猪猪 Hanke 得到了一只鸡。 题目描述 猪猪 Hanke 特别喜欢吃烤鸡(本是同畜牲,相煎何太急!)Hanke 吃鸡很特别,为什么特别呢?因为他有 101010 种配料(芥末、孜然等)…...
windows10安装ubantu双系统
windows10安装ubantu双系统 文章目录windows10安装ubantu双系统一、安装前准备1.前期说明2.制作U盘启动器3.设置硬盘分区相关4.设置给ubantu系统的硬盘大小,设置为未分配(删除卷)二、进行安装1.设置bios相关2.进入bios启动界面选择U盘安装3.进…...
【华为OD机试 2023】 人数最多的站点/小火车最多人时所在园区站点(C++ Java JavaScript Python)
文章目录 题目描述输入描述输出描述用例题目解析C++JavaScriptJavaPython励志做全网最全、解法最多的华为OD机考算法题库,帮助你上岸华为。提供C++/Java、JavaScript、Python四种语言的解法。每篇文章都有详细的结题步骤。有问题,随时解答。😁😁😁😁 目前为了造福广大…...
2024届暑期实习实录(阿里云大数据研发平台)
1. 项目介绍(介绍一下你觉得有挑战的项目 (1)项目的痛点需求(配置变更的痛点、你做的目的是什么?) 思考方向:业务背景,用户需求;产品发展,产品现有局限问题…...
空口协议probe req和probe rsp 、auth req和auth rsp 、assoc req和assoc rsp讲解
我们经常可以看到抓到的报文主要有三种:probe req和probe rsp 、auth req和auth rsp 、assoc req和assoc rsp 。 建立联结的三个阶段 相互发现阶段:probe req和probe rsp probe是探测的意思 相互了解阶段:auth req和auth rsp auth是认证的缩写 建立关…...
vscode ssh一直卡在wget的解决方案
vscode ssh一直卡在wget的解决方案找到commit_id 在服务器下点进该目录 .vscode-server\bin 一般日期最新的那一串就是我们需要的commit_id下载vscode-server-linux-x64.tar https://update.code.visualstudio.com/commit:${commit_id}/server-linux-x64/stable 将加粗部分替换…...
【Python学习笔记】第二十五节 Python MySQL
Python 连接到 MySQL 数据库有几种不同的连接方法,而且不是所有的方法都能与不同的操作系统很好地配合.MySQL connector/Python模块是Oracle支持的官方驱动,用于通过Python连接MySQL。该连接器完全是Python语言,而mysqlclient是用C语言编写的…...
折叠屏手机自带的屏幕表面层为什么不能自己撕?
计划入手折叠屏手机的小伙伴看过来,因为折叠屏手机外观的特殊形态,在日常使用中与普通直板手机屏幕的养护还是有着很大的不同,比如直板机入手后新机开箱撕膜这个操作,对于折叠屏手机来说,万万不可!除此之外…...
20.hadoop系列之Yarn资源调度器
Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统,而MapReduce等运算程序则相当于运行于操作系统之上的应用程序 1.Yarn基础架构 Yarn主要由ResourceManager、NodeManager、ApplicationMaster和Contai…...
206页16万字城市运行“一网统管”体系建设项目需求报告
本资料来源网络,知识分享,仅供个人学习,请勿商用。完整资料领取见文末,部分资料内容: 第 一 章 应用场景示例 一.1 一卡通人员管理针对企业厂区、办公楼等场所人员出入频繁、安保问题多样化、管理环节复杂等现状&#…...
【JS】数组Array的使用
arr.length 3 通过设置数组长度,实现截取数组,改变原数组 map 循环遍历数组,不改变原数组,返回一个新的数组(可用变量接收) forEach 循环遍历数组,不改变原数组 join 把数组转化为字符串&…...
2023年,软件测试怎么样?
2022年因为各种不可抗力原因,大厂裁员,失业等等频频受到关注。 不解释,确实存在,各行各业都很难,但是,说软件测试行业不吃香,我还真不认同(不是为培训机构说好话,大环境…...
【学习笔记】NOMURA Programming Competition 2020
C - Folia 不难想到自底向上确定树的形态。可能要多尝试一下 一开始想错了好几个地方,服了 假设某一层有XXX个节点,那么上一层可能有⌈X2⌉,⌈X2⌉1,...,X\lceil\frac{X}{2}\rceil,\lceil\frac{X}{2}\rceil1,...,X⌈2X⌉,⌈2X⌉1,...,X个节点&…...
iis下常用程序的伪静态规则列表(包括wordpress、thinkphp)
shopex discuz2.0 discuz2.5 discuz3.x 淘宝客 ecshop phpwind参照http://www.west.cn/faq/list.asp?unid797通过主机面板设置即可 wordpress设置: 第一步: 1.新建一个“chineseurl.php”文件:在里面写入以下代码上传到wordpress安装目录。…...
石景山网站建设制作公司/买友情链接有用吗
git add . :他会监控工作区的状态树,使用它会把工作时的所有变化提交到暂存区,包括文件内容修改(modified)以及新文件(new),但不包括被删除的文件。 git add -u :他仅监控已经被add的文件(即tracked file&a…...
一起看地图app下载手机版/seo优化顾问
1,自然演绎系统 1.1,定义 如果我们不过分追求精确,在形式系统中使用 作为逻辑连接词,使用 等。 的推演规则: Hyp:若 则 EHyp:若, 则 Ded:,则 Pÿ…...
台州网站制作服务/sem 优化软件
目录在 netmiko 中使用 TextFSM安装模板查看安装完成使用如何编写自定义TextFSM模板示例TextFSM 在线解析: http://textfsm.xdai.vip/ 在 netmiko 中使用 TextFSM 安装模板 $ cd ~ $ git clone https://github.com/networktocode/ntc-templates.git查看安装完成 …...
泉州建设网站开发/靠谱seo外包定制
可以设置收、发邮件白名单,当终端用户使用天锐绿盾终端自带的邮件工具、outlook、foxmail 或闪电邮往白名单邮箱地址收、发送邮件时,加密的附件就会自动解密,减少解密申请次数,方便用户使用。工具:绿盾加密软件收件人白…...
天津外贸网站建设公司/徐州seo建站
对象数组去重 用ES6的reduce方法 this.backTag [{id:111,name:哈哈哈1}, {id:222,name:哈哈哈2},{id:333,name:哈哈哈3},{id:444,name:哈哈哈4},{id:111,name:哈哈哈5}]; let obj {}this.backTag this.backTag.reduce(function(item, next) {obj[next.id] ? : obj[next.…...
极端页面 装修wordpress 主题/广西seo公司
我们经常使用Ping开查看网络是否畅通,随着安全意识的加强,预装系统有些默认会关闭Ping回显服务,我们要手动去打开它。 打开开始菜单输入防火墙 打开Windows防火墙,进入入站规则,找到文件和打印机共享(回显…...