当前位置: 首页 > news >正文

【Flink状态管理(二)各状态初始化入口】状态初始化流程详解与源码剖析

文章目录

    • 1. 状态初始化总流程梳理
    • 2.创建StreamOperatorStateContext
    • 3. StateInitializationContext的接口设计。
    • 4. 状态初始化举例:UDF状态初始化

在TaskManager中启动Task线程后,会调用StreamTask.invoke()方法触发当前Task中算子的执行,在invoke()方法中会调用restoreInternal()方法,这中间包括创建和初始化算子中的状态数据。
另外在invoke中,可以通过判断任务状态来判断是否需要初始化状态。

        // Allow invoking method 'invoke' without having to call 'restore' before it.if (!isRunning) {LOG.debug("Restoring during invoke will be called.");restoreInternal();}

StreamTask调用initializeStateAndOpenOperators()方法对当前Task中所有算子的状态数据进行初始化。

RegularOperatorChain.
public void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {  Iterator var2 = this.getAllOperators(true).iterator();  while(var2.hasNext()) {  StreamOperatorWrapper<?, ?> operatorWrapper = (StreamOperatorWrapper)var2.next();  StreamOperator<?> operator = operatorWrapper.getStreamOperator();  operator.initializeState(streamTaskStateInitializer);  operator.open();  }  }

 
找到了算子状态初始化的位置,我们继续了解状态是如何初始化的。

1. 状态初始化总流程梳理

AbstractStreamOperator.initializeState中描述了状态初始化的总体流程,如下代码以及注释:

# AbstractStreamOperator.initializeStatepublic final void initializeState(StreamTaskStateInitializer streamTaskStateManager)  throws Exception {  //1. 获取类型序列化器final TypeSerializer<?> keySerializer =  config.getStateKeySerializer(getUserCodeClassloader());  //2. get containingTaskfinal StreamTask<?, ?> containingTask = Preconditions.checkNotNull(getContainingTask());  final CloseableRegistry streamTaskCloseableRegistry =  Preconditions.checkNotNull(containingTask.getCancelables());  //3. create StreamOperatorStateContextfinal StreamOperatorStateContext context =  streamTaskStateManager.streamOperatorStateContext(  getOperatorID(),  getClass().getSimpleName(),  getProcessingTimeService(),  this,  keySerializer,  streamTaskCloseableRegistry,  metrics,  config.getManagedMemoryFractionOperatorUseCaseOfSlot(  ManagedMemoryUseCase.STATE_BACKEND,  runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),  runtimeContext.getUserCodeClassLoader()),  isUsingCustomRawKeyedState());  //4. create stateHandlerstateHandler =  new StreamOperatorStateHandler(  context, getExecutionConfig(), streamTaskCloseableRegistry);  timeServiceManager = context.internalTimerServiceManager();  //5. initialize OperatorStatestateHandler.initializeOperatorState(this);  //6. set KeyedStateStore in runtimeContextruntimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));  
}

在StreamOperator初始化状态数据的过程中,首先从StreamTask中获取创建状态需要的组件,例如托管状态的管理后端KeyedStateBackend、OperatorStateBackend以及原生状态管理的KeyedStateInputs和OperatorStateInputs组件。

状态数据操作过程中使用的管理组件最终都会封装成StateInitializationContext并传递给子类使用,例如在AbstractUdfStreamOperator中,就会使用StateInitializationContext中的信息初始化用户定义的UDF中的状态数据。

2.创建StreamOperatorStateContext

接下来看如何在Task实例初始化时创建这些组件,并将其存储在StreamOperatorStateContext中供算子使用,如下代码:

StreamTaskStateInitializerImpl
@Override  
public StreamOperatorStateContext streamOperatorStateContext(  @Nonnull OperatorID operatorID,  @Nonnull String operatorClassName,  @Nonnull ProcessingTimeService processingTimeService,  @Nonnull KeyContext keyContext,  @Nullable TypeSerializer<?> keySerializer,  @Nonnull CloseableRegistry streamTaskCloseableRegistry,  @Nonnull MetricGroup metricGroup,  double managedMemoryFraction,  boolean isUsingCustomRawKeyedState)  throws Exception {  //1. 获取task实例信息TaskInfo taskInfo = environment.getTaskInfo();  OperatorSubtaskDescriptionText operatorSubtaskDescription =  new OperatorSubtaskDescriptionText(  operatorID,  operatorClassName,  taskInfo.getIndexOfThisSubtask(),  taskInfo.getNumberOfParallelSubtasks());  final String operatorIdentifierText = operatorSubtaskDescription.toString();  final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates =  taskStateManager.prioritizedOperatorState(operatorID);  CheckpointableKeyedStateBackend<?> keyedStatedBackend = null;  OperatorStateBackend operatorStateBackend = null;  CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null;  CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null;  InternalTimeServiceManager<?> timeServiceManager;  try {  // 创建keyed类型的状态后端// -------------- Keyed State Backend --------------  keyedStatedBackend =  keyedStatedBackend(  keySerializer,  operatorIdentifierText,  prioritizedOperatorSubtaskStates,  streamTaskCloseableRegistry,  metricGroup,  managedMemoryFraction);  //创建operator类型的状态后端// -------------- Operator State Backend --------------  operatorStateBackend =  operatorStateBackend(  operatorIdentifierText,  prioritizedOperatorSubtaskStates,  streamTaskCloseableRegistry);  //创建原生类型状态后端// -------------- Raw State Streams --------------  rawKeyedStateInputs =  rawKeyedStateInputs(  prioritizedOperatorSubtaskStates  .getPrioritizedRawKeyedState()  .iterator());  streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);  rawOperatorStateInputs =  rawOperatorStateInputs(  prioritizedOperatorSubtaskStates  .getPrioritizedRawOperatorState()  .iterator());  streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);  //创建Internal Timer Service Manager// -------------- Internal Timer Service Manager --------------  if (keyedStatedBackend != null) {  // if the operator indicates that it is using custom raw keyed state,  // then whatever was written in the raw keyed state snapshot was NOT written            // by the internal timer services (because there is only ever one user of raw keyed            // state);            // in this case, timers should not attempt to restore timers from the raw keyed            // state.            final Iterable<KeyGroupStatePartitionStreamProvider> restoredRawKeyedStateTimers =  (prioritizedOperatorSubtaskStates.isRestored()  && !isUsingCustomRawKeyedState)  ? rawKeyedStateInputs  : Collections.emptyList();  timeServiceManager =  timeServiceManagerProvider.create(  keyedStatedBackend,  environment.getUserCodeClassLoader().asClassLoader(),  keyContext,  processingTimeService,  restoredRawKeyedStateTimers);  } else {  timeServiceManager = null;  }  // -------------- Preparing return value --------------  return new StreamOperatorStateContextImpl(  prioritizedOperatorSubtaskStates.getRestoredCheckpointId(),  operatorStateBackend,  keyedStatedBackend,  timeServiceManager,  rawOperatorStateInputs,  rawKeyedStateInputs);  } catch (Exception ex) {  。。。。
}

流程梳理:

  1. 从environment中获取TaskInfo,并基于Task实例创建OperatorSubtaskDescriptionText。Operator中Task实例的描述信息包含OperatorID、OperatorClassName等,最终用于创建OperatorStateBackend的状态存储后端。
  2. 创建KeyedStateBackend,KeyedStateBackend是KeyedState的状态管理后端,提供创建和管理KeyedState的方法。
  3. 创建OperatorStateBackend,OperatorStateBackend是OperatorState的状态管理后端,提供获取和管理OperatorState的接口。
  4. 创建KeyGroupStatePartitionStreamProvider实例,提供创建和获取原生KeyedState的方法。
  5. 创建StatePartitionStreamProvider实例,提供创建和获取原生OperatorState的方法。
  6. 将所有创建出来的托管状态管理后端keyedStatedBackend和operatorStateBackend、原生状态存储后端rawKeyedStateInputs和rawOperatorStateInputs及timeServiceManager实例,全部封装在StreamOperatorStateContextImpl上下文对象中,并返回给AbstractStreamOperator使用。

 
小结
StreamTaskStateInitializer.streamOperatorStateContext()方法包含创建托管状态和原生状态管理后端的全过程。StreamOperator的实现类能够从StreamOperatorStateContext中获取这些状态管理组件并使用它们创建指定类型的状态,最终状态数据会存储在状态管理后端指定的物理介质上,例如堆内存或RocksDB。

StateInitializationContext会被用于算子和UserDefinedFunction中,实现算子或函数中的状态数据操作。

 

3. StateInitializationContext的接口设计。

StateInitializationContext接口同时继承了ManagedInitializationContext接口和FunctionInitializationContext接口。StateInitializationContext接口的默认实现类为StateInitializationContextImpl。
在这里插入图片描述

  1. ManagedInitializationContext接口提供了托管状态使用的KeyedStateStore和OperatorStateStore获取方法,即KeyedStateBackend和OperatorStateBackend的封装类。算子进行初始化时,会通过KeyedStateStore和OperatorStateStore提供的方法创建和管理指定类型的托管状态。

  2. FunctionInitializationContext提供了用户自定义函数状态数据初始化需要的方法。它和ManagedInitializationContext保持一致,这主要是为了和算子使用的上下文进行区分,但两者的操作基本一致。

  3. StateInitializationContext提供了对托管状态数据的管理,并在内部继承和拓展了获取及管理原生状态数据的方法,如getRawOperatorStateInputs()、getRawKeyedStateInputs()等

  4. StateInitializationContextImpl具备操作管理状态和原生状态的能力。基于它可以获取不同类型的状态管理后端,并基于状态管理操作状态数据。

在这里插入图片描述

 

4. 状态初始化举例:UDF状态初始化

在AbstractStreamOperator中调用initializeState(StateInitializationContext context)抽象方法初始化Operator中的状态。这里以AbstractUdfStreamOperator为例说明具体算子、UDF是如何进行状态初始化的。

AbstractUdfStreamOperator.initializeState()方法实际上调用了StreamingFunctionUtils.restoreFunctionState()方法对User-Defined Function中的状态数据进行初始化和恢复,实际上就是将上文创建的StateInitializationContext上下文信息提供给Function接口使用。

public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);StreamingFunctionUtils.restoreFunctionState(context, userFunction);
}

恢复函数内部的状态数据涉及Checkpoint的实现,我们会在之后介绍如何在StreamingFunctionUtils.restoreFunctionState()方法中恢复函数中的状态数据。

 
《Flink设计与实现:核心原理与源码解析》张利兵

相关文章:

【Flink状态管理(二)各状态初始化入口】状态初始化流程详解与源码剖析

文章目录 1. 状态初始化总流程梳理2.创建StreamOperatorStateContext3. StateInitializationContext的接口设计。4. 状态初始化举例&#xff1a;UDF状态初始化 在TaskManager中启动Task线程后&#xff0c;会调用StreamTask.invoke()方法触发当前Task中算子的执行&#xff0c;在…...

python+flask人口普查数据的应用研究及实现django

作为一款人口普查数据的应用研究及实现&#xff0c;面向的是大多数学者&#xff0c;软件的界面设计简洁清晰&#xff0c;用户可轻松掌握使用技巧。在调查之后&#xff0c;获得用户以下需求&#xff1a; &#xff08;1&#xff09;用户注册登录后&#xff0c;可进入系统解锁更多…...

C语言:函数

C语言&#xff1a;函数 函数的概念库函数自定义函数实参与形参return语句数组做参数声明与定义externstatic 嵌套调用 函数的概念 在C语言中&#xff0c;存在一个函数的概念&#xff0c;有人也将其翻译为子程序。 在数学中&#xff0c;函数是一个完成特定功能的公式&#xff0…...

jmeter-问题一:关于线程组,线程数,用户数详解

文章目录 jmeter参数介绍1.线程数2.准备时长(Ramp-up)3.循环次数4.same user on each iteratio5.调度器 场景一&#xff1a;当你的线程组中线程数为1,循环为1场景二&#xff1a;当你的线程组中线程数为2&#xff0c;循环为1场景三&#xff1a;当你的线程组中线程数为1&#xff…...

golang 通过 cgo 调用 C++ 库

思路 将 C 库包装成 C 库 -> golang 通过 cgo 调用 C 库 C 相关文件 目录列表 include/ some.h C 库头文件some_wrapper.h < 用于将 C 库包装成 C 库的头文件 lib/ libsome.a C 库 src/ some_wrapper.cpp < 用于将 C 库包装成 C 库的源码文件 源码示例 some.h…...

使用 IDEA 开发一个简单易用的 SDK

目录 一、什么是 SDK 二、为什么要开发 SDK 三、开发 SDK 的详细步骤 四、导入 SDK 进行测试 附&#xff1a;ConfigurationProperties 注解的介绍及使用 一、什么是 SDK 1. 定义&#xff1a;软件开发工具包 Software Development Kit 2. 用于开发特定软件或应用程序的工…...

CSS transition(过渡效果)详解

CSS过渡效果&#xff08;Transition&#xff09;是一种在CSS3中引入的动画效果&#xff0c;它允许开发者在元素状态变化时&#xff08;如鼠标悬停、类更改等&#xff09;平滑地改变CSS属性值&#xff0c;从而创建出平滑的动画效果。过渡效果可以应用于多种CSS属性&#xff0c;如…...

Android13多媒体框架概览

Android13多媒体框架概览 Android 多媒体框架 Android 多媒体框架旨在为 Java 服务提供可靠的接口。它是一个系统&#xff0c;包括多媒体应用程序、框架、OpenCore 引擎、音频/视频/输入的硬件设备&#xff0c;输出设备以及一些核心动态库&#xff0c;比如 libmedia、libmedi…...

一文读懂:MybatisPlus从入门到进阶

快速入门 简介 在项目开发中&#xff0c;Mybatis已经为我们简化了代码编写。 但是我们仍需要编写很多单表CURD语句&#xff0c;MybatisPlus可以进一步简化Mybatis。 MybatisPlus官方文档&#xff1a;https://www.baomidou.com/&#xff0c;感谢苞米豆和黑马程序员。 Mybat…...

C语言--------指针(1)

0.指针&指针变量 32位平台&#xff0c;指针变量是4个字节&#xff08;32bit/84)--------x86 64位平台&#xff0c;指针变量是8个字节&#xff08;64bit/88)--------x64 编号指针地址&#xff1b;我们平常讲的p是指针就是说p是一个指针变量&#xff1b; ************只要…...

Vite 下一代的前端工具链,前端开发与构建工具

一、Vite 简介 官方中文网站&#xff1a;Vite | 下一代的前端工具链 官方定义&#xff1a; Vite&#xff0c;下一代的前端工具链&#xff0c;为开发提供极速响应。 Vue3.4版本&#xff0c;Vue新版本使用Vite构建、开发、调试、编译。 Vite的优势 极速的服务启动 使用原生…...

【SpringBoot】FreeMarker视图渲染

目录 一、FreeMarker 简介 1.1 什么是FreeMarker&#xff1f; 1.2 Freemarker模板组成部分 1.3 为什么要使用FreeMarker 二、Springboot集成FreeMarker 2.1 配置 2.2 数据类型 2.2.1 字符串 2.2.2 数值 2.2.3 布尔值 2.2.4 日期 2.3 常见指令 2.3.2 assign 2.3…...

巴尔加瓦算法图解:算法运用。

树 如果能将用户名插入到数组的正确位置就好了&#xff0c;这样就无需在插入后再排序。为此&#xff0c;有人设计了一种名为二叉查找树(binary search tree)的数据结构。 每个node的children 都不大于两个。对于其中的每个节点&#xff0c;左子节点的值都比它小&#xff0c;…...

Docker的镜像和容器的区别

1 Docker镜像 假设Linux内核是第0层&#xff0c;那么无论怎么运行Docker&#xff0c;它都是运行于内核层之上的。这个Docker镜像&#xff0c;是一个只读的镜像&#xff0c;位于第1层&#xff0c;它不能被修改或不能保存状态。 一个Docker镜像可以构建于另一个Docker镜像之上&…...

忘记 RAG:拥抱Agent设计,让 ChatGPT 更智能更贴近实际

RAG&#xff08;检索增强生成&#xff09;设计模式通常用于开发特定数据领域的基于实际情况的ChatGPT。 然而&#xff0c;重点主要是改进检索工具的效率&#xff0c;如嵌入式搜索、混合搜索和微调嵌入&#xff0c;而不是智能搜索。 这篇文章介绍了一种新的方法&#xff0c;灵感…...

利用路由懒加载和CDN分发策略,对Vue项目进行性能优化

目录 一、Vue项目 二、路由懒加载 三、CDN分发策略 四、如何对Vue项目进行性能优化 一、Vue项目 Vue是一种用于构建用户界面的JavaScript框架&#xff0c;它是一种渐进式框架&#xff0c;可以用于构建单页应用&#xff08;SPA&#xff09;和多页应用。Vue具有简单易学、灵…...

【Scala】1. 变量和数据类型

1. 变量和数据类型 1.1 for begining —— hello world 新建hello.scala文件&#xff0c;注意object名字与文件名一致。 object hello { def main(args:Array[String]): Unit { println("hello world!") } }运行后打印结果如下&#xff1a; hello world!Pr…...

何时以及如何选择制动电阻

制动电阻的选择是优化变频器应用的关键因素 制动电阻器在变频器中是如何工作的&#xff1f; 制动电阻器在 VFD 应用中的工作原理是将电机减速到驱动器设定的精确速度。它们对于电机的快速减速特别有用。制动电阻还可以将任何多余的能量馈入 VFD&#xff0c;以提升直流母线上的…...

消息中间件:Puslar、Kafka、RabbigMQ、ActiveMQ

消息队列 消息队列&#xff1a;它主要用来暂存生产者生产的消息&#xff0c;供后续其他消费者来消费。 它的功能主要有两个&#xff1a; 暂存&#xff08;存储&#xff09;队列&#xff08;有序&#xff1a;先进先出 从目前互联网应用中使用消息队列的场景来看&#xff0c;…...

Rust开发WASM,浏览器运行WASM

首先需要安装wasm-pack cargo install wasm-pack 使用cargo创建工程 cargo new --lib mywasm 编辑Cargo.toml文件&#xff0c;修改lib的类型为cdylib&#xff0c;并且添加依赖wasm-bindgen [package] name "mywasm" version "0.1.0" edition "…...

基于算法竞赛的c++编程(28)结构体的进阶应用

结构体的嵌套与复杂数据组织 在C中&#xff0c;结构体可以嵌套使用&#xff0c;形成更复杂的数据结构。例如&#xff0c;可以通过嵌套结构体描述多层级数据关系&#xff1a; struct Address {string city;string street;int zipCode; };struct Employee {string name;int id;…...

阿里云ACP云计算备考笔记 (5)——弹性伸缩

目录 第一章 概述 第二章 弹性伸缩简介 1、弹性伸缩 2、垂直伸缩 3、优势 4、应用场景 ① 无规律的业务量波动 ② 有规律的业务量波动 ③ 无明显业务量波动 ④ 混合型业务 ⑤ 消息通知 ⑥ 生命周期挂钩 ⑦ 自定义方式 ⑧ 滚的升级 5、使用限制 第三章 主要定义 …...

【入坑系列】TiDB 强制索引在不同库下不生效问题

文章目录 背景SQL 优化情况线上SQL运行情况分析怀疑1:执行计划绑定问题?尝试:SHOW WARNINGS 查看警告探索 TiDB 的 USE_INDEX 写法Hint 不生效问题排查解决参考背景 项目中使用 TiDB 数据库,并对 SQL 进行优化了,添加了强制索引。 UAT 环境已经生效,但 PROD 环境强制索…...

第一篇:Agent2Agent (A2A) 协议——协作式人工智能的黎明

AI 领域的快速发展正在催生一个新时代&#xff0c;智能代理&#xff08;agents&#xff09;不再是孤立的个体&#xff0c;而是能够像一个数字团队一样协作。然而&#xff0c;当前 AI 生态系统的碎片化阻碍了这一愿景的实现&#xff0c;导致了“AI 巴别塔问题”——不同代理之间…...

ElasticSearch搜索引擎之倒排索引及其底层算法

文章目录 一、搜索引擎1、什么是搜索引擎?2、搜索引擎的分类3、常用的搜索引擎4、搜索引擎的特点二、倒排索引1、简介2、为什么倒排索引不用B+树1.创建时间长,文件大。2.其次,树深,IO次数可怕。3.索引可能会失效。4.精准度差。三. 倒排索引四、算法1、Term Index的算法2、 …...

算法笔记2

1.字符串拼接最好用StringBuilder&#xff0c;不用String 2.创建List<>类型的数组并创建内存 List arr[] new ArrayList[26]; Arrays.setAll(arr, i -> new ArrayList<>()); 3.去掉首尾空格...

iOS性能调优实战:借助克魔(KeyMob)与常用工具深度洞察App瓶颈

在日常iOS开发过程中&#xff0c;性能问题往往是最令人头疼的一类Bug。尤其是在App上线前的压测阶段或是处理用户反馈的高发期&#xff0c;开发者往往需要面对卡顿、崩溃、能耗异常、日志混乱等一系列问题。这些问题表面上看似偶发&#xff0c;但背后往往隐藏着系统资源调度不当…...

使用Spring AI和MCP协议构建图片搜索服务

目录 使用Spring AI和MCP协议构建图片搜索服务 引言 技术栈概览 项目架构设计 架构图 服务端开发 1. 创建Spring Boot项目 2. 实现图片搜索工具 3. 配置传输模式 Stdio模式&#xff08;本地调用&#xff09; SSE模式&#xff08;远程调用&#xff09; 4. 注册工具提…...

JavaScript基础-API 和 Web API

在学习JavaScript的过程中&#xff0c;理解API&#xff08;应用程序接口&#xff09;和Web API的概念及其应用是非常重要的。这些工具极大地扩展了JavaScript的功能&#xff0c;使得开发者能够创建出功能丰富、交互性强的Web应用程序。本文将深入探讨JavaScript中的API与Web AP…...

AI+无人机如何守护濒危物种?YOLOv8实现95%精准识别

【导读】 野生动物监测在理解和保护生态系统中发挥着至关重要的作用。然而&#xff0c;传统的野生动物观察方法往往耗时耗力、成本高昂且范围有限。无人机的出现为野生动物监测提供了有前景的替代方案&#xff0c;能够实现大范围覆盖并远程采集数据。尽管具备这些优势&#xf…...