Flink系列之:Savepoints
Flink系列之:Savepoints
- 一、Savepoints
- 二、分配算子ID
- 三、Savepoint 状态
- 四、算子
- 五、触发Savepoint
- 六、Savepoint 格式
- 七、触发 Savepoint
- 八、使用 YARN 触发 Savepoint
- 九、使用 Savepoint 停止作业
- 十、从 Savepoint 恢复
- 十一、跳过无法映射的状态恢复
- 十二、Restore 模式
- 十三、NO_CLAIM (默认的)
- 十四、CLAIM
- 十五、LEGACY
- 十六、删除 Savepoint
- 十七、配置
一、Savepoints
Savepoint 是依据 Flink checkpointing 机制所创建的流作业执行状态的一致镜像。 你可以使用 Savepoint 进行 Flink 作业的停止与重启、fork 或者更新。 Savepoint 由两部分组成:稳定存储(列入 HDFS,S3,…) 上包含二进制文件的目录(通常很大),和元数据文件(相对较小)。 稳定存储上的文件表示作业执行状态的数据镜像。 Savepoint 的元数据文件以(相对路径)的形式包含(主要)指向作为 Savepoint 一部分的稳定存储上的所有文件的指针。
二、分配算子ID
强烈建议你按照本节所述调整你的程序,以便将来能够升级你的程序。主要通过 uid(String) 方法手动指定算子 ID 。这些 ID 将用于恢复每个算子的状态。
DataStream<String> stream = env.// Stateful source (e.g. Kafka) with ID.addSource(new StatefulSource()).uid("source-id") // ID for the source operator.shuffle()// Stateful mapper with ID.map(new StatefulMapper()).uid("mapper-id") // ID for the mapper// Stateless printing sink.print(); // Auto-generated ID
这段代码使用Apache Flink框架创建了一个名为stream的数据流(DataStream类型)。代码中的每一个操作都代表了流处理的一步。
首先,代码通过调用addSource方法添加一个名为StatefulSource的有状态源。这个源可以是任何有状态的数据源,比如Kafka。uid(“source-id”)方法用于给源操作符指定一个唯一的ID。
接下来,代码调用shuffle方法对流进行一次shuffle操作,该操作可用于重新分区数据。
然后,代码调用map方法,使用一个名为StatefulMapper的有状态映射函数对流进行映射操作。同样地,uid(“mapper-id”)方法用于给映射操作符指定一个唯一的ID。
最后,代码调用print方法通过无状态的打印sink将流的内容输出到控制台。这个操作会自动生成一个ID。
整个代码段构建了一个有状态的数据流处理图,其中包含了源操作符、映射操作符和一个打印sink。该数据流会根据源生成的数据进行一系列的转换和处理,并将结果打印输出。
如果不手动指定 ID ,则会自动生成 ID 。只要这些 ID 不变,就可以从 Savepoint 自动恢复。生成的 ID 取决于程序的结构,并且对程序更改很敏感。因此,强烈建议手动分配这些 ID 。
三、Savepoint 状态
可以将 Savepoint 想象为每个有状态的算子保存一个映射“算子 ID ->状态”:
Operator ID | State
------------+------------------------
source-id | State of StatefulSource
mapper-id | State of StatefulMapper
在上面的示例中,print sink 是无状态的,因此不是 Savepoint 状态的一部分。默认情况下,我们尝试将 Savepoint 的每个条目映射回新程序。
四、算子
你可以使用命令行客户端来触发 Savepoint,触发 Savepoint 并取消作业,从 Savepoint 恢复,以及删除 Savepoint。
从 Flink 1.2.0 开始,还可以使用 webui 从 Savepoint 恢复。
五、触发Savepoint
当触发 Savepoint 时,将创建一个新的 Savepoint 目录,其中存储数据和元数据。可以通过配置默认目标目录或使用触发器命令指定自定义目标目录(参见:targetDirectory参数来控制该目录的位置。
注意: 目标目录必须是 JobManager(s) 和 TaskManager(s) 都可以访问的位置,例如分布式文件系统(或者对象存储系统)上的位置。
以 FsStateBackend 或 RocksDBStateBackend 为例:
# Savepoint 目标目录
/savepoint/# Savepoint 目录
/savepoint/savepoint-:shortjobid-:savepointid/# Savepoint 文件包含 Checkpoint元数据
/savepoint/savepoint-:shortjobid-:savepointid/_metadata# Savepoint 状态
/savepoint/savepoint-:shortjobid-:savepointid/...
从 1.11.0 开始,你可以通过移动(拷贝)savepoint 目录到任意地方,然后再进行恢复。
在如下两种情况中不支持 savepoint 目录的移动:1)如果启用了 entropy injection :这种情况下,savepoint 目录不包含所有的数据文件,因为注入的路径会分散在各个路径中。 由于缺乏一个共同的根目录,因此 savepoint 将包含绝对路径,从而导致无法支持 savepoint 目录的迁移。2)作业包含了 task-owned state(比如 GenericWriteAhreadLog sink)。
和 savepoint 不同,checkpoint 不支持任意移动文件,因为 checkpoint 可能包含一些文件的绝对路径。
如果你使用 MemoryStateBackend 的话,metadata 和 savepoint 的数据都会保存在 _metadata 文件中,因此不要因为没看到目录下没有数据文件而困惑。
注意: 不建议移动或删除正在运行作业的最后一个 Savepoint ,因为这可能会干扰故障恢复。因此,Savepoint 对精确一次的接收器有副作用,为了确保精确一次的语义,如果在最后一个 Savepoint 之后没有 Checkpoint ,那么将使用 Savepoint 进行恢复。
六、Savepoint 格式
你可以在 savepoint 的两种二进制格式之间进行选择:
- 标准格式 - 一种在所有 state backends 间统一的格式,允许你使用一种状态后端创建 savepoint 后,使用另一种状态后端恢复这个 savepoint。这是最稳定的格式,旨在与之前的版本、模式、修改等保持最大兼容性。
- 原生格式 - 标准格式的缺点是它的创建和恢复速度通常很慢。原生格式以特定于使用的状态后端的格式创建快照(例如 RocksDB 的 SST 文件)。
以原生格式创建 savepoint 的能力在 Flink 1.15 中引入,在那之前 savepoint 都是以标准格式创建的。
七、触发 Savepoint
$ bin/flink savepoint :jobId [:targetDirectory]
这将触发 ID 为 :jobId 的作业的 Savepoint,并返回创建的 Savepoint 路径。 你需要此路径来恢复和删除 Savepoint 。你也可以指定创建 Savepoint 的格式。如果没有指定,会采用标准格式创建 Savepoint。
$ bin/flink savepoint --type [native/canonical] :jobId [:targetDirectory]
八、使用 YARN 触发 Savepoint
$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
这将触发 ID 为 :jobId 和 YARN 应用程序 ID :yarnAppId 的作业的 Savepoint,并返回创建的 Savepoint 的路径。
九、使用 Savepoint 停止作业
$ bin/flink stop --type [native/canonical] --savepointPath [:targetDirectory] :jobId
这将自动触发 ID 为 :jobid 的作业的 Savepoint,并停止该作业。此外,你可以指定一个目标文件系统目录来存储 Savepoint 。该目录需要能被 JobManager(s) 和 TaskManager(s) 访问。你也可以指定创建 Savepoint 的格式。如果没有指定,会采用标准格式创建 Savepoint。
十、从 Savepoint 恢复
$ bin/flink run -s :savepointPath [:runArgs]
这将提交作业并指定要从中恢复的 Savepoint 。 你可以给出 Savepoint 目录或 _metadata 文件的路径。
十一、跳过无法映射的状态恢复
默认情况下,resume 操作将尝试将 Savepoint 的所有状态映射回你要还原的程序。 如果删除了运算符,则可以通过 --allowNonRestoredState(short:-n)选项跳过无法映射到新程序的状态:
十二、Restore 模式
Restore 模式 决定了在 restore 之后谁拥有Savepoint 或者 externalized checkpoint的文件的所有权。在这种语境下 Savepoint 和 externalized checkpoint 的行为相似。 这里我们将它们都称为“快照”,除非另有明确说明。
如前所述,restore 模式决定了谁来接管我们从中恢复的快照文件的所有权。快照可被用户或者 Flink 自身拥有。如果快照归用户所有,Flink 不会删除其中的文件,而且 Flink 不能依赖该快照中文件的存在,因为它可能在 Flink 的控制之外被删除。
每种 restore 模式都有特定的用途。尽管如此,我们仍然认为默认的 NO_CLAIM 模式在大多数情况下是一个很好的折中方案,因为它在提供明确的所有权归属的同时只给恢复后第一个 checkpoint 带来较小的代价。
你可以通过如下方式指定 restore 模式:
$ bin/flink run -s :savepointPath -restoreMode :mode -n [:runArgs]
十三、NO_CLAIM (默认的)
在 NO_CLAIM 模式下,Flink 不会接管快照的所有权。它会将快照的文件置于用户的控制之中,并且永远不会删除其中的任何文件。该模式下可以从同一个快照上启动多个作业。
为保证 Flink 不会依赖于该快照的任何文件,它会强制第一个(成功的) checkpoint 为全量 checkpoint 而不是增量的。这仅对state.backend: rocksdb 有影响,因为其他 backend 总是创建全量 checkpoint。
一旦第一个全量的 checkpoint 完成后,所有后续的 checkpoint 会照常创建。所以,一旦一个 checkpoint 成功制作,就可以删除原快照。在此之前不能删除原快照,因为没有任何完成的 checkpoint,Flink 会在故障时尝试从初始的快照恢复。
十四、CLAIM
另一个可选的模式是 CLAIM 模式。该模式下 Flink 将声称拥有快照的所有权,并且本质上将其作为 checkpoint 对待:控制其生命周期并且可能会在其永远不会被用于恢复的时候删除它。因此,手动删除快照和从同一个快照上启动两个作业都是不安全的。Flink 会保持配置数量的 checkpoint。
注意:
Retained checkpoints 被存储在 <checkpoint_dir>/<job_id>/chk- 这样的目录中。Flink 不会接管 <checkpoint_dir>/<job_id> 目录的所有权,而只会接管 chk- 的所有权。Flink 不会删除旧作业的目录。
Native 格式支持增量的 RocksDB savepoints。对于这些 savepoints,Flink 将所有 SST 存储在 savepoints 目录中。这意味着这些 savepoints 是自包含和目录可移动的。然而,在 CLAIM 模式下恢复时,后续的 checkpoints 可能会复用一些 SST 文件,这反过来会阻止在 savepoints 被清理时删除 savepoints 目录。 Flink 之后运行期间可能会删除复用的SST 文件,但不会删除 savepoints 目录。因此,如果在 CLAIM 模式下恢复,Flink 可能会留下一个空的 savepoints 目录。
十五、LEGACY
Legacy 模式是 Flink 在 1.15 之前的工作方式。该模式下 Flink 永远不会删除初始恢复的 checkpoint。同时,用户也不清楚是否可以删除它。导致该的问题原因是, Flink 会在用来恢复的 checkpoint 之上创建增量的 checkpoint,因此后续的 checkpoint 都有可能会依赖于用于恢复的那个 checkpoint。总而言之,恢复的 checkpoint 的所有权没有明确的界定。
十六、删除 Savepoint
$ bin/flink savepoint -d :savepointPath
这将删除存储在 :savepointPath 中的 Savepoint。
请注意,还可以通过常规文件系统操作手动删除 Savepoint ,而不会影响其他 Savepoint 或 Checkpoint(请记住,每个 Savepoint 都是自包含的)。 在 Flink 1.2 之前,使用上面的 Savepoint 命令执行是一个更乏味的任务。
十七、配置
你可以通过 state.savepoints.dir 配置 savepoint 的默认目录。 触发 savepoint 时,将使用此目录来存储 savepoint。 你可以通过使用触发器命令指定自定义目标目录来覆盖缺省值
# 默认 Savepoint 目标目录
state.savepoints.dir: hdfs:///flink/savepoints
如果既未配置缺省值也未指定自定义目标目录,则触发 Savepoint 将失败。
注意: 目标目录必须是 JobManager(s) 和 TaskManager(s) 可访问的位置,例如,分布式文件系统上的位置。
相关文章:
Flink系列之:Savepoints
Flink系列之:Savepoints 一、Savepoints二、分配算子ID三、Savepoint 状态四、算子五、触发Savepoint六、Savepoint 格式七、触发 Savepoint八、使用 YARN 触发 Savepoint九、使用 Savepoint 停止作业十、从 Savepoint 恢复十一、跳过无法映射的状态恢复十二、Resto…...
使用宝塔面板部署前端项目到服务器
目录 文章目录 前言 一、第一步:创建文件夹 二、第二步:部署前端项目 三、第三步:打开防火墙 文章目录 前言第一步:创建文件夹第二步:部署前端项目第三步:打开防火墙总结 前言 在此之前,我…...
Enge问题解决教程
目录 解决问题的一般步骤: 针对"Enge问题"的具体建议: 以下是一些普遍适用的解决问题的方法: 以下是一些更深入的Enge浏览器问题和解决办法: 浏览器性能问题: 浏览器插件与网站冲突: 浏览…...
使用yarn安装electron时手动选择版本
访问1Password或者其他可以提供随机字符的网站,获取随机密码运行安装命令 操作要点,必须触发Couldnt find any versions for "electron" that matches "*"才算成功 将复制的随机密码粘贴到后面 例如:yarn add --dev elec…...
AIGC:阿里开源大模型通义千问部署与实战
1 引言 通义千问-7B(Qwen-7B)是阿里云研发的通义千问大模型系列的70亿参数规模的模型。Qwen-7B是基于Transformer的大语言模型, 在超大规模的预训练数据上进行训练得到。预训练数据类型多样,覆盖广泛,包括大量网络文本、专业书籍…...
Java小案例-Java实现人事管理系统
前言 《人事管理系统》该项目采用技术jsp、Struts2、Mybatis、dwr、tomcat服务器、mysql数据库 开发工具eclipse/idea。 【项目使用技术】 Struts2Mybatisdwrjqueryjscss等技术 前端使用技术:JSP, dwr、jquery、js、css等 后端使用技术:Struts2Myba…...
Win系统修改Nginx配置结合内网穿透实现远程访问多个Web站点
文章目录 1. 下载windows版Nginx2. 配置Nginx3. 测试局域网访问4. cpolar内网穿透5. 测试公网访问6. 配置固定二级子域名7. 测试访问公网固定二级子域名 1. 下载windows版Nginx 进入官方网站(http://nginx.org/en/download.html)下载windows版的nginx 下载好后解压进入nginx目…...
如何使用 NFTScan NFT API 在 Base 网络上开发 Web3 应用
Base 是 Coinbase 使用 OP Stack 开发的最新以太坊第 2 层(L2)网络,用于解决以太坊等主要区块链面临的可扩展性和成本挑战。Coinbase 将其描述为“安全、低成本、对开发人员友好的以太坊 L2,旨在将下一个 10 亿用户带入 Web3”。B…...
【Chrome】ERR_SSL_PROTOCOL_ERROR问题
文章目录 前言一、下载二、使用步骤总结 前言 Edge升级最新版后,有的https访问不了,报如下错误 发现新版Chrome以及Chromium内核访问nginx ssl时报错,顺着这个思路接着查看到大佬的结论:服务器nginx使用的openssl版本过低&#…...
Codeforces Round 916 (Div. 3)(E:贪心 F贪心dfs G tarjan+topsort +线段树优化建图)
A:直接暴力统计每个字符的次数是否达标即可 #include<bits/stdc.h> using namespace std; const int N 3e510,mod998244353; #define int long long typedef long long LL; typedef pair<int, int> PII; typedef unsigned long long ULL;const long l…...
eNSP错误40,原因三:windows10自带虚拟化软件Hyper-V
问题描述 Hyper-V软件与VirtualBox不兼容,一旦开启Hyper-V的话eNSP的路由器就会无法开启,显示ERROR 40 原理 大家注意看hypervisor的两种类型: 左边的是开启hypervisor的Type-1,hypervisor在启用的时候,宿主机也相…...
Maven将Jar包打入本地仓库
Maven将Jar包打入本地仓库 Maven将Jar包打入本地仓库嘚吧嘚下载Maven配置Maven新建MAVEN_HOME编辑Path验证Maven配置 Jar包打入Maven仓库 Maven将Jar包打入本地仓库 嘚吧嘚 最近项目用到一个Jar包,不能从远程仓库拉取,只有一个Jar包,所以需…...
如何使用 Helm 在 K8s 上集成 Prometheus 和 Grafana|Part 1
本系列将分成三个部分,您将学习如何使用 Helm 在 Kubernetes 上集成 Prometheus 和 Grafana,以及如何在 Grafana 上创建一个简单的控制面板。Prometheus 和 Grafana 是 Kubernetes 最受欢迎的两种开源监控工具。学习如何使用 Helm 集成这两个工具&#x…...
Observability:捕获 Elastic Agent 和 Elasticsearch 之间的延迟
在现代 IT 基础设施的动态环境中,高效的数据收集和分析至关重要。 Elastic Agent 是 Elastic Stack 的关键组件,通过促进将数据无缝摄取到 Elasticsearch 中,在此过程中发挥着至关重要的作用。 然而,显着影响此过程整体有效性的关…...
Ubuntu 常用命令之 less 命令用法介绍
📑Linux/Ubuntu 常用命令归类整理 less命令是一个在Unix和Unix-like系统中用于查看文件内容的命令行工具。与more命令相比,less命令提供了更多的功能和灵活性,例如向前和向后滚动查看文件,搜索文本,查看长行等。 les…...
探索Node.js包管理器npm:介绍与使用指南
引言: 在现代软件开发中,包管理器已经成为了不可或缺的工具。它们简化了软件的安装、升级和管理过程,使得开发者能够更加高效地构建项目。而作为Node.js的官方包管理器,npm(Node Package Manager)无疑是最受…...
探讨APP自动化测试工具的重要性
随着移动应用市场的蓬勃发展,企业对于保证其移动应用质量和用户体验的需求日益迫切。在这一背景下,APP自动化测试工具正变得越来越重要,成为企业成功的关键组成部分。本文将探讨APP自动化测试工具对企业的重要性,并为您解析其在提…...
el-date-picker日期时间插件只允许选择年月日小时并做可选择范围限制(精确到小时的范围)
一、首先明确下这个需求 1、要求只能选择年月日时,不要分钟和秒 2、根据后台返回的开始时间和天数设置可选择的开始时间和结束时间范围(包含小时)比如后台返回的开始时间是2023-12-20 13:24:30,天数365天,那么我们需要限制当前可选日期为2023-12-20 14时(不能选小于13:2…...
在MongoDB中使用数组字段和子文档字段进行索引
本文主要介绍在MongoDB使用数组字段和子文档字段进行索引。 目录 MongoDB的高级索引一、索引数组字段二、索引子文档字段 MongoDB的高级索引 MongoDB是一个面向文档的NoSQL数据库,它提供了丰富的索引功能来加快查询性能。除了常规的单字段索引之外,Mong…...
<JavaEE> 网络编程 -- 网络编程和 Socket 套接字
目录 一、网络编程的概念 1)什么是网络编程? 2)网络编程中的基本概念 1> 收发端 2> 请求和响应 3> 客户端和服务端 二、Socket套接字 1)什么是“套接字”? 2)Socket套接字的概念 3&…...
【计算机系统结构实验】实验2 流水线中的冲突实验
2.1 实验目的 加深对计算机流水线基本概念的理解; 理解MIPS结构如何用5段流水线来实现,理解各段的功能和基本操作; 加深对结构冲突/数据冲突/控制冲突的理解; 进一步理解解决数据冲突的方法,掌握如何应用定向技术来…...
conda环境下执行conda命令提示无法识别解决方案
1 问题描述 win10环境命令行执行conda命令,报命令无法识别,错误信息如下: PS D:\code\cv> conda activate pt conda : 无法将“conda”项识别为 cmdlet、函数、脚本文件或可运行程序的名称。请检查名称的拼写,如果包括路径&a…...
链接未来:深入理解链表数据结构(二.c语言实现带头双向循环链表)
上篇文章简述讲解了链表的基本概念并且实现了无头单向不循环链表:链接未来:深入理解链表数据结构(一.c语言实现无头单向非循环链表)-CSDN博客 那今天接着给大家带来带头双向循环链表的实现: 文章目录 一.项目文件规划…...
论文笔记 | Nature 2023 FunSearch:利用大语言模型在数学科学领域探索新的发现
文章目录 一、前言二、主要内容三、总结🍉 CSDN 叶庭云:https://yetingyun.blog.csdn.net/ 一、前言 科学中有许多难以解决的问题,这些问题难以获得确切解答,但却相对容易进行验证。在数学和计算机科学领域,这类问题被称为 NP 完全优化问题(NP-complete optimization pr…...
JavaScript 对象和 JSON 字符串的区别
JavaScript 对象和 JSON 字符串是两种不同的数据表示形式,它们有以下区别: 语法格式:JavaScript 对象是 JavaScript 语言中的一种数据类型,使用花括号 {} 包裹,属性和值之间使用冒号 : 分隔,并且使用逗号 …...
基于 Flink SQL 和 Paimon 构建流式湖仓新方案
目录 1. 数据分析架构演进 2. Apache Paimon 3. Flink + Paimon 流式湖仓 Consumer 机制 Changelog 生成编辑...
MFC静态链接+libtiff静态链接提示LNK2005和LNK4098
编译报错 1>msvcrt.lib(ti_inst.obj) : error LNK2005: "private: __thiscall type_info::type_info(class type_info const &)" (??0type_infoAAEABV0Z) 已经在 libcmtd.lib(typinfo.obj) 中定义 1>msvcrt.lib(ti_inst.obj) : error LNK2005: "pr…...
桶装水送水小程序:提升服务质量的利器
随着移动互联网的发展,越来越多的消费者通过手机在线购物和订购商品。如果你是一名桶装水供应商,想要拓展线上业务,那么开发一个桶装水微信小程序将是一个明智的选择。本文将指导你从零开始开发一个桶装水微信小程序,让你轻松完成…...
深度学习在训练什么,什么是模型
深度学习是机器学习的一个分支,它主要通过使用称为神经网络的复杂结构来学习数据的表征。在深度学习中,"训练"和"模型"是两个核心概念。 训练 在深度学习中,"训练"是指用数据来训练一个神经网络。这个过程涉…...
Andorid 使用bp或者mk编译C文件生成so
在Aosp源码里编译C文件生成so 使用mk编译 文件夹列表 CMkDemo/Android.mk CMkDemo/cpp/SerialPort.c CMkDemo/cpp/SerialPort.hAndroid.mk 内容如下 LOCAL_PATH: $(call my-dir) include $(CLEAR_VARS)LOCAL_MODULE_TAGS : optional# All of the source files that we will…...
台湾做系统集成的公司网站/seo技术博客
clickhouse提供了update和delete的删除能力,但是和常规的例如mysql,redis这种立即见效的能力不一样。在clickhouse中这种操作称为mutation操作。 1.mutation操作有3个特点: 1.Mutations是一类允许对表的行记录进行删除或更新的ALTER操作。相…...
成都设计网站的公司名称/代理怎么引流推广
首先需要安装python环境 见本人发表的:python及pycharm安装python及pycharm安装_susan花雨的博客-CSDN博客 用pip install 安装 pip install xlrd pip install xlwt pip install xlsxwriter pip list查看 python合并多个表格:可以将每个表的不同she…...
wordpress搭建网站/腾讯广告投放推广平台价格
想测试STM32F429 和linux USB 串口速率,网上讲了各种软件好像都不能用,wireshark 的USB cap 也不管用,干脆自己写一个程序来测吧! 关于测试程序执行时间的方法在测试串口时也不管用(clock,time 等等&#x…...
广西网站/2022真实新闻作文400字
前些天,测试人员向我反映有一家企业的通讯录同步接口不返回数据,我查日志nginx的相应access.log状态码是499,请求超时,从网上查找资料分析有两种可能:1.客户端主动断开连接2.服务端响应超时造成客户端连接中断问了客户…...
北碚区网站建设/厦门人才网最新招聘信息网
Tengine是由淘宝网发起的Web服务器项目。它在Nginx的基础上,针对大访问量网站的需求,添加了很多高级功能和特性。Tengine的性能和稳定性已经在大型的网站如淘宝网,天猫商城等得到了很好的检验。它的最终目标是打造一个高效、稳定、安全、易用…...
专业做破碎机的网站/日本免费服务器ip地址
配置tokenstore.js,state中的数据一刷新就会消失,所以需要在localStorage中重新获取state: {//从localStorage中获取,没有则为空字符串token:localStorage.getItem(token)||},mutations: {//设置state.token对象setUtoken(state,token){state…...