Spark RDD sortBy算子执行时进行数据 “采样”是什么意思?
一、sortBy 和 RangePartitioner
sortBy
在 Spark 中会在执行排序时采用 rangePartitioner
进行分区,这会影响数据的分区方式,并且这一步骤是通过对数据进行 “采样” 来计算分区的范围。不过,重要的是,sortBy
本身仍然是一个 transformation,它不会立即触发计算,但在执行过程中会涉及到对数据的排序、分区和最终计算。
1. sortBy
和 RangePartitioner
sortBy
会利用 RangePartitioner
来决定数据如何进行分区。RangePartitioner
会在排序之前,首先对数据进行采样,从而得出每个分区的范围,然后根据这些范围进行数据的分区。这是因为数据排序是一个全局操作,而 RangePartitioner
提供了一个合理的划分策略,使得 Spark 在执行排序时能够并行化。
-
采样过程:
当调用sortBy
时,Spark 会对数据进行 采样,通常使用的是SampledRDD
,这种采样会用来估计数据的分布范围,并为后续的分区计算提供依据。 -
RangePartitioner 的使用:
RangePartitioner
会根据数据的值划分成不同的范围。通常在分布式环境中,我们需要将数据按某种方式划分为多个分区,这个过程会使用一个范围来决定数据分布。
2. 是否会触发 runJob
sortBy
作为 transformation 不会立即触发作业执行。它返回一个新的 RDD,并仅在后续执行 action 操作时才会触发实际的计算。因此,sortBy
不会直接导致 runJob
的执行。只有在你执行类似 collect()
, count()
, saveAsTextFile()
等行动算子时,整个作业才会执行。
但是,sortBy
内部会涉及到 采样 和 范围分区,这些过程是为了确保排序能够在多个分区上并行高效地完成,所有这些操作都在 Spark 内部的 task 中完成。runJob
会在行动算子执行时启动,但在执行过程中,rangePartitioner
的计算、数据的重新分区等步骤会被逐步执行。
3. 源码分析
我们可以通过查看 Spark 源码来更清楚地理解这些步骤。以下是关于 sortBy
和其内部处理的一些关键源码:
RDD.sortBy
源码
def sortBy[K: ClassTag, U: Ordering](f: T => K, ascending: Boolean = true, numPartitions: Int)(implicit ord: Ordering[K]): RDD[T] = {val partitioner = new RangePartitioner(numPartitions, this) // 使用 RangePartitionerval map = this.mapPartitionsWithIndex { (index, iter) =>// 计算分区内的排序val partitioned = iter.toArray.sortBy(f) partitioned.iterator}map
}
在这个方法中,RangePartitioner
被用来决定如何将数据分成多个分区。而在实际执行时,分区是通过 mapPartitionsWithIndex
来执行的。
RangePartitioner
源码
class RangePartitioner[K, V](partitions: Int, rdd: RDD[(K, V)]) extends Partitioner {def getPartition(key: Any): Int = {// 根据 key 的范围来决定在哪个分区val partitionIndex = rangePartition(key)partitionIndex}def rangePartition(key: Any): Int = {// 进行采样,并将数据按范围分到对应的分区}
}
4. 触发计算的条件
sortBy
是一个 transformation 操作,它会生成一个新的 RDD,并不会立即执行排序。- RangePartitioner 会在后台进行数据的分区计算和范围分割,但这一切都不会触发作业执行,直到 action 操作 被调用。
5. 总结
sortBy
会利用RangePartitioner
进行数据的分区和范围划分,这过程中会对数据进行采样以确定每个分区的范围。- 这个过程本身不会触发作业执行,只有当你执行一个 action 操作时(如
collect()
或saveAsTextFile()
),Spark 才会触发计算,并启动实际的作业执行,进行排序和分区。
二、 RangePartitioner 的 采样过程
在 Spark 中,RangePartitioner
的 采样过程 是其核心部分之一,它确保能够为数据分配适当的分区,并保证每个分区的数据范围在排序时能够合理地分布。这里我们将深入探讨 RangePartitioner
是如何通过采样来计算分区范围的。
1. RangePartitioner
概述
RangePartitioner
是 Spark 中的一个分区器,常用于按范围将数据进行分区。它通常用于类似 sortBy
这类需要全局排序的操作,目的是为了在分布式环境中进行高效的并行排序。
RangePartitioner
在执行分区时,会利用 采样 来估算每个分区的范围(即每个分区的边界)。这种采样过程通过从数据中提取一个小样本,帮助计算出数据在不同分区上的分布,从而保证数据能够均匀地分配到各个分区中。
2. RangePartitioner
采样过程
采样是 RangePartitioner
计算每个分区的范围的关键。这个过程涉及到以下步骤:
2.1 数据采样
RangePartitioner
会从数据中 随机采样 一部分元素,用来估算数据的分布和计算每个分区的边界。这个采样过程通常不会采用全部数据,而是通过一定比例的数据来进行推测。这是为了减少计算开销,同时确保分区的均衡性。
采样操作通常是在 分布式环境中并行执行 的,Spark 会在多个分区上并行地获取样本数据。
- 采样的比例:采样比例通常是一个相对较小的数值,目的是减少计算量。Spark 内部会在每个分区中执行采样,以确保最终分区的边界能够反映整个数据集的分布。
2.2 计算分区边界
一旦采样完成,RangePartitioner
就会使用这些采样数据来计算每个分区的边界。这个过程基于采样数据的排序:
- 排序样本数据:首先,对采样数据进行排序,确保数据可以按顺序进行分区。
- 计算分割点:然后,
RangePartitioner
会根据排序后的数据划分出多个边界点。这些边界点代表了每个分区的数据范围。例如,如果数据有 1000 个元素,并且要求将数据划分为 10 个分区,那么就会在排序后的数据中选取 9 个分割点。
2.3 创建分区
RangePartitioner
利用这些边界点来创建新的分区。数据根据其值所在的范围,决定落入哪个分区。具体来说,RangePartitioner
会为每个分区计算出一个边界值,然后将所有数据按这些边界值进行分配。
- 分区计算:对于每个数据元素,
RangePartitioner
会根据元素的值和这些边界值,决定该元素属于哪个分区。
3. 代码实现中的采样部分
在 Spark 的源码中,RangePartitioner
的采样过程是通过以下代码来实现的:
3.1 RangePartitioner
类中的采样
class RangePartitioner[K, V](partitions: Int, rdd: RDD[(K, V)]) extends Partitioner {// 进行数据的采样val sample = rdd.sample(withReplacement = false, fraction = 0.1, seed = 12345)val sortedSample = sample.map(_._1).sortBy(identity)// 计算每个分区的分割点val splits = sortedSample.zipWithIndex.map { case (key, index) =>if (index % (sampleCount / partitions) == 0) key else null}.filter(_ != null)def getPartition(key: Any): Int = {// 根据采样的分割点进行分区var low = 0var high = partitions - 1while (low < high) {val mid = (low + high) / 2if (key < splits(mid)) high = mid - 1 else low = mid + 1}low}
}
在上面的代码中,sample
操作会对 RDD 中的数据进行采样,并将其按值排序。然后,通过分割排序后的数据,计算出每个分区的边界点。这些边界点随后用于 getPartition
方法中来确定数据的分配。
3.2 采样与排序
rdd.sample(withReplacement = false, fraction = 0.1)
:从原始 RDD 中采样 10% 的数据(fraction = 0.1
),并且不进行重复采样。sortBy(identity)
:对采样的数据进行排序,确保采样数据的顺序正确,便于后续计算边界。
4. 触发计算
在执行 sortBy
操作时,Spark 会根据 RangePartitioner
对数据进行采样、排序和分区计算。这些操作会在你执行 action 操作(如 collect()
、saveAsTextFile()
)时触发,具体的分区计算会在计算过程中完成。直到行动算子触发,计算过程才会开始,RangePartitioner
会根据采样数据生成分区,并最终执行数据的排序。
5. 总结
- 采样:
RangePartitioner
会从数据中随机采样一部分元素(通常是 10% 或其他比例),用来估算数据的分布。 - 排序与计算分区边界:采样数据被排序,并根据排序后的数据计算出每个分区的边界。这样可以确保数据均匀分配到不同的分区。
- 数据分区:根据采样和计算出的边界,
RangePartitioner
会将数据分配到相应的分区中。
通过这种采样与分区机制,RangePartitioner
能够高效地支持 Spark 的排序操作,使得数据在分布式环境中能够有效地并行处理。
三、举例介绍RangePartitioner采样过程
理解 RangePartitioner
如何通过采样来获得数据分布、计算边界,并将数据分配到相应分区的过程,确实比较抽象。我会通过一个简单的例子来帮助你更直观地理解这个过程。
问题场景
假设你有一个数据集,包含了以下的 10 个整数:
[10, 23, 1, 9, 15, 37, 2, 16, 40, 3]
你想用 RangePartitioner
来将这些数据分为 3 个分区,并且根据它们的值进行排序。
1. 采样数据
首先,为了计算每个分区的边界,RangePartitioner
会对数据集进行采样。假设我们采样 30% 的数据(即随机选择 3 个数据点)。假设采样到的数据是:
[10, 23, 3]
2. 排序采样数据
然后,对采样的数据进行排序,确保它们按大小排列。对于这个例子,排序后的采样数据是:
[3, 10, 23]
3. 计算分区边界
通过对采样数据进行排序,RangePartitioner
可以计算出分区的边界。在我们的例子中,我们有 3 个分区,因此我们需要为数据计算 2 个边界(因为 n
个分区需要 n-1
个边界)。
根据排序后的采样数据 [3, 10, 23]
,RangePartitioner
可以选择分割点来确定边界:
- 第一个边界:选择采样数据的第一个元素(
3
)。 - 第二个边界:选择采样数据的最后一个元素(
23
)。
现在我们有了两个边界:
- 分区 1:所有小于 10 的数据
- 分区 2:所有大于等于 10 小于 23 的数据
- 分区 3:所有大于等于 23 的数据
4. 分配数据到分区
接下来,RangePartitioner
会根据这些边界将数据分配到相应的分区中。具体的分区规则是:
- 分区 1:所有小于
10
的元素 →[1, 2, 3, 9]
- 分区 2:所有大于等于
10
且小于23
的元素 →[10, 15, 16]
- 分区 3:所有大于等于
23
的元素 →[23, 37, 40]
所以最终的分区结果是:
- 分区 1:
[1, 2, 3, 9]
- 分区 2:
[10, 15, 16]
- 分区 3:
[23, 37, 40]
5. 总结过程
通过这个例子,我们可以看到 RangePartitioner
的整个过程:
- 采样数据:从整个数据集中随机抽取一部分数据(这里是 30%)。
- 排序采样数据:对采样数据进行排序,确保我们能根据数据的范围计算边界。
- 计算分区边界:根据排序后的采样数据,选择边界来划分数据(例如第一个和最后一个元素)。
- 分配数据到分区:根据边界将所有数据分配到相应的分区中。
6. 实际执行的情况
- 采样比例:在实际的 Spark 中,采样比例并不一定是 30%,通常是根据数据的大小和分区数量进行调整的。采样可以确保
RangePartitioner
在计算边界时不会消耗过多资源。 - 多个分区:如果数据集更大,分区数量更多,
RangePartitioner
会选择更多的采样点来划分分区。边界点会根据排序后的采样数据来动态选择。
7. 关键源码中的采样部分
在实际 Spark 的源码中,采样是通过 sample
方法实现的:
val sample = rdd.sample(withReplacement = false, fraction = 0.1, seed = 12345)
val sortedSample = sample.map(_._1).sortBy(identity)
然后通过这些采样的排序数据,计算每个分区的边界。例如,当分区数量是 3
时,RangePartitioner
会选取采样数据的前几个元素作为边界,并用这些边界来确定每个分区的范围。
8. 进一步优化
在实际使用中,Spark 的 RangePartitioner
会通过自适应调整采样的比例和算法来优化性能,确保在处理大型数据集时依然高效。在某些情况下,Spark 会使用更智能的策略来决定采样的方式,以便在并行处理中避免过多的计算开销。
总结
通过采样、排序和计算边界,RangePartitioner
确保了数据可以均匀地分配到不同的分区中,从而为排序等操作提供并行化的支持。这一过程使得 Spark 在处理大规模数据时能够有效地进行全局排序。
相关文章:
Spark RDD sortBy算子执行时进行数据 “采样”是什么意思?
一、sortBy 和 RangePartitioner sortBy 在 Spark 中会在执行排序时采用 rangePartitioner 进行分区,这会影响数据的分区方式,并且这一步骤是通过对数据进行 “采样” 来计算分区的范围。不过,重要的是,sortBy 本身仍然是一个 tr…...
React-useRef与DOM操作
#题引:我认为跟着官方文档学习不会走歪路 ref使用 组件重新渲染时,react组件函数里的代码会重新执行,返回新的JSX,当你希望组件“记住”某些信息,但又不想让这些信息触发新的渲染时,你可以使用ref&#x…...
Mistral AI 发布 Pixtral Large 模型:多模态时代的开源先锋
Mistral AI 最新推出的 Pixtral Large 模型,带来了更强的多模态能力。作为一款开源的多模态模型,它不仅在参数量上达到 1240 亿,更在文本和图像理解上实现了质的飞跃。 模型亮点 1. 多模态能力再升级 Pixtral Large 配备了 123B 参数的解码器…...
Windows、Linux多系统共享蓝牙设备
Windows、Linux多系统共享蓝牙设备 近来遇到一个新问题,就是双系统共享蓝牙鼠标。因为一直喜欢在Windows、Linux双系统之间来回切换,而每次切换系统蓝牙就必须重新配对,当然,通过网络成功解决了问题。 通过这个问题,稍…...
C语言 | Leetcode C语言题解之第564题寻找最近的回文数
题目: 题解: #define MAX_STR_LEN 32 typedef unsigned long long ULL;void reverseStr(char * str) {int n strlen(str);for (int l 0, r n-1; l < r; l, r--) {char c str[l];str[l] str[r];str[r] c;} }ULL * getCandidates(const char * n…...
wsl虚拟机中的dockers容器访问不了物理主机
1 首先保证wsl虚拟机能够访问宿主机IP地址,wsl虚拟机通过vEthernet (WSL)的地址访问,着意味着容器也要通过此IP地址访问物理主机。 2 遇到的问题:wsl虚拟机中安装了docker,用在用到docker容器内的开发环境,但是虚拟机…...
Spark RDD 的宽依赖和窄依赖
通俗地理解 Spark RDD 的 宽依赖 和 窄依赖,可以通过以下比喻和解释: 1. 日常生活比喻 假设你在管理多个团队完成工作任务: 窄依赖:每个团队只需要关注自己的分工,完成自己的任务。例如,一个人将纸张折好&…...
二进制转十进制
解题思路分析 二进制转十进制原理:二进制数转换为十进制数的基本原理是按位权展开相加。对于一个二进制数,从右往左每一位的位权依次是将每一位上的数字(0 或 1)乘以其对应的位权,然后把所有结果相加,就得…...
深度学习:神经网络中的非线性激活的使用
深度学习:神经网络中的非线性激活的使用 在神经网络中,非线性激活函数是至关重要的组件,它们使网络能够捕捉和模拟输入数据中的复杂非线性关系。这些激活函数的主要任务是帮助网络解决那些无法通过简单的线性操作(如权重相乘和偏…...
Python缓存:两个简单的方法
缓存是一种用于提高应用程序性能的技术,它通过临时存储程序获得的结果,以便在以后需要时重用它们。 在本文中,我们将学习Python中的不同缓存技术,包括functools模块中的 lru_cache和 cache装饰器。 简单示例:Python缓…...
原生微信小程序在顶部胶囊左侧水平设置自定义导航兼容各种手机模型
无论是在什么手机机型下,自定义的导航都和右侧的胶囊水平一条线上。如图下 以上图iphone12,13PRo 以上图是没有带黑色扇帘的机型 以下是调试器看的wxml的代码展示 注意:红色阔里的是自定义导航(或者其他的logo啊,返回之…...
经验笔记:远端仓库和本地仓库之间的连接(以Gitee为例)
经验笔记:远端仓库和本地仓库之间的连接 方法一:先创建远端仓库,再克隆到本地 创建远端仓库 登录到你的Git托管平台(如Gitee、GitHub、GitLab、Bitbucket等)。点击“New Repository”或类似按钮,创建一个新…...
利用RAGflow和LM Studio建立食品法规问答系统
前言 食品企业在管理标准、法规,特别是食品原料、特殊食品法规时,难以通过速查法规得到准确的结果。随着AI技术的发展,互联网上出现很多AI知识库的解决方案。 经过一轮测试,找到问题抓手、打通业务底层逻辑、对齐行业颗粒度、沉…...
ffplay音频SDL播放处理
1、从解码数组获取到解码后的数据 static int audio_decode_frame(VideoState *is) {int data_size, resampled_data_size;av_unused double audio_clock0;int wanted_nb_samples;Frame *af;if (is->paused)return -1;//音频数组队列获取数据do { #if defined(_WIN32)while …...
自动化仪表故障排除法
自动化仪表主要是指在企业的实际生产工程当中,开展检测、控制、执行以及显示等一系列仪表的总称。合理地利用自动化仪表能够及时地掌握企业生产的动态,并获取相应的数据,从而推动生产过程的有序运行。 在自动化控制系统中,自动化…...
WPF 中 MultiConverter ——XAML中复杂传参方式
1. XAML代码 <!-- 数据库表格 --> <!-- RowHeaderWidth"0": 把默认的行表头隐藏 --> <DataGridx:Name"xDataGrid"Grid.Row"2"hc:DataGridAttach.ShowRowNumber"True"ItemsSource"{Binding WaferInfos, ModeT…...
实验室管理现代化:Spring Boot技术方案
4系统概要设计 4.1概述 本系统采用B/S结构(Browser/Server,浏览器/服务器结构)和基于Web服务两种模式,是一个适用于Internet环境下的模型结构。只要用户能连上Internet,便可以在任何时间、任何地点使用。系统工作原理图如图4-1所示: 图4-1系统工作原理…...
aws凭证(一)凭证存储
AWS 凭证用于验证身份,并授权对 DynamoDB 等等 AWS 服务的访问。配置了aws凭证后,才可以通过编程方式或从AWS CLI连接访问AWS资源。凭证存储在哪里呢?有以下几个方法: 一、使用文件存储 1、介绍 文件存储适用于长期和多账户配置。AWS SDK 也会自动读取配置文件中的凭证。…...
jmeter常用配置元件介绍总结之断言
系列文章目录 1.windows、linux安装jmeter及设置中文显示 2.jmeter常用配置元件介绍总结之安装插件 3.jmeter常用配置元件介绍总结之线程组 4.jmeter常用配置元件介绍总结之函数助手 5.jmeter常用配置元件介绍总结之取样器 6.jmeter常用配置元件介绍总结之jsr223执行pytho…...
JMeter监听器与压测监控之Grafana
Grafana 是一个开源的度量分析和可视化套件,通常用于监控和观察系统和应用的性能。本文将指导你如何在 Kali Linux 上使用 Docker 来部署 Grafana 性能监控平台。 前提条件 Kali Linux:确保你已经安装了 Kali Linux。Docker:确保你的系统已…...
MySQL8 安装教程
一、从官网下载mysql-8.0.18-winx64.zip安装文件( 从 https://dev.mysql.com/downloads/file/?id484900 下载zip版本安装包 mysql-8.0.18-winx64.zip 解压到本地磁盘中,例如解压到:D盘根目录,并改名为MySQL mysql-8.0.34-winx6…...
聚焦 NLP 和生成式 AI 的创新与未来 基础前置知识点
给学生们讲解的技术内容可以根据他们的背景、兴趣和教学目标来规划。以下是一些适合不同阶段和领域的技术主题建议,尤其是与大语言模型(如 ChatGPT)相关的内容: 1. 自然语言处理(NLP)基础 适合对 NLP 了解…...
23种设计模式-访问者(Visitor)设计模式
文章目录 一.什么是访问者模式?二.访问者模式的结构三.访问者模式的应用场景四.访问者模式的优缺点五.访问者模式的C实现六.访问者模式的JAVA实现七.代码解释八.总结 类图: 访问者设计模式类图 一.什么是访问者模式? 访问者模式(…...
ssm150旅游网站的设计与实现+jsp(论文+源码)_kaic
毕 业 设 计(论 文) 题目:旅游网站设计与实现 摘 要 现代经济快节奏发展以及不断完善升级的信息化技术,让传统数据信息的管理升级为软件存储,归纳,集中处理数据信息的管理方式。本旅游网站就是在这样的大…...
【SKFramework框架】一、框架介绍
推荐阅读 CSDN主页GitHub开源地址Unity3D插件分享QQ群:398291828小红书小破站 大家好,我是佛系工程师☆恬静的小魔龙☆,不定时更新Unity开发技巧,觉得有用记得一键三连哦。 一、前言 【Unity3D框架】SKFramework框架完全教程《全…...
Arcgis地图实战三:自定义导航功能的实现
文章目录 1.最终效果预览2.计算两点之间的距离3.将点线画到地图上4.动态展示点线的变化5.动态画线6.动态画点 1.最终效果预览 2.计算两点之间的距离 let dis this.utilsTools.returnDisByCoorTrans(qdXYData, zdXYData, "4549")当距离小于我们在配置文件中预设置的…...
LLaMA-Factory 上手即用教程
LLaMA-Factory 是一个高效的大型语言模型微调工具,支持多种模型和训练方法,包括预训练、监督微调、强化学习等,同时提供量化技术和实验监控,旨在提高训练速度和模型性能。 官方开源地址:https://github.com/hiyouga/L…...
黑马点评 秒杀下单出现的问题:服务器异常---java.lang.NullPointerException: null(已解决)
前言: 在此之前找了好多资料,查了很多,都没有找到对应解决的方法,虽然知道是userid为空,但不知道要修改哪里,还是自己的debug能力不足,以后得多加练习。。。 问题如下: 点击限时抢…...
购物街项目TabBar的封装
1.TabBar介绍 在购物街项目中 不论页面如何滚动 始终存在一个TabBar固定在该项目的底部 他在该项目中 扮演者选项卡栏的角色 内部存在若干选项 而选项中 固定存在两部分(图片文本) 其中主要涉及到TabBar/TabBarItem这些和业务无关的共享组件(建议存放于components/common中)、…...
C++游戏开发面试题及参考答案
目录 在游戏开发中,为什么选择 C++ 作为编程语言? 为什么 C++ 语言更适合游戏开发? 描述游戏中的碰撞检测的基本原理。 解释游戏中的碰撞检测机制,并用 C++ 举例说明如何实现。 描述游戏中的物理模拟的基本原理。 阐述游戏中的物理模拟,如重力模拟在 C++ 中的实现方…...
西安网站建设设计的好公司哪家好/百度文库个人登录
继上篇博客说到头标签,我们这一次主要说HTML体部分中的标签。 一<div>标签 div标签定义文档中的一个分区,<div>元素是块级元素,它是可用于组合其他 HTML 元素的容器。<div> 元素没 有特定的含义。除此之外,由于…...
有个找人做任务赚返佣的网站/脑白金网络营销
PNG和JPG格式是众所周知的包含单层视觉信息的栅格图像文件格式。而Photoshop文档(PSD)文件包含几层来显示图片。您可以在.NET应用程序中使用C#以编程方式轻松地将PNG或JPG图像转换为PSD格式。本文涵盖以下与PNG和JPG图像转换有关的部分&#…...
自己做网赌网站/seo搜索引擎优化兴盛优选
这里的配置为vs2010和opencv2.4.8的配置 1、安装opencv 2.4.8 双击图标 ,弹出的对话框,建议填写放到D:\Program Files\下。 解压后 2、配置环境变量 计算机->(右键)属性->高级系统设置->高级(标签ÿ…...
黄岛开发区做网站的公司/百度收录快的发帖网站
这里写目录标题学习其他模型链接一、引言二、RNN本质三、RNN模型四、RNN的应用五、RNN训练的算法(一)随时间反向传播(backpropagation through time,BPTT)(二)实时循环学习(real-tim…...
网站建设管理工作总结报告/软文文案案例
1.Android 5.0 删除ActionBar下面的阴影 于Android 5.0假设你发现的ActionBar下面出现了阴影,例如,下面的设置,以消除阴影: getActionBar().setElevation(0); Android 5.0之前能够用以下代码消除阴影: <item name&q…...
网站建设商谈/网络服务器的作用
方法一:循环元素删除 (使用的方式FOR循环操作。不建议使用大数据量的转换。。n*n的循环量) // 删除ArrayList中重复元素 public static void removeDuplicate(List list) { for ( int i 0 ; i < list.size() - 1 ; i …...