当前位置: 首页 > news >正文

Apache Hudi初探(五)(与flink的结合)--Flink 中hudi clean操作

背景

本文主要是具体说说Flink中的clean操作的实现

杂说闲谈

在flink中主要是CleanFunction函数:

  @Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);this.writeClient = FlinkWriteClients.createWriteClient(conf, getRuntimeContext());this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();String instantTime = HoodieActiveTimeline.createNewInstantTime();LOG.info(String.format("exec clean with instant time %s...", instantTime));executor.execute(() -> writeClient.clean(instantTime), "wait for cleaning finish");}@Overridepublic void notifyCheckpointComplete(long l) throws Exception {if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && isCleaning) {executor.execute(() -> {try {this.writeClient.waitForCleaningFinish();} finally {// ensure to switch the isCleaning flagthis.isCleaning = false;}}, "wait for cleaning finish");}}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {try {this.writeClient.startAsyncCleaning();this.isCleaning = true;} catch (Throwable throwable) {// catch the exception to not affect the normal checkpointingLOG.warn("Error while start async cleaning", throwable);}}}
  • open函数

    • writeClient =FlinkWriteClients.createWriteClient(conf, getRuntimeContext())
      创建FlinkWriteClient,用于写hudi数据

    • this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
      创建一个只有一个线程的线程池,改线程池的主要作用来异步执行hudi写操作

    • executor.execute(() -> writeClient.clean(instantTime)
      异步执行hudi的清理操作,该clean函数的主要代码如下:

         if (!tableServicesEnabled(config)) {return null;}final Timer.Context timerContext = metrics.getCleanCtx();CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking));HoodieTable table = createTable(config, hadoopConf);if (config.allowMultipleCleans() || !table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()) {LOG.info("Cleaner started");// proceed only if multiple clean schedules are enabled or if there are no pending cleans.if (scheduleInline) {scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);table.getMetaClient().reloadActiveTimeline();}}// Proceeds to execute any requested or inflight clean instances in the timelineHoodieCleanMetadata metadata = table.clean(context, cleanInstantTime, skipLocking);if (timerContext != null && metadata != null) {long durationMs = metrics.getDurationInMs(timerContext.stop());metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"+ " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()+ " cleanerElapsedMs" + durationMs);}return metadata;
      
      • CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),HoodieTimeline.CLEAN_ACTION,() -> rollbackFailedWrites *
        根据配置
        hoodie.cleaner.policy.failed.writes* 默认是EAGER,也就是在写数据失败的时候,会立即进行这次写失败的数据的清理,在这种情况下,
        就不会执行rollbackFailedWrites操作,也就是回滚写失败文件的操作

      • HoodieTable table = createTable *
        创建
        HoodieFlinkMergeOnReadTable*类型的hudi表,用来做clean等操作

      • scheduleTableServiceInternal
        如果hoodie.clean.allow.multiple为true(默认为true)或者没有正在运行中clean操作,则会生成Clean计划
        这里最终调用的是FlinkWriteClient.scheduleCleaning方法,即CleanPlanActionExecutor.execute方法

        这里最重要的就是requestClean方法:

        CleanPlanner<T, I, K, O> planner = new CleanPlanner<>(context, table, config);
        Option<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain();
        List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant)    
        int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
        Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context.map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism).stream().collect(Collectors.toMap(Pair::getKey, Pair::getValue))    
        Map<String, List<HoodieCleanFileInfo>> cleanOps = cleanOpsWithPartitionMeta.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,e -> CleanerUtils.convertToHoodieCleanFileInfoList(e.getValue().getValue())))    
        List<String> partitionsToDelete = cleanOpsWithPartitionMeta.entrySet().stream().filter(entry -> entry.getValue().getKey()).map(Map.Entry::getKey).collect(Collectors.toList())    
        return new HoodieCleanerPlan(earliestInstant.map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),planner.getLastCompletedCommitTimestamp(),config.getCleanerPolicy().name(), CollectionUtils.createImmutableMap(),CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps, partitionsToDelete)    
        
        • planner.getEarliestCommitToRetain();
          根据保留策略,获取到最早需要保留的commit的HoodieInstant,在这里会兼顾考虑到hoodie.cleaner.commits.retained(默认是10)以及hoodie.cleaner.hours.retained默认是24小时以及hoodie.cleaner.policy策略(默认是KEEP_LATEST_COMMITS)
        • planner.getPartitionPathsToClean(earliestInstant);
          根据保留的最新commit的HoodieInstant,得到要删除的分区,这里会根据配置hoodie.cleaner.incremental.mode(默认是true)来进行增量清理,
          这个时候就会根据上一次已经clean的信息,只需要删除差量的分区数据就行
        • cleanOpsWithPartitionMeta = context
          根据上面得到的需要删除的分区信息,获取需要删除的文件信息,具体的实现可以参考CleanPlanner.getFilesToCleanKeepingLatestCommits
          这里的操作主要是先通过fileSystemView获取分区下所有的FileGroup,之后再获取每个FileGroup下的所有的FileSlice(这里的FileSlice就有版本的概念,也就是commit的版本),之后再与最新保留的commit的时间戳进行比较得到需要删除的文件信息
        • new HoodieCleanerPlan
          最后组装成HoodieCleanPlan的计划,并且在外层调用table.getActiveTimeline().saveToCleanRequested(cleanInstant, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan)); 方法把clean request的状态存储到对应的.hoodie目录下,并建立一个xxxx.clean.requested的元数据文件
      • table.getMetaClient().reloadActiveTimeline()
        重新加载timeline,便于过滤出来刚才scheduleTableServiceInternal操作生成的xxxxxxxxxxxxxx.clean.requested的元数据文件

      • table.clean(context, cleanInstantTime, skipLocking)
        真正执行clean的部分,主要是调用CleanActionExecutor.execute的方法,最终调用的是*runPendingClean(table, hoodieInstant)*方法:

         HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(table.getMetaClient(), cleanInstant);return runClean(table, cleanInstant, cleanerPlan);
        

        首先是反序列化CleanPlan,然后在进行清理,主要是删除1. 如果没有满足的分区,直接删除该分区,2. 否则删除该分区下的满足条件的文件,最后返回HoodieCleanStat包含删除的文件信息等。

  • snapshotState方法

    • 如果clean.async.enabled是true(默认是true),并且不是正在进行clean动作,则会进行异步清理
      this.writeClient.startAsyncCleaning(); 这里最终也是调用的writeClient.clean方法。
    • this.isCleaning = true;
      设置标志位,用来保证clean操作的有序性
  • notifyCheckpointComplete方法

    • 如果clean.async.enabled是true(默认是true),并且正在进行clean动作,则等待clean操作完成,
      并且设置清理标识位,用来和snapshotState方法进行呼应以保证clean操作的有序性

相关文章:

Apache Hudi初探(五)(与flink的结合)--Flink 中hudi clean操作

背景 本文主要是具体说说Flink中的clean操作的实现 杂说闲谈 在flink中主要是CleanFunction函数&#xff1a; Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);this.writeClient FlinkWriteClients.createWriteClient(conf,…...

stream对list数据进行多字段去重

方法一&#xff1a; //根据sj和name去重 List<NursingHandover> testList list.stream().collect(Collectors.collectingAndThen(Collectors.toCollection(() -> new TreeSet<>(Comparator.comparing(o -> o.getj() ";" o.getName() ";&…...

一种基于体素的射线检测

效果 基于体素的射线检测 一个漏检的射线检测 从起点一直递增指定步长即可得到一个稀疏的检测 bool Raycast(Vector3 from, Vector3 forword, float maxDistance){int loop 6666;Vector3 pos from;Debug.DrawLine(from, from forword * maxDistance, Color.red);while (loo…...

利用Docker安装Protostar

文章目录 一、Protostar介绍二、Ubuntu下安装docker三、安装Protostar 一、Protostar介绍 Protostar是一个免费的Linux镜像演练环境&#xff0c;包含五个系列共23道漏洞分析和利用实战题目。 Protostar的安装有两种方式 第一种是下载镜像并安装虚拟机https://github.com/Exp…...

go基础语法10问

1.使用值为 nil 的 slice、map会发生啥 允许对值为 nil 的 slice 添加元素&#xff0c;但对值为 nil 的 map 添加元素&#xff0c;则会造成运行时 panic。 // map 错误示例 func main() {var m map[string]intm["one"] 1 // error: panic: assignment to entry i…...

SpringCloud + SpringGateway 解决Get请求传参为特殊字符导致400无法通过网关转发的问题

title: “SpringCloud SpringGateway 解决Get请求传参为特殊字符导致400无法通过网关转发的问题” createTime: 2021-11-24T10:27:5708:00 updateTime: 2021-11-24T10:27:5708:00 draft: false author: “Atomicyo” tags: [“tomcat”] categories: [“java”] description: …...

vim基本操作

功能&#xff1a; 命令行模式下的文本编辑器。根据文件扩展名自动判别编程语言。支持代码缩进、代码高亮等功能。使用方式&#xff1a;vim filename 如果已有该文件&#xff0c;则打开它。 如果没有该文件&#xff0c;则打开个一个新的文件&#xff0c;并命名为filename 模式…...

Drift plus penalty 漂移加惩罚Part1——介绍和工作原理

文章目录 正文Methodology 方法论Origins and applications 起源和应用How it works 它是怎样工作的The stochastic optimization problem 随机优化问题Virtual queues 虚拟队列The drift-plus-penalty expression 漂移加惩罚表达式Drift-plus-penalty algorithmApproximate sc…...

(四)动态阈值分割

文章目录 一、基本概念二、实例解析 一、基本概念 基于局部阈值分割的dyn_threshold()算子&#xff0c;适用于一些无法用单一灰度进行分割的情况&#xff0c;如背景比较复杂&#xff0c;有的部分比前景目标亮&#xff0c;或者有的部分比前景目标暗&#xff1b;又比如前景目标包…...

jvm介绍

1. JVM是什么 JVM是Java Virtual Machine的缩写&#xff0c;即咱们经常提到的Java虚拟机。虚拟机是一种抽象化的计算机&#xff0c;有着自己完善的硬件架构&#xff0c;如处理器、堆栈等&#xff0c;具体有什么咱们不做了解。目前我们只需要知道想要运行Java文件&#xff0c;必…...

数据结构与算法课后题-第三章(顺序队和链队)

#include <iostream> //引入头文件 using namespace std;typedef int Elemtype;#define Maxsize 5 #define ERROR 0 #define OK 1typedef struct {Elemtype data[Maxsize];int front, rear;int tag; }SqQueue;void InitQueue(SqQueue& Q) //初始化队列 {Q.rear …...

SSM - Springboot - MyBatis-Plus 全栈体系(十六)

第三章 MyBatis 三、MyBatis 多表映射 2. 对一映射 2.1 需求说明 根据 ID 查询订单&#xff0c;以及订单关联的用户的信息&#xff01; 2.2 OrderMapper 接口 public interface OrderMapper {Order selectOrderWithCustomer(Integer orderId); }2.3 OrderMapper.xml 配置…...

k8s--storageClass自动创建PV

文章目录 一、storageClass自动创建PV1.1 安装NFS1.2 创建nfs storageClass1.3 测试自动创建pv 一、storageClass自动创建PV 这里使用NFS实现 1.1 安装NFS 安装nfs-server&#xff1a; sh nfs_install.sh /mnt/data03 10.60.41.0/24nfs_install.sh #!/bin/bash### How to i…...

7.3 调用函数

前言&#xff1a; 思维导图&#xff1a; 7.3.1 函数调用的形式 我的笔记&#xff1a; 函数调用的形式 在C语言中&#xff0c;调用函数是一种常见的操作&#xff0c;主要有以下几种调用方式&#xff1a; 1. 函数调用语句 此时&#xff0c;函数调用独立存在&#xff0c;作为…...

如果使用pprof来进行性能的观测和优化

1. 分析性能瓶颈 在开始优化之前&#xff0c;首先需要确定你的程序的性能瓶颈在哪里。使用性能分析工具&#xff08;例如 Go 的内置 pprof 包&#xff09;来检测程序中消耗时间和内存的地方。这可以帮助你确定需要优化的具体部分。 2. 选择适当的数据结构和算法 选择正确的数…...

在移动固态硬盘上安装Ubuntu系统和ROS2

目录 原视频准备烧录 原视频 b站鱼香ros 准备 1.在某宝上买一个usb移动固态硬盘或固态U盘&#xff0c;至少64G 2.下载鱼香ros烧录工具 下载第二个就行了&#xff0c;不然某网盘的速度下载全部要一天 下载后&#xff0c;选择FishROS2OS制作工具压缩包&#xff0c;进行解压…...

【iptables 实战】02 iptables常用命令

一、iptables中基本的命令参数 -P 设置默认策略-F 清空规则链-L 查看规则链-A 在规则链的末尾加入新规则-I num 在规则链的头部加入新规则-D num 删除某一条规则-s 匹配来源地址IP/MASK&#xff0c;加叹号“&#xff01;”表示除这个IP外-d 匹配目标地址-i 网卡名称 匹配从这块…...

webview_flutter

查看webview内核 ​https://liulanmi.com/labs/core.html​ h5中获取设备 https://cloud.tencent.com/developer/ask/sof/105938013 https://developer.mozilla.org/zh-CN/docs/Web/API/Navigator/mediaDevices web资源部署后navigator获取不到mediaDevices实例的解决方案&…...

【GESP考级C++】1级样题 闰年统计

GSEP 1级样题 闰年统计 题目描述 小明刚刚学习了如何判断平年和闰年&#xff0c;他想知道两个年份之间&#xff08;包含起始年份和终止年份&#xff09;有几个闰年。你能帮帮他吗&#xff1f; 输入格式 输入一行&#xff0c;包含两个整数&#xff0c;分别表示起始年份和终止…...

CentOS密码重置

背景&#xff1a; 我有一个CentOS虚拟机&#xff0c;但是密码忘记了&#xff0c;偶尔记起可以重置密码&#xff0c;于是今天尝试记录一下&#xff0c;又因为我最近记性比较差&#xff0c;所以必须要记录一下。 过程&#xff1a; 1、在引导菜单界面&#xff08;grub&#xff…...

Tomcat Servlet

Tomcat & Servlet 一、What is “Tomcat”?二、 What is “Servlet”?1、HttpServlet2、HttpServletRequest3、HttpServletResponse 一、What is “Tomcat”? Tomcat 本质上是一个基于 TCP 协议的 HTTP 服务器。我们知道HTTP是一种应用层协议&#xff0c;是 HTTP 客户端…...

国庆day2---select实现服务器并发

select.c&#xff1a; #include <myhead.h>#define ERR_MSG(msg) do{\fprintf(stderr,"__%d__:",__LINE__);\perror(msg);\ }while(0)#define IP "192.168.1.3" #define PORT 8888int main(int argc, const char *argv[]) {//创建报式套接字socketi…...

Grafana 开源了一款 eBPF 采集器 Beyla

eBPF 的发展如火如荼&#xff0c;在可观测性领域大放异彩&#xff0c;Grafana 近期也发布了一款 eBPF 采集器&#xff0c;可以采集服务的 RED 指标&#xff0c;本文做一个尝鲜介绍&#xff0c;让读者有个大概了解。 eBPF 基础介绍可以参考我之前的文章《eBPF Hello world》。理…...

亲测可用国产GPT人工智能

分享一些靠谱、可用、可以白嫖的GPT大模型。配合大模型&#xff0c;工作效率都会极大提升。 清华大学ChatGLM 官网&#xff1a; 智谱清言中国版对话语言模型&#xff0c;与GLM大模型进行对话。https://chatglm.cn/开源的、支持中英双语的1300亿参数的对话语言模型&#xff0…...

适配器模式详解和实现(设计模式 四)

适配器模式将一个类的接口转换成客户端所期望的另一个接口&#xff0c;解决由于接口不兼容而无法进行合作的问题。 设计基本步骤 1. 创建目标接口&#xff08;Target Interface&#xff09;&#xff0c;该接口定义了客户端所期望的方法。 2.创建被适配类&#xff08;Adaptee…...

IDEA的使用

文章目录 1.IDEA配置1.1 idea界面说明1.2 git1.3 JDK1.4 maven1.5 Tomcat1.6 idea设置编码格式1.7 vscodenodejs1.8 windows下安装redis 2. IDEA问题2.1 setAttribute方法爆红2.2 idea cannot download sources解决办法2.3 springboot项目跑起来不停run 3. vscode3.1 vscode显示…...

CSS详细基础(二)文本样式

插播一条CSS的工作原理&#xff1a; CSS是一种定义样式结构如字体、颜色、位置等的语言&#xff0c;被用于描述网页上的信息格式化和显示的方式。CSS样式可以直接存储于HTML网页或者单独的样式单文件。无论哪一种方式&#xff0c;样式单包含将样式应用到指定类型的元素的规则。…...

win10系统任务栏图标变成白色的解决办法

我平时都是用滴答清单进行管理这个自己的日程代办的&#xff0c;但是今天打开的时候发现这个快捷方式突然变成纯白色的了&#xff0c;重启电脑之后&#xff0c;这个图标的样式仍然没有变化。上网查找解决办法之后&#xff0c;终于搞好了&#xff0c;于是就有了下面的教程。 为什…...

hadoop生态现状、介绍、部署

一、引出hadoop 1、hadoop的高薪现状 各招聘平台都有许多hadoop高薪职位&#xff0c;可以看看职位所需求的技能 ----> hadoop是什么&#xff0c;为什么会这么高薪&#xff1f;引出大数据&#xff0c;大数据时代&#xff0c;大数据与云计算 2、大数据时代的介绍 大数据的故事…...

二、EFCore 数据库表的创建和迁移

文章目录 一、数据库连接二、数据库表迁移一、数据库连接 在NuGet上安装EntityFramework 代码如下: Microsoft.EntityFrameworkCoreMicrosoft.EntityFrameworkCore.SqlServerMicrosoft.Extensions.Configuration.Json配置数据连接 appsettings.json 增加数据库连接配置 &quo…...

手机网站模版 优帮云/推广赚钱软件排行

如果你想从头学习Jmeter&#xff0c;可以看看这个系列的文章哦 https://www.cnblogs.com/poloyy/category/1746599.html 简单介绍 约定响应时间&#xff0c;响应时间如果超出约定&#xff0c;则断言为失败 断言持续时间 断言持续时间界面介绍 只需要填写预期的运行时间就行了 结…...

wordpress去除/seo搜索引擎优化是什么意思

一、头插法新建链表 1、先让新结点指向头结点所指向的结点 2、再让头结点指向新结点 //头插法新建链表 void head_insert_list(Linklist &L) //Linlilist 与Lnode*等价 {Elemtype e;scanf("%d", &e);Linklist s; //s指针用来记录新申请的结点while(e!999…...

前海艾爻网站 建设/用html制作淘宝网页

文章目录Java网络调用ArcGIS REST API一、说明二、记录1. 添加 httpclient 依赖2.网络请求工具类3.ArcGIS REST API 调用工具类Java网络调用ArcGIS REST API 一、说明 ArcGIS REST API&#xff08;图层要素查询、叠加分析&#xff09;Java 网络请求调用&#xff08;HttpClien…...

中学生怎么做网站/网站seo排名优化软件

1.static-静态属性 &#xff08;1&#xff09;static代表“全局”或“静态”&#xff0c;可以理解为&#xff1a;方便在没有创建对象的情况下来进行某些操作。通常用于修饰成员变量和方法&#xff0c;也可以形成静态代码块。 实际应用中&#xff0c;可将需频繁操作、通用…...

盘锦如何做百度的网站/网站优化招商

Docker 修改容器端口 本文介绍如何修改容器端口 对于已经创建的容器&#xff0c;可以通过下面的方法修改端口&#xff1a; 将容器提交为镜像&#xff0c;重新运行修改容器配置文件 将容器提交为镜像&#xff0c;重新运行 略 修改容器配置文件 修改前需要关闭docker&…...

信息技术用C 做登录界面网站 csdn/建网站建设

影调&#xff1a; 对摄影作品而言&#xff0c;“影调”&#xff0c;又称为照片的基调或调子。指画面的明暗层次、虚实对比和色彩的色相明暗等之间的关系。通过这些关系&#xff0c;使欣赏者感到光的流动与变化。摄影画面中的线条、形状、色彩等元素是由影调来体现的&#xff0…...