当前位置: 首页 > news >正文

【Spark官方文档部分翻译】RDD编程指南(RDD Programming Guide)

写在前面

内容如何选择

本翻译只翻译本人认为精华的部分,本人认为的Spark的一些核心理念,编程思想。一些特别基础的操作包括但不限于搭建环境就不在此赘述了。

配套版本

本系列基于Spark 3.3.1,Scala 2.12.10,进行翻译总结

原文链接

https://spark.apache.org/docs/3.3.1/rdd-programming-guide.html

思维导图

整篇文章思维导图
在这里插入图片描述

翻译正文

RDD编程指南

概览

总的来说,每个 Spark 应用程序由一个驱动程序(driver program)组成,该程序运行用户的主函数并在集群上执行各种并行操作。
Spark 提供的主要抽象(基本核心概念)是弹性分布式数据集(RDD),这是一组元素,分布在集群的节点上,可以并行操作。RDD 可以通过从 Hadoop 文件系统(或任何其他支持 Hadoop 的文件系统)中的文件开始,或在驱动程序中使用现有的 Scala 集合,并对其进行转换来创建。用户还可以要求 Spark 在内存中持久化 RDD,从而使其能够在并行操作中高效地重用。最后,RDD 会自动从节点故障中恢复。
Spark 的第二个抽象(基本核心概念)是共享变量,可以在并行操作中使用。默认情况下,当 Spark 在不同节点上以一组任务并行运行一个函数时,它会将函数中使用的每个变量的副本发送到每个任务。有时,变量需要在任务之间或在任务与驱动程序之间共享。Spark 支持两种类型的共享变量:广播变量(broadcast variables),可以在所有节点的内存中缓存一个

弹性分布式数据集(RDD)

Spark 的核心概念是弹性分布式数据集(RDD),它是一种容错的元素集合,可以并行操作。创建 RDD 有两种方式:一种是在驱动程序中将现有集合并行化,另一种是引用外部存储系统中的数据集,例如共享文件系统、HDFS、HBase 或任何提供 Hadoop InputFormat 的数据源。

并行化集合

并行化集合是通过在驱动程序中对现有集合(一个 Scala Seq)调用 SparkContext 的 parallelize 方法来创建的。集合的元素会被复制,以形成一个可以并行操作的分布式数据集。例如,以下是如何创建一个包含数字 1 到 5 的并行化集合的示例:

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

一旦创建,分布式数据集(distData)就可以并行操作。例如,我们可以调用 distData.reduce((a, b) => a + b) 来对数组的元素进行求和。我们稍后会描述对分布式数据集的操作。

并行集合的一个重要参数是将数据集切分为的分区数量。Spark 会为集群的每个分区运行一个任务。通常,每个 CPU 需要 2 到 4 个分区。通常,Spark 会根据你的集群自动设置分区数量。不过,你也可以通过将其作为第二个参数传递给 parallelize 方法手动设置分区数量(例如,sc.parallelize(data, 10))。注意:代码中的某些地方使用“切片”(slices)这个术语(与分区同义)以保持向后兼容性。

外部数据集

Spark 可以从任何支持 Hadoop 的存储源创建分布式数据集,包括本地文件系统、HDFS、Cassandra、HBase、Amazon S3 等。Spark 支持文本文件、SequenceFiles 以及任何其他 Hadoop InputFormat。

文本文件 RDD 可以使用 SparkContext 的 textFile 方法创建。此方法接受文件的 URI(可以是机器上的本地路径,也可以是 hdfs://、s3a:// 等 URI),并将其作为行的集合读取。以下是一个示例调用:

scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26

一旦创建,distFile 可以通过数据集操作进行处理。例如,我们可以使用 map 和 reduce 操作来计算所有行的长度总和,如下所示:

distFile.map(s => s.length).reduce((a, b) => a + b)

关于使用 Spark 读取文件的一些注意事项:

  • 如果使用本地文件系统上的路径,则该文件在工作节点上也必须可以通过相同路径访问。要么将文件复制到所有工作节点,要么使用网络挂载的共享文件系统。

  • Spark 的所有基于文件的输入方法,包括 textFile,都支持在目录、压缩文件和通配符上运行。例如,您可以使用 textFile(“/my/directory”)、textFile(“/my/directory/.txt") 和 textFile("/my/directory/.gz”)。当读取多个文件时,分区的顺序取决于文件系统返回文件的顺序。它可能会或可能不会遵循文件路径的字典顺序。在一个分区内,元素的顺序根据它们在底层文件中的顺序进行排列。

  • textFile 方法还接受一个可选的第二个参数,用于控制文件的分区数量。默认情况下,Spark 为文件的每个块创建一个分区(在 HDFS 中,默认块大小为 128MB),但您也可以通过传递更大的值请求更多的分区。请注意,分区数量不能少于块的数量。
    除了文本文件,Spark 的 Scala API 还支持几种其他数据格式:

  • SparkContext.wholeTextFiles 允许您读取包含多个小文本文件的目录,并将每个文件作为 (filename, content) 对返回。这与 textFile 不同,后者会在每个文件中返回每行一个记录。分区由数据局部性决定,在某些情况下,这可能导致分区过少。在这些情况下,wholeTextFiles 提供了一个可选的第二个参数,用于控制最小分区数。

  • 对于 SequenceFiles,使用 SparkContext 的 sequenceFile[K, V] 方法,其中 K 和 V 是文件中键和值的类型。这些应该是 Hadoop 的 Writable 接口的子类,如 IntWritable 和 Text。此外,Spark 允许您为一些常见的 Writables 指定原生类型;例如,sequenceFile[Int, String] 将自动读取 IntWritables 和 Texts。

  • 对于其他 Hadoop InputFormats,您可以使用 SparkContext.hadoopRDD 方法,该方法接受任意 JobConf 和输入格式类、键类和值类。以与 Hadoop 作业相同的方式设置这些内容,以便处理您的输入源。您还可以使用 SparkContext.newAPIHadoopRDD 处理基于“新”MapReduce API(org.apache.hadoop.mapreduce)的 InputFormats。

  • RDD.saveAsObjectFile 和 SparkContext.objectFile 支持以包含序列化 Java 对象的简单格式保存 RDD。虽然这不如 Avro 等专门格式高效,但它提供了一种轻松保存任何 RDD 的方法。

RDD 操作

RDD 支持两种类型的操作:

  • 转换(transformations),它们从现有数据集中创建一个新数据集;
  • 动作(actions),它们在数据集上运行计算后将值返回给驱动程序。

例如,map 是一种转换,它将每个数据集元素传递给一个函数,并返回一个新的 RDD,表示结果。另一方面,reduce 是一种动作,它使用某个函数聚合 RDD 中的所有元素,并将最终结果返回给驱动程序(尽管还有一个并行的 reduceByKey,它返回一个分布式数据集)。

Spark 中的所有转换都是惰性执行的,这意味着它们不会立即计算结果。相反,它们只是记住对某个基础数据集(例如文件)应用的转换。当一个动作需要将结果返回给驱动程序时,转换才会被计算。这种设计使 Spark 能够更高效地运行。例如,我们可以意识到通过 map 创建的数据集将用于 reduce,并仅将 reduce 的结果返回给驱动程序,而不是返回更大的映射数据集。

默认情况下,每次对 RDD 执行动作时,转换后的 RDD 可能会被重新计算。然而,您也可以使用 persist(或 cache)方法将 RDD 持久化到内存中,这样 Spark 将在集群中保留元素,以便在下次查询时更快地访问。Spark 还支持将 RDD 持久化到磁盘,或在多个节点之间进行复制。

基本操作

为了说明 RDD 的基础知识,考虑以下简单程序:

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

第一行从外部文件定义了一个基础 RDD。此数据集并未加载到内存中或以其他方式进行操作:lines 仅仅是指向文件的指针。第二行将 lineLengths 定义为 map 转换的结果。同样,由于惰性执行,lineLengths 也不会立即计算。最后,我们运行 reduce,这是一种动作。在这一点上,Spark 将计算拆分为任务(task),以便在不同的机器上运行,每台机器运行其部分的 map 和本地的 reduce,仅将结果返回给驱动程序。

如果我们还希望稍后再次使用 lineLengths,可以在 reduce 之前添加:

lineLengths.persist()

这将导致 lineLengths 在第一次计算后保存在内存中。这样,在后续的操作中,如果再使用 lineLengths,就不需要重新计算,而是可以直接从内存中获取结果,从而提高性能。

将函数传递给 Spark

Spark 的 API 在驱动程序中大量依赖于传递函数,以便在集群上运行。推荐有两种方法来实现这一点:

  1. 匿名函数语法:适用于简短的代码段。
  2. 全局单例对象中的静态方法:例如,您可以定义一个对象 MyFunctions,然后像下面这样传递 MyFunctions.func1:
object MyFunctions {def func1(s: String): String = { ... }
}myRdd.map(MyFunctions.func1)

需要注意的是,尽管也可以传递类实例中方法的引用(与单例对象相对),但这需要将包含该类的方法的对象一起发送。例如,考虑以下代码:

class MyClass {def func1(s: String): String = { ... }def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

在这里,如果我们创建一个新的 MyClass 实例并调用 doStuff,那么内部的 map 将引用该 MyClass 实例的 func1 方法,因此整个对象需要发送到集群。这类似于编写 rdd.map(x => this.func1(x))。

以类似的方式,访问外部对象的字段也会引用整个对象:

class MyClass {val field = "Hello"def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

这相当于编写 rdd.map(x => this.field + x),这会引用整个对象 this。为了避免这个问题,最简单的方法是将 field 复制到一个局部变量中,而不是外部访问它:

def doStuff(rdd: RDD[String]): RDD[String] = {val field_ = this.fieldrdd.map(x => field_ + x)
}

这样,field_ 是一个局部变量,它只会被传递到集群,而不会发送整个对象,从而提高了效率并避免了不必要的开销。

理解闭包

在 Spark 中,理解在集群上执行代码时变量和方法的作用域和生命周期是较为困难的事情之一。RDD 操作如果修改其作用域外的变量,往往会引发混淆。以下示例展示了使用 foreach() 来递增计数器的代码,但类似的问题也可能出现在其他操作中。

例子

考虑下面的朴素RDD元素求和,它的行为可能会有所不同,具体取决于执行是否在同一个JVM中。一个常见的例子是在本地模式下运行Spark(–master = local[n])与将Spark应用程序部署到集群(例如,通过spark-submit到YARN)时:

var counter = 0
var rdd = sc.parallelize(data)// Wrong: Don't do this!!
rdd.foreach(x => counter += x)println("Counter value: " + counter)
本地模式 vs 集群模式

上述代码的行为是未定义的,并且可能无法按预期工作。为了执行作业,Spark将RDD操作的处理分解为任务,每个任务由一个执行器执行。在执行之前,Spark计算任务的闭包。闭包是那些必须对执行器可见的变量和方法,以便执行器能够对RDD(在这种情况下是foreach())执行计算。该闭包会被序列化并发送到每个执行器。

发送到每个执行器的闭包中的变量现在是副本,因此,当在foreach函数中引用counter时,它不再是驱动节点上的counter。驱动节点的内存中仍然有一个counter,但这对执行器来说不再可见!执行器只能看到来自序列化闭包的副本。因此,counter的最终值仍然是零,因为对counter的所有操作都引用了序列化闭包中的值。

在本地模式下,在某些情况下,foreach函数实际上会在与驱动相同的JVM中执行,并且会引用同一个原始counter,并可能会更新它。

为了确保在这些场景中行为明确,应使用累加器。Spark中的累加器专门用于提供一种机制,以便在执行分散到集群中的工作节点时安全地更新变量。本指南的累加器部分对此进行了更详细的讨论。

一般来说,闭包——像循环或局部定义的方法这样的构造,不应被用于改变某种全局状态。Spark不定义或保证从闭包外部引用的对象的突变行为。一些这样做的代码可能在本地模式下有效,但这只是偶然的,而这样的代码在分布式模式下不会按预期工作。如果需要某种全局聚合,请使用累加器。

打印RDD的元素

另一个常见的用法是尝试使用rdd.foreach(println)或rdd.map(println)打印RDD的元素。在单台机器上,这将生成预期的输出,并打印出所有RDD的元素。然而,在集群模式下,由执行器调用的stdout输出现在写入执行器的stdout,而不是驱动程序上的stdout,因此驱动程序上的stdout将不会显示这些内容!要在驱动程序上打印所有元素,可以使用collect()方法将RDD带到驱动节点,如下所示:rdd.collect().foreach(println)。不过,这可能导致驱动程序内存不足,因为collect()会将整个RDD提取到单台机器上;如果只需要打印RDD中的几个元素,更安全的方法是使用take():rdd.take(100).foreach(println)。

处理键值对

虽然大多数Spark操作适用于包含任何类型对象的RDD,但有一些特殊操作仅适用于键值对的RDD。最常见的操作是分布式“洗牌”操作,例如按键对元素进行分组或聚合。

在Scala中,这些操作会自动应用于包含Tuple2对象的RDD(语言中的内置元组,只需写作(a, b)即可创建)。键值对操作在PairRDDFunctions类中可用,该类会自动包装元组的RDD。

例如,以下代码使用reduceByKey操作对键值对进行处理,以计算文件中每行文本出现的次数:

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

我们还可以使用counts.sortByKey(),例如,按字母顺序对对进行排序,最后使用counts.collect()将它们作为对象数组带回驱动程序。

注意:当使用自定义对象作为键进行键值对操作时,必须确保自定义的equals()方法与匹配的hashCode()方法一起使用。有关详细信息,请参见Object.hashCode()文档中概述的契约。

转换算子

以下表格列出了一些Spark支持的常见转换操作。有关详细信息,请参考RDD API文档(Scala、Java、Python、R)和配对RDD函数文档(Scala、Java)。

方法说明
map(func)返回一个新的分布式数据集,通过将源中的每个元素传递给函数func生成。
filter(func)返回一个新的数据集,通过选择源中func返回true的那些元素生成。
flatMap(func)类似于map,但每个输入项可以映射到0个或多个输出项(因此func应返回一个Seq而不是单个项)。
mapPartitions(func)类似于map,但在RDD的每个分区(块)上单独运行,因此func在运行在类型为T的RDD时必须为Iterator => Iterator
mapPartitionsWithIndex(func)类似于mapPartitions,但还提供一个整数值表示分区的索引,因此func在运行在类型为T的RDD时必须为(Int, Iterator) => Iterator
sample(withReplacement, fraction, seed)以给定的随机数生成器seed对数据进行抽样,抽取数据的比例为fraction,可以选择是否有放回。
union(otherDataset)返回一个新的数据集,包含源数据集和参数中的元素的并集。
intersection(otherDataset)返回一个新的RDD,包含源数据集和参数中元素的交集。
distinct([numPartitions])返回一个新的数据集,包含源数据集的唯一元素。
groupByKey([numPartitions])当在(K, V)对的数据集上调用时,返回(K, Iterable)对的数据集。
注意:如果您是为了对每个键执行聚合(如求和或平均),使用reduceByKey或aggregateByKey将获得更好的性能。
注意:默认情况下,输出的并行度取决于父RDD的分区数量。您可以传递可选的numPartitions参数以设置不同数量的任务。
reduceByKey(func, [numPartitions])当在(K, V)对的数据集上调用时,返回(K, V)对的数据集,其中每个键的值使用给定的reduce函数func进行聚合,func必须为(V,V) => V。与groupByKey一样,reduce任务的数量可以通过可选的第二个参数进行配置。
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])当在(K, V)对的数据集上调用时,返回(K, U)对的数据集,其中每个键的值使用给定的组合函数和中性“零”值进行聚合。允许聚合值的类型与输入值的类型不同,同时避免不必要的分配。与groupByKey一样,reduce任务的数量可以通过可选的第二个参数进行配置。
sortByKey([ascending], [numPartitions])当在(K, V)对的数据集上调用时,其中K实现了Ordered,返回按键升序或降序排序的(K, V)对的数据集,如布尔值ascending参数所指定。
join(otherDataset, [numPartitions])当在类型为(K, V)和(K, W)的数据集上调用时,返回(K, (V, W))对的数据集,其中每个键的所有元素对。通过leftOuterJoin、rightOuterJoin和fullOuterJoin支持外连接。
cogroup(otherDataset, [numPartitions])当在类型为(K, V)和(K, W)的数据集上调用时,返回(K, (Iterable, Iterable))元组的数据集。此操作也称为groupWith。
cartesian(otherDataset)当在类型为T和U的数据集上调用时,返回(T, U)对的数据集(所有元素对)。
pipe(command, [envVars])通过shell命令(例如Perl或bash脚本)对RDD的每个分区进行处理。RDD元素被写入进程的stdin,输出到其stdout的行作为字符串的RDD返回。
coalesce(numPartitions)将RDD中的分区数量减少到numPartitions。在对大数据集进行过滤后,更有效地运行操作。
repartition(numPartitions)随机重新洗牌RDD中的数据,以创建更多或更少的分区并在其间进行平衡。这始终会通过网络洗牌所有数据。
repartitionAndSortWithinPartitions(partitioner)根据给定的分区器对RDD进行重新分区,并在每个结果分区内按键对记录进行排序。这比先重新分区再在每个分区内排序更有效,因为它可以将排序操作下推到洗牌机制中。
行动算子

以下表格列出了一些Spark支持的常见操作。有关详细信息,请参考RDD API文档(Scala、Java、Python、R)和配对RDD函数文档(Scala、Java)。

方法说明
reduce(func)使用函数func(接受两个参数并返回一个结果)对数据集的元素进行聚合。该函数应具有交换性和结合性,以便能够正确地并行计算。
collect()将数据集的所有元素作为数组返回到驱动程序。这通常在过滤或其他返回足够小的数据子集的操作后很有用。
count()返回数据集中的元素数量。
first()返回数据集的第一个元素(类似于take(1))。
take(n)返回数据集的前n个元素的数组。
takeSample(withReplacement, num, [seed])返回数据集的随机样本,数量为num,可以选择是否有放回,选项上还可以预先指定随机数生成器的种子。
takeOrdered(n, [ordering])使用自然顺序或自定义比较器返回RDD的前n个元素。
saveAsTextFile(path)将数据集的元素写入本地文件系统、HDFS或任何其他Hadoop支持的文件系统中指定目录的文本文件(或文本文件集)。Spark会对每个元素调用toString,将其转换为文件中的一行文本。
saveAsSequenceFile(path)(Java和Scala)将数据集的元素写入本地文件系统、HDFS或任何其他Hadoop支持的文件系统中指定路径的Hadoop SequenceFile。仅适用于实现Hadoop的Writable接口的键值对RDD。在Scala中,它也适用于隐式可转换为Writable的类型(Spark包括对基本类型如Int、Double、String等的转换)。
saveAsObjectFile(path)(Java和Scala)使用Java序列化将数据集的元素写入简单格式,随后可以使用SparkContext.objectFile()加载。
countByKey()仅适用于类型为(K, V)的RDD。返回一个包含每个键计数的(K, Int)对的哈希映射。
foreach(func)对数据集的每个元素运行函数func。通常用于副作用,例如更新累加器或与外部存储系统交互。
注意:在foreach()之外修改累加器以外的变量可能会导致未定义行为。有关更多详细信息,请参见理解闭包。

Spark RDD API还提供了一些操作的异步版本,比如foreachAsync,它会立即返回一个FutureAction给调用者,而不是在操作完成时阻塞。这可以用于管理或等待操作的异步执行。

Shuffle 操作

在Spark中,某些操作会触发一个称为shuffle的事件。shuffle是Spark重新分配数据的机制,以便在分区中以不同的方式对数据进行分组。这通常涉及在executor和机器之间复制数据,因此洗牌是一项复杂且代价高昂的操作。

背景

为了理解洗牌期间发生的事情,我们可以考虑reduceByKey操作的例子。reduceByKey操作生成一个新的RDD,其中单个键的所有值会组合成一个元组——键和对与该键相关的所有值执行reduce函数的结果。挑战在于,单个键的所有值不一定都位于同一个分区,甚至同一台机器上,但它们必须在一起才能计算结果。

在Spark中,数据通常并不是分布在分区中以满足特定操作的要求。在计算过程中,单个任务将操作于单个分区——因此,为了组织单个reduceByKey减小任务要执行的所有数据,Spark需要执行一个全到全的操作。它必须从所有分区读取数据,以查找所有键的所有值,然后跨分区汇集这些值,以计算每个键的最终结果——这就是shuffle。

尽管新shuffle数据的每个分区中的元素集合是确定性的,分区本身的顺序也是如此,但这些元素的顺序却不是。如果希望在洗牌后获得可预测的有序数据,可以使用:

  • mapPartitions来对每个分区进行排序,例如使用.sorted
  • repartitionAndSortWithinPartitions来有效地对分区进行排序,同时重新分区
  • sortBy来生成一个全局有序的RDD
    可能导致shuffle的操作包括重新分区操作,如repartition和coalesce,基于键的操作(计数操作除外)如groupByKey和reduceByKey,以及连接操作如cogroup和join。
性能影响

shuffle是一项昂贵的操作,因为它涉及磁盘I/O、数据序列化和网络I/O。为了组织数据进行洗牌,Spark生成了一组任务——映射任务用于组织数据,以及一组减少任务用于聚合数据。这种命名源自MapReduce,并不直接与Spark的map和reduce操作相关。

在内部,来自单个映射任务的结果会保留在内存中,直到无法容纳为止。然后,这些结果根据目标分区进行排序,并写入单个文件。在减少阶段,任务读取相关的排序块。

某些shuffle操作可能会消耗大量的堆内存,因为它们使用内存中的数据结构来组织记录,在传输之前或之后。具体而言,reduceByKey和aggregateByKey在映射端创建这些结构,而基于键的操作则在减少端生成这些结构。当数据无法适应内存时,Spark会将这些表溢出到磁盘,从而造成额外的磁盘I/O开销和增加的垃圾回收。

shuffle还会在磁盘上生成大量中间文件。从Spark 1.3开始,这些文件会被保留,直到相应的RDD不再使用并被垃圾回收。这是为了避免在重新计算血缘时需要重新创建洗牌文件。垃圾回收可能会在很长一段时间后才发生,如果应用程序保留对这些RDD的引用,或者如果GC不频繁触发。这意味着长期运行的Spark作业可能会消耗大量的磁盘空间。临时存储目录由配置Spark上下文时的spark.local.dir配置参数指定。

shuffle行为可以通过调整各种配置参数进行调优。有关详细信息,请参阅Spark配置指南中的“Shuffle Behavior”部分。

RDD持久化

Spark中最重要的功能之一是在操作之间将数据集持久化(或缓存)到内存中。当您持久化一个RDD时,每个节点会将计算出的它的分区存储在内存中,并在对该数据集(或从中派生的数据集)的其他操作中重用它们。这使得未来的操作可以更快(通常超过10倍)。缓存是迭代算法和快速交互使用的关键工具。

您可以使用persist()或cache()方法标记一个RDD进行持久化。在操作中第一次计算时,它将被保留在节点的内存中。Spark的缓存是容错的 - 如果RDD的任何分区丢失,它将自动使用最初创建它的转换重新计算。

此外,每个持久化的RDD可以使用不同的存储级别进行存储,允许您例如将数据集持久化到磁盘,以序列化的Java对象的形式持久化到内存中(以节省空间),跨节点复制。这些级别通过将StorageLevel对象(Scala、Java、Python)传递给persist()来设置。cache()方法是一个使用默认存储级别的简写,即StorageLevel.MEMORY_ONLY(在内存中存储反序列化的对象)。完整的存储级别如下:

存储级别含义
MEMORY_ONLY将RDD作为反序列化的Java对象存储在JVM中。如果RDD无法完全放入内存,一些分区将不会被缓存,每次需要时都会即时重新计算。这是默认级别。
MEMORY_AND_DISK将RDD作为反序列化的Java对象存储在JVM中。如果RDD无法完全放入内存,将不适合的分区存储在磁盘上,并在需要时从那里读取。
MEMORY_ONLY_SER(Java和Scala)将RDD作为序列化的Java对象(每个分区一个字节数组)存储。这通常比反序列化对象更节省空间,尤其是当使用快速序列化器时,但读取时更占用CPU。
MEMORY_AND_DISK_SER(Java和Scala)与MEMORY_ONLY_SER类似,但是将不适合放入内存的分区溢写到磁盘上,而不是每次需要时即时重新计算。
DISK_ONLY仅在磁盘上存储RDD分区。
MEMORY_ONLY_2, MEMORY_AND_DISK_2等与上述级别相同,但是在两个集群节点上复制每个分区。
OFF_HEAP (实验性)类似于MEMORY_ONLY_SER,但是将数据存储在非堆内存中。这需要启用非堆内存。

注意:在Python中,存储的对象总是使用Pickle库进行序列化,因此选择序列化级别并不重要。Python中可用的存储级别包括MEMORY_ONLY、MEMORY_ONLY_2、MEMORY_AND_DISK、MEMORY_AND_DISK_2、DISK_ONLY、DISK_ONLY_2和DISK_ONLY_3。

即使用户没有调用persist,Spark也会自动持久化shuffle操作(例如reduceByKey)中的一些中间数据。这是为了避免在洗牌期间某个节点失败时重新计算整个输入。如果用户计划重用结果RDD,我们仍然建议调用persist。

选择哪个存储级别

Spark的存储级别旨在在内存使用和CPU效率之间提供不同的权衡。选择存储级别的建议过程如下:

  • 如果您的RDD在默认存储级别(MEMORY_ONLY)下能够舒适地适应,就保持这样。这是最节省CPU的选项,允许对RDD的操作尽可能快地运行。

  • 如果不适合,尝试使用MEMORY_ONLY_SER,并选择一个快速的序列化库,使对象更加节省空间,但访问速度仍然合理。(适用于Java和Scala)

  • 除非计算数据集的函数很昂贵,或者它们过滤了大量的数据,否则不要将数据溢写到磁盘上。否则,重新计算一个分区可能和从磁盘读取一样快。

  • 如果您想要快速的容错恢复(例如,使用Spark来服务Web应用程序中的请求),请使用复制的存储级别。所有存储级别都通过重新计算丢失的数据提供完整的容错能力,但复制的级别允许您在不等待重新计算丢失的分区的情况下继续运行RDD上的任务。

删除数据

Spark会自动监控每个节点上的缓存使用情况,并以最近最少使用(LRU)的方式丢弃旧的数据分区。如果您想手动移除一个RDD而不是等待它从缓存中掉出,可以使用RDD.unpersist()方法。请注意,这个方法默认情况下不会阻塞。如果您希望在资源释放后阻塞等待,可以在调用这个方法时指定blocking=true。

共享变量

通常,当传递给Spark操作(如map或reduce)的函数在远程集群节点上执行时,它会在函数中使用的所有变量的单独副本上工作。这些变量被复制到每台机器上,远程机器上对变量的任何更新都不会传播回驱动程序。支持跨任务的通用读写共享变量将是低效的。然而,Spark确实提供了两种有限类型的共享变量,用于两种常见的使用模式:广播变量和累加器。

广播变量

广播变量允许程序员在每台机器上保持一个只读变量的缓存,而不是随着任务传送它的副本。它们可以被用来,例如,以高效的方式给每个节点一份大型输入数据集的副本。Spark还尝试使用高效的广播算法分发广播变量,以减少通信成本。

Spark操作是通过一系列阶段执行的,这些阶段由分布式“洗牌”操作分隔。Spark自动广播每个阶段任务所需的公共数据。这样广播的数据以序列化形式缓存,并在运行每个任务前进行反序列化。这意味着显式创建广播变量仅在多个阶段的任务需要相同的数据时才有用,或者当以反序列化形式缓存数据很重要时。

广播变量是通过调用SparkContext.broadcast(v)从变量v创建的。广播变量是围绕v的包装器,其值可以通过调用value方法来访问。下面的代码展示了这一点:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

创建广播变量后,应在集群上运行的任何函数中使用它来替代值v,以避免多次将v传送到节点。此外,一旦广播了对象v,就不应再修改它,以确保所有节点获得相同的广播变量值(例如,如果稍后将变量传送到新节点)。

要释放广播变量复制到执行器上的资源,请调用.unpersist()。如果之后再次使用广播,它将被重新广播。要永久释放广播变量使用的所有资源,请调用.destroy()。此后,不能再使用广播变量。请注意,这些方法默认情况下不会阻塞。如果在调用它们时指定blocking=true,则会阻塞直到资源被释放。

累加器

累加器是通过一种结合律和交换律操作“加”到变量上的,因此可以高效地在并行中支持。它们可以用来实现计数器(如MapReduce中的)或求和。Spark原生支持数值类型的累加器,程序员也可以为新类型添加支持。

作为用户,您可以创建命名或未命名的累加器。如下图所见,命名的累加器(在此示例中为计数器)将在修改该累加器的阶段的Web UI中显示。Spark在“任务”表中显示每个任务修改的累加器的值。
在这里插入图片描述
在用户界面中跟踪累加器对于理解运行阶段的进度很有用(注意:Python中尚未支持此功能)。

可以通过调用SparkContext.longAccumulator()或SparkContext.doubleAccumulator()分别创建用于累加Long或Double类型的数值累加器。然后在集群上运行的任务可以使用add方法向其添加值。然而,它们不能读取累加器的值。只有驱动程序可以使用其value方法读取累加器的值。

以下代码展示了使用累加器累加数组元素的示例:

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 sscala> accum.value
res2: Long = 10

虽然这段代码使用了对Long类型的累加器的内置支持,但程序员也可以通过继承AccumulatorV2来创建自己的类型。AccumulatorV2抽象类有几个必须重写的方法:reset用于将累加器重置为零,add用于向累加器中添加另一个值,merge用于将另一个相同类型的累加器合并到这个累加器中。其他必须重写的方法包含在API文档中。例如,假设我们有一个表示数学向量的MyVector类,我们可以这样编写:

class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {private val myVector: MyVector = MyVector.createZeroVectordef reset(): Unit = {myVector.reset()}def add(v: MyVector): Unit = {myVector.add(v)}...
}// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")

请注意,当程序员定义自己的AccumulatorV2类型时,最终的类型可能与添加的元素的类型不同。

对于仅在行动中执行的累加器更新,Spark保证每个任务对累加器的更新只会应用一次,即重新启动的任务不会更新值。在转换中,用户应该注意,如果任务或作业阶段重新执行,每个任务的更新可能会被应用多次。

累加器不会改变Spark的延迟评估模型。如果它们在RDD上的操作中被更新,它们的值只有在作为行动的一部分计算RDD时才会更新一次。因此,当在像map()这样的延迟转换中进行更新时,不保证会执行累加器更新。以下代码片段演示了这个属性:

val accum = sc.longAccumulator
data.map { x => accum.add(x); x }
// Here, accum is still 0 because no actions have caused the map operation to be computed.

相关文章:

【Spark官方文档部分翻译】RDD编程指南(RDD Programming Guide)

写在前面 内容如何选择 本翻译只翻译本人认为精华的部分&#xff0c;本人认为的Spark的一些核心理念&#xff0c;编程思想。一些特别基础的操作包括但不限于搭建环境就不在此赘述了。 配套版本 本系列基于Spark 3.3.1&#xff0c;Scala 2.12.10&#xff0c;进行翻译总结 原…...

前端八股文 $set

为什么会有$set vue2中对数组中新增的属性是监听不到的 如图 vue 插件中有但是 视图中没有刷新 解决方法 解决就是 $set() 就是在数组中新增属性的时候可以重新渲染视图 具体的写法 写法 就是 第一个 是在那个对象上新增 第二个参数 是新增的属性 第三个参数是 新增的属性…...

Connecting weaviate with langflow across docker containers

题意&#xff1a;在Docker容器之间连接Weaviate与Langflow 问题背景&#xff1a; I am trying to build a local RAG application using Langflow. For my vectore store, I want to use a local Weaviate instance, hosted in a separate docker container on the same netwo…...

【linux vim使用说明】

基本概念 提示&#xff1a;本文是网络资源整理 模式: vim 有多种模式&#xff0c;每种模式都有不同的功能。 普通模式 (Normal Mode): 默认模式&#xff0c;用于导航和执行命令。插入模式 (Insert Mode): 用于文本输入。可以通过按 i 进入。可视模式 (Visual Mode): 用于选择…...

cocos2d-x安装和项目

首先多方查找资料发现教程很简洁&#xff0c;发现对自己的操作方面没多大帮助&#xff0c;后来干脆去官网&#xff0c;好像也很简洁。基于这样一个原因&#xff0c;加上我首次碰cocos2d-x&#xff0c;决定记录一下整个流程&#xff0c;解决实际操作上的疑惑。 涉及的方面&…...

因果推断 | 双重机器学习(DML)算法原理和实例应用

文章目录 1 引言2 DML算法原理2.1 问题阐述2.2 DML算法 3 DML代码实现3.1 策略变量为0/1变量3.2 策略变量为连续变量 4 总结5 相关阅读 1 引言 小伙伴们&#xff0c;好久不见呀。 距离上次更新已经过去了一个半月&#xff0c;上次发文章时还信誓旦旦地表达自己后续目标是3周更…...

Flutter 开源库学习

网上看了好多歌词实现逻辑相关资料&#xff0c;封装比较的好的 就 flutter_lyric&#xff0c;核心类是LyricsReader&#xff0c;而且如果实现逐字逐句歌词编辑功能还需要自己实现很多细节 &#xff0c;网友原话是 &#xff1a;歌词的功能真的是不少&#xff0c;写起来也是挺难的…...

自主巡航,目标射击

中国机器人及人工智能大赛 参赛经验&#xff1a; 自主巡航赛道 【机器人和人工智能——自主巡航赛项】动手实践篇-CSDN博客 主要逻辑代码 #!/usr/bin/env python #coding: utf-8import rospy from geometry_msgs.msg import Point import threading import actionlib impor…...

MySQL中EXPLAIN关键字详解

昨天领导突然问到&#xff0c;MySQL中explain获取到的type字段中index和ref的区别是什么。 这两种状态都是在使用索引后产生的&#xff0c;但具体区别却了解不多&#xff0c;只知道ref相比于index效率更高。 因此&#xff0c;本文较为详细地记录了MySQL性能中返回字段的含义、状…...

如何理解ref toRef和toRefs

是什么 ref 生成值类型的响应式数据可用于模板和reactive通过.value修改值 ref也可以像vue2中的ref那样使用 toRef 针对一个响应式对象&#xff08;reactive&#xff09;的prop创建一个ref两者保持引用关系 toRefs 将响应式对象&#xff08;reactive封装&#xff09;转换…...

【linux】kernel-trace

文章目录 linux kernel trace配置trace内核配置trace接口使用通用配置Events配置Function配置Function graph配置Stack trace设置 跟踪器tracer功能描述 使用示例1.irqsoff2.preemptoff3.preemptirqsoff linux kernel trace 配置 源码路径&#xff1a; kernel/trace trace内…...

【Golang 面试基础题】每日 5 题(一)

✍个人博客&#xff1a;Pandaconda-CSDN博客 &#x1f4e3;专栏地址&#xff1a;http://t.csdnimg.cn/UWz06 &#x1f4da;专栏简介&#xff1a;在这个专栏中&#xff0c;我将会分享 Golang 面试中常见的面试题给大家~ ❤️如果有收获的话&#xff0c;欢迎点赞&#x1f44d;收藏…...

ETCD介绍以及Go语言中使用ETCD详解

ETCD介绍以及Go语言中使用ETCD详解 什么是etcd ETCD是一个分布式、可靠的key-value存储的分布式系统,用于存储分布式系统中的关键数据;当然,它不仅仅用于存储,还提供配置共享及服务发现;基于Go语言实现 。 etcd的特点 完全复制:集群中的每个节点都可以使用完整的存档高…...

03-用户画像+Elasticsearch

优点 es支持海量数据的写入和更新es可以和hadoop&#xff0c;hive及spark进行集成es支持hivesql的操作&#xff0c;可以通过hivesql将数据导入eses的在进行数据检索查询是速度比较快es是分布式存储 应用 全文检索 全文检索流程: 1-对文档数据(文本数据)进行分词 2-将分词…...

初学Mybatis之搭建项目环境

在连接 mysql 数据库时&#xff0c;遇到了个 bug&#xff0c;之前都能连上&#xff0c;但报错说换了个 OS 操作系统什么的 然后搜索怎么连接&#xff0c;找到了解决方法 MySQL MYSQL – 无法连接到本地MYSQL服务器 (10061)|极客教程 (geek-docs.com) 命令行输入 services.msc…...

JMeter使用小功能-(持续更新)

1、jmeter在同一个线程组内&#xff0c;uuid的复用 方式一&#xff1a; 方式二&#xff1a; 2、获得jMeter使用的线程总数 ctx.getThreadGroup().getNumberOfThreads()来表示活动线程总数 int threadNumctx.getThreadGroup().getNumThreads(); String threads Integer…...

科研绘图系列:R语言火山图(volcano plot)

介绍 火山图(Volcano Plot),也称为火山图分析,是一种在生物信息学和基因组学中常用的图形表示方法,主要用于展示基因表达数据的差异。它通常用于基因表达微阵列或RNA测序数据的可视化,帮助研究人员识别在不同条件下表达差异显著的基因。 火山图的基本构成 X轴:通常表示…...

docker firewalld 防火墙设置

1、环境 centos 7 firewalld docker-ce docker 默认会更改防护墙配置 导致添加的防火墙策略不生效&#xff0c;可以启用firewalld 重新设置策略 2、启用防火墙 systemctl start firewalld systemctl enable firewalld3、配置文件禁用docker 的iptables /etc/docker/daemon.js…...

《问题004:报错-JS问题-unknown: Invalid shorthand property initializer.》

问题描述&#xff1a; unknown: Invalid shorthand property initializer. (25:13) unknown:无效的简写属性初始化项 解决方法&#xff1a; “”应该写为“&#xff1a;”&#xff08;globalData 改成 globalData: &#xff09;...

什么是 MLPerf?

什么是 MLPerf&#xff1f; MLPerf 是一个用于衡量机器学习硬件、软件和服务性能的标准化基准测试平台。它由 MLCommons 组织开发&#xff0c;该组织是由多家领先的科技公司和学术机构组成的。MLPerf 的目标是通过一系列标准化的基准测试任务和数据集&#xff0c;提供一个统一…...

【SpringBoot】第3章 SpringBoot的系统配置

3.1 系统配置文件 3.1.1 application.properties SpringBoot支持两种不同格式的配置文件&#xff0c;一种是Properties&#xff0c;一种是YML。 SpringBoot默认使用application.properties作为系统配置文件&#xff0c;项目创建成功后会默认在resources目录下生成applicatio…...

ELK日志分析系统部署文档

一、ELK说明 ELK是Elasticsearch&#xff08;ES&#xff09; Logstash Kibana 这三个开源工具组成&#xff0c;官方网站: The Elastic Search AI Platform — Drive real-time insights | Elastic 简单的ELK架构 ES: 是一个分布式、高扩展、高实时的搜索与数据分析引擎。它…...

ue5笔记

1 点光源 聚光源 矩形光源 参数比较好理解 &#xff08;窗口里面&#xff09;环境光混合器&#xff1a;快速创造关于环境光的组件 大气光源&#xff1a;太阳光&#xff0c;定向光源 天空大气&#xff1a;蓝色的天空和大气 高度雾&#xff1a;大气下面的高度感的雾气 体积…...

TCP重传机制详解

1.什么是TCP重传机制 在 TCP 中&#xff0c;当发送端的数据到达接收主机时&#xff0c;接收端主机会返回⼀个确认应答消息&#xff0c;表示已收到消息。 但是如果传输的过程中&#xff0c;数据包丢失了&#xff0c;就会使⽤重传机制来解决。TCP的重传机制是为了保证数据传输的…...

如何使用javascript将商品添加到购物车?

使用JavaScript将商品添加到购物车可以通过以下步骤实现&#xff1a; 创建一个购物车对象&#xff0c;可以是一个数组或者对象&#xff0c;用于存储添加的商品信息。在网页中的商品列表或详情页面&#xff0c;为每个商品添加一个“添加到购物车”的按钮&#xff0c;并为按钮绑…...

【MySQL】:想学好数据库,不知道这些还想咋学

客户端—服务器 客户端是一个“客户端—服务器”结构的程序 C&#xff08;client&#xff09;—S&#xff08;server&#xff09; 客户端和服务器是两个独立的程序&#xff0c;这两个程序之间通过“网络”进行通信&#xff08;相当于是两种角色&#xff09; 客户端 主动发起网…...

1.关于linux的命令

1.关于文件安装的问题 镜像站点服务器&#xff1a;cat /etc/apt/sources.list 索引文件&#xff1a;cd /var/lib/apt/lists 下载文件包存在的路径&#xff1a;cd /etc/cache/apt/archives/2.关于dpkg文件安装管理器的应用: 安装文件:sudo dpkg -i 文件名; 查找文件目录:sudo …...

【人工智能】机器学习 -- 决策树(乳腺肿瘤数)

目录 一、使用Python开发工具&#xff0c;运行对iris数据进行分类的例子程序dtree.py&#xff0c;熟悉sklearn机器实习开源库。 二、登录https://archive-beta.ics.uci.edu/ 三、使用sklearn机器学习开源库&#xff0c;使用决策树对breast-cancer-wisconsin.data进行分类。 …...

【proteus经典实战】LCD滚动显示汉字

一、简介 Proteus是一款功能丰富的电子设计和仿真软件&#xff0c;它允许用户设计电路图、进行PCB布局&#xff0c;并在虚拟环境中测试电路功能。这款软件广泛应用于教育和产品原型设计&#xff0c;特别适合于快速原型制作和电路设计教育。Proteus的3D可视化功能使得设计更加直…...

数据结构复习1

1、什么是集合&#xff1f; 就是一组数据的集合体&#xff0c;就像篮子装着苹果、香蕉等等&#xff0c;这些“水果”就代表数据&#xff0c;“篮子”就是这个集合。 集合的特点&#xff1a; 集合用于存储对象。 对象是确定的个数可以用数组&#xff0c;如果不确定可以用集合…...

北京网站开发工程师招聘网/百度指数数据下载

U1S1&#xff08;有一说一&#xff09;是声网主办的开发者互动交流活动&#xff0c;通过小范围的深入交流&#xff0c;关注开发者真实需求&#xff0c;收集来自开发者的体验反馈&#xff0c;与声网开发者共同碰撞声网产品及实时互动技术领域相关的话题&#xff0c;一起创造无限…...

酒店网站制作/天津百度推广排名

RocketMQ集群部署记录 #引用 https://cloud.tencent.com/developer/article/1147765一、RocketMQ基础知识介绍 Apache RocketMQ是阿里开源的一款高性能、高吞吐量、队列模型的消息中间件的分布式消息中间件。RocketMQ具有以下特点&#xff1a; 上图是一个典型的消息中间件收…...

文化投资的微网站怎么做/关键词排名技巧

小猪佩奇总是用侧脸示人&#xff0c;不论是转左边还是转右边&#xff0c;脸上总是有2只眼睛&#xff0c;2个鼻孔&#xff0c;1腮红&#xff0c;小手手不断挥呀挥&#xff0c;讲完一句话就要发出猪叫声&#xff0c;最爱在往泥巴坑里跳。 佩奇正脸也是相当神秘&#xff0c;从未用…...

深圳有做网站的公司660元/安康seo

快捷键&#xff1a;Ctrld&#xff1a;快速复制 Ctrl/&#xff1a;注释 ShiftEnter:直接跳转下一行注释&#xff1a; """多行注释多行注释多行注释"""# 单行注释# print(hello world)# print(hello world)# 这是一行。。。。代码print(hello…...

凡客v十商城还在吗/seo诊断专家

将一个字符串转换成一个整数&#xff0c;要求不能使用字符串转换整数的库函数。 数值为0或者字符串不是一个合法的数值则返回0&#xff1b; 思路&#xff1a;遍历查找非整数字符&#xff0c;第一个字符串位置进行判断&#xff0c; 采用-‘0’算出整数*10的多少次方&#xff1b…...

格尔木市住建和城乡建设局网站/佛山百度seo点击软件

正在学前台&#xff0c;出现了vertical-align: middle 这个属性怎么都不起作用的情况&#xff0c;解决过程如下&#xff1a; 刚开始是这样&#xff1a; .table_yht{ text-align: center; vertical-align: middle; } 发现文字左右的确居中&#xff0c;但是上下不能居中&#xff…...