当前位置: 首页 > news >正文

Flink SQL Checkpoint 学习总结

前言

学习总结Flink SQL Checkpoint的使用,主要目的是为了验证Flink SQL流式任务挂掉后,重启时还可以继续从上次的运行状态恢复。

验证方式

Flink SQL流式增量读取Hudi表然后sink MySQL表,任务启动后处于running状态,先查看sink表有数据,然后将对应的yarn kill掉,再通过设置的checkpoint重启任务,任务重启后验证sink表的数据量。Flink SQL流式增量读取Hudi表可以参考:Flink SQL增量查询Hudi表

版本

  • Flink 1.14.3
  • Hudi 0.13.0

Checkpoint 参数

一般需要设置的常用参数

-- checkpoint间隔时间,单位毫秒,没有默认值,如果想开启checkpoint,需要将该参数设置一个大于0的数值
-- 如果想提升sink性能,比如写hudi,需要将该值设置大一点,因为间隔时间决定了批次大小
-- checkpoint间隔时间不能设置太短也不能设置太长,太短影响写入性能,太长影响数据及时性。
set execution.checkpointing.interval=1000;
-- 保存checkpoint文件的目录
set state.checkpoints.dir=hdfs:///flink/checkpoints/hudi2mysql;
-- 任务取消后保留checkpoint,默认值NO_EXTERNALIZED_CHECKPOINTS,
-- 可选值NO_EXTERNALIZED_CHECKPOINTS、DELETE_ON_CANCELLATION、RETAIN_ON_CANCELLATION
set execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;

从checkpoint恢复

set execution.savepoint.path=hdfs:///flink/checkpoints/hudi2mysql/255bdd01cee7486113feb1cbe8b45ee0/chk-1314;

其他参数

-- checkpoint模式,默认值EXACTLY_ONCE,可选值:EXACTLY_ONCE、AT_LEAST_ONCE
-- 要想支持EXACTLY_ONCE,需要sink端支持事务
set execution.checkpointing.mode=EXACTLY_ONCE;
-- checkpoint超时时间,默认10分钟
set execution.checkpointing.timeout=600000;
-- checkpoint文件保留数,默认1
set state.checkpoints.num-retained=3;

Checkpoint 目录结构

/user-defined-checkpoint-dir/{job-id}|+ --shared/+ --taskowned/+ --chk-1/+ --chk-2/+ --chk-3/...   

验证

创建Hudi和MySQL物理表

Hudi表

CREATE TABLE hudi_source (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts bigint,dt string
)
WITH ('connector' = 'hudi','path' = '/tmp/hudi_source'
);

MySQL表

CREATE TABLE `sink_mysql` (`id` int(11) NOT NULL,`name` text,`price` double DEFAULT NULL,`ts` int(11) DEFAULT NULL,`dt` text,`insert_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

造数

insert into hudi_source values(1,'hudi1',11.1,1000,'20230301');
insert into hudi_source values(2,'hudi2',22.2,1000,'20230301');
......

流读Hudi写MySQL

hudi2mysql.sql

set yarn.application.name=hudi2mysql;
set execution.checkpointing.interval=1000;
set state.checkpoints.dir=hdfs:///flink/checkpoints/hudi2mysql;
set execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;CREATE TABLE hudi_source_incr (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts bigint,dt string
)
WITH ('connector' = 'hudi','path' = '/tmp/hudi_source','read.streaming.enabled' = 'true', 'read.start-commit' = '202302', 'read.streaming.check-interval' = '4'
);create table sink_mysql (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts bigint,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8','username' = 'root','password' = 'password','table-name' = 'sink_mysql'
);insert into sink_mysql select * from hudi_source_incr;

执行上面的SQL

bin/sql-client.sh -f sql/hudi2mysql.sql

这样我们启动了一个常任务,在Flink界面上可以看到checkpoint的相关信息,如下图显示了checkpoint具体文件地址

可以用hdfs命令看一下checkpoint路径下有哪些文件

drwxr-xr-x   - hive hdfs          0 2023-03-01 14:47 hdfs:///flink/checkpoints/hudi2mysql/255bdd01cee7486113feb1cbe8b45ee0/chk-589
drwxr-xr-x   - hive hdfs          0 2023-03-01 14:36 hdfs:///flink/checkpoints/hudi2mysql/255bdd01cee7486113feb1cbe8b45ee0/shared
drwxr-xr-x   - hive hdfs          0 2023-03-01 14:36 hdfs:///flink/checkpoints/hudi2mysql/255bdd01cee7486113feb1cbe8b45ee0/taskowned

其中255bdd01cee7486113feb1cbe8b45ee0为flink的jobid

将yarn任务kill

yarn app -kill application_1676855463066_0177

再看一下,发现checkpoint文件还在

hdfs:///flink/checkpoints/hudi2mysql/255bdd01cee7486113feb1cbe8b45ee0/chk-1314

重启任务验证checkpoint效果

需要先在hudi2mysql.sql,添加下面的配置

-- 从该checkpoint文件对应的状态恢复
set execution.savepoint.path=hdfs:///flink/checkpoints/hudi2mysql/255bdd01cee7486113feb1cbe8b45ee0/chk-1314;

重启flink sql任务

bin/sql-client.sh -f sql/hudi2mysql.sql

我们可以在新启动的yarn界面上看到,最新的恢复点,和我们设置的一样,这样代表我们设置恢复点生效

最后再造几条新的增量数据,在MySQL里看验证以下数据量是否一致

insert into hudi_source values(3,'hudi3',33.3,1000,'20230301');

MySQL数据量一致,且更新时间和插入时间一致,代表id=1、2的数据重启时没有重复消费,达到了预期效果。(也可以对MySQL表不设置主键,直接通过验证数据量验证效果)

这样我们通过一个简单的示例,了解了checkpoint的具体使用。大致过程
1、设置开启checkpoint和保存的路径,
2、任务运行时会根据设置的时间间隔不断生成新的ckp文件,
3、等任务挂掉后,重启任务时先设置execution.savepoint.path为我们最后一次保存的ckp文件
这样就达到了任务重启时继续从上次的运行状态恢复。

Checkpoint和Hudi

流任务写hudi时,必须设置checkpoint,不然不会生成commit,感觉像是卡住一样,具体表现为只生成.commit.requested.inflight,然后不写文件、不生成.commit也不报错,对于新手来说很费劲,很难找到解决方法。
大概原因是因为写文件、生成commit的动作是在coordinator里面,只有当checkpoint完成后才会调用coordinator,所以不设置checkpoint就不会生成commit,这里的逻辑是在Hudi源码里(具体没看),也就是说checkpoint和生成hudi commit是绑定一起的,这样才能保证流写Hudi的事务性,从而保证checkpoint的EXACTLY_ONCE。

StateBackend

在启动 CheckPoint 机制时,状态会随着 CheckPoint 而持久化,以防止数据丢失、保障恢复时的一致性。 状态内部的存储格式、状态在 CheckPoint 时如何持久化以及持久化在哪里均取决于选择的 State Backend。

在学习Flink SQL Checkpoint时,发现网上的资料有下面的这个配置,本来以为这样设置后,就会将checkpoint文件保存到文件系统中,后来发现并不是这样。并且官网文档和源码描述的也不是很清楚,所以专门研究了一下这一块

set state.backend=filesystem;

从 Flink 1.13 版本开始,社区改进了 state backend 的公开类,进而帮助用户更好理解本地状态存储和 checkpoint 存储的区分。 这个变化并不会影响 state backend 和 checkpointing 过程的运行时实现和机制,仅仅是为了更好地传达设计意图。 用户可以将现有作业迁移到新的 API,同时不会损失原有 state。
旧版本的 MemoryStateBackend 等价于使用 HashMapStateBackend 和 JobManagerCheckpointStorage。

新版本的有两个参数state.backendstate.checkpoint-storage

state.backend可选参数:hashmap、roksdb,另外也支持filesystem(弃用)和jobmanager(弃用),官方文档并没有说明filesystem和jobmanager已经弃用

只设置state.backend:

state.backendCheckpoint StorageState Backend
默认JobManagerCheckpointStorageHashMapStateBackend
hashmapJobManagerCheckpointStorageHashMapStateBackend
filesystem(弃用)JobManagerCheckpointStorageHashMapStateBackend
roksdbJobManagerCheckpointStorageEmbeddedRocksDBStateBackend
jobmanager (弃用)MemoryStateBackend(弃用)MemoryStateBackend (弃用)

总结:对于State Backend,只有HashMapStateBackend和EmbeddedRocksDBStateBackend,另外还有一个弃用的MemoryStateBackend

state.checkpoint-storage可选参数:jobmanager、filesystem,当设置了state.checkpoints.dir,flink会自动使用filesystem对应的FileSystemCheckpointStorage

只设置state.checkpoint-storage:

state.checkpoint-storageCheckpoint StorageState Backend
默认JobManagerCheckpointStorageHashMapStateBackend
jobmanagerJobManagerCheckpointStorageHashMapStateBackend
filesystemFileSystemCheckpointStorageHashMapStateBackend
设置state.checkpoints.dirFileSystemCheckpointStorageHashMapStateBackend
总结:对于Checkpoint Storage只有JobManagerCheckpointStorage和FileSystemCheckpointStorage
另外,当设置state.checkpoint-storage=filesystem时,必须同时设置state.checkpoints.dir,否则会有异常:
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot create the file system state backend: The configuration does not specify the checkpoint directory 'state.checkpoints.dir'

其实可以不设置state.checkpoint-storage,当设置了state.checkpoints.dir时Checkpoint Storage 自动使用FileSystemCheckpointStorage,不设置的话就使用默认的JobManagerCheckpointStorage

一开始对于默认的JobManagerCheckpointStorage、HashMapStateBackend不是很理解,不明白这样的checkpoint有啥用,因为是保存到内存中,不是保存到文件系统中,所以任务挂掉后就没办法恢复。
后来发现这种默认保存在内存中的checkpoint可以用于flink作业失败时自动恢复,而不是任务挂掉后手动恢复,另外默认情况下,程序取消时也不保存checkpoint

其他总结

  • 对于flink sql读取mysql,设置checkpoint恢复不生效(不是flink cdc)
  • checkpoint 一个时间间隔内只有一个批次,这样才能保证eos,时间间隔大小影响写入性能
  • 对于kafka2hudi的场景,checkpoint时间间隔如果比较小(1s),会因为时间不够导致第一个批次卡住,等超时(默认10分钟)后才会报错,所以需要间隔时间设置大一点,10s以上即可
  • 默认情况,只有全部任务running才会生成checkpoint,可以通过参数修改:execution.checkpointing.checkpoints-after-tasks-finish.enabled=true

pipeline.operator-chaining

set pipeline.operator-chaining=false;

将该参数设置为false,实现将多个算子拆分,利于观察每个任务的运行情况。对于上面说的kafka2hudi的场景,本来只是为了观察任务卡住的原因,但是发现设置了该参数后,任务不卡了
原因是虽然官方文档说的是将该参数设置为false后,会影响性能,但是我测试的kafka2hudi的场景反而提升了性能~,所以不卡了(不增加checkpoint时间间隔的情况)
下面是我测试的结果,总数据量:1000万,checkpoint间隔:10s

pipeline.operator-chaining第一个批次的数据量第二个批次的数据量第三个批次的数据量总用时
false7742701409025155219566s
true838610896459124514279s

相关文章:

Flink SQL Checkpoint 学习总结

前言 学习总结Flink SQL Checkpoint的使用,主要目的是为了验证Flink SQL流式任务挂掉后,重启时还可以继续从上次的运行状态恢复。 验证方式 Flink SQL流式增量读取Hudi表然后sink MySQL表,任务启动后处于running状态,先查看sin…...

2023年“楚怡杯“湖南省职业院校技能竞赛“网络安全”竞赛任务书

2023年“楚怡杯“湖南省职业院校技能竞赛“网络安全”竞赛任务书 一、竞赛时间 总计:360分钟 竞赛阶段竞赛阶段 任务阶段 竞赛任务 竞赛时间 分值 A模块 A-1 登录安全加固 180分钟 200分 A-2 本地安全策略配置 A-3 流量完整性保护 A-4 事件监控 …...

MyBatis中主键回填的两种实现方式

主键回填其实是一个非常常见的需求,特别是在数据添加的过程中,我们经常需要添加完数据之后,需要获取刚刚添加的数据 id,无论是 Jdbc 还是各种各样的数据库框架都对此提供了相关的支持,本文我就来和和大家分享下数据库主…...

Windows11如何打开ie浏览器

目录1.背景:2.方法一:在 edge 中配置使用 ie 模式3.方法二:通过 Internet 选项 打开1.背景: 昨天电脑自动从win10升级为win11了,突然发现电脑找不到ie浏览器了,打开全都是直接跳转到 edge 浏览器&#xff0…...

Linux:进程间通信

目录 进程间通信目的 进程间通信分类 管道 System V IPC POSIX IPC 什么是管道 站在文件描述符角度-深度理解管道 管道使用 管道通信的四种情况 管道通信的特点 进程池管理 命名管道 创建一个命名管道 命名管道的打开规则 命名管道通信实例 匿名管道与命名管道的…...

【java】将LAC改造成Elasticsearch分词插件

目录 为什么要将LAC改造成ES插件? 怎么将LAC改造成ES插件? 确认LAC java接口能work 搭建ES插件开发调试环境 编写插件 生成插件 安装、运行插件 linux版本的动态链接库生成 总结 参考文档 为什么要将LAC改造成ES插件? ES是著名的非…...

TPM 2.0实例探索3 —— LUKS磁盘加密(5)

接前文:TPM 2.0实例探索3 —— LUKS磁盘加密(4) 本文大部分内容参考: Code Sample: Protecting secret data and keys using Intel Platform... 二、LUKS磁盘加密实例 4. 将密码存储于TPM的PCR 现在将TPM非易失性存储器中保护…...

mybatisplus复习(黑马)

学习目标能够基于MyBatisPlus完成标准Dao开发能够掌握MyBatisPlus的条件查询能够掌握MyBatisPlus的字段映射与表名映射能够掌握id生成策略控制能够理解代码生成器的相关配置一、MyBatisPlus简介MyBatisPlus(简称MP)是基于MyBatis框架基础上开发的增强型工…...

【数据聚类|深度聚类】Deep Comprehensive Correlation Mining for Image Clustering(DCCM)论文研读

Abstract 翻译 最近出现的深度无监督方法使我们能够联合学习表示和对未标记数据进行聚类。这些深度聚类方法主要关注样本之间的相关性,例如选择高精度对来逐步调整特征表示,而忽略了其他有用的相关性。本文提出了一种新的聚类框架,称为深度全面相关挖掘(DCCM),从三个方面…...

CE认证机构有哪些机构?

CE认证机构有哪些机构? 所有出口欧盟的产品都需要办理CE证明,而电子电器以及玩具是强制性要做CE认证。很多人以为只有办理欧盟NB公告机构的CE认证才可以被承认,实际上并不是。那么,除了NB公告上的机构,还有哪些认证机…...

MYSQL5.7:Access denied for user ‘root‘@‘localhost‘ (using password:YES)解决方法

一、打开MySQL目录下的my.ini文件,在文件的[mysqld]下面添加一行 skip-grant-tables,保存并关闭文件;skip-grant-tables :跳过密码登录,登录时无需密码。my.ini :一般在和bin同目录下,如果没有的话可自己创…...

单目运算符、双目运算符、三目运算符

单目运算符是什么 单目运算符是指运算所需变量为一个的运算符 又叫一元运算符,其中有逻辑非运算符:!、按位取 反运算符:~、自增自减运算符:,-等。 逻辑非运算符【!】、按位取反运算符【~】、 自…...

离线数据仓库项目搭建——准备篇

文章目录(一)什么是数据仓库(二)数据仓库基础知识(三)数据仓库建模方式(1)星行模型(2)雪花模型(3)星型模型 VS 雪花模型(四…...

十七、本地方法接口的理解

什么是本地方法? 1.简单来讲,一个Ntive method 就是一个Java调用非Java代码的接口.一个Native Method 是这样一个Java方法:该方法的实现由非Java语言实现,比如C,这个特征并非Java所特有,很多其他的编程语言都由这一机制,比如在C中…...

【halcon】模板匹配参数之金字塔级数

背景 今天,在使用模板匹配的时候,突然程序卡死,CPU直接飙到100%。最后排查发现是模板匹配其中一个参数 NumLevels 导致的: NumLevels: The number of pyramid levels used during the search is determined with numLevels. If n…...

jupyter lab安装和配置

jupyter lab 安装和配置 一、jupyter lab安装并配置 安装jupyterlab pip install jupyterlab启动 Jupyter lab默认会打开实验环境的,也可以自己在浏览器地址栏输入127.0.0.1:8888/lab 汉化 pip install jupyterlab-language-pack-zh-CN刷新一下网页&#xff0…...

用Docker搭建yolov5开发环境

拉取镜像 sudo docker pull pytorch/pytorch:latest 创建容器 sudo docker run -it -d --gpus "device0" pytorch/pytorch bash 查看所有容器 sudo docker ps -a 查看运行中的容器 sudo docker ps 进入容器 docker start -i 容器ID 将依赖包全都导入到requiremen…...

Apache Pulsar 云原生消息中间件之王

一、简介 pulsar,消息中间件,是一个用于服务器到服务器的消息系统,具有多租户、高性能等优势。 pulsar采用发布-订阅的设计模式,producer发布消息到topic,consumer订阅这些topic处理流入的消息,并当处理完…...

精选博客系列|公用事业中的VMware:在边缘重新定义价值

VMware 已经成为公用事业行业的核心。您可以在那里找到例如 VMware vSphere(包括基础 Hypervisor ESXi 和 VMware vCenter 建立的整体控制平面)的核心产品。来自软件定义的基础架构带来的诸多好处使 IT 团队将其先前基于硬件的系统转变为 VMware Cloud F…...

数字档案室测评的些许感悟

我是甲方,明明我家是档案“室”,为什么申请的是数字档案“馆”? 笔者正对着手里的一份方案苦笑,甲方爸爸是某机关单位档案室,方案最后的附件赫然写着几个大字:“申请国家级数字档案馆……“。这样的事屡见…...

Java 函数式编程实例

一、函数式编程概念 函数式编程是一种编程的范式和编程的方法论(programming paradigm),它属于结构化编程的一种,主要的思想是把运算的过程尽量通过一组嵌套的函数来实现。 函数式编程的几个特点: 函数可以作为变量、参数、返回值和数据类…...

Ant design Chart onReady函数使用外部变量问题

一、问题描述封装了一个Chart组件,它接收一个boolean类型的props,根据这个boolean的true或false执行不同的操作。经过console.log验证,onReady函数只会在组件初次渲染时取到props值,不管后面的props变化成什么都无法重新取值。二、…...

Unity使用webSocket与服务器通信(一)搭建一个简单地服务器和客户端

你想在unity WebGL里面使用TCP通信吗,那么你可以用一用webSocket。当然,桌面端也可以使用webSocket,这样Unity多平台发布的时候,业务层的通信代码可以使用一套,而不是桌面用socket,网页用http… 一、什么是…...

SpringCloud微服务实战——搭建企业级开发框架(四十九):数据字典注解的设计与实现

数据字典是系统中基本的必不可少的功能,在多种多样的系统中,数据字典表的设计都大同小异。但是使用方式确是多种多样,设计好一套易用的数据字典功能模块,可以使开发事半功倍。 常用的数据字典使用方式: 直接在SQL语句…...

mysql下,实现保存指定用户、ip、命令的查询日志

环境:mysql 8.0.14 社区版 阅读文本需要的背景知识:对数据库的基本概念(触发器、存储过程、事件),mysql下general log的配置指令 背景:因审计需要,对于数据库操作需要留痕。实际访问数据库的有…...

Vue 3.0 学习笔记之基础知识

系列文章目录 提示:阅读本章之前,请先阅读目录 文章目录系列文章目录前言Vue 3.0 创建与Vue2.0对比的变化关闭语法检查setup 组合式函数compositions响应式数据 refreactive 函数Vue3.0 响应原理ref 和 reactive 区别setup 注意点computed 计算函数watch…...

WebGIS行政区炫酷特效——流光特效教程

先来看下效果: 图片截图: 流光特效的思路是从行政区的边界中随着时间不断的取若干段线条换成另一种高亮颜色。 流光的第一步首先是发光,发光的教程在这里: GIS矢量图形多边形地块行政区发光,阴影发光特效实现_疯狂的GISer的博客-CSDN博客 学会发光以后,接下来需要做的…...

2023-3-3 刷题情况

保证文件名唯一 题目描述 给你一个长度为 n 的字符串数组 names 。你将会在文件系统中创建 n 个文件夹:在第 i 分钟,新建名为 names[i] 的文件夹。 由于两个文件 不能 共享相同的文件名,因此如果新建文件夹使用的文件名已经被占用&#xf…...

《青浦区加快发展跨境电子商务实施细则(审议稿)》

为进一步贯彻落实《中华人民共和国电子商务法》,上海市《关于促进本市跨境电子商务发展的若干意见》,切实做好青浦区跨境电子商务试点工作,探索和规范跨境电子商务管理,促进跨境电子商务健康快速发展,青浦商务委根据多…...

【React全家桶】React生命周期

React生命周期 1、初始化阶段 componentDidMount:render之前最后一次修改状态的机会 render:只能访问this.props和this.state,不允许修改状态和DOM输出 componentDidMount:成功render并渲染完成真实DOM之后触发 2、旧生命周期 👉👉👉加…...

珠海网站建设有限公司/北京优化推广

创建SpringBoot项目在线创建方式网址&#xff1a;https://start.spring.io/然后创建Controller、Mapper、Service包SpringBoot整合Redis引入Redis依赖<!--SpringBoot与Redis整合依赖--> <dependency><groupId>org.springframework.boot</groupId><a…...

开一个网站建设公司/小吃培训机构排名前十

1.老版微信支付,通过微信APP自带的浏览器中的WeixinJSBridge支付 这种方式无需引入任何js,但必须在微信中打开 wxPay(payInfo){ //老版微信支付,通过微信浏览器中的WeixinJSBridge支付function onBridgeReady() {WeixinJSBridge.invoke(getBrandWCPayRequest, {"appId&qu…...

青岛b2b网站建设/关键词优化公司推荐

Mybatis基础版 完结撒发 查询缓存 一级缓存 MyBatis 默认开启一级缓存&#xff0c;如果使用同一个的SqlSession对象执行相同的查询语句&#xff0c;则只会在第一次查询时向数据库发送SQL语句&#xff0c;并将查询结果放入到SqlSession中&#xff08;作为缓存 存在&#xff0…...

自己做的网站只能打开一个链接/公司建设网站哪家好

近几年&#xff0c;智能穿戴、运动健康领域的发展如火如荼、方心未艾&#xff0c;各路厂商都在倾力投入&#xff0c;如今已经成为智能手机行业顶级存在的华为也不例外&#xff0c;并且取得了不俗战绩。8月10日&#xff0c;华为消费者业务智能穿戴与运动健康产品线总裁张炜接受媒…...

多说评论插件对网站优化/yy直播

文/苏格兰折耳猫 图片来源于网络Social Listening可以帮助企业实现如下图所示的几个商业目标&#xff0c;这引起了一些新媒体、咨询从业者浓厚的兴趣&#xff0c;他们强烈要求笔者继续对Social Listening的分析方法和应用场景做进一步的阐述。作为回应&#xff0c;笔者将在本文…...

上海市工商网站官网/免费拓客软件

使用范围&#xff1a; OA、MIS、ERP等信息管理类的项目&#xff0c;暂时不考虑网站。 遇到的问题&#xff1a; 完成一个项目&#xff0c;往往需要引用很多js文件&#xff0c;比如jQuery.js、easyUI等。还有自己写的一些列js文件&#xff0c;那么这些文件如何方便的加载&#xf…...