当前位置: 首页 > news >正文

Flink源码之Checkpoint执行流程

checkpoint

Checkpoint完整流程如上图所示:

  1. JobMaster的CheckpointCoordinator向所有SourceTask发送RPC触发一次CheckPoint
  2. SourceTask向下游广播CheckpointBarrier
  3. SouceTask完成状态快照后向JobMaster发送快照结果
  4. 非SouceTask在Barrier对齐后完成状态快照向JobMaster发送快照结果
  5. JobMaster保存SubTask快照结果
  6. JobMaster收到所有SubTask快照结果后保存快照信息,想SubTask通知Checkpoint完成

以下对整个流程具体说明。

CheckpointCoordinator

JobMaster将JobGraph转换为ExecutionGraph时,如果开启Checkpoint,会为ExecutionGraph生成一个CheckpointCoordinator

DefaultExecutionGraphBuilder.buildGraph//在此会将JobGraph转换为ExecutionGraphDefaultExecutionGraph::newDefaultExecutionGraph::attachJobGraph //创建ExecutionJobVertexDefaultExecutionTopology.fromExecutionGraph //创建ExecutionTopologyDefaultExecutionGraph::enableCheckpointing //创建CheckpointCoordinatorDefaultExecutionGraph::createCheckpointPlanCalculator//创建DefaultCheckpointPlanCalculatorCheckpointCoordinator::new 

CheckpointCoordinator封装了StateBackend和CheckpointStorage

StateBackend负责管理状态:

  • HashMapStateBackend //内存
  • EmbeddedRocksDBStateBackend //内存+磁盘

CheckpointStorage则是负责存储StateBackend管理的状态:

  • JobManagerCheckpointStorage //checkpoint state放入JobManager内存
  • FileSystemCheckpointStorage //配置state.checkpoints.dir时

在为StreamTask构造SubtaskCheckpointCoordinatorImpl时会调用:

CheckpointStorage::createCheckpointStorage

创建CheckpointStorageAccess用于执行Checkpoint时解析状态存储位置

  • MemoryBackendCheckpointStorageAccess //对应JobManagerCheckpointStorage
  • FsCheckpointStorageAccess //对应FileSystemCheckpointStorage

CheckpointCoordinator在执行状态快照时会调用

CheckpointStorageAccess::resolveCheckpointStorageLocation

生成CheckpointStreamFactory用于生成读写状态数据流

  • MemCheckpointStreamFactory //对应JobManagerCheckpointStorage
  • FsCheckpointStreamFactory //对应FileSystemCheckpointStorage

Checkpoint触发流程

JobMaster状态转换为running后,通过CheckpointCoordinator向SourceTask发送TriggerCheckpoint

JobMaster端触发流程

JobMaster::start  //RPCServer启动
JobMaster::onStart
JobMaster::startJobExecution
JobMaster::startJobMasterServices //获取RM地址后与RM建立连接
JobMaster::startScheduling
SchedulerBase::startScheduling
DefaultScheduler::startSchedulingInternal
SchedulerBase::transitionToRunningDefaultExecutionGraph::transitionToRunning //调用ExecutionGraph监听器通知状态变化CheckpointCoordinatorDeActivator::jobStatusChanges//触发checkpointCheckpointCoordinator::startCheckpointSchedulerCheckpointCoordinator::scheduleTriggerWithDelay //定时不断触发CheckpointCheckpointCoordinator::triggerCheckpointCheckpointCoordinator::startTriggeringCheckpointDefaultCheckpointPlanCalculator::calculateCheckpointPlan//Plan中会隔离出SourceTask作为作为Trigger Checkpoint的入口CheckpointCoordinator::createPendingCheckpointCheckpointCoordinator::triggerCheckpointRequestCheckpointCoordinator::triggerTasks Execution::triggerCheckpoint //向每个SourceTask发送TriggerCheckpoint请求Execution::triggerCheckpointHelperTaskManagerGateway::triggerCheckpoint//向TaskExecutor发RPC

StreamTask端执行流程

SourceTask

SourceTask由JobMaster RPC直接触发,执行时先广播CheckpointBarrier,然后对状态执行异步快照

TaskExecutor::triggerCheckpoint
Task::triggerCheckpointBarrier
AbstractInvokable::triggerCheckpointAsync
SourceStreamTask::triggerCheckpointAsync
StreamTask::triggerCheckpointAsync
StreamTask::triggerCheckpointAsyncInMailbox
StreamTask::performCheckpoint
SubtaskCheckpointCoordinatorImpl::checkpointStateOperatorChain.broadcastEvent //广播CheckpointBarrier
CheckpointStorage::createCheckpointStorage//为JobId创建CheckpointStorageAccess
SubtaskCheckpointCoordinatorImpl::takeSnapshotSync
CheckpointStorageWorkerView::resolveCheckpointStorageLocation//CheckpointStorageAccess创建 CheckpointStreamFactoryOperatorChain::snapshotState //对每个OperatorRegularOperatorChain::buildOperatorSnapshotFuturesRegularOperatorChain::checkpointStreamOperatorAbstractStreamOperator::snapshotStateStreamOperatorStateHandler::snapshotState//调用Operator/Keyed Backend的snapshotStateSnapshotContextSynchronousImpl::newAbstractUdfStreamOperator::snapshotState //调用UDF中snapshotState方法,一般用于更新OperatorStateDefaultOperatorStateBackend::snapshotSnapshotStrategyRunner::snapshotDefaultOperatorStateBackendSnapshotStrategy::syncPrepareResources//深copy operator state,便于后续进行异步快照DefaultOperatorStateBackendSnapshotStrategy::asyncSnapshot//异步快照					  	  CheckpointStateOutputStream::closeAndGetHandleOperatorStreamStateHandle::new //包装元信息及数据StreamStateHandleHeapKeyedStateBackend::snapshotSnapshotStrategyRunner::snapshotHeapSnapshotStrategy::syncPrepareResourcesHeapSnapshotStrategy::asyncSnapshot //采用COWSateTable异步快照CheckpointStateOutputStream::closeAndGetHandleKeyGroupsStateHandle::new //包装KeyGroup及数据StreamStateHandle
SubtaskCheckpointCoordinatorImpl::finishAndReportAsync //向JobMaster发送checkpoint的结果AsyncCheckpointRunnable::new AsyncCheckpointRunnable::runAsyncCheckpointRunnable::finalizeNonFinishedSnapshotsOperatorSnapshotFinalizer::new //等待TaskSnapshot状态信息序列化完成AsyncCheckpointRunnable::reportCompletedSnapshotStatesTaskStateManagerImpl::reportTaskStateSnapshotsRpcCheckpointResponder::acknowledgeCheckpoint//向JobMaster发送Ack,带上State信息
非SourceTask

在StreamTask启动后调用StreamTask::processInput不断读取数据进行处理, 非SourceTask在收到上游的CheckpointBarrier对齐后触发Checkpoint,

StreamTask::processInput
StreamOneInputProcessor::processInput
StreamTaskNetworkInput::emitNext(StreamTaskNetworkOutput)
AbstractStreamTaskNetworkInput::emitNext //循环不断从buffer中读取StreamElement
处理CheckpointedInputGate::pollNextCheckpointedInputGate::handleEventSingleCheckpointBarrierHandler::processBarrierSingleCheckpointBarrierHandler::markCheckpointAlignedAndTransformStateWaitingForFirstBarrier::barrierReceivedAbstractAlignedBarrierHandlerState::barrierReceivedSingleCheckpointBarrierHandler.ControllerImpl::allBarriersReceived//判断对齐AbstractAlignedBarrierHandlerState::triggerGlobalCheckpointSingleCheckpointBarrierHandler.ControllerImpl::triggerGlobalCheckpointSingleCheckpointBarrierHandler::triggerCheckpointCheckpointBarrierHandler::notifyCheckpoint //触发StreamTask CheckpointStreamTask::triggerCheckpointOnBarrierStreamTask::performCheckpoint //后续调用过程与SourceTask一样SubtaskCheckpointCoordinatorImpl::checkpointState   		

根据调用栈看出,非SourceStreamTask执行Checkpoint只是触发时机不同,SourceTask由JobMaster RPC定时不断触发,非SourceTask则是在上游的CheckpointBarrier对齐后触发Checkpoint,最终执行逻辑都是将当前算子的信息写入CheckpointStorage后向JobMaster发送确认信息。

StreamTask向JobMaster ACK信息中包含状态元信息及StreamStateHandle,根据状态存储位置分为:

  • ByteStreamStateHandle //对应JobManagerCheckpointStorage,将状态序列化为byte[]发送给JobMaster
  • FileStateHandle //对应FileSystemCheckpointStorage,将状态写入文件系统后将文件路径发送给JobMaster

JobMaster端完成流程

JobMaster收到StreamTask的acknowledgeCheckpoint后:

JobMaster::acknowledgeCheckpoint
SchedulerBase::acknowledgeCheckpoint
ExecutionGraphHandler::acknowledgeCheckpoint
CheckpointCoordinator::receiveAcknowledgeMessagePendingCheckpoint::acknowledgeTask //某一个Task的确认PendingCheckpoint::updateOperatorState//更新SubTask状态信息CheckpointCoordinator::completePendingCheckpoint//所有Task Ack后PendingCheckpoint::finalizeCheckpointCheckpoints.storeCheckpointMetadata//保存CheckpointMetadataCompletedCheckpoint::newCheckpointCoordinator::sendAcknowledgeMessages//向Task通知Checkpoint完成消息ExecutionVertex::notifyCheckpointCompleteTaskManagerGateway.notifyCheckpointComplete

JobMaster收到所有StreamTask的Checkpoint状态信息后,标志一次Checkpoint完成,这时会通知StreamTask CheckPoint完成消息,便于SubTask监听Checkpoint完成后做后续动作。

相关文章:

Flink源码之Checkpoint执行流程

Checkpoint完整流程如上图所示: JobMaster的CheckpointCoordinator向所有SourceTask发送RPC触发一次CheckPointSourceTask向下游广播CheckpointBarrierSouceTask完成状态快照后向JobMaster发送快照结果非SouceTask在Barrier对齐后完成状态快照向JobMaster发送快照结…...

【工具使用】Git的使用

dev代表开发版 1. git clone 命令 通过 git add <name> 对文件进行跟踪&#xff0c;把<name>加入到暂存区 git commit -m XXXXXXX 提交修改并补充XXXXX作为注释 “暂存”状态&#xff1a;出现了一些修改&#xff0c;但是还没有提交 对于Java来说&#xff0c;.cl…...

无涯教程-PHP Installation on Windows NT/2000/XP with IIS函数

在Windows Server上运行IIS的PHP的安装比在Unix上简单得多,因为它涉及的是预编译的二进制文件而不是源代码。 如果您打算在Windows上安装PHP,那么这是先决条件列表- 运行中的PHP支持的Web服务器。一个正确安装的PHP支持的数据库,如MySQL或Oracle等。(如果您打算使用的话) PHP…...

EureKa快速入门

EureKa快速入门 远程调用的问题 多个服务有多个端口&#xff0c;这样的话服务有多个&#xff0c;硬编码不太适合 eureKa的作用 将service的所有服务的端口全部记录下来 想要的话 直接从注册中心查询对于所有服务 每隔一段时间需要想eureKa发送请求 保证服务还存活 动手实践 …...

Sectigo EV代码签名申请步骤

一、EV代码签名申请前提 1、单位成立时间不低于&#xff1a;3个月 2、单位工商及企查查可查 3、单位经营正常 4、注册地址真实存在&#xff0c;禁止使用集中注册地址 5、企查查登记电话和邮箱&#xff0c;确定查询结果的电话可以接听、邮箱可以接收邮件&#xff0c;如果信…...

生信学院|08月25日《SOLIDWORKS PDM帮助企业对设计数据版本的管理应用》

课程主题&#xff1a;SOLIDWORKS PDM帮助企业对设计数据版本的管理应用 课程时间&#xff1a;2023年08月25日 14:00-14:30 主讲人&#xff1a;车立洋 生信科技 PDM专家 1、图纸&文档的版本管理对于企业的重要性 2、SolidWorks PDM对图纸&文档版本的管理 3、SolidW…...

vue页面转pdf后分页时文字被横向割裂

效果 预期效果 //避免分页被截断async outPutPdfFn (id, title) {const _t this;const A4_WIDTH 592.28;const A4_HEIGHT 841.89;// dom的id。let target document.getElementById(pdf);let pageHeight target.scrollWidth / A4_WIDTH * A4_HEIGHT;// 获取分割dom&#xf…...

数据结构——队列(C语言)

需求&#xff1a;无 本篇文章将解决一下几个问题&#xff1a; 队列是什么&#xff1f;如何实现一个队列&#xff1f;什么场景下会用队列&#xff1f; 队列的概念&#xff1a; 队列&#xff1a;一种只允许一端进行插入数据操作&#xff0c;在另一端进行删除操作的特殊线性表。…...

WGS84地球坐标系,GCJ02火星坐标系,BD09百度坐标系简介与转换 资料收集

野火 ATGM332D简介 高性能、低功耗 GPS、北斗双模定位模块 STM32 GPS定位_为了维护世界和平_的博客-CSDN博客 秉火多功能调试助手上位机开源&#xff01;共六款软件&#xff0c;学到你吐... , - 电脑上位机 - 野火电子论坛 - Powered by Discuz! https://www.firebbs.cn/for…...

【面试题】前端面试复习6---性能优化

前端面试题库 &#xff08;面试必备&#xff09; 推荐&#xff1a;★★★★★ 地址&#xff1a;前端面试题库 性能优化 一、性能指标 要在 Chrome 中查看性能指标&#xff0c;可以按照以下步骤操作&#xff1a; 打开 Chrome 浏览器&#xff0c;并访问你想要测试…...

隧道HTTP具备的条件

作为一名专业的爬虫代理供应商&#xff0c;我们都知道使用代理是保证爬虫的高效性和稳定性的重要手段之一。而隧道代理则是近年来备受推崇的一种代理形式&#xff0c;它通过将请求通过隧道传输&#xff0c;可以有效地隐藏爬虫的真实IP地址&#xff0c;提高爬虫的反爬能力。 在…...

部署FTP服务(二)

目录 2.访问FTP服务 1.使用ftp命令行工具 2.使用浏览器 3.使用FileZilla Client 3.Serv-U 1.定义新域 2.创建用户 4. windowsserver搭建ftp服务器 一、FTP工具 二、Windows资源管理器 三、IE浏览器访问 2.访问FTP服务 下面在一台装有Windows10操作系统的计算机中&#…...

缓存的变更(JVM本地缓存->Redis分布式缓存)

在一次需求修改中&#xff0c;下游的服务附加提出了&#xff0c;针对某个业务数据缓存的生效时间的要求 原JVM设计方案&#xff1a; 采用jvm本地缓存机制&#xff0c;定时任务30秒刷新一次 现在redis方案&#xff1a; 因为很多地方使用了这个业务数据缓存&#xff0c;使用方…...

springMVC Unix 文件参数变更漏洞修复

错误信息如下&#xff1a; 解决方案&#xff1a; 原因&#xff1a;未对用户输入正确执行危险字符清理 未检查用户输入中是否包含“…”&#xff08;两个点&#xff09;字符串&#xff0c;比如 url 为 /login?action…/webapps/RTJEKSWTN26635&typerandomCode cookie为Coo…...

【LeetCode】494.目标和

题目 给你一个非负整数数组 nums 和一个整数 target 。 向数组中的每个整数前添加 或 - &#xff0c;然后串联起所有整数&#xff0c;可以构造一个 表达式 &#xff1a; 例如&#xff0c;nums [2, 1] &#xff0c;可以在 2 之前添加 &#xff0c;在 1 之前添加 - &#x…...

KaiwuDB 荣获哈佛商业评论 2023“高能韧性团队奖”

8月18日&#xff0c;《哈佛商业评论》中文版携手 FESCO 成功举办“第九届人才经济论坛”暨“2022-2023 高能团队奖颁奖典礼”。论坛秉承前沿的全球视野及权威的管理理念&#xff0c;发掘并展示本土企业组织管理的最佳实践&#xff0c;并重磅揭晓第二届“高能团队奖”评选结果。…...

删除ubuntu开始菜单中的图标

背景 本来是很好看干净的界面 更新谷歌浏览器后出现了Gmail&#xff0c;幻灯片&#xff0c;谷歌硬盘等跟谷歌相关的乱七八糟东西搞得界面就很丑 解决问题 删掉那个图标 输入命令 sudo nautilus /usr/share/applicationssudo nautilus ~/.local/share/applications可以…...

信息系统项目管理基础知识学习笔记 - IT 治理基础 - IT治理的驱动因素

信息系统项目管理基础知识学习笔记 - IT 治理基础 - IT治理的驱动因素 IT治理的驱动因素组织的IT战略驱动组织开展高质量IT治理因素IT治理的内涵IT 治理体系信息系统项目管理基础知识学习笔记 - IT 治理基础 - IT治理的驱动因素 IT治理的驱动因素 组织信息系统建设和运行需要…...

8月21-22日上课内容 第一章 MySQL数据库初始

本章结构 数据库的基本概念 概述&#xff08;总览&#xff09; 结构&#xff1a; 数据 表 数据库 数据库管理系统 数据库系统原理 数据 (Data) 描述事物的符号记录 包括数字&#xff0c;文字、图形、图像、声音、档案记录等以“记录”形式按统一的格式进行存储表 将不同…...

等级查询发布助手

考试成绩的发布是学校教学工作中的一项重要任务&#xff0c;传统的手工录入、统计和发布成绩的方式既耗时又容易出错。为了提高老师的工作效率和准确性&#xff0c;推荐老师们试一试易查分考试等级发布系统。 易查分是一个查询/发布发布平台 1. 快速高效&#xff1a;老师只需将…...

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式

一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明&#xff1a;假设每台服务器已…...

【Linux】shell脚本忽略错误继续执行

在 shell 脚本中&#xff0c;可以使用 set -e 命令来设置脚本在遇到错误时退出执行。如果你希望脚本忽略错误并继续执行&#xff0c;可以在脚本开头添加 set e 命令来取消该设置。 举例1 #!/bin/bash# 取消 set -e 的设置 set e# 执行命令&#xff0c;并忽略错误 rm somefile…...

Axios请求超时重发机制

Axios 超时重新请求实现方案 在 Axios 中实现超时重新请求可以通过以下几种方式&#xff1a; 1. 使用拦截器实现自动重试 import axios from axios;// 创建axios实例 const instance axios.create();// 设置超时时间 instance.defaults.timeout 5000;// 最大重试次数 cons…...

UR 协作机器人「三剑客」:精密轻量担当(UR7e)、全能协作主力(UR12e)、重型任务专家(UR15)

UR协作机器人正以其卓越性能在现代制造业自动化中扮演重要角色。UR7e、UR12e和UR15通过创新技术和精准设计满足了不同行业的多样化需求。其中&#xff0c;UR15以其速度、精度及人工智能准备能力成为自动化领域的重要突破。UR7e和UR12e则在负载规格和市场定位上不断优化&#xf…...

[Java恶补day16] 238.除自身以外数组的乘积

给你一个整数数组 nums&#xff0c;返回 数组 answer &#xff0c;其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法&#xff0c;且在 O(n) 时间复杂度…...

【Nginx】使用 Nginx+Lua 实现基于 IP 的访问频率限制

使用 NginxLua 实现基于 IP 的访问频率限制 在高并发场景下&#xff0c;限制某个 IP 的访问频率是非常重要的&#xff0c;可以有效防止恶意攻击或错误配置导致的服务宕机。以下是一个详细的实现方案&#xff0c;使用 Nginx 和 Lua 脚本结合 Redis 来实现基于 IP 的访问频率限制…...

华为OD机试-最短木板长度-二分法(A卷,100分)

此题是一个最大化最小值的典型例题&#xff0c; 因为搜索范围是有界的&#xff0c;上界最大木板长度补充的全部木料长度&#xff0c;下界最小木板长度&#xff1b; 即left0,right10^6; 我们可以设置一个候选值x(mid)&#xff0c;将木板的长度全部都补充到x&#xff0c;如果成功…...

【Elasticsearch】Elasticsearch 在大数据生态圈的地位 实践经验

Elasticsearch 在大数据生态圈的地位 & 实践经验 1.Elasticsearch 的优势1.1 Elasticsearch 解决的核心问题1.1.1 传统方案的短板1.1.2 Elasticsearch 的解决方案 1.2 与大数据组件的对比优势1.3 关键优势技术支撑1.4 Elasticsearch 的竞品1.4.1 全文搜索领域1.4.2 日志分析…...

DBLP数据库是什么?

DBLP&#xff08;Digital Bibliography & Library Project&#xff09;Computer Science Bibliography是全球著名的计算机科学出版物的开放书目数据库。DBLP所收录的期刊和会议论文质量较高&#xff0c;数据库文献更新速度很快&#xff0c;很好地反映了国际计算机科学学术研…...

​​企业大模型服务合规指南:深度解析备案与登记制度​​

伴随AI技术的爆炸式发展&#xff0c;尤其是大模型&#xff08;LLM&#xff09;在各行各业的深度应用和整合&#xff0c;企业利用AI技术提升效率、创新服务的步伐不断加快。无论是像DeepSeek这样的前沿技术提供者&#xff0c;还是积极拥抱AI转型的传统企业&#xff0c;在面向公众…...