当前位置: 首页 > news >正文

Apache Hudi初探(八)(与spark的结合)--非bulk_insert模式

背景

之前讨论的都是’hoodie.datasource.write.operation’:'bulk_insert’的前提下,在这种模式下,是没有json文件的已形成如下的文件:

/dt=1/.hoodie_partition_metadata
/dt=1/2ffe3579-6ddb-4c5f-bf03-5c1b5dfce0a0-0_0-41263-0_20230528233336713.parquet
/dt=1/30b7d5b2-12e8-415a-8ec5-18206fe601c0-0_0-22102-0_20230528231643200.parquet
/dt=1/4abc1c6d-a8aa-4c15-affc-61a35171ce69-0_4-22106-0_20230528231643200.parquet
/dt=1/513dee80-2e8c-4db8-baee-a767b9dba41c-0_2-22104-0_20230528231643200.parquet
/dt=1/57076f86-0a62-4f52-8b50-31a5f769b26a-0_1-22103-0_20230528231643200.parquet
/dt=1/84553727-be9d-4273-bad9-0a38d9240815-0_0-59818-0_20230528233513387.parquet
/dt=1/fecd6a84-9a74-40b1-bfc1-13612a67a785-0_0-26640-0_20230528231723951.parquet

因为是"bulk insert"操作,所以没有去重的需要,所以直接采用spark原生的方式,
以下我们讨论非spark原生的方式,

闲说杂谈

继续Apache Hudi初探(二)(与spark的结合)
剩下的代码:

 val reconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBooleanval (writeResult, writeClient: SparkRDDWriteClient[HoodieRecordPayload[Nothing]]) =...case _ => { // any other operation// register classes & schemasval (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)sparkContext.getConf.registerKryoClasses(Array(classOf[org.apache.avro.generic.GenericData],classOf[org.apache.avro.Schema]))// TODO(HUDI-4472) revisit and simplify schema handlingval sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)val latestTableSchema = getLatestTableSchema(sqlContext.sparkSession, tableMetaClient).getOrElse(sourceSchema)val schemaEvolutionEnabled = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBooleanvar internalSchemaOpt = getLatestTableInternalSchema(hoodieConfig, tableMetaClient)val writerSchema: Schema =if (reconcileSchema) {// In case we need to reconcile the schema and schema evolution is enabled,// we will force-apply schema evolution to the writer's schemaif (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) {internalSchemaOpt = Some(AvroInternalSchemaConverter.convert(sourceSchema))}if (internalSchemaOpt.isDefined) {...// Convert to RDD[HoodieRecord]val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,org.apache.hudi.common.util.Option.of(writerSchema))val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||operation.equals(WriteOperationType.UPSERT) ||parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBooleanval hoodieAllIncomingRecords = genericRecords.map(gr => {val processedRecord = getProcessedRecord(partitionColumns, gr, dropPartitionColumns)val hoodieRecord = if (shouldCombine) {val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false, parameters.getOrElse(DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean).asInstanceOf[Comparable[_]]DataSourceUtils.createHoodieRecord(processedRecord,orderingVal,keyGenerator.getKey(gr),hoodieConfig.getString(PAYLOAD_CLASS_NAME))} else {DataSourceUtils.createHoodieRecord(processedRecord, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_NAME))}hoodieRecord}).toJavaRDD()val writerDataSchema = if (dropPartitionColumns) generateSchemaWithoutPartitionColumns(partitionColumns, writerSchema) else writerSchema// Create a HoodieWriteClient & issue the write.val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writerDataSchema.toString, path,tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key))).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {asyncCompactionTriggerFn.get.apply(client)}if (isAsyncClusteringEnabled(client, parameters)) {asyncClusteringTriggerFn.get.apply(client)}val hoodieRecords =if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) {DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters))} else {hoodieAllIncomingRecords}client.startCommitWithTime(instantTime, commitActionType)val writeResult = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)(writeResult, client)}
  • 如果开启了Schema Evolution,也就是hoodie.datasource.write.reconcile.schematrue,默认是false,就会进行schema的合并
    convertStructTypeToAvroSchema 把df的schema转换成avro的schema
    并且从*.hoodie/20230530073115535.deltacommit* 获取internalSchemaOpt,具体的合并就是把即将写入的schema和internalSchemaOpt进行合并
    最后赋值给writerSchema,有可能还需要hoodie.schema.on.read.enable,默认是false

  • HoodieSparkUtils.createRdd 创建RDD
    把df转换为了RDD[GenericRecord]类型,赋值给genericRecords

  • val hoodieAllIncomingRecords = genericRecords.map(gr => {

    • 首先如果是hoodie.datasource.write.drop.partition.columnstrue(默认是false),则会从schema中删除hoodie.datasource.write.
      partitionpath.field
      字段
    • 如果hoodie.datasource.write.insert.drop.duplicatestrue(默认是false)或者hoodie.datasource.write.operationupsert(默认
      upsert),或者hoodie.combine.before.inserttrue(默认是false),
      则会创建HoodieAvroRecord<>(hKey, payload)类型的实例,其中HoodieKeyrecordkey和partitionpath组成,playloadOverwriteWithLatestAvroPayload实例
    • hoodieAllIncomingRecords就变成了RDD[HoodieAvroRecord]
  • writerDataSchema= client 这些就是创建SparkRDDWriteClient 客户端

  • isAsyncCompactionEnabled
    默认asyncCompactionTriggerFnDefined是没有的,所以不会开启异步的CompactionisAsyncClusteringEnabled同理也是

  • val hoodieRecords =
    如果配置了hoodie.datasource.write.insert.drop.duplicatestrue(默认是false),则会进行去重处理,具体是调用DataSourceUtils.dropDuplicates方法:

    SparkRDDReadClient client = new SparkRDDReadClient<>(new HoodieSparkEngineContext(jssc), writeConfig);return client.tagLocation(incomingHoodieRecords).filter(r -> !((HoodieRecord<HoodieRecordPayload>) r).isCurrentLocationKnown());
    
    • SparkRDDReadClient client 在创建Client的时候,会进行索引的创建this.index = SparkHoodieIndexFactory.createIndex(clientConfig);
      如果有hoodie.index.class设置,则实例化对象,否则根据hoodie.index.type的值来建立索引(默认是HoodieSimpleIndex,适合做测试用)
    • client.tagLocation(incomingHoodieRecords)…
      从要插入的记录中过滤出在index中不存在的记录,最终调用的是index.tagLocation方法
      如果hoodie.datasource.write.insert.drop.duplicatesfalse,则保留所有的记录
  • client.startCommitWithTime 开始写操作,这涉及到回滚的操作

    • 会先过滤出需要回滚的的的写失败的文件,如果hoodie.cleaner.policy.failed.writesEAGER(默认是EAGER),就会在这次提交中回滚失败的文件
    • 然后创建一个后缀为deltacommit.requested的文件,此时没有真正的写
  • val writeResult = DataSourceUtils.doWriteOperation
    真正的写操作

相关文章:

Apache Hudi初探(八)(与spark的结合)--非bulk_insert模式

背景 之前讨论的都是’hoodie.datasource.write.operation’:bulk_insert’的前提下&#xff0c;在这种模式下&#xff0c;是没有json文件的已形成如下的文件&#xff1a; /dt1/.hoodie_partition_metadata /dt1/2ffe3579-6ddb-4c5f-bf03-5c1b5dfce0a0-0_0-41263-0_202305282…...

Java之旅(九)

Java 循环语句 Java 中的循环语句包括 for、while 和 do-while&#xff0c;它们都可以用于实现循环结构。 for 语句用于循环执行一段代码块&#xff0c;直到给定的条件表达式的布尔值为 false。 for 语句的一般格式如下&#xff1a; for (initialization; condition; update…...

6年测试经验之谈,为什么要做自动化测试?

一、自动化测试 自动化测试是把以人为驱动的测试行为转化为机器执行的一种过程。 个人认为&#xff0c;只要能服务于测试工作&#xff0c;能够帮助我们提升工作效率的&#xff0c;不管是所谓的自动化工具&#xff0c;还是简单的SQL 脚本、批处理脚本&#xff0c;还是自己编写…...

二分法的边界条件 2517. 礼盒的最大甜蜜度

2517. 礼盒的最大甜蜜度 给你一个正整数数组 price &#xff0c;其中 price[i] 表示第 i 类糖果的价格&#xff0c;另给你一个正整数 k 。 商店组合 k 类 不同 糖果打包成礼盒出售。礼盒的 甜蜜度 是礼盒中任意两种糖果 价格 绝对差的最小值。 返回礼盒的 最大 甜蜜度。 记录一…...

java设计模式(十六)命令模式

目录 定义模式结构角色职责代码实现适用场景优缺点 定义 命令模式&#xff08;Command Pattern&#xff09; 又叫动作模式或事务模式。指的是将一个请求封装成一个对象&#xff0c;使发出请求的责任和执行请求的责任分割开&#xff0c;然后可以使用不同的请求把客户端参数化&a…...

[运维] iptables限制指定ip访问指定端口和只允许指定ip访问指定端口

iptables限制指定ip访问指定端口 要使用iptables限制特定IP地址访问特定端口&#xff0c;您可以使用以下命令&#xff1a; iptables -A INPUT -p tcp -s <IP地址> --dport <端口号> -j DROP请将 <IP地址> 替换为要限制的IP地址&#xff0c;将 <端口号&g…...

JS学习笔记(3. 流程控制)

1. 分歧 1.1 if条件 if (条件) {...} // 为真则执行&#xff0c;单条语句可省略大括号 if (条件) {...} else {...}// 为真则执行if&#xff0c;否则执行else if (条件1) {...} else if (条件2) {...} else {...} // 条件1为真则&#xff0c;条件2为真则&#xff0c;否则执…...

遥感云大数据在灾害、水体与湿地领域典型案例及GPT模型教程

详情点击链接&#xff1a;遥感云大数据在灾害、水体与湿地领域典型案例及GPT模型教程 一&#xff1a;平台及基础开发平台 GEE平台及典型应用案例&#xff1b; GEE开发环境及常用数据资源&#xff1b; ChatGPT、文心一言等GPT模型 JavaScript基础&#xff1b; GEE遥感云重…...

什么是文件描述符以及重定向的本质和软硬链接(Linux)

目录 1 什么是文件&#xff1f;什么是文件操作&#xff1f;认识系统接口open 什么是文件描述符认识Linux底层进程如何打开的文件映射关系重定向的本质理解软硬链接扩展问题 1 什么是文件&#xff1f;什么是文件操作&#xff1f; 文件 文件内容 文件属性&#xff08;文件属性…...

LVM逻辑卷元数据丢失恢复案例 —— 筑梦之路

Lvm常见的故障主要是pv出现异常&#xff0c;有以下几种情况 一个是pv所在的磁盘发生了lvm的元数据损坏一个是系统无法识别到pv所在的磁盘一个是系统异常&#xff0c;断电等导致重启后盘符发生变化&#xff0c;也就是系统识别的磁盘uuid发生变化&#xff0c;但是wwid还是可以对应…...

Java技术规范概览

Java技术规范 目录概述需求&#xff1a; 设计思路实现思路分析1.Java JSR的部分2.JSR-000373.JSR-0000394.JSR-000337 参考资料和推荐阅读 Survive by day and develop by night. talk for import biz , show your perfect code,full busy&#xff0c;skip hardness,make a bet…...

【OpenMMLab AI实战营第二期】二十分钟入门OpenMMLab笔记

OpenMMlab 主页&#xff1a;openmmlab.com 开源地址&#xff1a;https://github.com/open-mmlab 学习视频地址&#xff1a;https://www.bilibili.com/video/BV1js4y1i72P/ 概述 开源成为人工智能行业发展引擎 时间轴 theano&#xff1a;2007 Caffe&#xff1a;2013 Ten…...

docker-compose单机容器集群编排

docker-compose dockerfile模板文件可以定义一个独立的应用容器&#xff0c;如果需要多个容器就需要服务编排。服务编排有很多技术方案 docker-compose开源的项目实现对容器集群的快速编排 docker-compose将所管理的容器分为三层&#xff0c;分别为工程&#xff0c;服务&#…...

CentOS7 安装Gitlab

1、安装依赖 sudo yum install -y curl openssh-server ca-certificates tzdata perl libsemanage-devel 2、安装邮件服务工具 sudo yum install -y postfix 3、配置GitLab 软件源镜像 curl -fsSL https://packages.gitlab.cn/repository/raw/scripts/setup.sh | /bin/bash …...

Mysql InnoDB的Buffer Pool

Buffer Pool 在MySQL服务器启动的时候就向操作系统申请了⼀⽚连续的内存&#xff0c;他们给这⽚内存起了个名&#xff0c;叫做Buffer Pool&#xff08;中⽂名 是缓冲池&#xff09;。 默认情况下Buffer Pool只有128M⼤⼩&#xff0c;最⼩值为5M&#xff0c;通过修改配置文件设…...

SMTP简单邮件传输协议(C/C++ 发送电子邮件)

SMTP是用于通过Internet发送电子邮件的协议。电子邮件客户端&#xff08;如Microsoft Outlook或macOS Mail应用程序&#xff09;使用SMTP连接到邮件服务器并发送电子邮件。邮件服务器还使用SMTP将邮件从一个邮件服务器交换到另一个。它不用于从服务器下载电子邮件&#xff1b;相…...

uploads靶场通关(1-11关)

Pass-01&#xff08;JS校验&#xff09; 看题目我们准备好我们的php脚本文件&#xff0c;命名为1.php 上传该php文件&#xff0c;发现上传失败 方法一&#xff1a;将浏览器的JavaScript禁用 然后就能上传了 方法二&#xff1a; 查看源码&#xff0c;发现只能上传以下形式的文…...

6.1黄金探底回升是否到顶,今日多空如何布局

近期有哪些消息面影响黄金走势&#xff1f;今日黄金多空该如何研判&#xff1f; ​黄金消息面解析&#xff1a;周三(5月31日)黄金期货价格攀升&#xff0c;美国国债收益率下降推动金价升至一周最高收盘位。美市尾盘&#xff0c;现货黄金收报1962.42美元/盎司&#xff0c;上升3…...

自定义ViewGroup实现流式布局

目录 1、View的绘制流程 2、自定义ViewGroup构造函数的作用 3、onMeasure 方法 3.1、View的度量方式 3.2、onMeasure方法参数的介绍 3.3、自定义ViewGroup onMeasure 方法的实现 4、onLayout方法 5、onDraw方法 6、自定义View的生命周期 7、自定义流式布局的实现 扩展&#xff…...

Git版本控制

目录 版本控制 概念 为什么需要版本控制&#xff1f; 常见的版本控制工具 Git 1、安装 2、了解基本的Linux命令 3、配置git 用户名和邮箱 4、git 工作模式 5、git 项目管理 6、git 分支 托管平台 远程仓库 Gitee 关联多个远程库 Git服务器 Git GUI 版本控制 概…...

全球首个30米分辨率湿地数据集(2000—2022)

数据简介 今天我们分享的数据是全球30米分辨率湿地数据集&#xff0c;包含8种湿地亚类&#xff0c;该数据以0.5X0.5的瓦片存储&#xff0c;我们整理了所有属于中国的瓦片名称与其对应省份&#xff0c;方便大家研究使用。 该数据集作为全球首个30米分辨率、覆盖2000–2022年时间…...

对WWDC 2025 Keynote 内容的预测

借助我们以往对苹果公司发展路径的深入研究经验&#xff0c;以及大语言模型的分析能力&#xff0c;我们系统梳理了多年来苹果 WWDC 主题演讲的规律。在 WWDC 2025 即将揭幕之际&#xff0c;我们让 ChatGPT 对今年的 Keynote 内容进行了一个初步预测&#xff0c;聊作存档。等到明…...

代理篇12|深入理解 Vite中的Proxy接口代理配置

在前端开发中,常常会遇到 跨域请求接口 的情况。为了解决这个问题,Vite 和 Webpack 都提供了 proxy 代理功能,用于将本地开发请求转发到后端服务器。 什么是代理(proxy)? 代理是在开发过程中,前端项目通过开发服务器,将指定的请求“转发”到真实的后端服务器,从而绕…...

【无标题】路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论

路径问题的革命性重构&#xff1a;基于二维拓扑收缩色动力学模型的零点隧穿理论 一、传统路径模型的根本缺陷 在经典正方形路径问题中&#xff08;图1&#xff09;&#xff1a; mermaid graph LR A((A)) --- B((B)) B --- C((C)) C --- D((D)) D --- A A -.- C[无直接路径] B -…...

Razor编程中@Html的方法使用大全

文章目录 1. 基础HTML辅助方法1.1 Html.ActionLink()1.2 Html.RouteLink()1.3 Html.Display() / Html.DisplayFor()1.4 Html.Editor() / Html.EditorFor()1.5 Html.Label() / Html.LabelFor()1.6 Html.TextBox() / Html.TextBoxFor() 2. 表单相关辅助方法2.1 Html.BeginForm() …...

tomcat入门

1 tomcat 是什么 apache开发的web服务器可以为java web程序提供运行环境tomcat是一款高效&#xff0c;稳定&#xff0c;易于使用的web服务器tomcathttp服务器Servlet服务器 2 tomcat 目录介绍 -bin #存放tomcat的脚本 -conf #存放tomcat的配置文件 ---catalina.policy #to…...

MyBatis中关于缓存的理解

MyBatis缓存 MyBatis系统当中默认定义两级缓存&#xff1a;一级缓存、二级缓存 默认情况下&#xff0c;只有一级缓存开启&#xff08;sqlSession级别的缓存&#xff09;二级缓存需要手动开启配置&#xff0c;需要局域namespace级别的缓存 一级缓存&#xff08;本地缓存&#…...

Unity中的transform.up

2025年6月8日&#xff0c;周日下午 在Unity中&#xff0c;transform.up是Transform组件的一个属性&#xff0c;表示游戏对象在世界空间中的“上”方向&#xff08;Y轴正方向&#xff09;&#xff0c;且会随对象旋转动态变化。以下是关键点解析&#xff1a; 基本定义 transfor…...

沙箱虚拟化技术虚拟机容器之间的关系详解

问题 沙箱、虚拟化、容器三者分开一一介绍的话我知道他们各自都是什么东西&#xff0c;但是如果把三者放在一起&#xff0c;它们之间到底什么关系&#xff1f;又有什么联系呢&#xff1f;我不是很明白&#xff01;&#xff01;&#xff01; 就比如说&#xff1a; 沙箱&#…...

【安全篇】金刚不坏之身:整合 Spring Security + JWT 实现无状态认证与授权

摘要 本文是《Spring Boot 实战派》系列的第四篇。我们将直面所有 Web 应用都无法回避的核心问题&#xff1a;安全。文章将详细阐述认证&#xff08;Authentication) 与授权&#xff08;Authorization的核心概念&#xff0c;对比传统 Session-Cookie 与现代 JWT&#xff08;JS…...