PySpark-核心编程
2. PySpark——RDD编程入门
文章目录
- 2. PySpark——RDD编程入门
- 2.1 程序执行入口SparkContext对象
- 2.2 RDD的创建
- 2.2.1 并行化创建
- 2.2.2 获取RDD分区数
- 2.2.3 读取文件创建
- 2.3 RDD算子
- 2.4 常用Transformation算子
- 2.4.1 map算子
- 2.4.2 flatMap算子
- 2.4.3 reduceByKey算子
- 2.4.4 WordCount回顾
- 2.4.5 groupBy算子
- 2.4.6 Filter算子
- 2.4.7 distinct算子
- 2.4.8 union算子
- 2.4.9 join算子
- 2.4.10 intersection 算子
- 2.4.11 glom算子
- 2.4.12 groupByKey算子
- 2.4.13 sortBy算子
- 2.4.14 sortByKey
- 2.4.15 综合案例
- 2.4.16 将案例提交到yarn运行
- 2.5 常用Action算子
- 2.5.1 countByKey算子
- 2.5.2 collect算子
- 2.5.3 reduce算子
- 2.5.4 fold算子
- 2.5.5 first算子
- 2.5.6 take算子
- 2.5.7 top算子
- 2.5.8 count算子
- 2.5.9 takeSample算子
- 2.5.10 takeOrdered
- 2.5.11 foreach算子
- 2.5.12 saveAsTextFile
- 2.5.13 注意点
- 2.6 分区操作算子
- 2.6.1 mapPartitions算子
- 2.6.2 foreachPartition算子
- 2.6.3 partitionBy算子
- 2.6.4 repartition算子
- 2.6.5 coalesce算子
- 2.6.6 mapValues算子
- 2.6.7 join算子
- 2.7 面试题
- 2.8 总结
- 3. RDD的持久化
- 3.1 RDD的数据是过程数据
- 3.2 RDD的缓存
- 3.2.1 缓存
- 3.2.2 缓存特点
- 3.2.3 缓存是如何保存的
- 3.3 RDD的CheckPoint
- 3.3.1 RDD CheckPoint
- 3.3.2 CheckPoint是如何保存数据的
- 3.3.3 缓存和CheckPoint的对比
- 3.3.4 代码
- 3.3.5 注意
- 3.3.6 总结
- 4. Spark案例练习
- 4.1 搜索引擎日志分析案例
- 4.2 提交到集群运行
- 4.3 作业
- 5. 共享变量
- 5.1 广播变量
- 5.1.1 问题引出
- 5.1.2 解决方案-广播变量
- 5.2 累加器
- 5.2.1 需求
- 5.2.2 没有累加器的代码演示
- 5.2.3 解决方法-累加器
- 5.2.4 累加器的注意事项
- 5.3 综合案例
- 5.3.1 需求
- 5.4 总结
- 6.Spark内核调度(重点理解)
- 6.1 DAG
- 6.1.1 DAG
- 6.1.2 Job和Action
- 6.1.3 DAG和分区
- 6.2 DAG的宽窄依赖和阶段划分
- 6.2.1 窄依赖
- 6.2.2 宽依赖
- 6.2.3 阶段划分
- 6.3 内存迭代计算
- 6.3.1 面试题
- 6.4 Spark并行度
- 6.4.1 如何设置并行度
- 6.4.2 全局并行度-推荐
- 6.4.3 针对RDD的并行度设置-不推荐
- 6.4.4 集群中如何规划并行度
- 6.5 Spark任务调度
- 6.5.1 Drivcer内的两个组件
- 6.6 拓展-Spark概念名词大全
- 6.6.1 Spark运行中的概念名词大全
- 6.7 SparkShuffle
- 6.7.1MR Shuffle回顾
- 6.7.2 简介
- 6.7.3 Sort Shuffle bypass机制
- 6.7.4 Shuflle的配置选项
- 6.8 总结
gitee仓库:gitee仓库
觉得有用的话,点个赞,点个收藏呗
给人点赞,手留余香
2.1 程序执行入口SparkContext对象
Spark RDD 编程的程序入口对象是SparkContext
对象(不论何种编程语言)
只有构建出SparkContext
, 基于它才能执行后续的API调用和计算
本质上, SparkContext
对编程来说, 主要功能就是创建第一个RDD出来
代码演示:
# coding:utf8# 导入Spark相关包
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':# 构建SparkConf对象conf = SparkConf().setAppName ("helloSpark").setMaster("local[*]")# 构建SparkContext执行环境入口对象sc = SparkContext(conf=conf)
master的种类:
- local:local[N]:表示以N核CPU执行,local[*]:给予local进程 所有CPU核心的使用权
- standlone:spark://node1:7077
- yarn 模式
2.2 RDD的创建
RDD的创建主要有2种方式:
• 通过并行化集合创建 ( 本地对象 转 分布式RDD )
• 读取外部数据源 ( 读取文件 )
2.2.1 并行化创建
概念:并行化创建,是指将本地集合转向分布式RDD,这一步就是分布式的开端:本地转分布式
API:
rdd = spakcontext.parallelize(参数1,参数2)
参数1 集合对象即可,比如list
参数2 分区数
完整代码:
# coding:utf8
from pyspark import SparkConf,SparkContextif __name__ == '__main__':# 0. 构建Spark执行环境conf = SparkConf().setAppName("create rdd").setMaster("local[*]")sc = SparkContext(conf=conf)# sc 对象的parallelize方法,可以将本地集合转换成RDD返回给你data = [1,2,3,4,5,6,7,8,9]rdd = sc.parallelize(data,numSlices=3)print(rdd.collect())
执行结果:
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
[1, 2, 3, 4, 5, 6, 7, 8, 9]Process finished with exit code 0
2.2.2 获取RDD分区数
getNumPartitions
API :获取RDD分区数量,返回值是Int
数字
用法:rdd.getNumPartitions()
例如,基于上述代码设置了3为分区数,调用以下代码
print(rdd.getNumPartitions())
则会输出结果:3
完整案例代码:01_create_parallelize.py
# coding:utf8# 导入Spark相关包
from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 0. 初始化执行环境 构建SparkContext对象,本地集合--> 分布式对象(RDD)conf = SparkConf().setAppName ("test").setMaster("local[*]")sc = SparkContext(conf=conf)# 演示通过并行化集合的方式去创建RDDrdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])# parallelize方法,没有给定分区数,默认分区数是多少? 根据CPU核心来定print("默认分区数:", rdd.getNumPartitions())rdd = sc.parallelize([1, 2, 3], 3)print("分区数:", rdd.getNumPartitions())# collect方法,是将RDD(分布式对象)中每个分区的数据,都发送到Driver中,形成一个Python List对象# collect:分布式 转--> 本地集合print("rdd的内容是:", rdd.collect())print(type(rdd.collect()))
输出结果:
默认分区数: 8
分区数: 3
rdd的内容是: [1, 2, 3]
<class 'list'>
2.2.3 读取文件创建
textFile
API
这个API可以读取本地数据,也可以读取hdfs数据
使用方法 :
sparkcontext.textFile(参数1,参数2)
参数1,必填,文件路径 支持本地文件 支持HDFS 也支持一些比如S3协议
参数2 可选,表示最小分区数量
注意:参数2 话语权不足,spark有自己的判断,在它允许的范围内,参数2有效果,超出spark允许的范围,参数2失效
案例代码:02_create_textFile.py
# coding : utf8
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("02_create_textFile").setMaster("local[*]")sc = SparkContext(conf=conf)# 通过textFile API 读取数据# 读取本地文件数据file_rdd1 = sc.textFile("../data/input/words.txt")print("默认读取分区数:", file_rdd1.getNumPartitions())print("file_rdd1 内容:", file_rdd1.collect())## # 加最小分区数的测试file_rdd2 = sc.textFile("../data/input/words.txt",3)file_rdd3 = sc.textFile("../data/input/words.txt",100)print("file_rdd2 分区数:", file_rdd2.getNumPartitions())print("file_rdd3 分区数:", file_rdd3.getNumPartitions())# 读取hdfs文件数据测试hdfs_rdd = sc.textFile("hdfs://Tnode1:8020/input/words.txt")print("hdfs_rdd 分区数:", hdfs_rdd.getNumPartitions())print("hdfs_rdd 内容:", hdfs_rdd.collect())
输出结果:
默认读取分区数: 2
file_rdd1 内容: ['hello spark', 'hello hadoop', 'hello flink']
file_rdd2 内容: 4
file_rdd3 内容: 38
hdfs_rdd 分区: 2
hdfs_rdd 内容: ['hello spark', 'hello hadoop', 'hello flink']
wholeTextFile
读取文件的API,有个适用场景:适合读取一堆小文件
这个API是小文件读取专用
用法:
sparkcontext.textFile(参数1,参数2)# 参数1,必填,文件路径 支持本地文件 支持HDFS 也支持一些比如S3协议# 参数2 可选,表示最小分区数量# 注意:参数2 话语权不足,spark有自己的判断,在它允许的范围内,参数2有效果,超出spark允许的范围,参数2失效
这个API偏向于少量分区读取数据
因为,这个API表明了自己是小文件读取专用,那么文件的数据很小、分区很多,
导致shuffle的几率更高,所以尽量少分区读取数据
案例代码:03_create_wholeTextFile.py
# coding:utf8
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)# 读取小文件文件夹rdd = sc.wholeTextFiles("../data/input/tiny_files")print(rdd.collect())print(rdd.map(lambda x: x[1]).collect())
输出结果:
[('file:/tmp/pycharm_project_937/PySpark01/data/input/tiny_files/1.txt', 'hello spark\r\nhello hadoop\r\nhello flink'), ('file:/tmp/pycharm_project_937/PySpark01/data/input/tiny_files/2.txt', 'hello spark\r\nhello hadoop\r\nhello flink'), ('file:/tmp/pycharm_project_937/PySpark01/data/input/tiny_files/3.txt', 'hello spark\r\nhello hadoop\r\nhello flink'), ('file:/tmp/pycharm_project_937/PySpark01/data/input/tiny_files/4.txt', 'hello spark\r\nhello hadoop\r\nhello flink'), ('file:/tmp/pycharm_project_937/PySpark01/data/input/tiny_files/5.txt', 'hello spark\r\nhello hadoop\r\nhello flink')]
['hello spark\r\nhello hadoop\r\nhello flink', 'hello spark\r\nhello hadoop\r\nhello flink', 'hello spark\r\nhello hadoop\r\nhello flink', 'hello spark\r\nhello hadoop\r\nhello flink', 'hello spark\r\nhello hadoop\r\nhello flink']
2.3 RDD算子
算子是什么?
算子:分布式集合对象上的API称之为算子
方法、函数:本地对象的API,叫做方法、函数
算子:分布式对象的API,叫做算子
算子分类
RDD的算子 分成2类
- Transformation:转换算子
- Action:动作(行动)算子
Transformation 算子:
定义:RDD的算子,返回值任然是一个RDD的,称之为转换算子
特性:这类算子lazy 懒加载的,如果没有action算子,Transformation算子是不工作的
Action算子
定义:返回值不是rdd的就是action算子
对于这两类算子来说,
Transformation
算子,相当于在构建执行计划,action
是一个指令让这个执行计划开始工作。如果没有
action
,Transformation
算子之间的迭代关系,就是一个没有通电的流水线,只有
action
到来,这个数据处理的流水线才开始工作
2.4 常用Transformation算子
2.4.1 map算子
演示代码:04_operators_map.py
# coding:utf8
from pyspark import SparkConf, SparkContextdef addNum(data):return data * 10if __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 4)rdd2 = rdd1.map(lambda x: x * 10)rdd3 = rdd1.map(addNum)result = rdd2.collect()print(result)print(rdd3.collect())
输出结果:
[10, 20, 30, 40, 50, 60, 70, 80, 90]
[10, 20, 30, 40, 50, 60, 70, 80, 90]
对于传入参数的lambda表达式
传入方法作为传参的时候,可以选择
- 定义方法,传入其方法名
- 使用lambda 匿名方法的方式
一般,如果方法体可以一行写完,用lambda方便。
如果方法体复杂,就直接定义方法更方便
2.4.2 flatMap算子
功能:对rdd执行map操作,然后进行解除嵌套
操作
解除嵌套
:
演示代码:05_operators_flatMap.py
# coding:utf8
from pyspark import SparkConf, SparkContextdef addNum(data):return data * 10if __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd1 = sc.parallelize(["hadoop spark hadoop", "spark hadoop hadoop", "hadoop flink spark"])# 得到所有的单词,组成rdd,flatMap的传入参数和map一致,就是给map逻辑用的,解除嵌套无需逻辑(传参)rdd2 = rdd1.flatMap(lambda line: line.split(" "))print(rdd2.collect())
输出结果:
['hadoop', 'spark', 'hadoop', 'spark', 'hadoop', 'hadoop', 'hadoop', 'flink', 'spark']
注意:flatMap只适合用于有“嵌套”的rdd,直接用于没有嵌套的rdd会报错
2.4.3 reduceByKey算子
功能:针对KV型的RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作。
用法:
rdd.reduceByKey(func)
# func:(V,V) ——>V
# 接收2个传入参数(类型要一致),返回一个返回值,类型和传入要求一致。
reduceByKey
的聚合逻辑是:
比如,有[1,2,3,4,5]
,然后聚合函数是:lambda a,b: a+ b
注意:reduceByKey中接收的函数,只负责聚合,不理会分组
分组是自动
byKey
来分组的。
代码演示:06_operators_reduceByKey.py
# coding:utf8
from pyspark import SparkConf, SparkContextdef addNum(data):return data * 10if __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('a', 1)])rdd2 = sc.parallelize([('a', 1), ('a', 11), ('b', 3), ('b', 1), ('a', 5)])rdd3 = sc.parallelize([('a', 1), ('a', 11), ('b', 3), ('b', 1), ('a', 5)])rdd = rdd.reduceByKey(lambda a, b: a + b)rdd2 = rdd2.map(lambda x: (x[0], x[1] * 10))# 只操作value的算子rdd3 = rdd3.mapValues(lambda value: value * 10)# recudeByKey 对相同key的数据执行聚合相加print(rdd.collect())print(rdd2.collect())print(rdd3.collect())
输出结果:
[('a', 3), ('b', 2)]
[('a', 10), ('a', 110), ('b', 30), ('b', 10), ('a', 50)]
[('a', 10), ('a', 110), ('b', 30), ('b', 10), ('a', 50)]
2.4.4 WordCount回顾
代码演示:07_wordcount_example.py
# coding:utf8from pyspark import SparkContext, SparkConfif __name__ == '__main__':# 构建SparkConf对象conf = SparkConf().setAppName("test").setMaster("local[*]")# 构建SparkContext执行环境入口对象sc = SparkContext(conf=conf)# 1.读取文件获取数据 构建RDDfile_rdd = sc.textFile(r"../data/input/words.txt")# 2. 通过flatMap API取出所有的单词word_rdd = file_rdd.flatMap(lambda x: x.split(" "))# 3.将单词转换成元组,key是单词,value是1word_with_one_rdd = word_rdd.map(lambda word:(word,1))# 4. 用reduceByKey 对单词进行分组并进行value的聚合result_rdd = word_with_one_rdd.reduceByKey(lambda a,b:a+b)# 5. 通过collect算子,将rdd的数据收集到Driver中,打印输出print(result_rdd.collect())
输出结果:
[('hadoop', 1), ('hello', 3), ('spark', 1), ('flink', 1)]
2.4.5 groupBy算子
功能:将rdd的数据进行分组
语法:
rdd.groupBy(func)
# func 函数
# func:(T)——>k
# 函数要求传入一个参数,返回一个返回值,类型无所谓
# 这个函数是 拿到你返回值后,将所有相同返回值的放入一个组中
# 分组完成后,每一个组是一个二元元组,key就是返回值,所有同组的数据放入一个迭代器对象中作为value
代码演示:08_oprators_groupBy.py
# coding:utf8from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('b', 1)])# 通过groupBy对数据进行分组# groupBy传入的函数的意思是:通过这个函数,确定按照谁来分组(返回谁即可)# 分组规则和SQL是一致的,也就是相同的在一个组(Hash分组)result = rdd.groupBy(lambda t: t[0])print(result.collect())print("hello")print(result.map(lambda t: (t[0], list(t[1]))).collect())
输出结果:
[('a', <pyspark.resultiterable.ResultIterable object at 0x7f85fa80eca0>), ('b', <pyspark.resultiterable.ResultIterable object at 0x7f85fa80ebb0>)]
hello
[('a', [('a', 1), ('a', 1)]), ('b', [('b', 1), ('b', 1), ('b', 1)])]
2.4.6 Filter算子
功能:过滤,把想要的数据进行保留
语法:
rdd.filter(func)
# func:(T)——>bool 传入1个随意类型参数进来,返回值必须是True or False
返回值是True的数据被保留,False的数据被丢弃
代码演示:09_operators_filter.py
# coding:utf8from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5, 6])# 通过Filter算子,过滤奇数,filter 只返回true的值result = rdd.filter(lambda x: x % 2 == 1)print(result.collect())
输出结果:
[1, 3, 5]
2.4.7 distinct算子
功能:对RDD数据进行去重,返回新的RDD
语法:
rdd.distinct(参数1)
# 参数1,去重分区数量,一般不用传
演示代码:10_operators_distinct.py
# coding:utf8from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 1, 1, 2, 2, 2, 3, 3, 3])# distinct 进行RDD数据去重操作print(rdd.distinct().collect())rdd2 = sc.parallelize([('a', 1), ('a', 1), ('a', 3)])print(rdd2.distinct().collect())
输出结果:
[1, 2, 3]
[('a', 3), ('a', 1)]
2.4.8 union算子
功能:2个rdd合并成1个rdd返回
用法:rdd.union(other_rdd)
注意:只合并,不会去重
代码演示:11_operators_union.py
# coding:utf8from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd1 = sc.parallelize([1, 1, 3, 3])rdd2 = sc.parallelize(["a","b","a"])rdd3 = rdd1.union(rdd2)print(rdd3.collect())print(rdd3.distinct().collect())"""
1. 可以看到union算子是不会去重的
2. RDD的类型不同也是可以合并的
"""
输出结果:
[1, 1, 3, 3, 'a', 'b', 'a']
[1, 3, 'b', 'a']
2.4.9 join算子
功能:对两个RDD执行JOIN操作(可实现SQL的内、外连接)
注意:join算子只能用于二元元组
语法:
rdd.join(other_rdd) #内连接
rdd.leftOuterJoin(other_rdd) # 左外
rdd.rightOuterJoin(other_rdd) # 右外
代码演示:12_operators_join.py
# coding:utf8from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd1 = sc.parallelize([(1001, "张三"), (1002, '李四'), (1003, '王五'), (1004, '赵六')])rdd2 = sc.parallelize([(1001, "销售部"), (1002, '科技部')])# 通过join算子来进行rdd之间的关联# 对于join算子来说 关联条件 按照二元元组的key来进行关联# 内连接print(rdd1.join(rdd2).collect())# 左外连接print(rdd1.leftOuterJoin(rdd2).collect())# 右外连接print(rdd1.rightOuterJoin(rdd2).collect())
输出结果:
[(1001, ('张三', '销售部')), (1002, ('李四', '科技部'))]
[(1001, ('张三', '销售部')), (1002, ('李四', '科技部')), (1003, ('王五', None)), (1004, ('赵六', None))]
[(1001, ('张三', '销售部')), (1002, ('李四', '科技部'))]
2.4.10 intersection 算子
功能:求2个rdd的交集,返回一个新rdd
用法:rdd.intersection(other_rdd)
代码演示:13_operators_intersection.py
# coding:utf8from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd1 = sc.parallelize([('a',1),('a',3)])rdd2 = sc.parallelize([('a',1),('b',3)])# 通过intersection算子求RDD之间的交集,将交集取出,返回新RDDrdd3 = rdd1.intersection(rdd2)print(rdd3.collect())
输出结果:
[('a', 1)]
2.4.11 glom算子
功能:将RDD的数据,加上嵌套,这个嵌套按照分区来进行
比如RDD数据[1,2,3,4,5]
有两个分区
那么,被glom后,数据变成:[[1,2,3],[4,5]]
使用方法:rdd.glom()
代码演示:14_operators_glom.py
# coding:utf8from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10],2)rdd2 = sc.parallelize([1,2,3,4,5,6,7,8,9,10])print(rdd.glom().collect())print(rdd.glom().flatMap(lambda x:x).collect()) # 用flatMap解嵌套print(rdd2.glom().collect())
输出结果:
[[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]]
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[[1], [2], [3], [4, 5], [6], [7], [8], [9, 10]]
2.4.12 groupByKey算子
功能:针对KV型RDD,自动按照key分组
用法:rdd.groupByKey()
自动按照key分组
代码演示:15_operators_groupByKey.py
# coding:utf8from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([('a', 1), ('a', 1), ('a', 1,), ('b', 1), ('b', 1)])rdd2 = rdd.groupByKey()print(rdd2.map(lambda x:(x[0],list(x[1]))).collect())
输出结果:
[('a', [1, 1, 1]), ('b', [1, 1])]
2.4.13 sortBy算子
功能:对RDD数据进行排序,基于你指定的排序依据
语法:
rdd.sortBy(func,ascending=False,numPartitions=1)
# func:(T)——>U:告知按照rdd中的哪个数据进行排序,比如lambda x:x[1] 表示按照rdd中的第二列元素进行排序
# ascending = True升序;False 降序
# numPartition:用多少分区来排序
注意:如果要全局有序,排序分区数请设置为1,因为生产环境下,分区数大于1,很可能只得到局部有序的结果
代码演示:16_operators_sortBy.py
# coding:utf8from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([('g', 3), ('c', 1), ('b', 2,), ('a', 9), ('h', 10), ('i', 4), ('l', 26,), ('o', 1), ('d', 7)])# 使用sortBy对rdd进行排序# 参数1 函数,表示的是,告诉spark按照数据的哪个列进行排序# 参数2 bool,True表示升序,False表示降序# 参数3 分区数设置"""注意:如果要全局有序,排序分区数请设置为1,因为生产环境下,分区数大于1,很可能只得到局部有序的结果"""rdd2 = rdd.sortBy(lambda x:x[1],ascending=True,numPartitions=3)rdd3 = rdd.sortBy(lambda x:x[0],ascending=True,numPartitions=8)print(rdd2.collect())print(rdd3.collect())
输出结果:
[('c', 1), ('o', 1), ('b', 2), ('g', 3), ('i', 4), ('d', 7), ('a', 9), ('h', 10), ('l', 26)]
[('a', 9), ('b', 2), ('c', 1), ('d', 7), ('g', 3), ('h', 10), ('i', 4), ('l', 26), ('o', 1)]
2.4.14 sortByKey
功能:针对KV型RDD,按照key进行排序
语法:
sortByKey(ascending=True,numPartitions=None,keyfunc=<function RDD,<lambda>>)
- ascending:升序或降序,True升序,False降序,默认是升序
- numPartitions:按照几个分区进行排序,如果全局有序,设置为1
- keyfunc:在排序前对key进行处理,语法是:(k)——>U,一个参数传入,返回一个值
代码演示:17_operators_sortByKey.py
# coding:utf8from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([('g', 3), ('A', 1), ('B', 2,), ('A', 9), ('h', 10), ('i', 4), ('l', 26,), ('o', 1), ('d', 7)])# 调用了忽略大小写的函数print(rdd.sortByKey(ascending=True, numPartitions=1, keyfunc=lambda key: str(key).lower()).collect())
输出结果:
[('A', 1), ('A', 9), ('B', 2), ('d', 7), ('g', 3), ('h', 10), ('i', 4), ('l', 26), ('o', 1)]
2.4.15 综合案例
代码演示:18_operators_demo.py
# coding:utf8import jsonfrom pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)# 读取数据文件file_rdd = sc.textFile("../data/input/order.text")# 进行rdd数据的split 按照|符号进行,得到一个json数据jsons_rdd = file_rdd.flatMap(lambda line: line.split("|"))# 通过python内置的json库,完成json字符串到字典对象的转换dict_rdd = jsons_rdd.map(lambda json_str: json.loads(json_str))# 过滤数据,只保留北京的数据beijing_rdd = dict_rdd.filter(lambda d: d['areaName'] == '北京')# 组合北京和商品类型形成的字符串category_rdd = beijing_rdd.map(lambda x: x['areaName'] + '_' + x['category'])# 对结果集进行去重操作result_rdd = category_rdd.distinct()# 输出print(result_rdd.collect())
输出结果:
['北京_平板电脑', '北京_家具', '北京_书籍', '北京_食品', '北京_服饰', '北京_手机', '北京_家电', '北京_电脑']
2.4.16 将案例提交到yarn运行
改动1:加入环境变量,让pycharm运行yarn的时候,知道hadoop的配置在哪,可以去读取yarn的信息
import os
from defs_19 import city_with_category
# 导入自己写的函数时,把文件夹设置为SourceRoot就不会报错了
os.environ['HADOOP_CONF_DIR']= "/export/server/hadoop/etc/hadoop"
改动2:在集群上运行,本地文件就不可以用了,需要用hdfs文件
# 在集群中运行,我们需要用HDFS路径,不能用本地路径file_rdd = sc.textFile("hdfs://Tnode1:8020/input/order.text")
改动3:
"""如果提交到集群运行,除了主代码以外,还依赖了其它的代码文件需要设置一个参数,来告知spark,还有依赖文件要同步上传到集群中参数叫做:spark.submit.pyFiles参数的值可以是单个.py文件,也可以是.zip压缩包(有多个依赖文件的时候可以用zip压缩后上传)"""conf.set("spark.submit.pyFiles","defs_19.py")
完整代码:19_operators_runOnYarn.py
# coding:utf8import jsonfrom pyspark import SparkConf, SparkContext
import os
from defs_19 import city_with_category
# 导入自己写的函数时,把文件夹设置为SourceRoot就不会报错了
os.environ['HADOOP_CONF_DIR']= "/export/server/hadoop/etc/hadoop"
if __name__ == '__main__':# 提交到yarn集群,master设置为yarnconf = SparkConf().setAppName("SparkDemo01").setMaster("yarn")"""如果提交到集群运行,除了主代码以外,还依赖了其它的代码文件需要设置一个参数,来告知spark,还有依赖文件要同步上传到集群中参数叫做:spark.submit.pyFiles参数的值可以是单个.py文件,也可以是.zip压缩包(有多个依赖文件的时候可以用zip压缩后上传)"""conf.set("spark.submit.pyFiles","defs_19.py")sc = SparkContext(conf=conf)# 在集群中运行,我们需要用HDFS路径,不能用本地路径file_rdd = sc.textFile("hdfs://Tnode1:8020/input/order.text")# 进行rdd数据的split 按照|符号进行,得到一个json数据jsons_rdd = file_rdd.flatMap(lambda line: line.split("|"))# 通过python内置的json库,完成json字符串到字典对象的转换dict_rdd = jsons_rdd.map(lambda json_str: json.loads(json_str))# 过滤数据,只保留北京的数据beijing_rdd = dict_rdd.filter(lambda d: d['areaName'] == '北京')# 组合北京和商品类型形成的字符串category_rdd = beijing_rdd.map(city_with_category)# 对结果集进行去重操作result_rdd = category_rdd.distinct()# 输出print(result_rdd.collect())
依赖代码:defs_19.py
# coding:utf8def city_with_category(data):return data['areaName'] + '_' +data['category']
输出结果:
['北京_书籍', '北京_食品', '北京_服饰', '北京_平板电脑', '北京_家具', '北京_手机', '北京_家电', '北京_电脑']
在服务器上通过spark-submit 提交到集群运行
# --py-files 可以帮你指定你依赖的其它python代码,支持.zip(一堆),也可以单个.py文件都行。
/export/server/spark/bin/spark-submit --master yarn --py-files ./defs.py ./main.py
服务器上程序运行结果:
注意,在服务器上跑时,需要把conf中的setMaster去掉
即conf = SparkConf().setAppName(“SparkDemo01”).setMaster(“yarn”)改为:
conf = SparkConf().setAppName(“SparkDemo01”)
2.5 常用Action算子
2.5.1 countByKey算子
功能:统计key出现的次数(一般适用于KV型的RDD)
代码演示:20_operators_countByKey.py
# coding:utf8import jsonfrom pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.textFile("../data/input/words.txt")rdd2 = rdd.flatMap(lambda x:x.split(" ")).map(lambda x: (x, 1))# 通过countByKey来对key进行计数,这是一个Action算子result = rdd2.countByKey()print(result)print(list(result))print(result["hello"])print(type(result))
输出结果:
defaultdict(<class 'int'>, {'hello': 3, 'spark': 1, 'hadoop': 1, 'flink': 1})
['hello', 'spark', 'hadoop', 'flink']
3
<class 'collections.defaultdict'>
2.5.2 collect算子
功能:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象
用法:rdd.collect()
这个算子,是将RDD各个分区数据都拉取到Driver
注意的是,RDD是分布式对象,其数据量可以很大,
所以用这个算子之前要心知肚明地了解 结果数据集不会太大。
不然,会把Driver内存撑爆
2.5.3 reduce算子
功能:对RDD数据集按照你传入的逻辑进行聚合
语法:
rdd.reduce(func)
# func:(T,T)——>T
# 2参数传入1个返回值,返回值要和参数要求类型一致
代码演示:21_operators_reduce.py
# coding:utf8import jsonfrom pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1,2,3,4,5])print(rdd.reduce(lambda a, b: a + b))
输出结果:
15
2.5.4 fold算子
功能:和reduce一样,接收传入逻辑进行聚合,聚合是带有初始值的,
这个初始值聚合会作用在:
- 分区内聚合
- 分区间聚合
比如:[[1,2,3],[4,5,6],[7,8,9]]
数据量分布在3个分区
分区1: 1、2、3 聚合的时候带上10作为初始值得到16
分区3: 4、5、6 聚合的时候带上10作为初始值得到25
分区4: 7、8、9 聚合的时候带上10作为初始值得到34
3个分区的结果做聚合也带上初始值10,所以结果是10+16+25+34 = 85
代码演示:22_operators_fold.py
# coding:utf8import jsonfrom pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3)print(rdd.glom().collect())print(rdd.fold(10, lambda a, b: a + b))
输出结果:
[[1, 2, 3], [4, 5, 6], [7, 8, 9]]
85
2.5.5 first算子
功能:取出RDD的第一个元素
用法
sc.parallelize([3,2,1]).first()
输出:3
2.5.6 take算子
功能:取RDD的前N个元素。组合成list返回给你
用法:
>>> sc.parallelize([3,2,1,4,5,6]).take(5)
[3, 2, 1, 4, 5]
2.5.7 top算子
功能:对RDD数据集进行降序排序,取前N个
用法:
>>> sc.parallelize([3,2,1,4,5,6]).top(3) # 表示取降序前3个
[6, 5, 4]
2.5.8 count算子
功能:计算RDD有多少条数据,返回值是一个数字
用法:
>>> sc.parallelize([3,2,1,4,5,6]).count()
6
2.5.9 takeSample算子
功能:随机抽样RDD的数据
用法:
takeSample(参数1:True or False,参数2:采样数,参数3:随机数种子)
- 参数1:True表示允许取同一个数据,False表示不允许取同一个数据,和数据内容无关,是否重复表示的是同一个位置的数据(有、无放回抽样)
- 参数2:抽样要几个
- 参数3:随机数种子,这个参数传入一个数字即可,随意给
随机数种子 数字可以随便传,如果传同一个数字 那么取出的结果是一致的。
一般参数3 我们不传,Spark会自动给与随机的种子。
代码演示:23_operators_takeSample.py
# coding:utf8import jsonfrom pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 3, 5, 3, 1, 3, 2, 6, 7, 8, 6],1)result = rdd.takeSample(False,5,1)# 随机抽样可以抽出相同的数据,只是位置不同而已# 随机数种子能让随机数不再继续发生变化print(result)
输出结果:
[2, 7, 6, 6, 3]
注意:
随机抽样可以抽出相同的数据,只是位置不同而已
随机数种子能让随机数不再继续发生变化
2.5.10 takeOrdered
功能:对RDD进行排序取前N个
用法:
rdd.takeOrdered(参数1,参数2)
- 参数1 要几个数据
- 参数2 对排序的数据进行更改(不会更改数据本身,只是在排序的时候换个样子)
这个方法按照元素自然顺序升序排序,如果你想玩倒叙,需要参数2 来对排序的数据进行处理
代码演示:24_operators_takeOrdered.py
# coding:utf8import jsonfrom pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 3, 2, 4, 7, 9, 6], 1)print(rdd.takeOrdered(3))print(rdd.takeOrdered(3,lambda x:-x))
输出结果:
[1, 2, 3]
[9, 7, 6]
2.5.11 foreach算子
功能:对RDD的每一个元素,执行你提供的逻辑的操作(和map一个思想),但是这个方法没有返回值
用法:
rdd.foreach(func)
# func:(T) ——> None
代码演示:25_operators_foreach.py
# coding:utf8import jsonfrom pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 3, 2, 4, 7, 9, 6], 1)rdd.foreach(lambda x: print(x * 10))
输出结果:
10
30
20
40
70
90
60
2.5.12 saveAsTextFile
功能:将RDD的数据写入文本文件中
支持本地写出,hdfs等文件系统
代码演示:
# coding:utf8import jsonfrom pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1,3,2,4,7,9,6],3)rdd.saveAsTextFile("hdfs://Tnode1:8020/test/output/out1")
运行结果:
注意:保存文件API,是分布式执行的
这个API的执行数据是不经过driver的
如图,写出的时候,每个分区所在的Executor直接控制数据写出到目标文件系统中
所有才会一个分区产生一个结果文件
2.5.13 注意点
我们学习的action中:
- foreach
- saveAsTextFile
这两个算子是分区(Executor)直接执行的,跳过Driver,由分区所在的Executor直接执行
反之:其余的Action算子都会将结果发送至Driver
2.6 分区操作算子
2.6.1 mapPartitions算子
transformation算子
图解:
如图,mapPartition一次被传递的是一整个分区的数据
作为一个迭代器(一次性list)对象传入过来。
代码演示:27_operators_mapPartitions.py
# coding:utf8import jsonfrom pyspark import SparkConf, SparkContextimport timeif __name__ == '__main__':start_time = time.time()conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)# 效果和map一样,但是性能比map好,cpu计算没有省,但是网络IO少很多rdd = sc.parallelize([1,3,2,4,7,9,6],3)def process(iter):result = []for it in iter:result.append(it*10)return resultprint(rdd.mapPartitions(process).collect())# print(rdd.map(lambda x:x*10).collect())end_time = time.time()gap_time = (end_time - start_time)gap_time = round(gap_time, 4) # 保留四位小数print("执行本程序共耗时:" + str(gap_time) + "s")
输出结果:
[10, 30, 20, 40, 70, 90, 60]
执行本程序共耗时:8.0515s
注意:效果和map一样,但是性能比map好,cpu计算没有省,但是网络IO少很多
2.6.2 foreachPartition算子
Action算子
功能:和普通foreach一致,一次处理的是一整个分区数据
代码演示:28_operators_foreachPartitions.py
# coding:utf8import jsonfrom pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1,3,2,4,7,9,6],3)def process(iter):result = []for it in iter:result.append(it*10)print(result)rdd.foreachPartition(process)
输出结果:
[70, 90, 60]
[10, 30]
[20, 40]
foreachPartition 就是一个没有返回值的mapPartitions
2.6.3 partitionBy算子
transformation算子
功能:对RDD进行自定义分区操作
用法:
rdd.partitionBy(参数1,参数2)
- 参数1 重新分区后有几个分区
- 参数2 自定义分区规则,函数传入参数2:(K)——>int
一个传入参数进来,类型无所谓,但是返回值一定是int类型,
将key传给这个函数,你自己写逻辑,决定返回一个分区编号分区编号从0开始,不要超出分区数-1
代码演示:29_operators_partitionBy.py
# coding:utf8import jsonfrom pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([('hadoop', 1), ('spark', 1), ("hello", 1), ("flink", 1), ("hadoop", 1), ("spark", 1)])# 使用partitionBy 自定义 分区def process(k):if 'hadoop' == k or 'hello' == k: return 0if 'spark' == k: return 1return 2print(rdd.partitionBy(3, process).glom().collect())
输出结果:分区依次为0、1、2
[[('hadoop', 1), ('hello', 1), ('hadoop', 1)], [('spark', 1), ('spark', 1)], [('flink', 1)]]
分区号不要超标,你设置3个分区,分区号只能是0 1 2
设置5个分区 分区号只能是0 1 2 3 4
2.6.4 repartition算子
transformation算子
功能:对RDD的分区执行重新分区(仅数量)
用法:
rdd.repartition(N)
传入N 决定新的分区数
代码演示:30_operators_repartition_and_coalesce.py
# coding:utf8import jsonfrom pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5], 3)# repartition 修改分区print(rdd.repartition(1).getNumPartitions())print(rdd.repartition(5).getNumPartitions())# coalesce 修改分区print(rdd.coalesce(1).getNumPartitions())print(rdd.coalesce(5,shuffle=True).getNumPartitions())
输出结果:
1
5
1
5
注意:对分区的数量进行操作,一定要慎重
一般情况下,我们写spark代码除了要求全局排序设置为1个分区外,
多数时候,所有API中关于分区相关的代码我们都不太理会
因为,如果你改分区了
- 会影响并行计算(内存迭代的并行管道数量)
后面学
- 分区如果增加,极大可能导致shuffle
2.6.5 coalesce算子
transformation算子
功能:对分区进行数量增减
用法:
rdd.coalesce(参数1,参数2)
- 参数1,分区数
- 参数2,True or False
True表示允许shuffle,也就是可以加分区
False表示不允许shuffle,也就是不能加分区,False是默认
代码见2.6.4
对比repartition,一般使用coalesce较多,因为加分区要写参数2
这样避免写repartition的时候手抖了加分区了
2.6.6 mapValues算子
Transformation算子
功能:针对二元元组RDD,对其内部的二元元组的Value执行map操作
语法:
rdd.mapValues(func)
# func: (V)——> U
# 注意,传入的参数,是二元元组的 value值
# 我们这个传入的方法,只对value进行处理
代码演示:
# coding:utf8
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("create rdd").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([('a', 1), ('a', 11), ('a', 6), ('b', 3), ('b', 5)])# rdd.map(lambda x:(x[0],x[1]*10))# 将二元元组的所有value都乘以10进行处理print(rdd.mapValues(lambda x: x * 10).collect())
输出结果:
[('a', 10), ('a', 110), ('a', 60), ('b', 30), ('b', 50)]
2.6.7 join算子
Transformation算子
功能:对两个RDD执行join操作(可以实现SQL的内、外连接)
注意:join算子只能用于二元元组
代码见 2.4.9
2.7 面试题
groupByKey和reduceByKey的区别
在功能上的区别:
groupByKey
仅仅只有分组功能而已reduceByKey
除了有ByKey
的分组功能外,还有reduce
聚合功能,所以是一个分组+聚合一体化的算子
如果对数据执行分组+聚合,那么使用这2个算子的性能差别是很大的
reduceByKey
的性能是远大于:groupByKey
+聚合逻辑的
因为:
如图,这是groupByKey
+聚合逻辑的执行流程。
因为,groupByKey
只能分组,所以,执行上是先分组(shuffle)后聚合
再来看reduceByKey
:
如图,reduceByKey由于自带聚合逻辑,所以可以完成:
- 先在分区内做预聚合
- 然后再走分组流程(shuffle)
- 分组后再做最终聚合
对于groupByKey,reduceByKey最大的提升在于,分组前进行了预聚合,那么在shuffle分组节点,被shuffle的数据可以极大地减少
这就极大地提升了性能
分组+聚合,首选
reduceByKey
,数据越大,对groupByKey的优势就越高
2.8 总结
- RDD创建方式有哪几种方法?
通过并行化集合的方式(本地集合转分布式集合)
或者读取数据的方式创建(TextFile、WholeTextFile)
- RDD分区数如何查看?
通过getNumPartitions API查看,返回值Int
-
Transformation和Action的区别?
转换算子的返回值100%是RDD,而Action算子的返回值100%不是RDD转换算子是懒加载的,只有遇到Action才会执行,Action就是转换算子处理链条的开关。
-
哪两个Action算子的结果不经过Driver,直接输出?
foreach
和saveAsTextFile
直接由Executor执行后输出,不会将结果发送到Driver上去
-
reduceByKey和groupByKey的区别?
reduceByKey自带聚合逻辑,groupByKey不带
如果做数据聚合reduceByKey的效率更好,因为可以先聚合后shuffle在最终聚合,传输的IO小
-
mapPartitions和foreachPartition的区别?
mapPartitions带有返回值 foreachPartition不带
-
对于分区操作有什么要注意的地方?
尽量不要增加分区,可能破坏内存迭代的计算管道
3. RDD的持久化
3.1 RDD的数据是过程数据
RDD之间进行相互迭代计算(Transformation的转换),当执行开启后,新的RDD生成,代表老RDD的消失。
RDD的数据是过程数据,只在处理的过程中存在,一旦处理完成,就不见了。
这个特性可以最大化地利用资源,老旧RDD没用了 就从内存中清理,给后续的计算腾出内存空间。
如上图,rdd3被2次使用,第一次使用之后,其实RDD3就不存在了。
第二次使用的时候,只能基于RDD的血缘关系,从RDD1重新执行,构建出来RDD3,供RDD5使用。
3.2 RDD的缓存
3.2.1 缓存
对于上述的场景,肯定要执行优化,优化就是:
RDD3如果不消失,那么RDD1——>RDD2——>RDD3这个链条就不会执行2次,或者更多次
RDD的缓存技术:Spark提供了缓存API,可以让我们通过调用APi,将指定的RDD数据保留在内存或者硬盘
上
缓存的API
# RDD3 被2次使用,可以加入缓存进行优化
rdd3.cache() # 缓存到内存中
rdd3.persist(StorageLevel.MEMORY_ONLY) # 仅内存缓存
rdd3.persist(StorageLevel.MEMORY_ONLY_2) # 仅内存缓存,2个副本
rdd3.persist(StorageLevel.DISK_ONLY) # 仅缓存硬盘上
rdd3.persist(StorageLevel.DISK_ONLY_2) # 仅缓存硬盘上,2个副本
rdd3.persist(StorageLevel.DISK_ONLY_3) # 仅缓存硬盘上,3个副本
rdd3.persist(StorageLevel.MEMORY_AND_DISK) # 先放内存,不够放硬盘
rdd3.persist(StorageLevel.MEMORY_AND_DISK_2) # 先放内存,不够放硬盘,2个副本
rdd3.persist(StorageLevel.OFF_HEAP) # 堆外内存(系统内存)# 如上API,自行选择使用即可
# 一般建议使用rdd3.persist(StorageLevel.MEMORY_AND_DISK)
# 如果内存比较小的集群,建议使用rdd3.persist(StorageLevel.DISK_ONLY)或者就别用缓存了 用CheckPoint# 主动清理缓存的API
rdd.unpersist()
3.2.2 缓存特点
- 缓存技术可以将过程RDD数据,持久化保存到内存或者硬盘上
- 但是,这个保存在设定上是认为不安全的。
缓存的数据在设计上是认为有丢失风险的。
所以,缓存有一个特点就是:其保留RDD之间的血缘(依赖)关系
一旦缓存丢失,可以基于血缘关系的记录,重新计算这个RDD的数据
缓存如何丢失:
- 在内存中的缓存是不安全的,比如断电、计算任务内存不足,把缓存清理给计算让路
- 硬盘中因为硬盘损坏也是可能丢失的。
代码演示:31_cache.py
# coding:utf8from pyspark.storagelevel import StorageLevelfrom pyspark import SparkConf, SparkContextimport timeif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd1 = sc.textFile("../data/input/words.txt")rdd2 = rdd1.flatMap(lambda x: x.split(" "))rdd3 = rdd2.map(lambda x: (x, 1))# 给rdd3加缓存# rdd3.cache()rdd3.persist(StorageLevel.MEMORY_AND_DISK_2) # 设置缓存级别rdd4 = rdd3.reduceByKey(lambda a, b: a + b)result = rdd4.collect()print(result)rdd5 = rdd3.groupByKey()rdd6 = rdd5.mapValues(lambda x:sum(x))print(rdd6.collect())# 取消缓存rdd3.unpersist()time.sleep(10000000)
输出结果:
[('hadoop', 1), ('hello', 3), ('spark', 1), ('flink', 1)]
[('hadoop', 1), ('hello', 3), ('spark', 1), ('flink', 1)]
3.2.3 缓存是如何保存的
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EP3ykOn3-1692435339403)(https://cdn.jsdelivr.net/gh/Sql88/BlogImg@main/img/%E7%BC%93%E5%AD%98%E6%98%AF%E5%A6%82%E4%BD%95%E4%BF%9D%E5%AD%98%E7%9A%842.png)]
如图,RDD是将自己分区的数据,每个分区自行将其数据保存在其所在的Executor内存和硬盘上。
这是分散存储
3.3 RDD的CheckPoint
3.3.1 RDD CheckPoint
CheckPoint技术,也是将RDD的数据,保存起来。
但是它仅支持硬盘存储
并且:
- 它被设计认为是安全的
- 不保留
血缘关系
3.3.2 CheckPoint是如何保存数据的
如图:CheckPoint存储RDD数据,是集中收集各个分区数据进行存储
。而缓存是分散存储
3.3.3 缓存和CheckPoint的对比
- CheckPoint不管分区数量多少,风险是一样的,缓存分区越多,风险越高
- CheckPoint支持写入HDFS,缓存不行,HDFS是高可靠存储,CheckPoint被认为是安全的
- CheckPoint不支持内存,缓存可以,缓存如果写内存,性能比CheckPoint要好一些
- CheckPoint因为设计是安全的,所以不保留血缘关系,而缓存因为设计上认为不安全,所以保留
3.3.4 代码
# 设置CheckPoint第一件事情,选择CP的保存路径
# 如果是Local模式,可以支持本地文件系统,如果在集群运行,千万要用HDFS
sc.setCheckpointDir("hdfs://node1:8020/output/bj52ckp")
# 用的时候,直接调用checkPoint算子即可。
rdd.checkpoint()
完整代码演示:32_checkPoint.py
# coding:utf8import jsonfrom pyspark.storagelevel import StorageLevelfrom pyspark import SparkConf, SparkContextimport timeif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)# 1.告知spark,开启checkPoint功能sc.setCheckpointDir("hdfs://Tnode1:8020/output/ckp")rdd1 = sc.textFile("../data/input/words.txt")rdd2 = rdd1.flatMap(lambda x: x.split(" "))rdd3 = rdd2.map(lambda x: (x, 1))# 调用checkPoint API 保存数据即可rdd3.checkpoint()rdd4 = rdd3.reduceByKey(lambda a, b: a + b)result = rdd4.collect()print(result)rdd5 = rdd3.groupByKey()rdd6 = rdd5.mapValues(lambda x:sum(x))print(rdd6.collect())# 取消缓存rdd3.unpersist()time.sleep(10000000)
输出结果:
[('hadoop', 1), ('hello', 3), ('spark', 1), ('flink', 1)]
[('hadoop', 1), ('hello', 3), ('spark', 1), ('flink', 1)]
3.3.5 注意
CheckPoint是一种重量级的使用,也就是RDD的重新计算成本很高的时候,我们采用CheckPoint比较合适。
或者数据量很大,用CheckPoint比较合适。
如果数据量小,或者RDD重新计算是非常快的,用CheckPoint没啥必要
Cache和CheckPoint两个API都不是Action类型
所以,想要它俩工作,必须在后面接上Action
接上Action的目的,是让RDD有数据,而不是为了CheckPoint和Cache工作。
3.3.6 总结
1.Cache和CheckPoint的区别
- Cache是轻量化保存RDD数据,可存储在内存和硬盘,是分散存储,设计上数据是不安全的(保留RDD血缘关系)
- CheckPoint是重量级保存RDD数据,是集中存储,只能存储在硬盘(HDFS)上,设计上是安全的(不保留RDD血缘关系)
2.Cache和CheckPoint的性能对比?
- Cache性能更好,因为是分散存储,各个Executor并行执行,效率高,可以保存到内存中(占内存),更快
- CheckPoint比较慢,因为是集中存储,涉及到网络IO,但是存储到HDFS上更加安全(多副本)
4. Spark案例练习
4.1 搜索引擎日志分析案例
数据格式:
需求:
- 用户搜索的关键词分析
- 用户和关键词组合分析
- 热门搜索时间段分析
案例实现代码:
# coding:utf8from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel
import jiebafrom operator import adddef context_jieba(data):"""通过jieba分词工具 进行分词操作"""seg = jieba.cut_for_search(data)l = []for word in seg:l.append(word)return ldef filter_words(data):"""过滤不要的 谷、帮、客 湖"""return data not in ['谷', '帮', '客', '湖']def append_words(data):"""修订某些关键词的内容"""if data == '传智播': data = '传智播客'if data == '院校': data = '院校帮'if data == '博学': data = '博学谷'if data == '数据': data = '数据湖'return (data, 1)def extract_user_and_word(data):"""传入数据是 元组(1,我喜欢传智播客)"""user_id = data[0]content = data[1]# 对content进行分词words = context_jieba(content)return_list = []for word in words:# 不要忘记过滤 \谷\帮\客\湖if filter_words(word):return_list.append((user_id + '_' + append_words(word)[0], 1))return return_listif __name__ == '__main__':conf = SparkConf().setAppName("SparkDemo2")sc = SparkContext(conf=conf)# 1.读取文件file_rdd = sc.textFile("hdfs://Tnode1/input/SogouQ.txt")# 2. 对数据进行切分 \tsplit_rdd = file_rdd.map(lambda x: x.split("\t"))# 3. 因为要做多个需求,split_rdd 作为基础的rdd 会被多次使用split_rdd.persist(StorageLevel.DISK_ONLY)# TODO:需求1:用户搜索的关键‘词’分析# 主要分析热点词# 将所有的搜索内容取出# print(split_rdd.takeSample(True, 3))context_rdd = split_rdd.map(lambda x: x[2])# 对搜索的内容进行分词分析words_rdd = context_rdd.flatMap(context_jieba)# print(words_rdd.collect())# 异常的数据:# 数据 湖 ——> 数据湖# 院校 帮 ——> 院校帮# 博学 谷 ——> 博学谷# 传智播 客——> 传智播客filtered_rdd = words_rdd.filter(filter_words)# 将关键词转换:传智播 --> 传智播客final_words_rdd = filtered_rdd.map(append_words)# 对单词进行分组、聚合、排序 求出前五名result1 = final_words_rdd.reduceByKey(lambda a, b: a + b). \sortBy(lambda x: x[1], ascending=False, numPartitions=1). \take(5)print("需求1结果:", result1)# TODO:需求2:用户和关键词组合分析# 1,我喜欢传智播客# 1 + 我 1+喜欢 1+传智播客user_content_rdd = split_rdd.map(lambda x: (x[1], x[2]))# 对用户的搜索内容进行分词,分词后和用户ID再次组合user_word_with_one_rdd = user_content_rdd.flatMap(extract_user_and_word)# 对内容进行分组、聚合、排序、求前5result2 = user_word_with_one_rdd.reduceByKey(lambda a, b: a + b). \sortBy(lambda x: x[1], ascending=False, numPartitions=1). \take(5)print("需求2结果:", result2)# TODO:需求3:热门搜索时间段分析# 取出来所有的时间time_rdd = split_rdd.map(lambda x: x[0])# 对时间进行处理,只保留小时精度即可hour_with_one_rdd = time_rdd.map(lambda x: (x.split(":")[0], 1))# 分组、聚合、排序result3 = hour_with_one_rdd.reduceByKey(add). \sortBy(lambda x: x[1], ascending=False, numPartitions=1). \collect()print("需求3结果:", result3)
输出结果:
需求1结果: [('scala', 2310), ('hadoop', 2268), ('博学谷', 2002), ('传智汇', 1918), ('itheima', 1680)]
需求2结果: [('6185822016522959_scala', 2016), ('41641664258866384_博学谷', 1372), ('44801909258572364_hadoop', 1260), ('7044693659960919_仓库', 1120), ('15984948747597305_传智汇', 1120)]
需求3结果: [('20', 3479), ('23', 3087), ('21', 2989), ('22', 2499), ('01', 1365)
4.2 提交到集群运行
# 普通提交
/export/server/spark/bin/spark-submit --master yarn SparkDemo2.py# 压榨集群式提交
# 每个executor吃14g内存,8核cpu,总共3个executor
/export/server/spark/bin/spark-submit --master yarn --executor-memory 14g --executor-cores 8 --num-executors 3 ./SparkDemo2.py
输出结果:
要注意代码中:
- master部分删除
- 读取的文件路径改为hdfs才可以
4.3 作业
代码演示:
# coding:utf8from pyspark import SparkContext, StorageLevel
from pyspark import SparkConfif __name__ == '__main__':conf = SparkConf().setAppName("sparkHomeWork01").setMaster("local[*]")sc = SparkContext(conf=conf)file_rdd = sc.textFile("../../data/input/apache.log")file_rdd.persist(StorageLevel.MEMORY_AND_DISK_2)# 需求1:TODO:计算当前网站访问的PV(被访问次数)visit_Num = file_rdd.count()print("当前网站的被访问次数:", visit_Num) # 14# 需求2:TODO:当前网站访问的用户数userNum = file_rdd.distinct().count()print("当前网站的访问用户数:", userNum) ## 需求3:TODO:有哪些IP访问了本网站?Ip_rdd1 = file_rdd.map(lambda x: x.split(" "))Ip_rdd1.cache()Ip_rdd2 = Ip_rdd1.map(lambda x: x[0]).distinct()# print(IP_rdd2.collect())print("有哪些IP访问了本网站:", Ip_rdd2.collect())# 需求4 TODO:哪个页面访问量最高page_rdd1 = Ip_rdd1.map(lambda x:x[-1])page_rdd2 = page_rdd1.map(lambda x:(x,1))page_rdd3 = page_rdd2.reduceByKey(lambda a,b:a+b)# page = page_rdd3.sortBy(lambda x:x[1],ascending=False,numPartitions=1).take(1)page = page_rdd3.takeOrdered(1,lambda x:-x[1])page = page[0]print(page)print("访问量最高的页面是:",page[0],"共被访问:",page[1],"次")
输出结果:sparkHomeWork01.py
当前网站的被访问次数: 14
当前网站的访问用户数: 9
有哪些IP访问了本网站: ['83.149.9.216', '10.0.0.1', '86.149.9.216']
('/presentations/logstash-monitorama-2013/css/print/paper.css', 13)
访问量最高的页面是: /presentations/logstash-monitorama-2013/css/print/paper.css 共被访问: 13 次
5. 共享变量
5.1 广播变量
5.1.1 问题引出
有如下代码:
上述代码,本地list对象和分布式对象RDD有了关联。如下图:
本地list对象,被发送到每个分区的处理线程上使用,也就是一个executor内,其实存放了2份一样的数据。
executor是进程,进程内资源共享,这2份数据没有必要,造成了内存浪费。
5.1.2 解决方案-广播变量
如果本地list对象标记为广播变量对象
,那么
当上述场景出现的时候,Spark只会:
- 给每个Executor来一份数据,而不像原本那样,每一个分区的处理线程都来一份,节省内存。
如图,使用广播变量后,每个Executor只会收到一份数据集。
内部的各个线程(分区)共享这一份数据集。
使用方式:
# 1. 将本地list 标记成广播变量即可
broadcast = sc.broadcast(stu_info_list)# 2. 使用广播变量,从broadcast对象中取出本地list对象即可
value = broadcast.value# 也就是 先放进去broadcast内部,然后从broadcast内部在取出来用,中间传输的是broadcast这个对象了
# 只要中间传输的是broadcast对象,spark就会留意,只会给每个Executor发一份了,而不是傻傻的哪个分区都要给
代码演示:33_broadcast.py
# coding:utf8import jsonfrom pyspark.storagelevel import StorageLevelfrom pyspark import SparkConf, SparkContextimport timeif __name__ == '__main__':conf = SparkConf().setAppName("33_broadcast.py").setMaster("local[*]")sc = SparkContext(conf=conf)stu_info_list = [(1, '张大仙', 11),(2, '王晓晓', 13),(3, '张甜甜', 11),(4, '王大力', 11)]# 1.将本地Python List对象标记为广播变量broadcast = sc.broadcast(stu_info_list)score_info_rdd = sc.parallelize([(1, '语文', 99),(2, '数学', 99),(3, '英语', 99),(4, '编程', 99),(1, '语文', 99),(2, '编程', 99),(3, '语文', 99),(4, '英语', 99),(1, '语文', 99),(3, '英语', 99),(2, '编程', 99)])def map_func(data):name = ''id = data[0]# 匹配本地list和分布式rdd中的学生ID 匹配成功后 即可获得当前学生的姓名# 2.在使用到本地集合对象的地方,从广播变量中取出来用即可for stu_info in broadcast.value:if stu_info[0] == id:name = stu_info[1]breakreturn (name, data[1], data[2])print(score_info_rdd.map(map_func).collect())"""
广播变量使用场景:本地集合对象和 分布式集合对象(RDD) 进行关联的时候
需要将本地集合对象封装为广播变量
可以节省:
1. 网络IO的次数
2. Executor的内存占用
"""
输出结果:
[('张大仙', '语文', 99), ('王晓晓', '数学', 99), ('张甜甜', '英语', 99), ('王大力', '编程', 99), ('张大仙', '语文', 99), ('王晓晓', '编程', 99), ('张甜甜', '语文', 99), ('王大力', '英语', 99), ('张大仙', '语文', 99), ('张甜甜', '英语', 99), ('王晓晓', '编程', 99)]
5.2 累加器
5.2.1 需求
想要对map
算子计算中的数据,进行数据累加,得到全部数据计算完后的累加结果
5.2.2 没有累加器的代码演示
# coding:utf8
from pyspark import SparkConf, SparkContext# 演示spark的accumulator累加器
if __name__ == '__main__':conf = SparkConf().setAppName("create rdd").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10],2)count = 0def map_func(data):global countcount += 1print(count)rdd.map(map_func).collect()print(count)# 代码中count 最后打印结果是0
输出结果:
1
2
3
4
5
1
2
3
4
5
0
代码的问题在于:
- count来自driver对象,当在分布式的map算子中需要count对象的时候,driver会将count对象发送给每一个executor一份(复制发送)
- 每个executor各自收到一个,在最后执行print(count) 的时候,这个被打印的count依旧是driver那个
- 所以,不管executor中累加到多少,都和driver这个count无关
5.2.3 解决方法-累加器
代码演示:
# coding:utf8import jsonfrom pyspark.storagelevel import StorageLevelfrom pyspark import SparkConf, SparkContextimport timeif __name__ == '__main__':conf = SparkConf().setAppName("33_broadcast.py").setMaster("local[*]")sc = SparkContext(conf=conf)# 10条数据 2个分区rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)# count = 0# Spark 提供的累加器变量,参数是初始值acmlt = sc.accumulator(0)def map_func(data):global acmltacmlt += 1print(acmlt)rdd.map(map_func).collect()# rdd2 = rdd.map(map_func)# rdd2.cache()# rdd2.collect()# rdd3 = rdd2.map(lambda x:x)# rdd3.collect()print(acmlt) # 结果是10"""累加器使用的注意点:某个rdd使用完后,再被新的rdd重新调用,有可能会产生和想象中不一样的结果避免方法:给需要重用的rdd加缓存"""
如上代码,将全部的count
对象,都替换成acmlt
对象即可
这个对象就是累加器对象,构建方式:sc.accumulator(初始值)
即可构建。
这个对象唯一和前面提到的count不同的是,这个对象可以从各个Executor中收集它们的执行结果,作用回自己身上。
输出结果:
1
2
3
4
5
1
2
3
4
5
10
5.2.4 累加器的注意事项
如上代码,第一次rdd2被action后,累加器值是10,然后rdd2就没有了(没数据了)
当rdd3构建出来的时候,是依赖rdd2,rdd2没数据,那么rdd2就要重新生成
重新生成就导致累加器累加数据的代码再次被执行,
所以代码的结果是20
也就是说,使用累加器的时候,要注意,因为rdd是过程数据,如果rdd被多次使用
可能重新构建此rdd
如果累加器累加代码,存在重新构建的步骤中
累加器累加代码就可能被多次执行。
如何解决:加缓存或者CheckPoint即可
5.3 综合案例
5.3.1 需求
对上面的数据执行:
- 正常的单词进行单词计数
- 特殊字符统计出现有多少个
特殊字符定义如下:
abnormal_char = [",",".","!","#","$","%"]
代码演示:
# coding:utf8import jsonfrom pyspark.storagelevel import StorageLevelfrom pyspark import SparkConf, SparkContextimport time
import reif __name__ == '__main__':conf = SparkConf().setAppName("35_demo.py").setMaster("local[*]")sc = SparkContext(conf=conf)# 1.读取数据文件file_rdd = sc.textFile("../data/input/accumulator_broadcast_data.txt")# 特殊字符的List定义abnormal_char = [',', '.', '!', '#', '$', '%']# 2. 特殊字符list 包装成广播变量broadcast = sc.broadcast(abnormal_char)# 3.对特殊字符出现次数做累加,累加使用累加器最好acmlt = sc.accumulator(0)# 4.数据处理,先处理数据的空行,在Python中有内容就是True None就是Falselines_rdd = file_rdd.filter(lambda line: line.strip())# 5.去除前后的空格data_rdd = lines_rdd.map(lambda line: line.strip())# 6.对数据进行切分,按照正则表达式切分,因为空格分隔符某些单词之间是两个或多个空格# 正则表达式 \s+ 表示 不确定多少个空格,最少一个空格words_rdd = data_rdd.flatMap(lambda line: re.split("\s+", line))# 7. 当前words_rdd中有正常单词,也有特殊符号# 现在需要过滤数据,保留正常单词用于单词计数,在过滤的过程中 对特殊符号做计数def filter_func(data):global acmlt# 取出广播变量中存储的特殊符号lsitabnormal_chars = broadcast.valueif data in abnormal_chars:# 表示这个是特殊字符acmlt += 1return Falseelse:return Truenormal_words_rdd = words_rdd.filter(filter_func)# 8. 正常单词的单词计数逻辑result_rdd = normal_words_rdd.map(lambda x: (x, 1)). \reduceByKey(lambda a, b: a + b)print("正常单词计数结果:",result_rdd.collect())print("特殊字符数量:",acmlt)
输出结果:
正常单词计数结果: [('hadoop', 3), ('hive', 6), ('hdfs', 2), ('spark', 11), ('mapreduce', 4), ('sql', 2)]
特殊字符数量: 8
5.4 总结
-
广播变量解决了什么问题?
分布式集合RDD和本地集合进行关联使用的时候,降低内存占用以及减少网络IO传输,提高性能。
-
累加器解决了什么问题
分布式代码执行中,进行全局累加
6.Spark内核调度(重点理解)
6.1 DAG
6.1.1 DAG
Spark的核心是根据RDD来实现的,Spark Scheduler则为Spark核心实现的重要一环,其作用就是任务调度。Spark
的任务调度就是如何组织任务去处理RDD中每个分区的数据,根据RDD的依赖关系构建DAG,基于DAG划分Stage,
将每个Stage中的任务发到指定节点运行。基于Spark的任务调度原理,可以合理规划资源利用,做到尽可能用最少的
资源高效地完成任务计算。
以词频统计WordCount程序为例,DAG图:
DAG:有向无环图(拓扑结构)
有向:有方向
无环:没有闭环
DAG:有方向没有形成闭环的一个执行流程图
比如:
此图,就是一个典型的DAG图。
有方向:RDD1——>RDD2——>…——>collect结束
无闭环:以action(collect) 结束了,没有形成闭环循环
作用:标识代码的逻辑
执行流程
6.1.2 Job和Action
Action:返回值不是RDD的算子
它的作用是一个触发开关,会将action算子之前的一串rdd依赖执行起来
如图,我们前面写的搜索引擎日志分析案例中,前两个需求就是2个action,就产生了2个DAG
结论:
1个Action会产生1个DAG,如果在代码中有3个Action就产生3个DAG
一个Action产生的一个DAG,会在程序运行中产生一个
JOB
所以:
1个ACTION = 1个DAG = 1个JOB
如果一个代码中,写了3个Action,那么这个代码运行起来产生3个JOB,每个JOB有自己的DAG
一个代码运行起来,在Spark中称之为:
Application
层级关系:
1个Application中,可以有多个JOB,每一个JOB内含一个DAG,同时每一个JOB都是由一个Action产生的。
6.1.3 DAG和分区
DAG是Spark代码的逻辑执行图,这个DAG的最终作用是;为了构建物理上的Spark详细执行计划而生。
所以,由于Spark是分布式(多分区)的,那么DAG和分区之间也是有关联的。
rdd1 = sc.textFile()
rdd2 = rdd1.flatMap()
rdd3 = rdd2.map()
rdd4 = rdd3.reduceByKey()
rdd4.action()
假设,全部RDD都是3个分区在执行
如图,就得到了带有分区关系的DAG图
6.2 DAG的宽窄依赖和阶段划分
在Spark RDD前后之间的关系为:
- 窄依赖
- 宽依赖
窄依赖:父RDD的一个分区,全部将数据发送给子RDD的一个分区
宽依赖:父RDD的一个分区,将数据发送给子RDD的多个分区
宽依赖还有一个别名:shuffle
6.2.1 窄依赖
6.2.2 宽依赖
6.2.3 阶段划分
对于Spark来说,会根据DAG,按照宽依赖,划分不同的DAG阶段
划分依据:从后向前,遇到宽依赖就划分出一个阶段,称之为stage
如图,可以看到,在DAG中,基于宽依赖,将DAG划分成了2个stage
在stage的内部,一定都是:窄依赖
6.3 内存迭代计算
如图,基于带有分区的DAG以及阶段划分。可以从图中得到 逻辑上最优的task分配,一个task是一个线程来具体执行
那么如上图,task1中rdd1 rdd2 rdd3的迭代计算,都是由一个task(线程完成),这一阶段的这一条线,是纯内存计算。
如上图,task1 task2 task3,就形成了三个并行的 内存计算管道。
Spark默认受到全局并行度的限制,除了个别算子有特殊分区情况,大部分的算子,都会遵循全局并行度的要求,来规划自己的分区数。
如果全局并行度是3,其实大部分算子分区都是3
注意:Spark ,我们一般推荐只设置全局并行度,不要在算子上设置并行度。
除了一些排序算子外,计算算子就让他默认开分区就可以了
6.3.1 面试题
面试题1:Spark是怎么做内存计算的?DAG的作用?Stage阶段划分的作用?
- Spark会产生DAG图
- DAG图会基于分区和宽窄依赖关系划分阶段
- 一个阶段的内部都是窄依赖,窄依赖内,如果形成前后1:1的分区对应关系,就可以产生许多内存迭代计算的管道
- 这些内存迭代计算的管道,就是一个个具体的执行Task
- 一个Task是一个具体的线程,任务跑在一个线程内,就是走内存计算了。
面试题2:Spark为什么比MapReduce快
- Spark的算子丰富,MapReduce算子匮乏(Map和Reduce),MapReduce这个编程模型,很难在一套MR中处理复杂的任务。很多复杂任务,是需要写多个MapReduce进行串联。多个MR串联通过磁盘交互数据。
- Spark可以执行内存迭代,算子之间形成DAG,基于依赖划分阶段后,在阶段内形成内存迭代管道。但是MapReduce的Map和Reduce之间的交互依旧是通过硬盘来交互的。
总结:
- 编程模型上Spark占优(算子够多)
- 算子交互上,和计算行可以尽量多的内存计算而非磁盘迭代
6.4 Spark并行度
Spark的并行度:在同一时间内,有多少个task在同时运行
并行度:并行能力的设置
比如设置并行度6,其实就是要6个task并行在跑
在有了6个task并行的前提下,rdd的分区就被规划成6个分区了。
6.4.1 如何设置并行度
可以在代码中配置文件中以及提交程序的客户端参数中设置
优先级从高到低:
- 代码中
- 客户端提交参数中
- 配置文件中
- 默认(1,但是不会全部以1来跑,多数时候基于读取文件的分片数量来作为默认并行度)
全局并行度配置的参数:spark.default.parallelism
6.4.2 全局并行度-推荐
配置文件中:
conf/spark-defaults.conf 中设置
spark.default.parallelism 100
在客户端提交参数中:
bin/spark-submit --conf "spark.default.parallelism=100"
在代码中设置:
conf = SparkConf()
conf.set("spark.default.parallelism",100)
全局并行度是推荐设置,不要针对RDD改分区,可能会影响内存迭代管道的构建,或者会产生额外的shuffle
6.4.3 针对RDD的并行度设置-不推荐
只能在代码中写,算子:
- repartition算子
- coalesce算子
- partitionBy算子
6.4.4 集群中如何规划并行度
结论:设置为CPU总核心的2~10倍
比如集群可用CPU核心是100个,我们建议并行度是200~1000
确保是CPU核心的整数倍即可,最小是2倍,最大一般10倍或者更高(适量)均可
为什么要设置最少2倍?
CPU的一个核心同一时间只能干一件事。
所以,在100个核心的情况下,设置100个并行,就能让CPU100%出力。
这种设置下,如果task的压力不均衡,某个task先执行完了,就导致某个CPU核心空闲
所以,我们将Task(并行)分配的数量变多,比如100个并行,同一时间只有100个在运行,700个在等待,
但是可以确保,某个task运行完了,后续有task补上,不让cpu闲下来,最大程度利用集群的资源。
规划并行度,只看集群总CPU核数
6.5 Spark任务调度
Spark的任务,由Driver进行调度,这个工作包含:
- 逻辑DAG产生
- 分区DAG产生
- Task划分
- 将Task分配给Executor并监控其工作
如图,Spark程序的调度流程如图:
- Driver被构建出来
- 构建SparkContext(执行环境入口对象)
- 基于DAG Scheduler(DAG调度器)构建逻辑Task分配
- 基于TaskSchedule(Task调度器)将逻辑Task分配到各个Executor上干活,并监控它们。
- Worker(Executor),被TaskScheduler管理监控,听从它们的指令干活,并定期汇报进度。
1,2,3,4都是Driver的工作
5是Worker的工作
6.5.1 Drivcer内的两个组件
DAG调度器:
工作内容:将逻辑的DAG图进行处理,最终得到逻辑上的Task划分
Task调度器:
工作内容:基于DAG Scheduler的产出,来规划这些逻辑的task,应该在哪些物理的executor上运行,以及监控管理它们的运行。
6.6 拓展-Spark概念名词大全
6.6.1 Spark运行中的概念名词大全
层级关系梳理:
- 一个Spark环境可以运行多个Application
- 一个代码运行起来,会成为一个Application
- Application内部可以有多个Job
- 每个Job由一个Action产生,并且每个Job有自己的DAG执行图
- 一个Job的DAG图会基于宽窄依赖划分成不同的阶段
- 不同阶段内基于分区数量,形成多个并行的内存迭代管道
- 每一个内存迭代管道形成一个Task(DAG调度器划分将Job内划分出具体的task任务,一个Job被划分出来的task在逻辑上称之为这个job的taskset)
6.7 SparkShuffle
6.7.1MR Shuffle回顾
首先回顾MapReduce框架中Shuffle过程,整体流程图如下:
6.7.2 简介
Spark在DAG调度阶段会将一个Job划分为多个Stage,上游Stage做map工作,下游Stage做reduce工作,其本质上
还是MapReduce计算框架。Shuffle是连接map和reduce之间的桥梁,它将map的输出对应到reduce输入中,涉及
到序列化反序列化、跨节点网络IO以及磁盘读写IO等。
Spark的Shuffle分为Write和Read两个阶段,分属于两个不同的Stage,前者是Parent Stage的最后一步,后者是
Child Stage的第一步。
执行Shuffle的主体是Stage中的并发任务,这些任务分ShuffleMapTask和ResultTask两种,ShuffleMapTask要进行
Shuffle,ResultTask负责返回计算结果,一个Job中只有最后的Stage采用ResultTask,其他的均为ShuffleMapTask
。如果要按照map端和reduce端来分析的话,ShuffleMapTask可以即是map端任务,又是reduce端任务,因为
Spark中的Shuffle是可以串行的;ResultTask则只能充当reduce端任务的角色。
Spark在1.1以前的版本一直是采用Hash Shuffle的实现的方式,到1.1版本时参考Hadoop MapReduce的实现开始引
入Sort Shuffle,在1.5版本时开始Tungsten钨丝计划,引入UnSafe Shuffle优化内存及CPU的使用,在1.6中将
Tungsten统一到Sort Shuffle中,实现自我感知选择最佳Shuffle方式,到的2.0版本,Hash Shuffle已被删除,所有
Shuffle方式全部统一到Sort Shuffle一个实现中。
在Spark的中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。ShuffleManager随着
Spark的发展有两种实现的方式,分别为HashShuffleManager和SortShuffleManager,因此spark的Shuffle有Hash Shuffle和Sort
Shuffle两种。
在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个非常严重
的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。
因此在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于
HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但
是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉
取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。
6.7.3 Sort Shuffle bypass机制
bypass运行机制的触发条件如下:
- shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold=200参数的值。
- 不是map combine聚合的shuffle算子(比如reduceByKey有map combie)。
bypass运行机制的触发条件如下:
1)shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold=200参数的值。
2)不是map combine聚合的shuffle算子(比如reduceByKey有map combie)。
- 此时task会为每个reduce端的task都创建一个临时磁盘文件,并将数据按key进行hash,然后根据key的hash值,
将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的
。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
- 该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,
只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的
HashShuffleManager来说,shuffle read的性能会更好。
而该机制与普通SortShuffleManager运行机制的不同在于:
第一,磁盘写机制不同;
第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,
也就节省掉了这部分的性能开销
总结:
- SortShuffle也分为普通机制和bypass机制
- 普通机制在内存数据结构(默认为5M)完成排序,会产生2M个磁盘小文件。
- 而当shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。或者算子不是聚合类的
shuffle算子(比如reduceByKey)的时候会触发SortShuffle的bypass机制,SortShuffle的bypass机制不会进行排序
,极大的提高了其性能。
6.7.4 Shuflle的配置选项
Shuffle阶段划分:
shuffle write:mapper阶段,上一个stage得到最后的结果写出
shuffle read :reduce阶段,下一个stage拉取上一个stage进行合并
spark 的shuffle调优:主要是调整缓冲的大小,拉取次数重试重试次数与等待时间,内存比例分配,是否进行排序操作等等
spark.shuffle.file.buffer
参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小(默认是32K)。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写
到磁盘。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性
能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
spark.reducer.maxSizeInFlight:
参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。(默认48M)
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现
,合理调节该参数,性能会有1%~5%的提升。
spark.shuffle.io.maxRetries and spark.shuffle.io.retryWait:
spark.shuffle.io.maxRetries :shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试
的最大次数。(默认是3次)
spark.shuffle.io.retryWait:该参数代表了每次重试拉取数据的等待间隔。(默认为5s)
调优建议:一般的调优都是将重试次数调高,不调整时间间隔。
spark.shuffle.memoryFraction:
参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作内存比例。
spark.shuffle.manager
参数说明:该参数用于设置shufflemanager的类型(默认为sort).Spark1.5x以后有三个可选项:
Hash:spark1.x版本的默认值,HashShuffleManager
Sort:spark2.x版本的默认值,普通机制,当shuffle read task 的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数,自动开启bypass 机制
spark.shuffle.sort.bypassMergeThreshold
参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作。
调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些
6.8 总结
- DAG是什么有什么用?
DAG是有向无环图,用以描述任务执行流程,主要作用就是协助DAG调度器构建Task分配用以做任务管理
- 内存迭代、阶段划分?
基于DAG的宽窄依赖划分阶段,阶段内部都是窄依赖可以构建内存迭代的管道
- DAG调度器是?
构建Task分配以做任务管理
相关文章:
PySpark-核心编程
2. PySpark——RDD编程入门 文章目录 2. PySpark——RDD编程入门2.1 程序执行入口SparkContext对象2.2 RDD的创建2.2.1 并行化创建2.2.2 获取RDD分区数2.2.3 读取文件创建 2.3 RDD算子2.4 常用Transformation算子2.4.1 map算子2.4.2 flatMap算子2.4.3 reduceByKey算子2.4.4 Wor…...
vue 在IOS移动端中 windon.open 等跳转外部链接后,返回不触发vue生命周期、mounted等相关事件-解决方法
做了一个列表的h5页面,通过点击列表跳转到外部链接,然后返回是回到原来页面状态,类似缓存。发现在ios端返回后,vue 的mounted() 、create()、路由监听等方法都不会执行。在安卓和pc 端都能正常调用。 解决方案:监听pa…...
股票预测和使用LSTM(长期-短期-记忆)的预测
一、说明 准确预测股市走势长期以来一直是投资者和交易员难以实现的目标。虽然多年来出现了无数的策略和模型,但有一种方法最近因其能够捕获历史数据中的复杂模式和依赖关系而获得了显着的关注:长短期记忆(LSTM)。利用深度学习的力…...
Docker搭建个人网盘、私有仓库
1、使用mysql:5.6和 owncloud 镜像,构建一个个人网盘 [rootlocalhost ~]# docker pull mysql:5.6 [rootlocalhost ~]# docker pull owncloud [rootlocalhost ~]# docker run -itd --name mysql --env MYSQL_ROOT_PASSWORD123456 mysql:5.6 [rootlocalhost ~]# doc…...
3种获取OpenStreetMap数据的方法【OSM】
OpenStreetMap 是每个人都可以编辑的世界地图。 这意味着你可以纠正错误、添加新地点,甚至自己为地图做出贡献! 这是一个社区驱动的项目,拥有数百万注册用户。 这是一个社区驱动的项目,旨在在开放许可下向每个人提供所有地理数据。…...
数据处理与统计分析——MySQL与SQL
这里写目录标题 1、初识数据库1.1、什么是数据库1.2、数据库分类1.3、相关概念1.4、MySQL及其安装1.5、基本命令 2、基本命令2.1、操作数据库2.2、数据库的列类型2.3、数据库的字段属性2.4 创建和删除数据库表2.5、数据库存储引擎2.6、修改数据库 3、MySQL数据管理3.1、外键 My…...
OpenCV之特征点匹配
特征点选取 特征点探测方法有goodFeaturesToTrack(),cornerHarris()和SURF()。一般使用goodFeaturesToTrack()就能获得很好的特征点。goodFeaturesToTrack()定义: void goodFeaturesToTrack( InputArray image, OutputArray corners,int maxCorners, double qualit…...
浅谈开关柜绝缘状态检测与故障诊断
贾丽丽 安科瑞电气股份有限公司 上海嘉定 201801 摘要:电力开关柜作为电力系统的关键设备广泛应用于输电配电网络,其运行可靠性直接影响着电力系统供电质量及安全性能。开关柜绝缘状态检测与故障诊断是及时维修、更换和预防绝缘故障的重要技术手段。在阐述开关柜绝…...
Mybatis 动态 SQL
动态 SQL 1. if 标签2. trim 标签3. where 标签4. set 标签5. foreach 标签 1. if 标签 if 标签有很多应用场景, 例如: 在用户进行注册是有些是必填项有些是选填项, 这就会导致前端传入的参数不固定如果还是将参数写死就很难处理, 这时就可以使用 if 标签进行判断 <insert …...
Android studio之 build.gradle配置
在使用Android studio创建项目会出现两个build.gradle: 一. Project项目级别的build.gradle (1)、buildscript{}闭包里是gradle脚本执行所需依赖,分别是对应的maven库和插件。 闭包下包含: 1、repositories闭包 2、d…...
【ElasticSearch】一键安装IK分词器无需其他操作
要注意的时下面命令中的es是我容器的名称,要换成你对应的es容器名 docker exec -it es /bin/bash # 进入容器 ./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis- ik/releases/download/v7.12.1/elasticsearch-analysis-ik-7.1…...
在Ubuntu上启动一个简单的用户登录接口服务
一个简单的用户登录接口 我使用 Python 和 Flask 框架来创建这个接口 首先,确保你已经安装了 Python 和 Flask。如果没有安装,可以通过以下命令在 Ubuntu 上安装: sudo apt update sudo apt install python3 python3-pip pip3 install Fla…...
【PHP】函数-作用域可变函数匿名函数闭包常用系统函数
文章目录 函数定义&使用命名规则参数种类默认值引用传递函数返回值return关键字 作用域global关键字静态变量 可变函数匿名函数闭包常用系统函数输出函数时间函数数学函数与函数相关函数 函数 函数:function,是一种语法结构,将实现某一个…...
Python使用pymysql和sqlalchemy访问MySQL的区别
Python使用pymysql和sqlalchemy访问MySQL的区别 1. 两个数据库连接工具的对比 pymysql和sqlalchemy是两个Python中经常用于与MySQL数据库交互的库。都可以连接MySQL数据库,但它们有明显的区别。 (1)特点 pymysql是一个Python模块…...
ubuntu服务器的mysql,更改root密码,并允许远程连接
我只是一个搬运工 更改密码远程连接...
微信小程序【构建npm】使用记录
:: 问题 使用原生微信小程序开发时,通过官方 typescript 模板构建的小程序无法正确执行 构建npm 成功,从而导致我想通过 npm 安装并使用第三方库出现问题 :: 开发环境(可参照) 设备:macOS Ventura 13.0 微信开发者工…...
mybatis入门的环境搭建及快速完成CRUD(增删改查)
又是爱代码的一天 一、MyBatis的介绍 ( 1 ) 背景 MyBatis 的背景可以追溯到 2002 年,当时 Clinton Begin 开发了一个名为 iBATIS 的持久化框架。iBATIS 的目标是简化 JDBC 编程,提供一种更直观、易用的方式来处理数据库操作。 在传统的 JDBC 编程中&…...
《HeadFirst设计模式(第二版)》第九章代码——组合模式
上一章链接: 《HeadFirst设计模式(第二版)》第九章代码——迭代器模式_轩下小酌的博客-CSDN博客 前面说到,当一个菜单里面出现了子菜单的时候,前面的迭代器模式得换成组合模式。 组合模式: 允许将对象组合成树形结构来表现部分-整…...
iOS17 widget Content margin
iOS17小组件有4个新的地方可以放置分别是:Mac桌面、iPad锁屏界面、 iPhone Standby模式、watch的smart stack Transition to content margins iOS17中苹果为widget新增了Content margin, 使widget的内容能够距离边缘有一定的间隙,确保内容显示完整。这…...
计网第四章(网络层)(一)
前面学习了数据链路层,我们可以实现一个网络的内部通信,可是要把这些网络互连起来形成更大的互连网,就需要用网络层互联设备路由器。而有了路由器的参与,就有不同网络、跨网络的概念诞生。 这时候我想大家也能理解为什么叫网络层…...
【前端】vue3 接入antdv表单校验
1/🍕背景 1、表单校验是非常常见的需求,能够有效的拦截大部分的错误数据,提升效率。 2、快速的给使用者提示和反馈,用户体验感非常好。 3、成熟的表单校验框架,开发效率高,bug少。 最近使用的是vue3antdv的…...
CY3-COOH在蛋白质定位的特点1251915-29-3星戈瑞
欢迎来到星戈瑞荧光stargraydye!小编带您盘点: CY3-COOH是一种橙红色荧光标记试剂,可以用于蛋白质定位研究。**以下是CY3-COOH在蛋白质定位的特点和应用: 细胞定位:**将CY3-COOH标记到特定蛋白质上,可以…...
数据采集:selenium 获取某网站CDN 商家排名信息
写在前面 工作中遇到,简单整理理解不足小伙伴帮忙指正 对每个人而言,真正的职责只有一个:找到自我。然后在心中坚守其一生,全心全意,永不停息。所有其它的路都是不完整的,是人的逃避方式,是对大…...
5.从头跑一个pipeline
1.安装torch pip install torchvision torch PyTorch的torchvision.models模块中自带的很多预定义模型。torchvision 是PyTorch的一个官方库,专门用于处理计算机视觉任务。在这个库中,可以找到许多常用的卷积神经网络模型,包括ResNet、VGG、…...
leetcode原题: 堆箱子(动态规划实现)
题目: 给你一堆n个箱子,箱子宽 wi、深 di、高 hi。箱子不能翻转,将箱子堆起来时,下面箱子的宽度、高度和深度必须大于上面的箱子。实现一种方法,搭出最高的一堆箱子。箱堆的高度为每个箱子高度的总和。 输入使用数组…...
Java中数组和集合的对比,以及什么情况下使用数组更合适,什么情况下使用集合更合适。集合的基本介绍和集合体系图。
在Java中,数组和集合(Java集合框架)都用于存储多个元素。它们各自有不同的特点和适用场景。下面我会对数组和集合进行对比,并解释何时使用集合更好,以及何时使用数组更合适。 数组和集合的对比: 数组&…...
STM32之17.PWM脉冲宽度调制
一LED0脉冲宽度调制在TIM14_CHI,先将LED(PF9)代码配置为AF推挽输出模式,将PF9引脚连接到TIM14, #include <stm32f4xx.h>static GPIO_InitTypeDef GPIO_InitStruct;void Led_init(void) {//打开端口F的硬件时钟&a…...
VS2015打开Qt的pro项目文件 报错
QT报错:Project ERROR: msvc-version.conf loaded but QMAKE_MSC_VER isn‘t set 解决方法: 找到本机安装的QT路径,找到“msvc-version.conf”文件,用记事本打开, 在其中添加版本“QMAKE_MSC_VER 1900”保存即可。 …...
骨传导耳机会头疼吗?骨传导耳机会对身体不好吗
一般情况下,骨传导耳机不会引起头疼。由于骨传导耳机的工作原理是通过将声音传导到颞骨和耳部周围的骨骼来传达音频信号,而不是直接进入耳道,因此不会对耳朵造成压力或产生耳疼的感觉。 然而,每个人的感受和体验可能不同ÿ…...
【面试题系列】(一)
Redis有哪些数据结构?其底层是怎么实现的? Redis 系列(一):深入了解 Redis 数据类型和底层数据结构 字符串(String): 用于存储文本或二进制数据。可以执行字符串的基本操作…...
网站买源代码/seo顾问是什么职业
本文实例讲述了JavaScript类继承及实例化的方法。分享给大家供大家参考。具体如下:(function(){var Class {//扩展类create: function(aBaseClass, aClassDefine){var $class function(){for(var member in aClassDefine){this[member] aClassDefine[member];}if…...
南京建设网站内容/网站软件免费下载
bzip2使用Burrows-Wheeler块排序文本压缩算法,将文件进行压缩,压缩比率比一般算法高一些。bzip2要求命令行标志附带一个文件名列表。每个文件都被自己的压缩版本替换,名称为“original_name.bz2”。每个压缩文件与相应的原始文件具有相同的修…...
网站开发增值税税率6%/南京百度推广开户
1.水平居中1.1行内元素的水平居中 /*行内元素水平居中*/ #body-div{ text-align:center; } 1.2块级元素的水平居中 /*块级元素水平居中*/ #body-div{ margin:0 auto; } 1.3多个块级元素水平居中 /*多个块级元素水平居中*/ #body-div{ text-align:center; } ##body-div-containe…...
日文网站模板/域名收录查询
1.全局安装express框架,cmd打开命令行,输入如下命令: npm install -g express express 4.x版本中将命令工具分出来,安装一个命令工具,执行命令: npm install -g express-generator 输入express --version验证 2.如果在执行js文件仍报Error: Cannot find module express错误。 解…...
微商城官网登陆入口/seo分析与优化实训心得
学习过程参考(后续章节同) 【公开课】数据库系统概论(王珊老师)(完结) 《数据库系统概论》思维导图 【专栏必读】数据库系统概论第五版(王珊)专栏学习笔记目录导航及课后习题答案详…...
建设网站现在免费吗/杭州企业seo
还有不到一周便是春节,在外打拼了一年的人们也早已失去了上班的动力,迫不及待的想赶紧回家过年。但是也有一部分年轻人对过年充满着恐惧和焦虑。今天小编就为大家来盘点一下当代的年轻人过年究竟有哪些烦恼,以及面对这些烦恼的一些相应的“对…...