【4-5章】Spark编程基础(Python版)
课程资源:(林子雨)Spark编程基础(Python版)_哔哩哔哩_bilibili
第4章 RDD编程(21节)
Spark生态系统:
- Spark Core:底层核心(RDD编程是针对这个)
- Spark SQL:SQL查询
- Spark Streaming:流计算(Structured Streaming:结构化数据流)
- Spark MLlib:机器学习
RDD编程:对RDD进行一次又一次的转换操作
(一)RDD编程基础
1、创建
两种方式:
- 从文件系统中加载数据创建RDD:分布式文件系统hdfs 或 本地文件系统 或 云端文件如Amazon S3(Amazon云端存储服务)
- 通过并行集合(数组)创建RDD:对集合进行并行化
(1)从文件系统中加载数据:Spark的SparkContext通过 sc.textFile() 读取数据,生成内存中的RDD
Driver节点为指挥所;SparkContext对象为指挥官
# 从本地文件系统中加载数据创建RDD
# sc即driver节点里的SparkContext对象
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") # 本地文件是///
lines.foreach(print) # 遍历RDD每个元素并输出
SparkContext在独立应用程序(即代码文件)里需要编写代码生成;但在pyspark交互式执行环境里,系统会默认创建sc,不需再人为创建
(2)从分布式文件系统HDFS中加载数据:
# 三条语句完全等价,可以使用其中任一种
lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt") # hdfs://,localhost为主机名,9000为端口号(系统默认去当前登录Ubuntu系统的用户在HDFS中所对应的用户主目录去找)
lines = sc.textFile("/user/hadoop/word.txt") # 用户主目录,Linux系统默认/home/用户名,简写为/~
lines = sc.textFile("word.txt") # txt文件放在当前用户主目录下
- Linux系统主目录默认为/home/用户名,简写为~
- Hadoop文件系统默认为当前登录Linux系统的用户,hdfs://localhost:9000/user/用户名/(全称路径),可以只写为 /user/用户名/
(3)通过并行集合(数组/列表)创建RDD:SparkContext 的 sc.parallelize() 方法,可以对array并行化,生成内存中的RDD
array = [1,2,3,4,5]
rdd = sc.parallelize(array)
rdd.foreach(print) # 遍历取出打印
# 结果:
# 1
# 2
# 3
# 4
# 5
2、基本操作
RDD操作:
- 转换操作(Transformation):
- filter(过滤)
- map(一对一映射)
- flatMap(输出0~多个结果)
- groupByKey(Key相同的分组)
- reduceByKey(根据Key分组后对分组的值计算)
- 行动操作(Action):
- count(数据集中元素个数)
- collect(以列表形式返回数据集中所有元素)
- first(数据集中第一个元素)
- take(n)(以列表形式返回数据集中前n个元素)
- reduce(func)(聚合数据集中的元素)
- foreach(func)(将数据集中每个元素传到func运行)
- 惰性机制:转换操作只记录轨迹,行动操作才真正执行计算
RDD转换 <=> 业务逻辑完成一次又一次转换(形成DAG有向无环图,Spark即解析DAG,生成很多个Stage,每个阶段的子任务提交到不同工作节点的线程去运行)。很多简单的转换组合后可实现复杂的业务逻辑
- 对于RDD而言,每一次转换操作都会产生不同的RDD,供一个操作使用(RDD是只读的,一旦生成无法修改,只有在转换的过程中才能修改,生成新的RDD后又无法修改)
- 转换得到的RDD是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作(动作类型操作Action)时才会发生真正的计算,从血缘关系的源头开始进行从头到尾的计算操作
(1)转换操作
- filter(func)
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") # 从底层文本文件加载生成内存中RDD
linesWithSpark = lines.filter(lambda line:"Spark" in line) # 匿名函数/lambda表达式
linesWithSpark.foreach(print) # 输出包含Spark的行
- map(func)
data = [1,2,3,4,5]
rdd1 = sc.parallelize(data) # 得到一个RDD
rdd2 = rdd1.map(lambda x:x+10)
rdd2.foreach(print)
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
words = lines.map(lambda line:line.split(" ")) # 一行语句被拆分后得到的是list
words.foreach(print) # 包含3个元素,每个元素都是list
- flatMap(func)
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
words = lines.flatMap(lambda line:line.split(" "))
words.foreach(print) # 包含9个元素(英文单词)
词频统计就是用 flatMap 将一行行语句打散成一个个单词
- groupByKey()
应用于key-value键值对数据集,返回(key, iterable),即把key相同的值封装成一个可迭代对象
# 并行化方式生成列表封装的数据集,列表里的元素为键值对
words = sc.parallelize([("Hadoop",1), ("is",1), ("good",1), ("Spark",1), ("is",1)])
words1 = words.groupByKey() # 相同key的值会归并起来,pyspark.resultiterable.ResultIterable object封装
words1.foreach(print)
- reduceByKey(func)
在groupByKey基础上对value list按照func进行计算
words = sc.parallelize([("Hadoop",1), ("is",1), ("good",1), ("Spark",1), ("is",1)])
words1 = words.reduceByKey(lambda a,b:a+b)
words1.foreach(print) # 如('is',2) 即 is出现2次
如若("is",(1,1,1)),第一个1赋给a,第二个1赋给b,求和得到2赋给a,第三个1赋给b,求和得到3,返回最终结果
(2)行动操作
转换类型操作为惰性机制,并不真正发生计算,只记录轨迹;当遇到第一个行动类型操作时执行真正的计算(从底层磁盘加载数据、生成数据、转换、得到结果)
- count():返回数据集中的元素个数
- collect():以数组/列表的形式返回数据集中的所有元素(封装在列表里返回)
- first():返回数据集中的第一个元素
- take(n):以数组/列表的形式返回数据集中的前n个元素
- reduce(func):通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
- foreach(func):将数据集中的每个元素传递到函数func中运行
rdd.reduce(lambda a,b:a+b):
惰性机制:
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") # 转换操作,只是记录轨迹,并未真正加载
lineLengths = lines.map(lambda s:len(s)) # 转换操作,只是记录轨迹,并未真正加载
totalLength = lineLengths.reduce(lambda a,b:a+b) # 行动操作,真正执行从头到尾计算
print(totalLength) # 每行长度相加,得到总长度
3、持久化
多次反复访问同样一组值,不做持久化的话,每次访问都需要重新生成,非常耗时(对于迭代计算而言代价很大,经常需要多次重复使用同一组数据)。持久化将其保存到内存中,下次使用时不需要从头计算
可以通过持久化(缓存)机制避免这种重复计算的开销。持久化后的RDD将会被保留在计算节点的内存中被后面的行动操作重复利用
可以使用 persist() 方法对一个RDD标记为持久化(之所以说标记为持久化,是因为出现 persist() 语句的地方,并不会马上计算生成RDD并把它持久化,而是要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化)
参数:
- MEMORY_ONLY(只存在内存中):把RDD作为反序列化的对象存在JVM中,若内存不足,就按先进先出原则替换内容 RDD.cache() = RDD.persist(MEMORY_ONLY)
- MEMORY_AND_DISK(同时保存内存和磁盘):优先保存在内存中,内存不足的部分再存放磁盘
.unpersist() 方法手动把持久化的RDD从缓存中移除
4、分区
RDD就是弹性分布式数据集,可以在计算过程中不断动态调整分区个数
(1)好处:增加并行度(可以在多个节点上同时发生计算);减少通信开销
- 增加并行度:
- 减少通信开销:
- userData表:大表,成千上百万用户,包括userID 和 userInfo(数据分块保存在不同机器上,每个块的用户ID都散布在0-1000万之间)
- Events表:小表,包括userID 和 LinkInfo,记录用户在过去五分钟内所访问的网站链接
连接两表,让j1负责连接0-100w的用户id,以此类推
分区后只涉及events表多次的数据分发
(2)分区原则:分区个数 = 集群中CPU核心数目
对于Spark不同部署模式(Local模式、Standalone模式、YARN模式、Mesos模式)而言,通过设置具体参数值(spark.default.parallelism)指定默认的分区数目
- Local模式:默认为本地机器的CPU数目,若设置了Local[N],则默认为N
- Apache Mesos模式:默认分区数为8
- Standalone模式:集群中所有CPU核心数目总和 和 2 中取较大值
- YARN模式:集群中所有CPU核心数目总和 和 2 中取较大值
(3)指定分区个数:在调用 textFile() 和 parallelize() 方法时指定分区个数
sc.textFile(path, partitionNum)
# path指定要加载的文件的地址
# partitionNum用于指定分区个数list = [1,2,3,4,5]
rdd = sc.parallelize(list,2) # 设置两个分区
(4)使用repartition()方法重新设置分区个数:通过转换操作得到新RDD时,直接调用repartition方法即可
(5)自定义分区方法:
- 哈希分区 HashPartitioner
- 区域分区 RangePartitioner
- 自定义分区
from pyspark import SparkConf, SparkContextdef MyPatitioner(key):print("MyPatitioner is running")print("The key is %d" %key)return key%10 # 作为分区编号返回def main():print("The main function is running")conf = SparkConf().setMaster("local").setAppName("MyApp")sc = SparkContext(conf=conf)data = sc.parallelize(range(10),5) # 分成5个分区data.map(lambda x:(x,1)) \ # 键值对.partitionBy(10, Mypartitioner) \ # 根据key分区。.partitionBy只接受键值对类型.map(lambda x:x[0]) \ # 再从键值对转回原格式.saveAsTextFile("file:///usr/local/spark/mycode/rdd/partitioner") # 写入10个分区,每个分区各1个文件if __name__ == '__main__':main()
.partitionBy只接受键值对类型
使用如下命令运行 TestPartitioner.py:
cd /usr/local/spark/mycode/rdd
python3 TestPartitioner.py
或者,使用如下命令运行 TestPartitioner.py:
cd /usr/local/spark/mycode/rdd
/usr/local/spark/bin/spark-submit TestPartitioner.py
5、基本实例(词频统计)
再次强调:
本地文件是三个/,即file:///
hdfs文件是两个/,即hdfs://
在一个集群中同时部署Hadoop和Spark,把集群中某个节点既作为HDFS的存储节点,也作为Spark的WorkerNode,即Spark的计算组件和HDFS的存储组件放在同一台机器上
分布式词频统计:
(二)键值对RDD
键值对RDD:RDD的每个元素都是一个键值对 (key, value)
1、创建
(1)从文件中加载
(2)通过并行集合(列表)创建
2、常用的键值对RDD转换操作
- reduceByKey(func):使用func函数合并具有相同键的值
- groupByKey():对具有相同键的值进行分组(ResultIterable对象封装)
总结:groupByKey是对每个key进行操作,但只生成一个sequence,本身不能自定义函数,需要先用groupByKey生成RDD,然后才能对此RDD通过map进行自定义函数操作;reduceByKey用于对每个key对应的多个value进行merge操作,能在本地先进行merge操作,且merge操作可以通过函数自定义
- keys:把Pair RDD中的key返回形成一个新的RDD
- values:把Pair RDD中的value返回形成一个新的RDD
- sortByKey():返回一个根据键排序的RDD,默认True(升序)
- mapValues(func):对键值对RDD中的每个value都应用一个函数,key不会发生变化
- join:内连接。对于给定的两个输入数据集 (K, V1) 和 (K, V2),只有在两个数据集中都存在的key才会被输出,最终得到一个 (K, (V1,V2)) 类型的数据集
- combineByKey
(1)reduceByKey(func):先把key相同的值归并起来,再对值列表用func进行聚合计算
(2)groupBykey()
对比 reduceByKey 和 groupByKey:
一个 reduceByKey 等价于 一个 groupByKey + map
(3)keys()
(4)values()
(5)sortByKey()
- sortByKey() 和 sortBy() 的区别:sortBy()可以根据值进行排序,而sortByKey() 只能根据键
(6)mapValues(func):key不变,value用func(lambda表达式)进行计算
3、综合实例
给定一组键值对,key为图书名称,value为某天图书销量。计算每种图书的每天平均销量
reduceByKey():
mapValues():
(三)数据读写
1、文件数据读写
(1)本地文件系统数据读写:
- 注意:Spark采用惰性执行机制,即使输入了错误的语句,Spark-Shell也不会马上报错
把RDD写入到文本文件中 .saveAsTextFile(给出的是目录,而不是具体文件,因为存在分区的概念):
再次把数据加载到RDD中,也要写目录,而不是文件:
(2)分布式文件系统HDFS数据读写:
把RDD中的数据保存到HDFS文件中(路径是目录,而不是具体文件):
再次强调,本地文件是file:///开头,而分布式文件是hdfs://开头
2、读写HBase数据
(1)HBase简介
HBase(分布式数据库)是Google BigTable的开源实现,也是Hadoop的成员组件,构建在Hadoop分布式文件系统HDFS基础上。即HBase的数据是保存在底层HDFS中的
特性:
- 每个值是一个未经解释的字符串,没有数据类型
- 用户在表中存储数据,每一行都有一个可排序的行键和任意多的列
- 表在水平方向由一个或者多个列族组成,一个列族中可以包含任意多个列,同一个列族里面的数据存储在一起
- 列族支持动态扩展,可以很轻松地添加一个列族或列,无需预先定义列的数量以及类型,所有列均以字符串形式存储,用户需要自行进行数据类型转换
- HBase中执行更新操作时,并不会删除数据旧的版本,而是生成一个新的版本,旧版本仍然保留(这是和HDFS只允许追加、不允许修改的特性相关的)
概念:
- 表:HBase采用表来组织数据,表由行和列组成,列划分为若干个列族
- 行:每个HBase表都由若干行组成,每个行由行键(row key)来标识
- 列族:一个HBase表被分组成许多列族(Column Family)的集合,它是基本的访问控制单元
- 列限定符:列族里的数据通过列限定符(或列)来定位
- 时间戳:每个单元格都保存着同一份数据的多个版本,这些版本采用时间戳进行索引
- 单元格:在HBase表中,通过行、列族和列限定符确定一个单元格(cell),单元格中存储的数据没有数据类型,总被视为字节数组byte[]
HBase - 稀疏、多维度、排序、映射表
- 相关信息全部汇总在一个表里,不需进行多表连接操作,有利于在大数据时代获得高的数据读写性能
- 一个表分为若干行和若干列族,一个列族又包含很多列/列限定符,每个行由行键(row key)来标识
- 行键和列限定符交叉的位置叫单元格,值是以一个单元格的形式写入的(关系型数据库是一行行存储写入的)
- 每个单元格的值可能会发生变化,但HBase底层存储是基于HDFS(只读,不可修改),通过将指针指向新版本数据间接实现修改
四维坐标定位:行键-列族-列限定符-版本时间戳
- 关系数据库是二维定位,给出行、列即可唯一确定一个单元格值
HBase概念视图:列族contents、列限定符html
HBase物理视图:底层为行键+列族+时间戳
保存机制:水平分区+切分成很多列族(分布式存储)
(2)创建HBase表
启动HBase(底层存储基础为HDFS,故要先启动Hadoop):
# 启动Hadoop
cd /usr/local/hadoop # 进入Hadoop安装目录
./sbin/start-all.sh # 启动Hadoop
# 或start-dfs.sh(启动hdfs)# 启动HBase
cd /usr/local/hbase # 进入HBase安装目录
./bin/start-hbase.sh # 启动HBase
./bin/hbase shell # 启动HBase Shell
创建student表(确保数据库里不存在student表):
disable 'student'
drop 'student'create 'student', 'info' # 表、列族# 录入student表第一个学生记录
put 'student', '1', 'info:name', 'Xueqian'
put 'student', '1', 'info:gender', 'F'
put 'student', '1', 'info:age', '23'# 录入student表第二个学生记录
put 'student', '2', 'info:name', 'Weiliang'
put 'student', '2', 'info:gender', 'M'
put 'student', '2', 'info:age', '24'
- 关系型数据库插入数据的方式为 insert into... values...(一次插入一行数据)
(3)配置Spark
把程序运行过程中所需jar包(lib目录下)拷贝到Spark安装目录下(jars目录),需要拷贝:
- 所有以hbase开头的jar包
- guava-12.0.1.jar
- htrace-core-3.1.0-incubating.jar
- protobuf-java-2.5.0.jar
cd /usr/local/spark/jars
mkdir hbase
cd hbase
cp /usr/local/hbase/lib/hbase*.jar ./
cp /usr/local/hbase/lib/guava-12.0.1.jar ./
cp /usr/local/hbase/lib/htrace-core-3.1.0-incubating.jar ./
cp /usr/local/hbase/lib/protobuf-java-2.5.0.jar ./
此外,在Spark2.0以上版本中,缺少把HBase数据转换成Python可读取数据的jar包,需要另行下载。可以访问下面地址下载spark-examples_2.11-1.6.0-typesafe-001.jar。下载以后保存到 /usr/local/spark/jars/hbase/ 目录中https://mvnrepository.com/artifact/org.apache.spark/spark-examples_2.11/1.6.0-typesafe-001
使用vim编辑器打开spark-env.sh文件,设置Spark的spark-env.sh文件,告诉Spark可以在哪个路径下找到HBase相关的jar文件,命令如下:
cd /usr/local/spark/conf
vim spark-env.sh
打开spark-env.sh文件后,可以在文件最前面增加下面一行内容,这样后面编译和运行过程才不会出错:
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath):$(/usr/local/hbase/bin/hbase classpath):/usr/local/spark/jars/hbase/*
(4)编写程序读取HBase数据
Spark读取HBase:用SparkContext提供的 newAPIHadoopRDD API将表的内容以RDD的形式加载到Spark中
SparkOperateHBase.py(从HBase读取数据、生成字符串格式并打印到屏幕上):
#!/usr/bin/env python3from pyspark import SparkConf, SparkContextconf = SparkConf().setMaster('local').setAppName("ReadHBase") # 设置连接方式为本地模式,应用名称为ReadHBase
sc = SparkContext(conf = conf) # 生成SparkContext对象
host = 'localhost' # ZooKeeper服务器地址(分布式协调一致性作用)
table = 'student' # 表名
conf = {"hbase.zookeeper.quorum": host,"hbase.mapreduce.inputtable": table} # 配置ZooKeeper服务器地址、当前读取的输入表
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter" # 键转换器,把key从HBase格式转换成字符串格式
valueConv="org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd=sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf) # 读取表的格式、从HBase读取的key的类型、从HBase读取的value的类型、指定key的转换类、指定value的转换类、配置信息
count=hbase_rdd.count() # 有多少个行键(键值对)
hbase_rdd.cache() # 缓存
output=hbase_rdd.collect() # 封装在一个列表中返回
for (k,v) in output:print(k,v)
执行该代码文件:
cd /usr/local/spark/mycode/rdd
/usr/local/spark/bin/spark-submit SparkOperateHBase.py
执行结果:
(5)编写程序向HBase写入数据
把表中的两个学生信息插入到HBase的student表中:
SparkWriteHBase.py:
#!/usr/bin/env python3from pyspark import SparkConf,SparkContextconf = SparkConf().setMaster('local').setAppName("WriteHBase")
sc = SparkContext(conf = conf)
host = 'localhost'
table = 'student'
keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter" # key转换器,String类型转换成内部格式
valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter" # value转换器,字符串列表转换成Put单元格
conf = {"hbase.zookeeper.quorum": host,"hbase.mapred.outputtable": table,"mapreduce.outputformat.class":"org.apache.hadoop.hbase.mapreduce.TableOutputFormat","mapreduce.job.output.key.class":"org.apache.hadoop.hbase.io.ImmutableBytesWritable","mapreduce.job.output.value.class":"org.apache.hadoop.io.Writable"}rawData=['3,info,name,Rongcheng','3,info,gender,M','3,info,age,26','4,info,name,Guanhua','4,info,gender,M','4,info,age,27']
# 首先将6个字符串加载到内存生成RDD,再写入HBase
sc.parallelize(rawData).map(lambda x: (x[0],x.split(','))).saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv) # x[0]为行键,map后6个字符串变为6个键值对(key,value),key为行键,value为字符串列表(即单元格值)
- StringList:[行键,列族,列,值]
执行:
cd /usr/local/spark/mycode/rdd
/usr/local/spark/bin/spark-submit SparkWriteHBase.py
去HBase Shell查看写入结果:scan 'student'
(四)综合案例
1、求TOP值
对一个目录下的所有文件的某字段排序,取top5(topN.py)
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("ReadHBase")
sc = SparkContext(conf = conf)
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/file")
# line.strip() 去掉字符串后面的空格(去掉空行)
# split后==4,即去掉缺失字段的行
result1 = lines.filter(lambda line:(len(line.strip()) > 0) and (len(line.split(",")) == 4))
result2 = result1.map(lambda x:x.split(",")[2]) # 取出payment
result3 = result2.map(lambda x:(int(x),"")) # 转换为(key, value),为了后面排序
result4 = result3.repartition(1) # 为了保证全局有序,否则可能分区有序,但全局无序
result5 = result4.sortByKey(False) # sortByKey()必须根据key来排,即输入是(key,value)。False为降序
result6 = result5.map(lambda x:x[0]) # 去掉value的""
result7 = result6.take(5) # 取出前五名
for a in result7:print(a)
过程解析:
参考:Spark求TOP值_11号的乔乔的博客-CSDN博客
2、文件排序
读取文件中所有整数并进行排序(FileSort.py)
#!/usr/bin/env python3
from pyspark import SparkContext, SparkConfindex = 0
def getindex(): # 获取全局排序,依次递增global indexindex += 1return indexdef main():conf = SparkConf().setMaster('local[1]').setAppName('FileSort')sc = SparkContext(conf=conf)lines = sc.textFile("file:///usr/local/spark/mycode/rdd/filesort/file*.txt") # 把某个目录下所有文件加载进来生成RDDresult1 = lines.filter(lambda line:(len(line.strip())>0)) # 消除空行result2 = result1.map(lambda x:(int(x.strip()),"") # 生成(key,value)result3 = result2.repatition(1) # 1个分区确保全局有序result4 = result3.sortByKey(True)result5 = result4.map(lambda x:x[0])result6 result5.map(lambda x:(getindex(),x))result6.saveAsTextFile("file:///usr/local/spark/mycode/rdd/filesort/sortresult")
sortByKey() 的输入必须是 (key, value)
过程解析:
3、二次排序
先根据第一列降序排序,第一列值相等再根据第二列降序排序(SecondarySortKey.py)
若就把字符串构建(key,value) 输入sortBykey(),是根据字母的升序排序。故本题必须生成一个可比较的key,即下图中的 SecondarySortKey(5,3)
实现思路:
- 按照Ordered 和 Serializable 接口实现自定义排序的key(人工定义类SecondarySortKey)
- 将要进行二次排序的文件加载进来生成 <key, value> 类型的RDD,key即人工定义的用于排序的SecondarySortKey,值即文本中的一行
- 使用sortByKey基于自定义的key进行二次排序
- 去除掉排序的key只保留排序的结果
from operator import gt
from pyspark import SparkContext, SparkConfclass SecondarySortKey():def __init__(self, k): # __init__为构造函数,k为传入参数,格式为(key,value)self.column1 = k[0]self.column2 = k[1]def __gt__(self, other): # 重写比较函数if other.column1 == self.column1:return gt(self.column2,other.column2) # 若第1列相等,比较第2列else:return gt(self.column1, other.column1)def main():conf = SparkConf().setAppName('spark_sort').setMaster('local[1]')sc = SparkContext(conf=conf)file="file:///usr/local/spark/mycode/rdd/secondarysort/file4.txt"rdd1 = sc.textFile(file)rdd2=rdd1.filter(lambda x:(len(x.strip()) > 0)) # 去除空行rdd3=rdd2.map(lambda x:((int(x.split(" ")[0]),int(x.split(" ")[1])),x)) # x为字符串rdd4=rdd3.map(lambda x: (SecondarySortKey(x[0]),x[1]))rdd5=rdd4.sortByKey(False)rdd6=rdd5.map(lambda x:x[1]) # 去掉左边的可排序key,只保留右边的字符串rdd6.foreach(print)if __name__ == '__main__':main()
过程解析:
第5章 Spark SQL(9节)
(一)Spark SQL简介
1、Hive:SQL-on-Hadoop
Hive:SQL-on-Hadoop(Hadoop平台上提供了SQL查询的能力,在Hadoop平台上构建数据仓库,把SQL语句转换成底层MapReduce程序,对底层HDFS数据进行查询分析)
- Hive本身不存储数据,借助底层HDFS存储数据
- Hive可以看成是编程接口,把SQL语句转换成MapReduce作业
2、Shark:Hive on Spark
Shark即Hive on Spark。为了实现和Hive兼容,Shark在HiveQL方面重用了Hive中HiveQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MapReduce作业替换成了Spark作业,通过Hive的HiveQL解析,把HiveQL翻译成Spark上的RDD操作
- SQL-on-Spark性能比Hive有了10-100倍的提高
- Shark导致的两个问题:
- 执行计划优化完全依赖Hive,不方便添加新的优化策略
- MapReduce是进程级并行,而Spark是线程级并行,故Spark为了兼容Hive就存在线程安全的问题,导致Shark不得不使用另外一套独立维护的打了补丁的Hive源码分支
3、Spark SQL
Spark SQL在Hive兼容层面仅依赖HiveQL解析、Hive元数据模块,其他模块全部重新开发。也就是说,从HQL被解析成抽象语法树AST开始,就全部由Spark SQL接管了。Spark SQL执行计划生成和优化都由Catalyst(函数式关系查询优化框架)负责
Spark SQL增加了DataFrame(即带有Schema信息的RDD),使用户可以在Spark SQL中执行SQL语句。数据既可以来自RDD,也可以是Hive、HDFS、Cassandra等外部数据源,还可以是JSON格式的数据
- Spark SQL目前支持三种语言:Java、Scala、Python
Why Spark SQL?
- 关系型数据库已经很流行,但在大数据时代已经不能满足要求
- 首先,用户需要从不同数据源执行各种操作,包括结构化和非结构化数据;
- 其次,用户需要执行高级分析,比如机器学习和图像处理。在实际大数据应用中,经常需要融合关系查询和复杂分析算法(比如机器学习或图像处理),但是,缺少这样的系统
Spark SQL填补了这个鸿沟
- 首先,可以提供DataFrame API,可以对内部和外部各种数据源执行各种关系操作
- 其次,可以支持大量的数据源和数据分析算法,Spark SQL可以融合传统关系数据库的结构化数据管理能力和机器学习算法的数据处理能力
(二)DataFrame概述
DataFrame的推出,让Spark具备了处理大规模结构化数据的能力,不仅比原有的RDD转化方式更加简单易用,而且获得了更高的计算性能。Spark能够轻松实现从Mysql到DataFrame的转化,并且支持SQL查询
RDD是Spark Core核心组件的数据抽象,Spark SQL的数据抽象是DataFrame
- RDD是分布式的Java对象的集合,看不到对象内部结构;
- DataFrame是以RDD为基础的分布式数据集,提供了详细的结构信息
1、创建
从Spark2.0以上版本开始,Spark使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口来实现其对数据加载、转换、处理等功能。SparkSession实现了SQLContext及HiveContext所有功能
SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并支持把DataFrame转换成SQLContext自身中的表,然后使用SQL语句来操作数据。SparkSession也提供了HiveQL以及其他依赖于Hive的功能的支持
- SparkSession接口(Spark SQL程序的指挥官)
- SparkContext对象(RDD应用程序的指挥官)
(1)创建SparkSession对象
- 在启动进入Pyspark以后,pyspark就默认提供了一个SparkContext对象(名称为sc)和一个SparkSession对象(名称为spark)
- 在写独立应用程序时,需要用下面代码生成:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
(2)创建DataFrame
spark.read() 操作或 spark.read.format().load() 操作
spark.read.text("xxx.txt")
spark.read.json("xxx.json")
spark.read.parquet("xxx.parquet")spark.read.format("text").load("xxx.txt")
spark.read.format("json").load("xxx.json")
spark.read.format("parquet").load("xxx.parquet")
调用 .show() 可以查看数据
2、保存
使用 spark.write 操作保存DataFrame
df.write.txt("xxx.txt")
df.write.json("xxx.json")
df.write.parquet("xxx.parquet")df.write.format("text").save("xxx.txt")
df.write.format("json").save("xxx.json")
df.write.format("parquet").save("xxx.parquet")
目录名称读取即可加载(注意不是文件名称)
3、常用操作
printSchema():打印模式信息
select():选取列显示
filter():过滤
groupBy():分组
sort():排序
(三)利用反射机制推断RDD模式
1、利用反射机制去推断RDD模式
/usr/local/spark/examples/src/main/resources/ 目录下:
from pyspark.sql import Row # 生成row对象封装一行数据
# spark为SparkSession对象
people = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").map(lambda line:line.split(",")).map(lambda p:Row(name=p[0],age=int(p[1]))) # people为RDD
schemapeople = spark.createDataFrame(people)# 必须注册为临时表才能供下面的查询使用
schemapeople.createOrReplaceTempView("people") # people为临时表名称
personDF = spark.sql("select name,age from people where age>20")
# 查询得到的结果会被封装在DataFrame中
personDF.show()# DataFrame中的每个元素都是一行记录,包含name和age两个字段,分别用p.name和p.age来获取值
personRDD = personDF.rdd.map(lambda p:"Name:"+p.name+","+"Age:"+str(p.age))
personRDD.foreach(print)
2、用编程方式去定义RDD模式
当无法提前获知数据结构时,采用编程方式定义RDD模式
from pyspark.sql.types import *
from pyspark.sql import Row# 生成表头
schemaString = "name age" # 包含两个字段 name 和 age
fields = [StructField(field_name,StringType(),True) for field_name in schemaString.split(" ")] # 列表里每个元素都是一个StructField对象(用来描述字段)
schema = StructType(fields)# 生成表中的记录
lines = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
parts = lines.map(lambda x:x.split(","))
people = parts.map(lambda p:Row(p[0],p[1].strip())) # p为列表# 表头和内容拼接
schemapeoples = spark.createDataFrame(people,schema) # (表中记录,表头)# 注册临时表才能查询
schemapeoples.createOrReplaceTempView("people")
results = spark.sql("select name,age from people")
results.show()
- StructField(字段名称,字段类型,是否可以为空) 用来描述字段
- StructType() 生成的对象用来描述数据库模式,即表头
查询的结果被封装在DataFrame里
(四)Spark SQL读取MySQL数据库
1、MySQL准备工作
在Linux系统中安装MySQL数据库:Ubuntu安装MySQL及常用操作_厦大数据库实验室博客
# 在Linux中启动MySQL数据库
service mysql start
mysql -u root -p # 屏幕会提示你输入密码(MySQL root用户密码)# 完成数据库和表的创建
create database spark
use spark
create table student(id int(4), name char(20), gender char(4), age int(4))
insert into student values (1, "Xueqian", 'F', 23)
insert into student values (2, "Weiliang", 'M', 24)
select * from student
Spark SQL通过jdbc连接关系型数据库MySQL,需要安装MySQL的jdbc驱动程序:Linux 下搭建 Hive 环境_mysql-connector-java-5.1.40.tar 对应的mysql版本_GreyZeng的博客-CSDN博客
下载后放入 /usr/local/spark/jars
# 启动pyspark
cd /usr/local/spark
./bin/pyspark
2、Spark SQL读写MySQL
(1)通过jdbc连接MySQL数据库
>>>jdbcDF = spark.read.format("jdbc") \
.option("url","jdbc:mysql://localhost:3306/spark") \ # 访问数据库地址及数据库(spark数据库)
.option("driver","com.mysql.jdbc.Driver") \ # 指定驱动程序
.option("dbtable", "student") \ # 访问student表
.option("user", "root") \
.option("password", "mysql密码").load()# .option()增加连接参数
(2)向MySQL数据库中写入数据
use spark
select * from student
往 spark.student 中插入两条记录:
from pyspark.sql.types import Row
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession# 生成SparkSession对象(Spark SQL指挥官)
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()# 下面要设置模式信息
>>> schema = StructType([StructField("id", IntegerType(), True), \ # True说明可以为空
StructField("name", StringType(), True), \
StructField("gender", StringType(), True), \
StructField("age",IntegerType(), True)])# 设置两条数据,表示两个学生信息(封装Row对象)
studentRDD = spark.sparkContext.parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]).map(lambda x:x.split(" "))# 创建Row对象,每个Row对象都是rowRDD的一行
rowRDD = studentRDD.map(lambda p:Row(int(p[1].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))# 建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
studentDF = spark.createDataFrame(rowRDD, schema) # 把DataFrame写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = '填写mysql密码'
prop['driver'] = "com.mysql.jdbc.Driver" # 驱动程序名称
# 库名, 表名, 追加, prop为属性集合
studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark",'student','append', prop)
结果如下:
相关文章:
【4-5章】Spark编程基础(Python版)
课程资源:(林子雨)Spark编程基础(Python版)_哔哩哔哩_bilibili 第4章 RDD编程(21节) Spark生态系统: Spark Core:底层核心(RDD编程是针对这个)Spark SQL:…...
04 卷积神经网络搭建
一、数据集 MNIST数据集是从NIST的两个手写数字数据集:Special Database 3 和Special Database 1中分别取出部分图像,并经过一些图像处理后得到的[参考]。 MNIST数据集共有70000张图像,其中训练集60000张,测试集10000张。所有图…...
【hadoop运维】running beyond physical memory limits:正确配置yarn中的mapreduce内存
文章目录 一. 问题描述二. 问题分析与解决1. container内存监控1.1. 虚拟内存判断1.2. 物理内存判断 2. 正确配置mapReduce内存2.1. 配置map和reduce进程的物理内存:2.2. Map 和Reduce 进程的JVM 堆大小 3. 小结 一. 问题描述 在hadoop3.0.3集群上执行hive3.1.2的任…...
数据结构--6.5二叉排序树(插入,查找和删除)
目录 一、创建 二、插入 三、删除 二叉排序树(Binary Sort Tree)又称为二叉查找树,它或者是一棵空树,或者是具有下列性质的二叉树: ——若它的左子树不为空,则左子树上所有结点的值均小于它的根结构的值…...
无需公网IP,在家SSH远程连接公司内网服务器「cpolar内网穿透」
文章目录 1. Linux CentOS安装cpolar2. 创建TCP隧道3. 随机地址公网远程连接4. 固定TCP地址5. 使用固定公网TCP地址SSH远程 本次教程我们来实现如何在外公网环境下,SSH远程连接家里/公司的Linux CentOS服务器,无需公网IP,也不需要设置路由器。…...
Java工具类
一、org.apache.commons.io.IOUtils closeQuietly() toString() copy() toByteArray() write() toInputStream() readLines() copyLarge() lineIterator() readFully() 二、org.apache.commons.io.FileUtils deleteDirectory() readFileToString() de…...
makefile之使用函数wildcard和patsubst
Makefile之调用函数 调用makefile机制实现的一些函数 $(function arguments) : function是函数名,arguments是该函数的参数 参数和函数名用空格或Tab分隔,如果有多个参数,之间用逗号隔开. wildcard函数:让通配符在makefile文件中使用有效果 $(wildcard pattern) 输入只有一个参…...
算法通关村第十八关——排列问题
LeetCode46.给定一个没有重复数字的序列,返回其所有可能的全排列。例如: 输入:[1,2,3] 输出:[[1,2,3],[1,3,2],[2,1,3],[2,3,1],[3,1,2],[3,2,1]] 元素1在[1,2]中已经使…...
基于STM32设计的生理监测装置
一、项目功能要求 设计并制作一个生理监测装置,能够实时监测人体的心电图、呼吸和温度,并在LCD液晶显示屏上显示相关数据。 随着现代生活节奏的加快和环境的变化,人们对身体健康的关注程度越来越高。为了及时掌握自身的生理状况,…...
Go-Python-Java-C-LeetCode高分解法-第五周合集
前言 本题解Go语言部分基于 LeetCode-Go 其他部分基于本人实践学习 个人题解GitHub连接:LeetCode-Go-Python-Java-C Go-Python-Java-C-LeetCode高分解法-第一周合集 Go-Python-Java-C-LeetCode高分解法-第二周合集 Go-Python-Java-C-LeetCode高分解法-第三周合集 G…...
【前端知识】前端加密算法(base64、md5、sha1、escape/unescape、AES/DES)
前端加密算法 一、base64加解密算法 简介:Base64算法使用64个字符(A-Z、a-z、0-9、、/)来表示二进制数据的64种可能性,将每3个字节的数据编码为4个可打印字符。如果字节数不是3的倍数,将会进行填充。 优点࿱…...
leetcode 925. 长按键入
2023.9.7 我的基本思路是两数组字符逐一对比,遇到不同的字符,判断一下typed与上一字符是否相同,不相同返回false,相同则继续对比。 最后要分别判断name和typed分别先遍历完时的情况。直接看代码: class Solution { p…...
[CMake教程] 循环
目录 一、foreach()二、while()三、break() 与 continue() 作为一个编程语言,CMake也少不了循环流程控制,他提供两种循环foreach() 和 while()。 一、foreach() 基本语法: foreach(<loop_var> <items>)<commands> endfo…...
Mojo安装使用初体验
一个声称比python块68000倍的语言 蹭个热度,安装试试 系统配置要求: 不支持Windows系统 配置要求: 系统:Ubuntu 20.04/22.04 LTSCPU:x86-64 CPU (with SSE4.2 or newer)内存:8 GiB memoryPython 3.8 - 3.10g or cla…...
艺术与AI:科技与艺术的完美融合
文章目录 艺术创作的新工具生成艺术艺术与数据 AI与互动艺术虚拟现实(VR)与增强现实(AR)机器学习与互动性 艺术与AI的伦理问题结语 🎉欢迎来到AIGC人工智能专栏~艺术与AI:科技与艺术的完美融合 ☆* o(≧▽≦…...
Android常用的工具“小插件”——Widget机制
Widget俗称“小插件”,是Android系统中一个很常用的工具。比如我们可以在Launcher中添加一个音乐播放器的Widget。 在Launcher上可以添加插件,那么是不是说只有Launcher才具备这个功能呢? Android系统并没有具体规定谁才能充当“Widget容器…...
探索在云原生环境中构建的大数据驱动的智能应用程序的成功案例,并分析它们的关键要素。
文章目录 1. Netflix - 个性化推荐引擎2. Uber - 实时数据分析和决策支持3. Airbnb - 价格预测和优化5. Google - 自然语言处理和搜索优化 🎈个人主页:程序员 小侯 🎐CSDN新晋作者 🎉欢迎 👍点赞✍评论⭐收藏 ✨收录专…...
jupyter 添加中文选项
文章目录 jupyter 添加中文选项1. 下载中文包2. 选择中文重新加载一下,页面就变成中文了 jupyter 添加中文选项 1. 下载中文包 pip install jupyterlab-language-pack-zh-CN2. 选择中文 重新加载一下,页面就变成中文了 这才是设置中文的正解ÿ…...
系列十、Java操作RocketMQ之批量消息
一、概述 RocketMQ可以一次性发送一组消息,那么这一组消息会被当做一个消息进行消费。 二、案例代码 2.1、pom 同系列五 2.2、RocketMQConstant 同系列五 2.3、BatchConsumer package org.star.batch.consumer;import cn.hutool.core.util.StrUtil; import lom…...
leetcode1两数之和
题目: 给定一个整数数组 nums 和一个整数目标值 target,请你在该数组中找出 和为目标值 target 的那 两个 整数,并返回它们的数组下标。 你可以假设每种输入只会对应一个答案。但是,数组中同一个元素在答案里不能重复出现。 你…...
近年GDC服务器分享合集(四): 《火箭联盟》:为免费游玩而进行的扩展
如今,网络游戏采用免费游玩(Free to Play)加内购的比例要远大于买断制,这是因为前者能带来更低的用户门槛。甚至有游戏为了获取更多的用户,选择把原来的买断制改为免费游玩,一个典型的例子就是最近的网易的…...
android反射详解
1,反射的定义 一般情况下,我们使用某个类时必定知道它是什么类,是用来做什么的,并且能够获得此类的引用。于是我们直接对这个类进行实例化,之后使用这个类对象进行操作。 反射则是一开始并不知道我要初始化的类对象是…...
Python 反射和动态执行
反射主要应用于类的对象上,在运行时,将对象中的属性和方法反射出来,通过字符串对对象成员(属性、方法)进行查找、获取、删除、添加成员等动作,是一种基于字符串的事件驱动技术。 python是一门动态语言&…...
计算机网络常见端口号
端口号标识了一个主机上进行通信的不同的应用程序。比如网站服务器80端口一般都是开启的,等你来连接。 端口划分: (1)常用端口,公共端口(保留给公共服务所使用),端口号为0-1023之间…...
SpringBoot / Vue 对SSE的基本使用(简单上手)
一、SSE是什么? SSE技术是基于单工通信模式,只是单纯的客户端向服务端发送请求,服务端不会主动发送给客户端。服务端采取的策略是抓住这个请求不放,等数据更新的时候才返回给客户端,当客户端接收到消息后,…...
Qt串口基本设置与协议收发
前言 1.一直都想要做一个Qt上位机,趁着这个周末有时间,动手写一下 2.comboBox没有点击的信号,所以做了一个触发的功能 3.Qt的数据类型很奇怪,转来转去的我也搞得很迷糊 4.给自己挖个坑,下一期做一个查看波形的上位…...
interview3-微服务与MQ
一、SpringCloud篇 (1)服务注册 常见的注册中心:eureka、nacos、zookeeper eureka做服务注册中心: 服务注册:服务提供者需要把自己的信息注册到eureka,由eureka来保存这些信息,比如服务名称、…...
kafka详解一
kafka详解一 1、消息引擎背景 根据维基百科的定义,消息引擎系统是一组规范。企业利用这组规范在不同系统之间传递语义准确的消息,实现松耦合的异步式数据传递. 即:系统 A 发送消息给消息引擎系统,系统 B 从消息引擎系统中读取 A…...
Flutter yuv 转 rgb
1、引用yuv_converter库 yuv_converter: ^0.0.1 2、导入头文件: import package:yuv_converter/yuv_converter.dart;3、yuv转rgb YuvConverter.yuv420NV21ToRgba8888(yuvRawData, 512, 512) 根据yuv格式选择不同的api。 举个例子: void initState() …...
MySQL——子查询
2023.9.8 相关学习笔记: #子查询 /* 含义: 出现在其他语句中的select语句,称为子查询或内查询 外部的查询语句,称为主查询或外查询分类: 按子查询出现的位置:select后面:仅仅支持标量子查询fro…...
中华人民共和国城乡建设部网站/广告引流推广平台
第一部分 了解C# C#是微软公司在2000年7月发布的一种全新且简单、安全、面向对象的程序设计语言,是专门为.NET的应用而开发的。体现了当今最新的程序设计技术的功能和精华。.NET框架为C#提供了一个强大的、易用的、逻辑结构一致的设计环境。其特点: 语言…...
wordpress文字加效果/站长工具搜索
Kendo UI for jQuery R3 2020 SP2试用版下载 Kendo UI是带有jQuery、Angular、React和Vue库的JavaScript UI组件的最终集合,无论选择哪种JavaScript框架,都可以快速构建高性能响应式Web应用程序。通过可自定义的UI组件,Kendo UI可以创建数据…...
青海旅游网站建设方案/策划网络营销方案
本节书摘来自异步社区《代码整洁之道》一书中的第13章13.10节小结,作者【美】Robert C. Martin,更多章节内容可以访问云栖社区“异步社区”公众号查看。 13.10 小结并发代码很难写正确。加入多线程和共享数据后,简单的代码也会变成噩梦。要编…...
做调查挣钱的网站/长沙专业竞价优化公司
1.STP交换机通过交换STP协议帧来建立和维护STP树,并在网络的物理拓扑发生变化时重新建立STP树。 2.STP协议帧由STP交换机产生,发送、接收和处理。STP协议帧是一种组播帧,组播地址是01-80-c2-00-00-00。 3.STP协议帧采用IEEE 802.3封装格式&…...
python做网站有什么优势/关键词排名优化教程
使用宽带路由器的话一般建议手工指定电脑的IP地址和DNS等参数,设置方法参考下文。如果你使用的是Windows 2000或Windows XP系统,请点击 “网上邻居”――> “本地连接”――>“属性”――>“Internet协议 (TCP/IP)属性”,将会看到如…...
如何用c语言做钓鱼网站/360优化大师下载安装
在8086处理器上,如果要用寄存器来提供偏移地址,只能使用BX,SI,DI,BP。段寄存器:BX段寄存器:SI段寄存器:DI段寄存器:BP不能使用其他寄存器,比如SP、IP、AX、CX、DX。这是一种硬性规定,…...