Spark RDD优化
Spark RDD优化
- 一、分区优化
- 二、持久化优化
- 三、依赖优化
- 四、共享变量优化
- 五、提交模式与运行模式优化
- 六、其他优化
一、分区优化
-
分区数调整:RDD的分区数可以通过
repartition
和coalesce
方法进行调整。合理的分区数可以提高并行度,但过多的分区会增加管理开销。通常,分区数应根据数据规模和集群资源进行调整。val rdd: RDD[String] = rdd.coalesce(numPartitions:Int, shuffle:Boolean) val rdd: RDD[String] = rdd.repartition(numPartitions:Int) // repartition(numPartitions: Int) 等价于 coalesce(numPartitions, true)
-
缩小分区
存在过多的小任务的时候收缩合并分区,减少分区的个数,减少任务调度成本
默认情况下,不会对数据重组,比如:3个合成2个,采用 {1+2},{3},容易导致数据倾斜
若需数据均衡,则将 shuffle 参数设置为 true 即可 -
扩大分区
若需要扩大分区,shuffle 参数必须设置为 true
若将2个分区拆分成3个,必须打乱重新分区,否则数据还是在两个分区(有一个分区为空),{1},{2},{空}
-
-
数据本地性:Spark会尽量将数据分配给与数据源相同的计算节点上,以减少数据移动的开销。在创建RDD时,可以通过设置分区偏好(如
preferredLocations
)或自定义分区来优化数据本地性,以最小化网络传输并最大化计算效率。自定义分区
// 自定义分区器 class MyPartitioner(numPartitions: Int) extends Partitioner {override def numPartitions: Int = numPartitions // 返回分区器的分区数量override def getPartition(key: Any): Int = {// 这里需要实现分区逻辑// 返回值是一个整数,表示该键应该被分配到哪个分区} }
// 使用自定义分区器重新分区 val partitionedRDD = rdd.partitionBy(new MyPartitioner(2)) // 传入分区个数
-
处理数据倾斜:数据倾斜是指某些分区包含的数据远远多于其他分区,导致计算资源分配不均。可以使用
repartition
或coalesce
方法重新分区RDD,或使用reduceByKey
、groupByKey
的变体等特定操作来减轻数据倾斜的影响。
二、持久化优化
-
持久化策略:对于需要多次使用的RDD,应该进行持久化操作,以避免重复计算。持久化策略包括内存持久化(如
MEMORY_ONLY
)、磁盘持久化(如DISK_ONLY
)以及内存和磁盘混合持久化(如MEMORY_AND_DISK
)等。 -
序列化:使用序列化可以进一步减少内存消耗,并提高持久化效率。Spark支持多种序列化框架,如Java序列化、Kryo序列化等。Kryo序列化通常比Java序列化更快,且占用空间更小。
// 临时存储于【xx】重用,job结束后自动删除 val rddCache: RDD[T] = rdd.cache() // 到内存上 val rdd: RDD[T] = rdd.persist(level:StorageLevel) // cache() 等价于persist(StorageLevel.MEMORY_ONLY) // persisit() 参数如下
StorageLevel.MEMORY_ONLY 只写到内存上 StorageLevel.DISK_ONLY 只写到磁盘上 StorageLevel.OFF_HEAP 使用堆外内存 StorageLevel.MEMORY_AND_DISK 先内存,后磁盘 StorageLevel.MEMORY_AND_DISK_SER 先内存,后磁盘,采取序列化方式 StorageLevel.MEMORY_AND_DISK_SER_2 先内存,后磁盘,采取二代序列化方式
-
检查点:对于需要长时间运行或可能遭受故障的应用,设置检查点(Checkpoint)可以将RDD的状态保存到稳定存储中,以便在故障后恢复。检查点会切断RDD的血统关系,从而避免重新计算整个血统链。
// checkpoint 长久存储于【磁盘】重用,job结束后不会删除,涉及IO性能较差,安全且一般和cache组合使用 val conf = new SparkConf().setAppName("spark_rdd").setMaster("local[4]") val sc = SparkContext.getOrCreate(conf) // 设置检查点路径 sc.setCheckpointDir("hdfs://ip:9000/spark/checkpoint") // ... rdd.checkpoint() // 将该 RDD 的内容写入到设置的路径,并在该 RDD 的计算图中插入一个检查点(Checkpoint)节点
三、依赖优化
-
宽依赖与窄依赖:RDD之间的依赖关系分为宽依赖和窄依赖。窄依赖有助于实现数据本地性,而宽依赖则可能导致数据移动和网络开销。在设计RDD转换操作时,应尽量避免不必要的宽依赖。
1、Driver程序提交后
1、Spark调度器将所有的RDD看成是一个Stage
2、然后对此Stage进行逆向回溯,遇到Shuffle就断开,形成一个新的Stage
3、遇到窄依赖,则归并到同一个Stage
4、等到所有的步骤回溯完成,便生成一个DAG图2、为什么要划分阶段
1、基于数据的分区,本着传递计算的性能远高于传递数据,所以数据本地化是提升性能的重要途径之一
2、一组串行的算子,无需 Shuffle,基于数据本地化可以作为一个独立的阶段连续并行执行
3、经过一组串行算子计算,遇到 Shuffle 操作,默认情况下 Shuffle 不会改变分区数量,但会因为 numPartitions:Int, partitioner:Partitioner 等参数重新分配,过程数据会【写盘供子RDD拉取(类MapReduce)】3、RDD依赖关系
-
Lineage:血统、遗传
RDD最重要的特性之一,保存了RDD的依赖关系
RDD实现了基于Lineage的容错机制
-
依赖关系 org.apache.spark.Dependency
窄依赖 NarrowDependency,1V1 OneToOneDependency,1VN RangeDependency
宽依赖 ShuffleDependency -
当RDD分区丢失时
对于窄依赖,Spark只需要重新计算丢失分区的父RDD分区即可。
对于宽依赖,Spark需要重新执行整个shuffle过程,以重新生成丢失的数据。
若配合持久化更佳:cache, persist, checkpoint
类型 窄依赖 map,flatMap,mapPartitions,mapPartitionsWithIndex,glom,filter,distinct,intersection,sample,union,subtract,zip…,cogroup 宽依赖 sortBy,sortByKey,groupByKey,reduceByKey,cogroup,join,partitionBy,repartition 不一定的情况 在Spark中,并非所有操作都可以明确地归类为宽依赖或窄依赖。有些操作可能根据具体的实现或上下文而有所不同。然而,在大多数情况下,上述提到的算子可以清晰地划分为宽依赖或窄依赖。 如:reduceByKey(【partitioner: Partitioner】, func: (V, V) => V) 若使用的是带 partitioner 的重载且 Partitioner 和父RDD的 Partitioner一致 则为窄依赖RDD,否则为宽依赖ShuffledRDD
-
-
优化转换操作:在可能的情况下,使用能够减少shuffle操作的转换函数,如
mapPartitions
代替map
,reduceByKey
代替groupByKey
等。这些操作可以减少数据在网络中的传输量,从而提高性能。shuffle性能较差:因为shuffle必须落盘,内存中等数据会OOM groupByKey只分组(存在Shuffle) + reduce只聚合<=结果同,性能不同=> reduceByKey先分组、预聚合、再聚合(存在Shuffle)
四、共享变量优化
-
广播大变量:当Spark作业中需要使用到较大的外部变量时,可以将这些变量广播到每个节点的Executor上,而不是每个Task都复制一份。这样可以减少网络传输开销和内存消耗。
val bc:Broadcast[T] = sc.broadcast(value:T) // 创建广播变量 rdd.mapPartitions(itPar=>{val v:T = bc.value // 在每个分区内部,通过bc.value获取广播变量的值 ... // 使用v进行计算... })
-
累加器(Accumulators):累加器提供了一种有效的手段来进行分布式计算中的统计和计数操作,减少通信开销,并简化聚合操作。
累加器:accumulate:只能 add 操作,常用于计数
1、定义在Driver端的一个变量,Cluster中每一个Task都会有一份Copy
2、所有的Task都计算完成后,将所有Task中的Copy合并到驱动程序中的变量中
非累加器:在所有Task中的都会是独立Copy,不会有合并val accLong: LongAccumulator = sc.longAccumulator("longAcc") // 定义累加器 val accDouble: DoubleAccumulator = sc.doubleAccumulator("doubleAcc") rdd.mapPartitions(itPar=>{...accLong.add(v:Long) // 将值添加到累加器中accDouble.add(v:Double)... }) accXxx.reset() // 重置累加器 val isZero:Boolean = accXxx.isZero // 检查累加器是否为零值 val num:Long|Double = accXxx.value|sum|count|avg // 获取累加器的值、总和、计数或平均值
// 定义一个累加器,用于统计 "bad" 记录的数量 val errorCount = sc.longAccumulator("Error Count") val data = sc.parallelize(Array("good", "bad", "good", "bad", "good")) data.foreach(record => if (record == "bad") errorCount.add(1)) // 打印累加器的值,即 "bad" 记录的总数println(s"Total errors: ${errorCount.value}")
自定义累加器:
写一个类继承 import org.apache.spark.util.AccumulatorV2[IN, OUT]
abstract class AccumulatorV2[IN, OUT] extends Serializable {// 返回是否为零值累加器def isZero: Boolean// 创建此累加器的新副本,其为零值def copyAndReset(): AccumulatorV2[IN, OUT] = {...}// 创建此累加器的新副本def copy(): AccumulatorV2[IN, OUT]// 重置此累加器为零值def reset(): Unit// 添加:接收输入并累加def add(v: IN): Unit// 合并:合并另一个相同类型的累加器并更新其状态def merge(other: AccumulatorV2[IN, OUT]): Unit// 当前累加器的值def value: OUT }
-
自定义计量器优化(Custom Metrics):自定义计量器允许用户定义和收集特定的性能指标,提供更细粒度的作业监控和调优能力。通过
SparkListener
接口,可以实现自定义的监听器来监控和记录所需的指标。
五、提交模式与运行模式优化
-
提交模式:Spark支持Client模式和Cluster模式两种提交方式。Client模式便于查看日志和结果,但可能消耗较多资源;Cluster模式则更适合大规模作业,但查看日志和结果可能不太方便。应根据实际情况选择合适的提交模式。
spark-submit --class <MainClass> --master <MasterURL> --deploy-mode <DeployMode> <PathToJar>
<MainClass>
:包含main
方法的主类的名称。<MasterURL>
:指定集群的 Master URL。<DeployMode>
:指定提交模式,可以是client
或cluster
。<PathToJar>
:包含 Spark 应用程序的 JAR 文件的路径。spark-submit --class SparkClientModeApp --master yarn --deploy-mode client /path/to/your/jarfile.jar spark-submit --class SparkClientModeApp --master yarn --deploy-mode cluster /path/to/your/jarfile.jar
-
运行模式:Spark支持多种运行模式,如Local模式、Standalone模式、YARN模式等。不同的运行模式适用于不同的场景和需求。例如,Local模式适用于本地开发和测试;Standalone模式适用于构建独立的Spark集群;YARN模式则适用于与Hadoop生态系统集成。
local
: 在单核上运行
local[N]
: 在指定数量的 N 个核上运行,如 “local[4]”
local[*]
: 使用所有可用的核
spark://HOST:PORT
: 连接到指定的 Spark standalone cluster
yarn
: 连接到 YARN 集群
mesos://HOST:PORT
: 连接到 Mesos 集群
六、其他优化
- 序列化框架选择:除了Kryo序列化外,还可以考虑使用其他高效的序列化框架来优化Spark作业的性能。
- 监控与调优:使用Spark提供的监控工具和API(如Spark UI、getStorageLevel方法等)来监控作业的运行状态和性能瓶颈,并根据监控结果进行调优。
相关文章:

Spark RDD优化
Spark RDD优化 一、分区优化二、持久化优化三、依赖优化四、共享变量优化五、提交模式与运行模式优化六、其他优化 一、分区优化 分区数调整:RDD的分区数可以通过repartition和coalesce方法进行调整。合理的分区数可以提高并行度,但过多的分区会增加管…...

idea:解决Maven报错 Properties in parent definition are prohibited
在父pom文件中定义了 <dhversion>1.0-SNAPSHOT</dhversion> 在子模块中引用 <parent><groupId>com.douhuang</groupId><artifactId>douhuang-springcloud</artifactId><version>${dhversion}</version> </parent&…...

代理IP池:解析与应用
代理IP大家都了解不少了,代理IP池又是什么呢?下面简单介绍一下吧! 1. 概述 代理IP池就是由多个代理IP地址组成的集合,用于实现更高效的网络访问和数据获取。这些IP地址通常来自不同的地理位置和网络提供商,经过动态管…...

MQTT是什么,物联网
写文思路: 以下从几个方面介绍MQTT,包括:MQTT是什么,MQTT和webSocket的结合,以及使用场景, 一、MQTT是什么 MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息…...

分布式训练
一、分布式计算 跟多GPU不同是:数据不是从主存拿的,是在分布式文件系统拿的,有多个工作站,工作站中有多个GPU,通过网络读取数据到GPU中,GPU通过网络接收到来自参数服务器的参数进行运算计算梯度,…...

day10:04一文搞懂decode和decoding的区别
在Python 3中,decode()方法和decoding概念同样与字符串的编码和解码紧密相关,但它们的应用场景和上下文有所不同。下面通过案例来解释它们的关系和区别。 1. decode() 方法 decode()方法是字节串(bytes)类型的一个方法ÿ…...

MechMind结构光相机 采图SDK python调用
测试效果 Mech-Mind结构光相机 Mech Mind(梅卡曼德)的结构光相机,特别是Mech-Eye系列,是工业级的高精度3D相机,广泛应用于工业自动化、机器人导航、质量检测等多个领域。以下是对Mech Mind结构光相机的详细解析: 一、产品概述 Mech Mind的结构光相机,如Mech-Eye PRO,…...

“学习Pandas中时间序列的基本操作“
目录 # 开篇 1. 创建和操作时间序列对象 2. 时间序列数据的读取和存储 3. 时间序列数据的索引和切片 4. 时间序列数据的操作和转换 5. 时间序列数据的可视化 6. 处理时间序列中的缺失值 7. 时间序列数据的聚合和分组 8. 时间序列的时间区间和偏移量操作 示例代码&…...

常用知识碎片 分页组件的使用(arco-design组件库)
目录 分页组件使用 API 组件代码示例 使用思路: 前端示例代码 html script 后端示例代码 Controller Impl xml 总结 分页组件使用 使用Arco Design之前需要配置好搭建前端环境可以看我另外一篇文章: 手把手教你 创建Vue项目并引入Arco Desi…...

WPF 制作一个文字漂浮提示框
WPF好像没有自带的文字提示漂浮,我们可以定制一个。 效果如下: xaml xaml如下: <Window x:Class"GroupServer.MsgTip"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x"http://sc…...

Node.js_fs模块
文件删除 文件重命名和移动(本质都是修改路径) 文件夹操作 创建文件夹(mkdir) 读取文件夹(readdir) (打印出来是该文件夹下名称的数组形式) 读取当前的文件夹(readdir) 删除文件夹 (rmdir) 查看资源状态…...

使用 Vue 3 实现打字机效果
在现代前端开发中,添加一些视觉效果可以提升用户体验。其中,打字机效果是一种常见且吸引人的效果,可以用于展示动态文本。本文将介绍如何在 Vue 3 中实现打字机效果。 实现步骤 1. 创建自定义指令 我们首先创建一个自定义指令 v-typewriter…...

unordered_map和set
前言:本篇文章继续分享新的容器unordered_map和set。前边我们分享过map和set,其底层为红黑树,而unordered_map和set的底层则为哈希表,因此在unordered_map和set的实现中,我们可以效仿许多在map和set的中就分享过的一些…...

java:运用字节缓冲输入流将文件中的数据写到集合中
代码主要是将文本文件中的数据写到集合中,运用到的是java字节缓冲输入流的知识点。 public static void main(String[] args) throws IOException {//创建字符缓冲流输入对象BufferedReader bufferedReader new BufferedReader(new FileReader("student.txt&q…...

【机器学习】支持向量机与主成分分析在机器学习中的应用
文章目录 一、支持向量机概述什么是支持向量机?超平面和支持向量大边距直觉 二、数据预处理与可视化数据集的基本信息导入必要的库加载数据集数据概况数据可视化特征对的散点图矩阵类别分布条形图平均面积与平均光滑度的散点图变量之间的相关性热图 三、模型训练&am…...
SpringBoot项目架构实战之“网关zuul搭建“
第三章 网关zuul搭建 前言: 1、主要功能 zuul主要提供动态路由(内置ribbon实现)和过滤(可以做统一鉴权过滤器、灰度发布过滤器、黑白名单IP过滤器、服务限流过滤器(可以配合Sentinel实现))功能…...

发挥储能系统领域优势,海博思创坚定不移推动能源消费革命
随着新发展理念的深入贯彻,我国正全面落实“双碳”目标任务,通过积极转变能源消费方式,大幅提升能源利用效率,实现了以年均约3.3%的能源消费增长支撑了年均超过6%的国民经济增长。这一成就的背后,是我国能源结构的持续…...

matlab R2016b安装cplex12.6,测试时cplex出现出现内部错误的解决方法
问题场景 网上搜索matlabyalmipcplex的安装教程,跟着步骤操作即可,假如都安装好了,在matlab中测试安装是否成功,出现以下问题: 1、matlab中设置路径中添加了yalmip和cplex路径,在命令窗口中输入yalmiptest…...

C#中的Dictionary
Dictionary<TKey, TValue> 是一个泛型集合,它存储键值对(key-value pairs),其中每个键(key)都是唯一的。这个集合类提供了快速的数据插入和检索功能,因为它是基于哈希表实现的。 注意 ke…...

VSCode中多行文本的快速前后缩进
快捷键 VSCode提供了一组快捷键,用于快速调整选中文本行的缩进。 增加缩进(向前缩进):在Windows和Linux上按 Tab 键,在Mac上按 ⇧⇥(Shift Tab)。减少缩进(向后缩进)&…...

C# 8.0 新语法的学习和使用
C# 8.0 是微软在 2019 年 9 月 23 日随 .NET Core 3.0 一同发布的一个重要版本更新,带来了许多新的语言特性和改进。本文将详细介绍 C# 8.0 的新语法,并通过实际应用案例展示这些新特性的使用方法。 目录 1. 可空引用类型 2. 异步流 3. 默认接口方…...

数据结构——约瑟夫环C语言链表实现
约瑟夫环问题由古罗马史学家约瑟夫(Josephus)提出,他参加并记录了公元66—70年犹太人反抗罗马的起义。在城市沦陷之后,他和40名死硬的将士在附近的一个洞穴中避难。起义者表示“宁为玉碎不为瓦全”,约瑟夫则想“留得青…...

【MyBatis】——入门基础知识必会内容
🎼个人主页:【Y小夜】 😎作者简介:一位双非学校的大二学生,编程爱好者, 专注于基础和实战分享,欢迎私信咨询! 🎆入门专栏:🎇【MySQL࿰…...

react父调用子的方法,子调用父的方法
父调用子的方法 // 子组件 import React, { useRef, useEffect } from react;const ChildComponent ({ childMethodRef }) > {const childMethod useRef(null);useEffect(() > {childMethodRef.current childMethod;}, []);const someMethod () > {console.log(子…...

C#知识|账号管理系统:UI层-添加账号窗体设计思路及流程。
哈喽,你好啊,我是雷工! 前边练习过详情页窗体的设计思路及流程: 《C#知识|上位机UI设计-详情窗体设计思路及流程(实例)》 本节练习添加账号窗体的UI设计,以下为学习笔记。 01 效果展示 02 添加窗体 在UI层添加Windows窗体,设置名称为:FrmAddAcount.cs 设置窗体属…...

【机器学习】初学者经典案例(随记)
🎈边走、边悟🎈迟早会好 一、概念 机器学习是一种利用数据来改进模型性能的计算方法,属于人工智能的一个分支。它旨在让计算机系统通过经验自动改进,而不需要明确编程。 类型 监督学习:使用带标签的数据进行训练&…...

进阶版智能家居系统Demo[C#]:整合AI和自动化
引言 在基础智能家居系统的基础上,我们将引入更多高级功能,包括AI驱动的自动化控制、数据分析和预测。这些进阶功能将使智能家居系统更加智能和高效。 目录 高级智能家居功能概述使用C#和AI实现智能家居自动化实现智能照明系统的高级功能 自动调节亮度…...

IC后端设计中的shrink系数设置方法
我正在「拾陆楼」和朋友们讨论有趣的话题,你⼀起来吧? 拾陆楼知识星球入口 在一些成熟的工艺节点通过shrink的方式(光照过程中缩小特征尺寸比例)得到了半节点,比如40nm从45nm shrink得到,28nm从32nm shrink得到,由于半节点的性能更优异,成本又低,漏电等不利因素也可以…...

在NVIDIA Jetson平台离线部署大模型
在NVIDIA Jetson平台离线部署大模型,开启离线具身智能新纪元。 本项目提供一种将LMDeploy移植到NVIDIA Jetson系列边缘计算卡的方法,并在Jetson计算卡上运行InternLM系列大模型,为离线具身智能提供可能。 最新新闻🎉 [2024/3/1…...

51单片机嵌入式开发:8、 STC89C52RC 操作LCD1602原理
STC89C52RC 操作LCD1602原理 1 LCD1602概述1.1 LCD1602介绍1.2 LCD1602引脚说明1.3 LCD1602指令介绍 2 LCD1602外围电路2.1 LCD1602接线方法2.2 LCD1602电路原理 3 LCD1602软件操作3.1 LCD1602显示3.2 LCD1602 protues仿真 4 总结 1 LCD1602概述 1.1 LCD1602介绍 LCD1602是一种…...