网站做代理需要空间是多少钱/友情链接你会回来感谢我
背景
在之前的文章中Apache Hudi初探(二)(与flink的结合)–flink写hudi的操作(JobManager端的提交操作) 有说到写hudi数据会涉及到写hudi真实数据以及写hudi元数据,这篇文章来说一下具体的实现
写hudi真实数据
这里的操作就是在HoodieFlinkWriteClient.upsert方法:
public List<WriteStatus> upsert(List<HoodieRecord<T>> records, String instantTime) {HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));table.validateUpsertSchema();preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());final HoodieWriteHandle<?, ?, ?, ?> writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(),instantTime, table, records.listIterator());HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable<T>) table).upsert(context, writeHandle, instantTime, records);if (result.getIndexLookupDuration().isPresent()) {metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());}return postWrite(result, instantTime, table);}
- initTable
初始化HoodieFlinkTable - preWrite
在这里几乎没什么操作 - getOrCreateWriteHandle
创建一个写文件的handle(假如这里创建的是FlinkMergeAndReplaceHandle),这里会记录已有的文件路径,后续FlinkMergeHelper.runMerge会从这里读取数
注意该构造函数中的init方法,会创建一个ExternalSpillableMap类型的map来存储即将插入的记录,这在后续upsert中会用到 - HoodieFlinkTable.upsert
这里进行真正的upsert操作,会调用FlinkUpsertDeltaCommitActionExecutor.execute,最终会调用到BaseFlinkCommitActionExecutor.execute,从而调用到FlinkMergeHelper.newInstance().runMergepublic void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,..) {final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation();HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) {readSchema = baseFileReader.getSchema();gWriter = new GenericDatumWriter<>(readSchema);gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetaFields());} else {gReader = null;gWriter = null;readSchema = mergeHandle.getWriterSchemaWithMetaFields();}wrapper = new BoundedInMemoryExecutor<>(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator),Option.of(new UpdateHandler(mergeHandle)), record -> {if (!externalSchemaTransformation) {return record;}return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord) record);});wrapper.execute();。。。mergeHandle.close();}
- externalSchemaTransformation=
这里有hoodie.avro.schema.external.transformation配置(默认是false)用来把在之前schame下的数据转换为新的schema下的数据 - wrapper.execute()
这里会最终调用到upsertHandle.write(record),也就是UpdateHandler.consumeOneRecord方法被调用的地方
如果keyToNewRecords报班了对应的记录,也就是说会有uodate的操作的话,就插入新的数据,public void write(GenericRecord oldRecord) {...if (keyToNewRecords.containsKey(key)) {if (combinedAvroRecord.isPresent() && combinedAvroRecord.get().equals(IGNORE_RECORD)) {copyOldRecord = true;} else if (writeUpdateRecord(hoodieRecord, oldRecord, combinedAvroRecord)) {copyOldRecord = false;}writtenRecordKeys.add(key); }}
writeUpdateRecord 这里进行数据的更新,并用writtenRecordKeys记录插入的记录 - mergeHandle.close()
这里的writeIncomingRecords会判断如果writtenRecordKeys没有包含该记录的话,就直接插入数据,而不是更新public List<WriteStatus> close() {writeIncomingRecords();...}...protected void writeIncomingRecords() throws IOException {// write out any pending records (this can happen when inserts are turned into updates)Iterator<HoodieRecord<T>> newRecordsItr = (keyToNewRecords instanceof ExternalSpillableMap)? ((ExternalSpillableMap)keyToNewRecords).iterator() : keyToNewRecords.values().iterator();while (newRecordsItr.hasNext()) {HoodieRecord<T> hoodieRecord = newRecordsItr.next();if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {writeInsertRecord(hoodieRecord);}}}
- externalSchemaTransformation=
总结一下upsert的关键点:
mergeHandle.close()才是真正的写数据(insert)的时候,在初始化handle的时候会把记录传导writtenRecordKeys中(在HoodieMergeHandle中的init方法)mergeHandle的write() 方法会在写入数据的时候,如果发现有新的数据,则会写入新的数据(update)
写hudi元数据
这里的操作是StreamWriteOperatorCoordinator.notifyCheckpointComplete
方法
public void notifyCheckpointComplete(long checkpointId) {...final boolean committed = commitInstant(this.instant, checkpointId);...
}...
private boolean commitInstant(String instant, long checkpointId){...doCommit(instant, writeResults);...
}...
private void doCommit(String instant, List<WriteStatus> writeResults) {// commit or rollbacklong totalErrorRecords = writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);long totalRecords = writeResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L);boolean hasErrors = totalErrorRecords > 0;if (!hasErrors || this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) {HashMap<String, String> checkpointCommitMetadata = new HashMap<>();if (hasErrors) {LOG.warn("Some records failed to merge but forcing commit since commitOnErrors set to true. Errors/Total="+ totalErrorRecords + "/" + totalRecords);}final Map<String, List<String>> partitionToReplacedFileIds = tableState.isOverwrite? writeClient.getPartitionToReplacedFileIds(tableState.operationType, writeResults): Collections.emptyMap();boolean success = writeClient.commit(instant, writeResults, Option.of(checkpointCommitMetadata),tableState.commitAction, partitionToReplacedFileIds);if (success) {reset();this.ckpMetadata.commitInstant(instant);LOG.info("Commit instant [{}] success!", instant);} else {throw new HoodieException(String.format("Commit instant [%s] failed!", instant));}} else {LOG.error("Error when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);LOG.error("The first 100 error messages");writeResults.stream().filter(WriteStatus::hasErrors).limit(100).forEach(ws -> {LOG.error("Global error for partition path {} and fileID {}: {}",ws.getGlobalError(), ws.getPartitionPath(), ws.getFileId());if (ws.getErrors().size() > 0) {ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " and value " + value));}});// Rolls back instantwriteClient.rollback(instant);throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", instant));}
}
主要在commitInstant涉及动的方法doCommit(instant, writeResults)
如果说没有错误发生的话,就继续下一步:
这里的提交过程和spark中一样,具体参考Apache Hudi初探(五)(与spark的结合)
其他
在flink和spark中新写入的文件是在哪里分配对一个的fieldId:
//Flink中
BucketAssignFunction 中processRecord getNewRecordLocation 分配新的 fieldId//Spark中
BaseSparkCommitActionExecutor 中execute方法 中 handleUpsertPartition 涉及到的UpsertPartitioner getBucketInfo方法
其中UpsertPartitioner构造函数中 assignInserts 方法涉及到分配新的 fieldId
相关文章:

Apache Hudi初探(三)(与flink的结合)--flink写hudi的操作(真正的写数据)
背景 在之前的文章中Apache Hudi初探(二)(与flink的结合)–flink写hudi的操作(JobManager端的提交操作) 有说到写hudi数据会涉及到写hudi真实数据以及写hudi元数据,这篇文章来说一下具体的实现 写hudi真实数据 这里的操作就是在HoodieFlinkWriteClient.upsert方法: public …...

解释 Git 的基本概念和使用方式(InsCode AI 创作助手)
Git 是一种分布式版本控制系统,它允许多个用户协同工作并对项目进行版本控制。下面是 Git 的基本概念和使用方式: 基本概念: 仓库(Repository):存储代码和版本历史记录的地方。 提交(Commit&a…...

【QT】信号和槽(15)
前面的内容说了很多不同的控件如何使用,今天来看下QT的核心,信号与槽(Signals and slots)! 简单理解一下,就是我们的信号与槽连接上了之后,发射一个信号给到槽,槽函数接收到了这个信…...

EFLK日志平台(filebeat-->kafka-->logstash-->es-->kiabana)
ELK平台是一套完整的日志集中处理解决方案,将 ElasticSearch、Logstash 和 Kiabana 三个开源工具配合使用, 完成更强大的用户对日志的查询、排序、统计需求。 安装顺序 1.安装es 7.17.12 2.安装kibana 7.17.12 3.安装x-pack 保证以上调试成功后开始下面…...

C盘扩容遇到的问题(BitLocker解密、)
120G的C盘不知不觉的就满了,忍了好久终于要动手了。 尽管电脑-管理--磁盘管理里可以进行磁盘大小调整,但由于各盘都在用,不能够连续调整,所以选用DiskGenius。 # DiskGenius调整分区大小遇到“您选择的分区不支持无损调整容量” …...

ShardingSphere——柔性事务SEATA原理
摘要 Apache ShardingSphere集成了 SEATA 作为柔性事务的使用方案,本文主要介绍其实现ShardingSphere中柔性事务SEATA原理原理。帮助你更好的理解ShardingSphere原理。同时帮助大家更好的使用柔性事务SEATA原理。 一、Seata柔性事务 Apache ShardingSphere 集成了…...

Introducing GlobalPlatform(一篇了解GP)
安全之安全(security)博客目录导读 TEE之GP(Global Platform)认证汇总 目录 一、GP简介 二、GP新的重点领域是什么? 三、认证程序和培训<...

Ubuntu 18.04上无法播放MP4格式视频解决办法
ubuntu18.04系统无法播放MP4格式视频,提示如下图所示: 解决办法: 1、首先,确保ubuntu系统已完全更新。可使用以下命令更新软件包列表:sudo apt update,然后使用以下命令升级所有已安装的软件包:…...

科技驱动产业升级:浅谈制造型企业对MES系统的应用
在科技不断进步的背景下,制造型行业也在持续发展,但随之而来的挑战也不断增加。传统的管理方式已经无法满足企业的需求,因此许多制造型企业开始寻找新的管理模式。制造执行系统(MES)作为先进的制造信息技术之一&#x…...

智能化新十年,“全栈智能”定义行业“Copilot智能助手”
“智能化转型是未来十年中国企业穿越经济周期的利器”,这是联想集团执行副总裁兼中国区总裁刘军在去年联想创新科技大会上做出的判断,而2023年正值第四次工业革命第二个十年的开端,智能化是第四次工业革命的主题。2023年初,基于谷…...

Docker资源控制cgroups
文章目录 一、docker资源控制1、资源控制工具2、Cgroups四大功能 二、CPU 资源控制1、设置CPU使用率上限2、CPU压力测试3、Cgroups限制cpu使用率4、设置CPU资源占用比(设置多个容器时才有效)5、设置容器绑定指定的CPU 三、对内存使用的限制四、对磁盘IO配…...

通过python 获取当前局域网内存在的IP和MAC
通过python 获取当前局域网内存在的ip 通过ipconfig /all 命令获取局域网所在的网段 通过arp -d *命令清空当前所有的arp映射表 循环遍历当前网段所有可能的ip与其ping一遍建立arp映射表 for /L %i IN (1,1,254) DO ping -w 1 -n 1 192.168.3.%i 通过arp -a命令读取缓存的映射表…...

解决D盘的类型不是基本,而是动态的问题
一、正确的图片 1.1图片 1.2本人遇到的问题 二、将动态磁盘 转为基本盘 2.1 基本概念,动态无法转化为基本,不是双向的,借助软件 网址:转换动态磁盘到普通磁盘_检测到计算机本地磁盘为动态分区_卫水金波的博客-CSDN博客 2.2分区…...

如何判断自己的qt版本呢?
如何判断自己的qt版本呢? 前情提要很简单,按照如下图所示,即可查看当前打开的qtCreator的版本如何打开5.15.2版本的qtCreator呢?安装教程 前情提要 我的电脑已经安装了qt5.14.1,然后我又安装了qt5.15.2,我想尝试一下同一台电脑能否适应两个版本的qt? 当我安装完成qt5.15.2后…...

【文心一言大模型插件制作初体验】制作面试错题本大模型插件
文心一言插件开发初体验 效果图 注意:目前插件仅支持在本地运行,虽然只能自用,但仍然是一个不错的选择。(什么?你说没有用?这不可能!文心一言app可以支持语音,网页端结合手机端就可…...

ROS 2官方文档(基于humble版本)学习笔记(二)
ROS 2官方文档(基于humble版本)学习笔记(二) 理解节点(node)ros2 runros2 node list重映射(remap)ros2 node info 理解话题(topic)rqt_graphros2 topic listr…...

excel中公式结合实际的数据提取出公式计算的分支
要在Excel中使用公式结合实际数据提取分支信息,您可以使用一些文本函数和条件函数来实现这个目标。以下是一个示例,假设您有一个包含银行交易描述的列A,想要从中提取分支信息: 假设交易描述的格式是"分行名称-交易类型"…...

3D模型优化实战:LowPoly、纹理烘焙及格式转换
在快节奏的游戏和虚拟/增强现实 (VR/AR) 世界中,3D 模型的优化在提供引人入胜的体验方面发挥着关键作用。 这门学科不仅仅是创造令人着迷的图形结构; 这是视觉质量和游戏流畅性之间的平衡问题,确保细致而流畅的游戏环境。 通过低多边形建模等…...

genome comparison commend 2 MCMCtree
仅本人练习使用!!后续会逐渐修改!! mcmctree估算物种分歧时间 - 简书 https://www.cnblogs.com/bio-mary/p/12818888.html 估算系统树分歧时间 —— paml.mcmctree,r8s | 生信技工 http://www.chenlianfu.com/?p2948 4. 使用PAM…...

Linux安装JenkinsCLI
项目简介安装目录 mkdir -p /opt/jenkinscli && cd /opt/jenkinscli JenkinsCLI下载 wget http://<your-jenkins-server>/jnlpJars/jenkins-cli.jar # <your-jenkins-server> 替换为你的 Jenkins 服务器地址 JenkinsCLI授权 Dashboard-->Configure Glob…...

Midjourney学习(一)prompt的基础
prompt目录 sd和mj的比较prompt组成风格表现风格时代描述表情色彩情绪环境 sd和mj的比较 自从去年9月份开始,sd就变得非常或火,跟它一起的还有一个midjourney。 他们就像是程序界的两种模式,sd是开源的,有更多的可能性更可控。但是…...

12 权重衰退
过拟合的应对方法——weight_decay 权重衰退是最广泛使用的正则化方法之一。 模型容量受参数个数和参数范围影响,通过L2正则项限制w的取值范围,权重w每次更新乘以小于1的数,w的数值范围不会太大,从而降低模型复杂度,…...

简化测试流程,提供卓越服务:TestComplete+Salesforce满足不断发展的企业的需求
2015年,一群前Salesforce员工发现了病毒防护市场中的一个空白:Salesforce不会对文档进行威胁扫描。为了填补这一空白,他们创建了一个平台,并以该平台作为中心帮助公司保护所有的企业云SaaS系统,使其免受威胁。这个平台…...

kafka 命令脚本说明以及在java中使用
一、命令行使用 1.1、topic 命令 1、关于topic,这里用window 来示例 bin\windows\kafka-topics.bat2、创建 first topic,五个分区,1个副本 bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --create --partitions 5 --replication-factor 1 -…...

Qt应用开发(基础篇)——文件选择对话框 QFileDialog
一、前言 QFileDialog类继承于QDialog,提供了一个允许用户选择文件或目录的对话框。 对话框窗口 QDialog QFileDialog文件选择对话框允许用户在当前文件系统中选择一个或者多个文件或者文件路径,使用静态函数创建是很简便的方式,比如…...

图像OCR转文字,验证码识别技术太疯狂-UI软件自动化
现在用PYTHON识别图片文字,PaddleOCR,Tesseract,Opencv等很多开源技术。知识大爆炸年代,几年不学习就跟不上时代了。 以前早的时候一个验证码图片上有4个不同颜色字符,带一些杂点,我写点代码按颜色最多的进行提取&…...

Docker:自定义镜像
(总结自b站黑马程序员课程) 环环相扣,跳过部分章节和知识点是不可取的。 一、镜像结构 镜像是分层结构,每一层称为一个Layer。 ①BaseImage层:包含基本的系统函数库、环境变量、文件系统。 ②Entrypoint࿱…...

【Nginx22】Nginx学习:FastCGI模块(四)错误处理及其它
Nginx学习:FastCGI模块(四)错误处理及其它 FastCGI 最后一篇,我们将学习完剩下的所有配置指令。在这里,错误处理还是单独拿出来成为一个小节了,而剩下的内容都放到其它中进行学习。不要感觉是其它的就没用了…...

轮毂电机单位换算-米每秒/转每分
先前写了一篇度/S和RPM的关系 这次补全一点 假设轮毂电机直径20CM 0.2M 周长为0.628M 0.2*3.14 轮子转一圈走0.628M 1RPM的单位是转/分 换成转/S 就除以60 也就是轮子转一圈的速度0.628/60 m/S 0.010467m/S 所以换算如下: 1RPM0.010467 m/S 那么1m/S1/(0.010467) RPM95.5RPM 如…...

博流RISC-V芯片BL616开发环境搭建
文章目录 1、工具安装2、代码下载3、环境变量配置4、下载交叉编译器5、编译与下载运行6、使用ninja编译 本文分别介绍博流RISC-V芯片 BL616 在 Windows和Linux 下开发环境搭建,本文同时适用BL618,BL602,BL702,BL808系列芯片。 1、…...