Spark【RDD编程(三)键值对RDD】
简介
键值对 RDD 就是每个RDD的元素都是 (key,value)类型的键值对,是一种常见的 RDD,可以应用于很多场景。
因为毕竟通过我们之前Hadoop的学习中,我们就可以看到对数据的处理,基本都是以键值对的形式进行统一批处理的,因为MapReduce模型中,Mapper和Reducer之间的联系就是通过键和值进行连接产生关系的。
键值对RDD的创建
其实就是个RDD 的创建,无非就是通过并行集合创建和通过文件系统创建,然后文件系统又分为本地文件系统和HDFS。
常用的键值对RDD转换操作
1、reduceByKey(func)
和上一篇文章中的用法一致。
2、groupByKey(func)
和上一篇文章中的用法一致。
3、keys
返回键值对 RDD 中所有的key,构成一个新的 RDD。
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object KV_RDD {def main(args: Array[String]): Unit = {//创建SparkContext对象val conf = new SparkConf()conf.setAppName("kv_rdd").setMaster("local")val sc:SparkContext = new SparkContext(conf)//通过并行集合创建RDDval arr = Array(("Spark",1),("Hadoop",1),("Spark",1),("Flink",1))val rdd: RDD[(String, Int)] = sc.parallelize(arr)val res: RDD[String] = rdd.keysres.foreach(println)//关闭SparkContextsc.stop()}
}
输出结果:
Spark
Hadoop
Spark
Flink
4、values
返回键值对 RDD 中所有的key,构成一个新的 RDD。
//通过并行集合创建RDDval arr = Array(("Spark",1),("Hadoop",1),("Spark",1),("Flink",1))val rdd: RDD[(String, Int)] = sc.parallelize(arr)val res: RDD[Int] = rdd.valuesres.foreach(println)
运行结果:
1
1
1
1
5、sortByKey(Boolean asce)
返回一个根据 key 排序(字典序)的RDD。
//通过并行集合创建RDDval arr = Array(("Spark",1),("Hadoop",1),("Spark",1),("Flink",1))val rdd: RDD[(String, Int)] = sc.parallelize(arr)val res: RDD[(String,Int)] = rdd.sortByKey()res.foreach(println)
运行结果:
(Flink,1)
(Hadoop,1)
(Spark,1)
(Spark,1)
设置升序/降序
默认我们sortByKey()方法是升序排序的,如果要降序可以传入一个false的值。
//通过并行集合创建RDDval arr = Array(("Spark",1),("Hadoop",1),("Spark",1),("Flink",1))val rdd: RDD[(String, Int)] = sc.parallelize(arr)//降序val res: RDD[(String,Int)] = rdd.sortByKey(false)res.foreach(println)
运行结果:
(Spark,1)
(Spark,1)
(Hadoop,1)
(Flink,1)
6、sortBy()
可以根据其他字段进行排序。
//通过并行集合创建RDDval arr = Array(("Spark",1),("Hadoop",5),("Hive",2),("Flink",3))val rdd: RDD[(String, Int)] = sc.parallelize(arr)//按照value升序排序val res: RDD[(String,Int)] = rdd.sortBy(kv=>kv._2,true)res.foreach(println)
运行结果:
(Spark,1)
(Hive,2)
(Flink,3)
(Hadoop,5)
7、mapValues(func)
之前我们处理的RDD 都是文本或数字类型的,之前我们的map(func)中的func函数是对整个RDD的元素进行处理。但是这里换成了mapValues(func),这里func函数处理的是我们(key,value)中的所有value,而key 不会发生变化。
//通过并行集合创建RDDval arr = Array(("Spark",1),("Hadoop",5),("Hive",2),("Flink",3))val rdd: RDD[(String, Int)] = sc.parallelize(arr)//所有的value+1val res: RDD[(String,Int)] = rdd.mapValues(value=>value+1)res.foreach(println)
运行结果:
(Spark,2)
(Hadoop,6)
(Hive,3)
(Flink,4)
8、join()
内连接,(K,V1)和(K,V2)进行内连接生成(K,(V1,V2))。
//通过并行集合创建RDDval arr1 = Array(("Spark",1),("Hadoop",5),("Hive",2),("Flink",3))val arr2 = Array(("Spark","fast"),("Hadoop","good"))val rdd1: RDD[(String,Int)] = sc.parallelize(arr1)val rdd2: RDD[(String,String)] = sc.parallelize(arr2)//所有的value+1
// val res: RDD[(String,(Int,Int))] = rdd1.join(rdd2)val res: RDD[(String, (Int, String))] = rdd1.join(rdd2)res.foreach(println)
运行结果:
(Spark,(1,fast))
(Hadoop,(5,good))
我们可以看到,返回的RDD 的元素都是满足连接表rdd2的K的。
9、combineByKey()
这个函数的参数比较多,下面做个介绍:
- createCombiner:用于将RDD中的每个元素转换为一个类型为C(V=>C)的值。这个函数在第一次遇到某个key的时候会被调用,用于创建一个累加器。
- mergeValue:用于将RDD中的每个value值合并到已经存在的累加器中。这个函数在遇到相同key的value时会被调用。
- mergeCombiners:用于将不同分区中的累加器值进行合并。这个函数在每个分区处理完后,将各个分区的累加器值进行合并。
案例-统计公司三个季度的总收入和平均收入
//通过并行集合创建RDDval arr = Array(("company-1",88),("company-1",96),("company-1",85),("company-2",94),("company-2",86),("company-2",74),("company-3",86),("company-3",88),("company-3",92))val rdd: RDD[(String, Int)] = sc.parallelize(arr,3)val res: RDD[(String,Int,Float)] = rdd.combineByKey(income=>(income,1),(acc:(Int,Int),income)=>(acc._1+income,+acc._2+1),(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)).map({case (key,value) => (key,value._1,value._1/value._2.toFloat)})//重新分配分区 将3个分区合并为1个res.repartition(1).saveAsTextFile("data/kv_rdd/")
运行结果中-part-00000文件内容:
(company-3,266,88.666664)
(company-1,269,89.666664)
(company-2,254,84.666664)
其中,第一列为季度名称。第二列为总收入,第三列为平均收入。
参数解析
第一个参数的作用是:当我们取出的RDD元素是第一次遇到的key,那么就创建一个组合器函数createCombiner(),负责将我们的键值对(K:季度名称,V:收入额)中的 V:收入额转为 C格式(总收入额,1)的格式,其中的1代表当前已经累加了一个月的收入。
第二个参数是合并值函数 mergeValue(),它的作用是:如果遇到相同的key,比如都是"company-1",那么就对相同key的的value进行mergeValue()中定义的操作。
第三个参数的作用是 :由于我们开启了多个分区,所以最后要对不同分区的数据进行一个对总,这个函数中定义的就是对两个 C格式 的键值对进行的操作。
最后我们进行了一个模式匹配,对于结果返回的(k,v)形式的数据,其中 k 就是指季度名称, v 是一个键值对(总收入额,月份数),我们将它转为 (季度名称,总收入额,平均收入额)。
分区1:
1-调用createCombiner()函数
(company-1,88) => (company-1,(88,1))
2-调用mergeValue()函数
(company-1,96) => (company-1,(184,2))
分区2:
1-调用createCombiner()函数
(company-1,85) => (company-1,(85,1))3-调用mergeCombiners()函数
(company-1,(184,2)) + (company-1,(85,1)) => (company-1,(269,3))
10、flatMapValues(fubc)
flatMapValues(func)的操作和mapValues(func)相似。它们都是对键值对类型的RDD进行操作,mapValues(func)是对(ke要,value)的value通过函数 func 进行一个处理,而key不变。而flatMapValues(func)则是对value先通过函数 func 进行处理,然后再处理后的值和key组成一系列新的键值对。
输入数据:
("k1","hadoop,spark,flink")
("k2","hadoop,hive,hbase")
处理
//通过并行集合创建RDDval arr = Array(("k1","hadoop,spark,flink"),("k2","hadoop,hive,hbase"))val rdd: RDD[(String, String)] = sc.parallelize(arr)//flatMapValues(func)//val res: Array[(String, String)] = rdd.flatMapValues(value => value.split(",")).collect() //mapValues(func)val res: Array[(String, Array[String])] =rdd.mapValues(value => value.split(",")).collect()value.split(",")).collect()res.foreach(println)
运行结果:
(k1,hadoop)
(k1,spark)
(k1,flink)
(k2,hadoop)
(k2,hive)
(k2,hbase)
而我们的mapValues(func)执行后的RDD集合内为:
(k1,Array("hadoop","spark","flink"))
(k2,Array("hadoop","hive","hbase"))
显然我们的flatMapValues(func)是多进行了一部扁平化的操作,将集合内的元素与key一一组成一系列心得键值对。
相关文章:
Spark【RDD编程(三)键值对RDD】
简介 键值对 RDD 就是每个RDD的元素都是 (key,value)类型的键值对,是一种常见的 RDD,可以应用于很多场景。 因为毕竟通过我们之前Hadoop的学习中,我们就可以看到对数据的处理,基本都是以…...
从板凳围观到玩转行家:Moonbeam投票委托如何让普通用户一同参与
今年5月,Moonbeam发起了一项社区链上治理中投票委托反馈的调查。187位社区成员参与了这项调查,调查发现受访者对治理感兴趣,增加参与度只需要进行一些调整,即更简化的投票流程。 治理和去中心化是Web3的核心,随着Moon…...
SpringMVC的文件上传文件下载多文件上传---详细介绍
目录 前言: 一,文件上传 1.1 添加依赖 1.2 配置文件上传解析器 1.3 表单设置 1.4 文件上传的实现 二,文件下载 controller层 前端jsp 三,多文件上传 Controller层 运行 前言: Spring MVC 是一个基于 Java …...
Spark【RDD编程(四)综合案例】
案例1-TOP N个数据的值 输入数据: 1,1768,50,155 2,1218,600,211 3,2239,788,242 4,3101,28,599 5,4899,290,129 6,3110,54,1201 7,4436,259,877 8,2369,7890,27 处理代码: def main(args: Array[String]): Unit {//创建SparkContext对象val conf…...
Golang报错mixture of field:value and value initializers
Golang报错mixture of field:value and value initializers 这个错误跟编程习惯(模式)有关,都知道golang 语言的编程与java /python 以及其他的编程语言相似 ,一通百通,易学万卷书。 编程中同一个结构中要保持唯一模…...
【网络教程】记一次使用Docker手动搭建BT宝塔面板的全过程(包含问题解决如:宝塔面板无法开启防火墙,ssh,nginx等)
文章目录 准备安装安装宝塔面板开启ssh和修改ssh的密码导出镜像问题解决宝塔面板无法开启防火墙无法启动ssh设置密码nginx安装失败设置开机启动相关服务准备 演示的系统环境:Ubuntu 22.04.3 LTS更新安装/升级docker到最新版本升级docker相关命令如下# 更新软件包列表并自动升级…...
【大虾送书第九期】速学Linux:系统应用从入门到精通
目录 🍭写在前面 🍭为什么学习Linux系统 🍭Linux系统的应用领域 🍬1.Linux在服务器的应用 🍬2.嵌入式Linux的应用 🍬3.桌面Linux的应用 🍭Linux的版本选择 &a…...
docker相关命令
####### 帮助启动类命令 ########## 启动docker systemctl start docker 停止docker systemctl stop docker 重启docker systemctl restart docker 查看docker状态 systemctl status docker 开机启动 systemctl enable docker 查看docker概要信息 docker info 查看…...
【Redis】4、rsync远程同步
与inodify结合使用,实现实时同步 rsync简介 rsync(Remote Sync,远程同步)是一个开源的快速备份工具,可以在不同主机之间镜像同步整个目录树,;支持增量备份,并保持链接和权限&#…...
无服务架构--Serverless
无服务架构 无服务架构(Serverless Architecture)即无服务器架构,也被称为函数即服务(Function as a Service,FaaS),是一种云计算模型,用于构建和部署应用程序,无需关心…...
2023-09-07 LeetCode每日一题(修车的最少时间)
2023-09-07每日一题 一、题目编号 2594. 修车的最少时间二、题目链接 点击跳转到题目位置 三、题目描述 给你一个整数数组 ranks ,表示一些机械工的 能力值 。ranksi 是第 i 位机械工的能力值。能力值为 r 的机械工可以在 r * n2 分钟内修好 n 辆车。 同时给你…...
数据挖掘实验-主成分分析与类特征化
数据集&代码https://www.aliyundrive.com/s/ibeJivEcqhm 一.主成分分析 1.实验目的 了解主成分分析的目的,内容以及流程。 掌握主成分分析,能够进行编程实现。 2.实验原理 主成分分析的目的 主成分分析就是把原有的多个指标转化成少数几个代表…...
70. 爬楼梯 (进阶),322. 零钱兑换,279.完全平方数
代码随想录训练营第45天|70. 爬楼梯 (进阶,322. 零钱兑换,279.完全平方数 70.爬楼梯文章思路代码 322.零钱兑换文章思路代码 279.完全平方数文章思路代码 总结 70.爬楼梯 文章 代码随想录|0070.爬楼梯完全背包版本 思路 将楼梯长度视为背…...
Apache Doris 2.0 如何实现导入性能提升 2-8 倍
数据导入吞吐是 OLAP 系统性能的重要衡量标准之一,高效的数据导入能力能够加速数据实时处理和分析的效率。随着 Apache Doris 用户规模的不断扩大, 越来越多用户对数据导入提出更高的要求,这也为 Apache Doris 的数据导入能力带来了更大的挑战…...
RabbitMQ: topic 结构
生产者 package com.qf.mq2302.topic;import com.qf.mq2302.utils.MQUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;public class Pubisher {public static final String EXCHANGE_NAME"mypubilisher";public static void ma…...
信息系统项目管理教程(第4版):第二章 信息技术及其发展
请点击↑关注、收藏,本博客免费为你获取精彩知识分享!有惊喜哟!! 第二章 信息技术及其发展 2.1信息技术及其发展 信息技术是以微电子学为基础的计算机技术和电信技术的结合而形成的,对声音的、图像的、文字的、数字…...
有哪些适合初学者的编程语言?
C语言 那为什么我还要教你C语言呢?因为我想要让你成为一个更好、更强大的程序员。如果你要变得更好,C语言是一个极佳的选择,其原因有二。首先,C语言缺乏任何现代的安全功能,这意味着你必须更为警惕,时刻了…...
uni-app动态tabBar,根据不同用户展示不同的tabBar
1.uni框架的api实现 因为我们用的是uni-app框架开发,所以在创建项目的时候直接创建uni-ui的项目即可,这个项目模板中自带了uni的一些好用的组件和api。 起初我想着这个效果不难实现,因为官方也有api可以直接使用,所以我最开始尝试…...
手写Spring:第6章-资源加载器解析文件注册对象
文章目录 一、目标:资源加载器解析文件注册对象二、设计:资源加载器解析文件注册对象三、实现:资源加载器解析文件注册对象3.1 工程结构3.2 资源加载器解析文件注册对象类图3.3 类工具类3.4 资源加载接口定义和实现3.4.1 定义资源加载接口3.4…...
Redis 7 第八讲 集群模式(cluster)架构篇
集群架构 Redis 集群架构图 集群定义 Redis 集群是一个提供在多个Redis节点间共享数据的程序集;Redis集群可以支持多个master 应用场景 Redis集群支持多个master,每个master又可以挂载多个slave读写分离支持数据的高可用支持海量数据的读写存储操作集群自带Sentinel的故障…...
【PowerQuery】导入与加载XML
在标准数据格式类型里面,有一类比较特殊的数据类型,就是层次结构数据。层次结构数据和标准的结构型数据方式完全不同,在实际应用过程中使用最为频繁的几种数据类型如下。 XML数据格式Json 数据格式Yaml 数据格式我们将在本节和大家一起分享下XML格式数据集成,下一节和大家分…...
vue 预览视频
1.预览本地文件 1.1 直接给video或者embed的src赋值本地路径 <video :src"videoUrl"></video> // 或者 使用embed标签<embed :src"videoUrl" /> 1.2 读取文件流形式 <input type"file" ref"file" /> <vi…...
4个维度讲透ChatGPT技术原理,揭开ChatGPT神秘技术黑盒!(文末送书)
🤵♂️ 个人主页:艾派森的个人主页 ✍🏻作者简介:Python学习者 🐋 希望大家多多支持,我们一起进步!😄 如果文章对你有帮助的话, 欢迎评论 💬点赞Ǵ…...
【无标题】@Scheduled 的cron
, :指定多个值。 -:表示一个区间。 / :指定一个值的增加幅度。n/m表示从n开始,每次增加m。 L:是last的缩写,表示最后一天,用在日表示一个月中的最后一天,用在周表示每周最后一天&…...
IP和MAC的作用区别
在 IP 地址的上一行是 link/ether fa:16:3e:c7:79:75 brd ff:ff:ff:ff:ff:ff,这个被称为 MAC 地址,是一个网卡的物理地址,用十六进制,6 个 byte 表示。 一个网络包要从一个地方传到另一个地方,除了要有确定的地址&…...
python趣味编程-数独游戏
数独游戏是一个用Python编程语言编写的应用程序。该项目包含可以显示实际应用程序的基本功能。该项目可以让修读 IT 相关课程并希望开发简单应用程序的学生受益。这个Python 数独游戏是一个简单的项目,可用于学习tkinter库的实践。这个数独游戏可以提供Python编程的基本编码技…...
MySQL/MariaDB 查询某个 / 多个字段重复数据
创建测试表和数据 # 创建表 create table if not exists t_duplicate (name varchar(255) not null,age int not null );# 插入测试数据 insert into t_duplicate(name, age) values(a, 1); insert into t_duplicate(name, age) values(a, 2);查询单个字段重复 使用 count() …...
【力扣每日一题】2023.9.10 课程表Ⅱ
目录 题目: 示例: 分析: 代码: 题目: 示例: 分析: 今天的题目和昨天类似,不过今天要我们求出学习所有课程的先后顺序。 昨天只需要我们求出能否学习完所有课程,因此…...
VSCODE CMAKE C++ 工程调试, C++不以科学计数法输出并控制小数位数
1. VSCODE调试CMAKE工程配置1.1 修改CMakeLists.txt文件1.2. 程序中1.3. launch.json配置1.4 开始调试1.5 注意 2. C设置输出浮点数且保留位数固定 1. VSCODE调试CMAKE工程配置 1.1 修改CMakeLists.txt文件 加这一句 set(CMAKE_BUILD_TYPE "Debug")1.2. 程序中 在…...
Drools规则引擎入门学习记录
业务开发过程中,对于某些判断性的通用规则是基于if-else封装,还是基于策略模式封装?无论以上那种封装出来的方法,只能在单体软件包中共用,且不能无感部署,然而对于业务而言,可能规则改变的比较频…...
遵义网站建设优化公司/互联网行业都有哪些工作
获得微秒级的时间 收藏 Win 下建议如下方式: http://www.vckbase.com/document/viewdoc/?id1301 VC中基于 Windows 的精确定时 中国科学院光电技术研究所 游志宇 在工业生产控制系统中,有许多需要定时完成的操作,如定时显示当前时间&#x…...
网站怎么做sitemap/百度竞价专员
一、控制结构 sk_buff 和网络报文的存储空间 二、分片的网络报文与scatter/gather IO 网络报文在内存中不一定是连续存储的,同一个网络报文有可能被分成几片存放在内存的不同位置(不要和IP分片混淆,IP分片是将一个网络报文分成多个网络报文&a…...
网站漏洞原理/seo如何提高排名
自我介绍 面试官,您好,我叫xxx,目前就读于xxx计算机科学与技术学院,现在已经研二阶段。我比较熟悉java虚拟机反射机制以及线程死锁等问题。此外,我也了解过解决IPv4枯竭问题的NAT技术,自己也曾扩展过家里使…...
网站的发布方案有哪些/郑州网站推广报价
首先需要在控制台找到EBS,那个地方可以修改EBS存储的容量,6个小时只能改一次。修改完成后,在ec2上使用lsblk命令可以看到 xdva容量变成刚才设置的容量了,但是 /dev/xvda1的容量还是原来的值,需要手动操作一下。 使用 …...
广州铁路投资建设集团网站/服务推广软文范例
i2c-tools i2cdetect 检测在系统上的i2c总线,例如: i2cdetect -l 检测挂载在i2c总线上器件,例如: i2cdetect -r -y 0 (检测i2c-0上的挂载情况) i2cdump 查看器件所有寄存器的值,例如:…...
个人网站免费模板/怎么搭建属于自己的网站
2019独角兽企业重金招聘Python工程师标准>>> 一、在编码过程发现报错:Override is not allowed when implementing interface method 。 1、在project/module上,右键选择菜单“Open Module Settings” ; 2、选择Modules,修改“Lan…...