Spark中多分区写文件前可以不排序么
背景
Spark 3.5.0
目前 Spark中的实现中,对于多分区的写入默认会先排序,这是没必要的。可以设置spark.sql.maxConcurrentOutputFileWriters 为大于0
来避免排序。
分析
这部分主要分为三个部分:
一个是V1Writes规则的重改
;
另一个是FileFormatWriter中的dataWriter的选择
;
还有一个是Spark中为什么会加上Sort
这三部分是需要结合在一起分析讨论的
V1Writes规则的重改
直接转到代码部分:
object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {import V1WritesUtils._override def apply(plan: LogicalPlan): LogicalPlan = {if (conf.plannedWriteEnabled) {plan.transformUp {case write: V1WriteCommand if !write.child.isInstanceOf[WriteFiles] =>val newQuery = prepareQuery(write, write.query)val attrMap = AttributeMap(write.query.output.zip(newQuery.output))val writeFiles = WriteFiles(newQuery, write.fileFormat, write.partitionColumns,write.bucketSpec, write.options, write.staticPartitions)val newChild = writeFiles.transformExpressions {case a: Attribute if attrMap.contains(a) =>a.withExprId(attrMap(a).exprId)}val newWrite = write.withNewChildren(newChild :: Nil).transformExpressions {case a: Attribute if attrMap.contains(a) =>a.withExprId(attrMap(a).exprId)}newWrite}} else {plan}}
其中 prepareQuery
是对满足条件的计划前加上Sort
逻辑排序,其中prepareQuery关键的代码
如下:
val requiredOrdering = write.requiredOrdering.map(_.transform {case a: Attribute => attrMap.getOrElse(a, a)}.asInstanceOf[SortOrder])val outputOrdering = empty2NullPlan.outputOrderingval orderingMatched = isOrderingMatched(requiredOrdering.map(_.child), outputOrdering)if (orderingMatched) {empty2NullPlan} else {Sort(requiredOrdering, global = false, empty2NullPlan)}
write.requiredOrdering
中涉及到的类为InsertIntoHadoopFsRelationCommand
和InsertIntoHiveTable
,且这两个物理计划中的requiredOrdering
实现都是:
V1WritesUtils.getSortOrder(outputColumns, partitionColumns, bucketSpec, options)
getSortOrder
方法关键代码如下:
val sortColumns = V1WritesUtils.getBucketSortColumns(bucketSpec, dataColumns)if (SQLConf.get.maxConcurrentOutputFileWriters > 0 && sortColumns.isEmpty) {// Do not insert logical sort when concurrent output writers are enabled.Seq.empty} else {// We should first sort by dynamic partition columns, then bucket id, and finally sorting// columns.(dynamicPartitionColumns ++ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns).map(SortOrder(_, Ascending))}
所以说 如果 spark.sql.maxConcurrentOutputFileWriters为0
(默认值为0),则会加上Sort逻辑计划
,具体的实现可以参考SPARK-37287
如果spark.sql.maxConcurrentOutputFileWriters为0
(默认值为0)且 sortColumns为空
(大部分情况下为空,除非建表是partition加上bucket),则不会加上Sort逻辑计划
FileFormatWriter 中的dataWriter的选择
在 InsertIntoHadoopFsRelationCommand
和InsertIntoHiveTable
这两个物理计划中,最终写入文件/数据的时候,会调用到FileFormatWriter.write
方法,这里有个concurrentOutputWriterSpecFunc
函数变量的设置:
val concurrentOutputWriterSpecFunc = (plan: SparkPlan) => {val sortPlan = createSortPlan(plan, requiredOrdering, outputSpec)createConcurrentOutputWriterSpec(sparkSession, sortPlan, sortColumns)}val writeSpec = WriteFilesSpec(description = description,committer = committer,concurrentOutputWriterSpecFunc = concurrentOutputWriterSpecFunc)executeWrite(sparkSession, plan, writeSpec, job)
设置concurrentOutputWriterSpecFunc
的代码如下:
private def createConcurrentOutputWriterSpec(sparkSession: SparkSession,sortPlan: SortExec,sortColumns: Seq[Attribute]): Option[ConcurrentOutputWriterSpec] = {val maxWriters = sparkSession.sessionState.conf.maxConcurrentOutputFileWritersval concurrentWritersEnabled = maxWriters > 0 && sortColumns.isEmptyif (concurrentWritersEnabled) {Some(ConcurrentOutputWriterSpec(maxWriters, () => sortPlan.createSorter()))} else {None}}
如果 spark.sql.maxConcurrentOutputFileWriters为0
(默认值为0),则ConcurrentOutputWriterSpec
为None
如果 spark.sql.maxConcurrentOutputFileWriters大于0
且 sortColumns为空
(大部分情况下为空,除非建表是partition加上bucket),则为Some(ConcurrentOutputWriterSpec(maxWriters, () => sortPlan.createSorter())
其中executeWrite
会调用WriteFilesExec.doExecuteWrite
方法,从而调用FileFormatWriter.executeTask
,这里就涉及到dataWriter
选择:
val dataWriter =if (sparkPartitionId != 0 && !iterator.hasNext) {// In case of empty job, leave first partition to save meta for file format like parquet.new EmptyDirectoryDataWriter(description, taskAttemptContext, committer)} else if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) {new SingleDirectoryDataWriter(description, taskAttemptContext, committer)} else {concurrentOutputWriterSpec match {case Some(spec) =>new DynamicPartitionDataConcurrentWriter(description, taskAttemptContext, committer, spec)case _ =>new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer)}}
这里其实会根据 concurrentOutputWriterSpec来选择不同的dataWriter
,默认情况下为DynamicPartitionDataSingleWriter
否则就会为DynamicPartitionDataConcurrentWriter
这两者的区别,见下文
Spark中为什么会加上Sort
至于Spark在写入文件的时候会加上Sort,这个是跟写入的实现有关的,也就是DynamicPartitionDataSingleWriter
和DynamicPartitionDataConcurrentWriter
的区别:
- DynamicPartitionDataSingleWriter 在任何时刻,只有一个writer在写文件,这能保证写入的稳定性,不会在写入文件的时候消耗大量的内存,但是速度会慢
- DynamicPartitionDataConcurrentWriter 会有多个 writer 同时写文件,能加快写入文件的速度,但是因为多个文件的同时写入,可能会导致OOM
对于DynamicPartitionDataSingleWriter
会根据partition或者bucket作为最细粒度来作为writer的标准,如果相邻的两条记录所属不同的partition或者bucket,则会切换writer,所以说如果不根据partition或者bucket排序的话,会导致writer
频繁的切换,这会大大降低文件的写入速度。所以说需要根据partition或者bucket进行排序。
参考
- [SPARK-37287][SQL] Pull out dynamic partition and bucket sort from FileFormatWriter
- [SQL] Allow FileFormatWriter to write multiple partitions/buckets without sort
相关文章:
Spark中多分区写文件前可以不排序么
背景 Spark 3.5.0 目前 Spark中的实现中,对于多分区的写入默认会先排序,这是没必要的。可以设置spark.sql.maxConcurrentOutputFileWriters 为大于0来避免排序。 分析 这部分主要分为三个部分: 一个是V1Writes规则的重改; 另一个是FileFormatWriter中…...
突破编程_C++_面试(变量与常量)
面试题 1 : C 中的变量存储类别有哪些,并简要描述它们的特点? 在C中,变量的存储类别决定了变量的生命周期和可见性。以下是C中的几种变量存储类别及其特点: 自动存储期 也称为局部存储类别。这类变量在函数或代码块…...
k8s的一些关键信息(归类摘抄,非提炼)
零:举例说明 当用户提交一个 Deployment 对象到 Kubernetes 集群时,控制平面的 API Server 接收到该请求,并将其转发给 Controller Manager。Controller Manager 中的 Deployment Controller 监听到该请求,并根据用户定义的配置信…...
海外媒体发稿:8个提升影响力的日韩地区媒体发稿推广策略-华媒舍
在今天的数字化时代,媒体发稿推广成为企业和个人增加影响力的重要方式。特别是在日韩地区,这个拥有庞大媒体市场和活跃社交媒体用户的地区,正确的推广策略将对影响力的提升起到关键作用。我们将介绍8个提升影响力的日韩地区媒体发稿推广策略。…...
面试官:能不能给 Promise 增加取消功能和进度通知功能... 我:???
扯皮 这段时间闲着没事就去翻翻红宝书,已经看到 Promise 篇了,今天又让我翻到两个陌生的知识点。 因为 Promise 业务场景太多了自我感觉掌握的也比较透彻,之前也跟着 Promise A 的规范手写过完整的 Promise,所以这部分内容基本上…...
详解MySQL增删查改
众所周知,MySQL是非常重要的数据库语言,下面我们来回顾一下mysql的增删查改吧 MySQL创建数据库: CREATE DATABASE 数据库名;MySQL删除数据库: DROP DATABASE <database_name>; --直接删除,不检查是否存在 DROP…...
Mysql开启bin-log日志
目录 一、安装配置 二、mysqlbinlog命令 一、安装配置 yum -y install mariadb mariadb-server#安装mysql数据库#默认配置文件/etc/my.cnfvim /etc/my.cnflog-binmariadb-bin #开启二进制日志 systemctl restart mariadb#会在/car/lib/mysql/产生二进制日志文件࿰…...
Java:性能优化细节01-10
Java:性能优化细节01-10 在Java程序开发过程中,性能优化是一个重要的考虑因素。常见的误解是将性能问题归咎于Java语言本身,然而实际上,性能瓶颈更多地源于程序设计和代码实现方式的不当。因此,培养良好的编码习惯不仅…...
CVE-2022-24652 漏洞复现
CVE-2022-24652 开题 后台管理是thinkphp的,但是工具没检测出漏洞。 登陆后界面如下,上传头像功能值得引起注意 这其实就是CVE-2022-24652,危险类型文件的不加限制上传,是文件上传漏洞。漏洞路由/user/upload/upload 参考文章&a…...
LeetCode、338. 比特位计数【简单,位运算】
文章目录 前言LeetCode、338. 比特位计数【中等,位运算】题目链接与分类思路位运算移位处理前缀思想实现 资料获取 前言 博主介绍:✌目前全网粉丝2W,csdn博客专家、Java领域优质创作者,博客之星、阿里云平台优质作者、专注于Java…...
借助Aspose.BarCode条码控件,C# 中的文本转 QR 码生成器
二维码用于在较小的空间内存储大量数据。它们易于使用,可以通过智能手机或其他设备扫描来打开网站、观看视频或访问其他编码信息。在这篇博文中,我们将学习如何使用 C# 以编程方式生成基于文本的 QR 码。我们将提供分步指南和代码片段,帮助您…...
vue打包优化,webpack的8大配置方案
vue-cli 生成的项目通常集成Webpack ,在打包的时候,需要webpack来做一些事情。这里我们希望它可以压缩代码体积,提高运行效率。 文章目录 (1)代码压缩:(2)图片压缩:&…...
B端系统从0到1:有几步,其中需求分析要做啥?
一款B系统从无到有都经历了啥,而其中的需求分析又要做什么?贝格前端工场给老铁们做一下分析,文章写作不易,如果咱们有界面设计和前端开发需求,别忘了私信我呦,开始了。 一、B端系统从0到1都有哪些要走的步骤…...
django中查询优化
在Django中,查询优化是一个重要的主题,因为不正确的查询可能会导致性能问题,尤其是在处理大量数据时。以下是一些在Django中进行查询优化的建议: 一:使用select_related和prefetch_related: select_related用于优化一…...
【JavaScript】输入输出语法
目录 一、输出语法 二、输入语法 一、输出语法 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>D…...
多模态基础--- word Embedding
1 word Embedding 原始的单词编码方式: one-hot,维度太大,不同单词之间相互独立,没有远近关系区分。 wordclass,将同一类单词编码在一起,此时丢失了类别和类别间的相关信息,比如class1和class3…...
Mysql 日志
0 引言 MySQL日志主要分为4类,使用这些日志文件,可以查看MySQL内部发生的事情。这4类日志分别是: ● 错误日志:记录MySQL服务的启动、运行或停止MySQL服务时出现的问题。 ● 查询日志:记录建立的客户端连接和执行的…...
【开源】SpringBoot框架开发服装店库存管理系统
目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 数据中心模块2.2 角色管理模块2.3 服装档案模块2.4 服装入库模块2.5 服装出库模块 三、系统设计3.1 用例设计3.2 数据库设计3.2.1 角色表3.2.2 服装档案表3.2.3 服装入库表3.2.4 服装出库表 四、系统展示五、核心代码5.…...
云原生之容器编排实践-在K8S集群中使用Registry2搭建私有镜像仓库
背景 基于前面搭建的3节点 Kubernetes 集群,今天我们使用 Registry2 搭建私有镜像仓库,这在镜像安全性以及离线环境下运维等方面具有重要意义。 Note: 由于是测试环境,以下创建了一个 local-storage 的 StorageClass ,并使用本地…...
标准IO 2月4日学习笔记
IO输入输出,操作对象是文件 Linux文件类型: b block 块设备文件 按块扫描设备信息的文件 存储设备 c character 字符设备文件 按字符扫描设备信息的文件 d direct…...
如何在1Panel上偷渡HTTP/3
本文 首发于 Anyeの小站,转载请取得作者同意。 前言 简介 HTTP/3 的基础即谷歌多年探索的基于 UDP 的 QUIC 协议。与 TCP 相比,使用 UDP 可以提供更大的灵活性,并且可以使 QUIC 完全于用户空间中实现——对协议实现的更新不像 TCP 那样需要绑…...
Qt实用技巧:QCustomPlot做北斗GPS显示绝对位置运动轨迹和相对位置运动轨迹图的时,使图按照输入点顺序连曲线
若该文为原创文章,转载请注明原文出处 本文章博客地址:https://hpzwl.blog.csdn.net/article/details/136131310 红胖子网络科技博文大全:开发技术集合(包含Qt实用技术、树莓派、三维、OpenCV、OpenGL、ffmpeg、OSG、单片机、软硬…...
116 C++ 可变参数函数,initializer_list (初始化列表), 省略号形参
一 可变参数函数 有时候我们传递的参数是不固定的。 这种能接受非固定个数参数的函数就是可变参数函数 怎么实现呢?就要用到 initializer_list 标准库类型 该类型能够使用的前提条件是:所有的实参类型相同。 二,initializer_list(初始化列…...
强国有我社会实践公益活动在合肥市庐阳区开展
2月18日是开工第一天,阳光灿烂、春光明媚。合肥市四十五中2022级星辰(5)班部分同学在监护人的陪伴下来到庐阳区双岗街道万小店社区残疾人工作站,和工作站兄弟姐妹们共同开展“强国复兴有我”社会实践公益活动。合肥市庐阳区为民社…...
Nginx 正向代理、反向代理
文章目录 前言1. 正向代理1.1 概念1.2 逻辑图1.3 使用场景 2. 反向代理2.1 概念2.2 逻辑图2.3 使用场景 前言 正向代理主要是用来解决访问限制问题;反向代理则是提供负载均衡、安全防护等作用 1. 正向代理 1.1 概念 正向代理是一个位于客户端和目标服务器之间的代理…...
软考学习--计算机组成原理与体系结构
计算机组成原理与体系结构 数据的表示 进制转换 R 进制转换为 10 进制–按权展开法 10进制转换为2进制 原码 反码 补码 移码 原码 :数字的二进制表示反码 : 正数的反码等于原码,负数的反码等于原码取反补码: 正数的补码等…...
fish终端下conda activate失败
【问题】fish终端下激活conda环境报错: >> conda activate base CondaError: Run conda init before conda activate ## 然而运行 conda init fish 仍旧无法解决【解决】 参考:https://github.com/conda/conda/issues/11079 方法一…...
FPGA之移位寄存器
SLICEM中的LUT可以配置为32位移位寄存器,而无需使用slice中可用的触发器。以这种方式使用,每个LUT 可以将串 行数据延迟 1 到 32 个时钟周期。移入D (DI1 LUT 引脚)和移出 Q31(MC31 LUT 引脚)线路将LUT级联,以形成更大…...
Android Compose Material3 ModalNavigationDrawer 抽屉的使用(处理了一些坑)
Android Compose Material3 ModalNavigationDrawer 抽屉的使用(处理了一些坑) val drawerState rememberDrawerState(initialValue DrawerValue.Closed) val scope rememberCoroutineScope()ModalNavigationDrawer(drawerState drawerState,drawerC…...
golang select两个channel性能稳定,三个channel时性能会发生抖动,为什么?
golang select两个channel性能稳定,三个channel时性能会发生抖动,为什么? 答题思路 select —> 让 Goroutine同时等待多个 Channel 可读或者可写 —> Goroutine —> 调度器调度 —> 资源竞争 —> 不稳定、抖动 在 Go 中&#…...
VSCODE上使用python_Django
接上篇 https://blog.csdn.net/weixin_44741835/article/details/136135996?csdn_share_tail%7B%22type%22%3A%22blog%22%2C%22rType%22%3A%22article%22%2C%22rId%22%3A%22136135996%22%2C%22source%22%3A%22weixin_44741835%22%7D VSCODE官网: Editing Python …...
探索IDE的世界:什么是IDE?以及适合新手的IDE推荐
引言 在编程的世界里,集成开发环境(IDE)是我们日常工作的重要工具。无论是初学者还是经验丰富的开发者,一个好的IDE都能极大地提高我们的编程效率。那么,什么是IDE呢?对于新手来说,又应该选择哪…...
DoRA(权重分解低秩适应):一种新颖的模型微调方法
来自:小互 DoRA(权重分解低秩适应):一种新颖的模型微调方法 DoRA在LoRA的基础上进一步发展,通过将预训练权重分解为“幅度”和“方向”两个部分进行微调。 这种权重分解方法允许DoRA更精细地控制模型的学习过程&…...
centos7.9 搭建k8s
K3s -轻量级Kubernetes K3s 是轻量级的 Kubernetes。K3s 易于安装,仅需要 Kubernetes 内存的一半,所有组件都在一个小于 100 MB 的二进制文件中。 为什么叫 K3s? 我们希望安装的 Kubernetes 只占用一半的内存。Kubernetes 是一个 10 个字母的单词&am…...
使用vite创建项目
NPM npm create vitelatest Yarn yarn create vite PNPM pnpm create vite Bun bunx create-vite 安装sass npm add -D sass 安装less npm add -D less vite官方中文文档:Vite | 下一代的前端工具链 (vitejs.dev)...
EXTI外部中断
? 难点:中断向量表、看门狗、NVIC的优先级位?EXTI框图? ------------------------ 中断系统 中断:在主程序运行过程中,出现了特定的中断触发条件(中断源)--->例如:…...
小肥柴慢慢手写数据结构(C篇)(5-4 中场小结)
小肥柴慢慢学习数据结构笔记(C篇)(5-4 中场小结) 目录5-14 再看数据结构的基础问题5-15 接下来关于Tree你还需要学习和了解的内容参考文献和资料 目录 5-14 再看数据结构的基础问题 假设前面讨论的所有内容大家都已经自己编码实…...
flutter 功能
flutter功能 带缓存的tab切换功能 使用PageController进行对应tab的widget缓存 late PageController _keepActiveVC;///当前使用的视图索引late int _index;late PageController _keepActiveVC;/// 所有视图final List<Widget> _bodys [];overridevoid initState() {…...
Sql Server 存储过程
一、创建存储过程 USE [数据库名称] GO /****** Object: StoredProcedure [dbo].[存储过程名称] Script Date: 2024/2/19 9:47:49 ******/ SET ANSI_NULLS ON GO SET QUOTED_IDENTIFIER ON GO -- -- Author: <Author,,Name> -- Create date: <Create Dat…...
二.重新回炉Spring Framework:Spring Framework主要组件概览
1.写在前面的话 这里主要简单说一下Spring Framework的几个核心组件的总体情况。为了比较直观,这里使用了ClassPathXmlApplicationContext的类图来进行说明。它基本上包含了 IoC 体系中大部分的核心类和接口。类图如下图所示: 2.Resource 组件体系 R…...
Open CASCADE学习|曲线向曲面投影
在三维空间中,将曲线向曲面投影通常涉及复杂的几何计算。这个过程可以通过多种方法实现,但最常见的是使用数学和几何库,如OpenCASCADE,来处理这些计算。 在OpenCASCADE中,投影曲线到曲面通常涉及以下步骤:…...
怎样连接局域网?
局域网(Local Area Network,缩写为LAN)是建立在小范围内的计算机网络,用于连接同一建筑物或者办公场所内的设备。连接局域网可以实现设备之间的信息共享和远程通信。本文将介绍如何连接局域网,并介绍了天联组网天联的使…...
OpenAI 发布文生视频大模型 Sora,AI 视频要变天了,视频创作重新洗牌!AGI 还远吗?
一、一觉醒来,AI 视频已变天 早上一觉醒来,群里和朋友圈又被刷屏了。 今年开年 AI 界最大的震撼事件:OpenAI 发布了他们的文生视频大模型 Sora。 OpenAI 文生视频大模型 Sora 的横空出世,预示着 AI 视频要变天了,视…...
java基础day01
1.什么是Java Java是一门编程语言 思考问题: 人和人沟通? 中文 英文 人和计算机沟通? 计算机语言: C C C# php python 2. Java诞生 前身叫Oak(橡树)…...
读十堂极简人工智能课笔记06_自然语言处理
1. 聊天机器人 1.1. 人工智能往往掌握不了跨越几段对话语境的讨论 1.1.1. 抓不住连贯的主题,只能单独处理每个句子 1.1.2. 不能将其答案与现实联系起来 1.1.3. 可能会遵循语言规则、统计相关性,甚至查找有关事实来为每个新句子提供答复 1.2. 聊天机…...
Linux文件信息,drwxr-xr-x. 2 root root 6 Jan 30 17:42 Desktop
drwxr-xr-x. 2 root root 6 Jan 30 17:42 Desktop drwxr-xr-x. drwxr-xr-x.d是文件类型rwx r-x r-x9位,每3位一组,一共3组,代表基本权限第一组 文件的创建者 | 拥有者第二组 和拥有者在一个组中第三组 其他用户rread,读的权限ww…...
深入理解Promise:用法和面试问题解析
引言 在现代的异步JavaScript编程中,Promise是一个强大的工具,用于更优雅地处理异步操作。本文将深入探讨Promise的具体用法,并提供一些在面试中可能遇到的问题及其答案。 Promise的基本用法 Promise是一个代表异步操作最终完成或失败的对…...
css2背景
css2背景 一.背景颜色二.背景图片三.背景平铺四.背景图片位置五.背景图像固定六.复合型写法七.背景颜色半透明八.总结 一.背景颜色 默认是transparent(透明) 二.背景图片 默认是none 三.背景平铺 默认是background-repeat(平铺) 四.背景图片位置…...
KUKA库卡机器人编程语言是什么?
KUKA库卡机器人的编程语言主要是KUKA Robot Language(简称KRL)。KRL是库卡机器人专门为其机器人系统设计的编程语言,用于编写和控制KUKA工业机器人的运动和操作。KRL结合了指令式编程和结构化编程的特点,具有一定的易学性和灵活性…...
Django学习全纪录:Django视图和路由的配置,应用的创建以及注册
导言 在之前的文章中,我们已经将Django的环境部署完成,包括一些注意事项以及前期工作,都已经完成。这篇文章,我们就可以正式开始干活了。 学习目标 1、学习创建应用以及注册APP 2、初步认识视图和路由,以及编写简单的代码 3、启动应用观察变化 创建第一个应用(APP) …...