当前位置: 首页 > news >正文

【Spark】Pyspark RDD

  • 1. RDD算子
    • 1.1 文件 <=> rdd对象
    • 1.2 map、foreach、mapPartitions、foreach Partitions
    • 1.3 flatMap 先map再解除嵌套
    • 1.4 reduceByKey、reduce、fold 分组聚合
    • 1.5 mapValue 二元组value进行map操作
    • 1.6 groupBy、groupByKey
    • 1.7 filter、distinct 过滤筛选
    • 1.8 union 合并
    • 1.9 join、leftOuterJoin、rightOuterJoin 连接
    • 1.10 intersection 交集
    • 1.11 sortBy、sortByKey 排序
    • 1.12 countByKey 统计key出现次数
    • 1.13 first、take、top、count 取元素
    • 1.14 takeOrdered 排序取前n个
    • 1.15 takeSample 随机抽取

from pyspark import SparkConf, SparkContextconf = SparkConf().setAppName('test')\.setMaster('local[*]')
sc = SparkContext(conf=conf)

1. RDD算子

1.1 文件 <=> rdd对象

# 集合对象 -> rdd (集合对象,分区数默认cpu核数)
rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
print(rdd.glom().collect(), rdd.getNumPartitions())
# [[1, 2], [3, 4], [5, 6]] 3# 文件 -> rdd
rdd = sc.textFile("./data.csv")
print(rdd.collect())
# ['1, 2, 3, 4, 5, 6']# rdd -> 文件
rdd = sc.parallelize([1, 2, 3], 3)
rdd.saveAsTextFile('./output')
'''  
生成output文件夹
里面有按分区存储的多个文件
'''

1.2 map、foreach、mapPartitions、foreach Partitions

# map函数
rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
rdd2 = rdd.map(lambda x: (x, 1))
print(rdd2.map(lambda x: x[0] + x[1]).collect())
# [2, 3, 4, 5, 6, 7]
# foreach 
# 同map,但无返回值,且不改变原元素
# 另有foreachPartitions
rdd = sc.parallelize([1, 2, 3])
rdd.foreach(lambda x: print(x))
# 1 3 2
rdd.foreach(lambda x: -x)
rdd.collect()
# [1, 2, 3]

# mapPartitions
'''  
map 一次调出一个元素进行计算,io次数多
mapPartitions 一次将一个分区的所有元素调出计算s
'''
rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)def func(iter):# 相较于map时间复杂度没优化,空间复杂度优化res = list()for it in iter:res.append(it * 10)return resrdd.mapPartitions(func).collect()
# [10, 20, 30, 40, 50, 60]

1.3 flatMap 先map再解除嵌套

# flatMap 先执行map操作,再解除嵌套(降维 softmax前flatten)
rdd = sc.textFile("./data.csv")
print(rdd.collect())rdd.flatMap(lambda x: x.split(' ')).collect()

1.4 reduceByKey、reduce、fold 分组聚合

# reduceByKey 按照key分组,再对组内value完成聚合逻辑
# key-value型(二元元组)rdd
rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 3)])
print(rdd.reduceByKey(lambda a, b: a + b).collect())
# [('b', 3), ('a', 3)]
# reduce 只聚合
# 不返回rdd 
rdd = sc.parallelize(range(1, 3))
print(rdd.reduce(lambda a, b: a + b))
# 3
print(sc.parallelize([('a', 1), ('a', 1)]).reduce(lambda a, b: a + b))
# ('a', 1, 'a', 1)
# fold 带初值的reduce
rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
print(rdd.fold(10, lambda a, b: a + b))
'''   
[[1, 2], [3, 4], [5, 6]]
10 + 1 + 2 = 13
10 + 3 + 4 = 17
10 + 5 + 6 = 21
10 + 13 + 17 + 21 = 61
> 61
'''

1.5 mapValue 二元组value进行map操作

# mapValues 对二元组内的value执行map操作, 没有分组操作
rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 3)])
rdd.mapValues(lambda x: x * 10).collect()

1.6 groupBy、groupByKey

  • groupBy、groupByKey、reduceByKey区别
# groupBy 多元组皆可进行分组,可选择按哪一个值分组
# reduceByKey 分组后(ByKey)对value进行聚合(reduce),二元组第一个值为key
rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 3), ('b', 4)])# 分组 按第一个值
rdd2 = rdd.groupBy(lambda x: x[0])
print(rdd2.collect())
'''
返回的是迭代器,需进一步转换
[('a', <pyspark.resultiterable.ResultIterable object at 0x106178370>), 
('b', <pyspark.resultiterable.ResultIterable object at 0x1060abe50>)]
'''
rdd3 = rdd2.map(lambda x: (x[0], list(x[1])))
print(rdd3.collect())
'''  
[('a', [('a', 1), ('a', 2)]), 
('b', [('b', 3), ('b', 4)])]
'''
# groupByKey
# 自动按照key分组,分组后没有聚合操作,只允许二元组
rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 3), ('b', 4)])rdd2 = rdd.groupByKey()
rdd2.map(lambda x: (x[0], list(x[1]))).collect()
# [('a', [1, 2]), ('b', [3, 4])]

1.7 filter、distinct 过滤筛选

# filter 过滤器
# 过滤条件True则保留
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.filter(lambda x: x > 3).collect()
# [4, 5]
# distinct 去重
rdd = sc.parallelize([1, 1, 1, 1, 2, 3, 'a', 'a'])
rdd.distinct().collect()
# [[1, 'a', 2, 3]

1.8 union 合并

# union 合并两个rdd
# 元素凑在一起,不考虑重复
rdd_a = sc.parallelize([1, 1, 2, 3])
rdd_b = sc.parallelize([2, 3, ('a', 1), ('b', 2)])rdd_a.union(rdd_b).collect()
# [1, 1, 2, 3, 2, 3, ('a', 1), ('b', 2)]

1.9 join、leftOuterJoin、rightOuterJoin 连接

# join JOIN操作
# 只用于二元组,相同key进行关联
rdd_a = sc.parallelize([('a', 1), ('a', 2), ('b', 3)])
rdd_b = sc.parallelize([('a', 1), ('b', 2), ('c', 3)])print(rdd_a.join(rdd_b).collect())
'''   
内连接 取交集
[('b', (3, 2)), 
('a', (1, 1)), 
('a', (2, 1))]
'''
print(rdd_a.leftOuterJoin(rdd_b).collect())
'''   
左连接 取交集和左边全部
[('b', (3, 2)), 
('a', (1, 1)), 
('a', (2, 1))]
'''
print(rdd_a.rightOuterJoin(rdd_b).collect())
'''   
右连接 取交集和右边全部
[('b', (3, 2)), 
('c', (None, 3)), 
('a', (1, 1)), 
('a', (2, 1))]
'''

1.10 intersection 交集

# intersection 取交集
# 区别于join,没有按key连接的操作
rdd_a = sc.parallelize([('a', 1), ('a', 2), ('b', 3)])
rdd_b = sc.parallelize([('a', 1), ('b', 2), ('c', 3)])rdd_a.intersection(rdd_b).collect()
# [('a', 1)]

1.11 sortBy、sortByKey 排序

# sortBy
# func 指定排序元素的方法
# ascending True生序,False降序
# numPartitions 用多少分区排序
rdd = sc.parallelize([[1, 2, 3], [7, 8, 9],[4, 5, 6]])
rdd.sortBy(lambda x: x[1], ascending=True, numPartitions=3).collect()
'''  
[[1, 2, 3], 
[4, 5, 6], 
[7, 8, 9]]
'''
# sortByKey 针对kv型rdd
'''   
ascending True升序,False降序
numPartitions 全局有序要设为1,否则只能保证分区内有序
keyfunc 对key进行处理,再排序
'''
rdd = sc.parallelize([('a', 1), ('c', 2), ('B', 3)])
print(rdd.sortByKey(ascending=True, numPartitions=1).collect())
'''   
[('B', 3), ('a', 1), ('c', 2)]
'''
print(rdd.sortByKey(ascending=True, numPartitions=1, keyfunc=lambda k: str(k).lower()).collect())
'''  
[('a', 1), ('B', 3), ('c', 2)]
'''

1.12 countByKey 统计key出现次数

# countByKey 统计key出现次数,可多元元组
# 返回dict 不是rdd
rdd = sc.parallelize([('a', 1, 2), ('a'), ('b', 1)])
rdd.countByKey()

1.13 first、take、top、count 取元素

# first 取第一个元素
rdd = sc.parallelize([('a', 1, 2), ('a'), ('b', 1)])
print(rdd.first() )
# ('a', 1, 2)# take 取前n个元素
print(rdd.take(2))
# [('a', 1, 2), 'a']# count 返回元素个数
print(rdd.count())# top 降序排序取前n个
rdd = sc.parallelize([2, 4, 1, 6])
print(rdd.top(2))
# [6, 4]

1.14 takeOrdered 排序取前n个

# takeOrdered 排序取前n个
'''   
param1: n
param2: func取数前更改元素,不更改元素本身,
不传func,默认升序(取前n最小值)
func = lambda x: -x 变为降序,取前n最大值,和top相同
'''
rdd = sc.parallelize([2, 4, 1, 6])
rdd.takeOrdered(2) # [1, 2]
rdd.takeOrdered(2, lambda x: -x) # [6, 4]

1.15 takeSample 随机抽取

# takeSample 随机抽取元素
'''  
param1: True随机有放回抽样,Fasle不放回抽样 
param2: 抽样个数
param3: 随机数种子
'''
rdd = sc.parallelize([1])
rdd.takeSample(True, 2)

相关文章:

【Spark】Pyspark RDD

1. RDD算子1.1 文件 <> rdd对象1.2 map、foreach、mapPartitions、foreach Partitions1.3 flatMap 先map再解除嵌套1.4 reduceByKey、reduce、fold 分组聚合1.5 mapValue 二元组value进行map操作1.6 groupBy、groupByKey1.7 filter、distinct 过滤筛选1.8 union 合并1.9 …...

数学建模:Logistic回归预测

&#x1f506; 文章首发于我的个人博客&#xff1a;欢迎大佬们来逛逛 数学建模&#xff1a;Logistic回归预测 Logistic回归预测 logistic方程的定义&#xff1a; x t 1 c a e b t x_{t}\frac{1}{cae^{bt}}\quad xt​caebt1​ d x d t − a b e b t ( c a e b t ) 2 >…...

一个面向MCU的小型前后台系统

JxOS简介 JxOS面向MCU的小型前后台系统&#xff0c;提供消息、事件等服务&#xff0c;以及软件定时器&#xff0c;低功耗管理&#xff0c;按键&#xff0c;led等常用功能模块。 gitee仓库地址为&#xff08;复制到浏览器打开&#xff09;&#xff1a; https://gitee.com/jer…...

软件外包开发人员分类

在软件开发中&#xff0c;通常会分为前端开发和后端开发&#xff0c;下面和大家分享软件开发中的前端开发和后端开发分类和各自的职责&#xff0c;希望对大家有所帮助。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#xff0c;欢迎交流合作。 1. 前端开发&…...

HTML 元素被定义为块级元素或内联元素

大多数 HTML 元素被定义为块级元素或内联元素。 10. 块级元素 块级元素在浏览器显示时&#xff0c;通常会以新行来开始&#xff08;和结束&#xff09;。 我们已经学习过的块级元素有: <h1>, <p>, <ul>, <table> 等。 值得注意的是: <p> 标签…...

单调递增的数字【贪心算法】

单调递增的数字 当且仅当每个相邻位数上的数字 x 和 y 满足 x < y 时&#xff0c;我们称这个整数是单调递增的。 给定一个整数 n &#xff0c;返回 小于或等于 n 的最大数字&#xff0c;且数字呈 单调递增 。 public class Solution {public int monotoneIncreasingDigits…...

gnuradio-hackrf_info.exe -FM频率使用

97910000...

JVM学习(三)--生产环境的线程问题诊断

1.如何定位哪个进程对cpu占用过高 使用top命令 2.如何定位到某个进程的具体某个线程 使用ps H -eo pid,tid,%cpu | grep 进程id (可以具体定位到某个进程的某个线程的cpu占用情况) 3.如何查看有问题线程的具体信息&#xff0c;定位到代码的行数 使用jstack 进程id 可以找…...

PHP数组处理$arr1转换为$arr2

请编写一段程序将$arr1转换为$arr2 $arr1 array( 0>array (fid>1,tid>1,name>Name1), 1>array (fid>2,tid>2,name>Name2), 2>array (fid>3,tid>5,name>Name3), 3>array (fid>4,tid>7,name>Name4), 4>array (fid>5,tid…...

ATF(TF-A)安全通告 TFV-10 (CVE-2022-47630)

安全之安全(security)博客目录导读 ATF(TF-A)安全通告汇总 目录 一、ATF(TF-A)安全通告 TFV-10 (CVE-2022-47630) 二、CVE-2022-47630 2.1 Bug 1:证书校验不足 2.2 Bug 2:auth_nvctr()中缺少边界检查...

详解 SpringMVC 中获取请求参数

文章目录 1、通过ServletAPI获取2、通过控制器方法的形参获取请求参数3、[RequestParam ](/RequestParam )4、[RequestHeader ](/RequestHeader )5、[CookieValue ](/CookieValue )6、通过POJO获取请求参数7、解决获取请求参数的乱码问题总结 在Spring MVC中&#xff0c;获取请…...

Message: ‘chromedriver‘ executable may have wrong permissions.

今天运行项目遇到如下代码 driverwebdriver.Chrome(chrome_driver, chrome_optionsoptions)上述代码运行报错如下&#xff1a; Message: chromedriver executable may have wrong permissions. Please see https://sites.google.com/a/chromium.org/chromedriver/home出错的原…...

每日一题 1372二叉树中的最长交错路径

题目 给你一棵以 root 为根的二叉树&#xff0c;二叉树中的交错路径定义如下&#xff1a; 选择二叉树中 任意 节点和一个方向&#xff08;左或者右&#xff09;。如果前进方向为右&#xff0c;那么移动到当前节点的的右子节点&#xff0c;否则移动到它的左子节点。改变前进方…...

【力扣每日一题】2023.9.2 最多可以摧毁的敌人城堡数量

目录 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 代码&#xff1a; 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 这道题难在阅读理解&#xff0c;题目看得我匪夷所思&#xff0c;错了好多个测试用例才明白题目说的是什么。 我简单翻译一下就是寻找1和…...

kotlin实现java的单例模式

代码 package com.flannery.interviewdemo.singleinstance//https://blog.csdn.net/Jason_Lee155/article/details/128796742 Java实现 //public class SingletonDemo { // private static SingletonDemo instancenew SingletonDemo(); // private SingletonDemo() // …...

使用 KeyValueDiffers 检测Angular 对象的变化

使用 KeyValueDiffers 检测Angular 对象的变化 ngDoCheck钩子 ngDoCheck 是 Angular 生命周期钩子之一。它允许组件在 Angular 检测到变化时执行自定义的变化检测逻辑。 当任何组件或指令的输入属性发生变化、在组件内部发生了变更检测周期或者当主动触发变更检测策略&#…...

Macos 10.13.2安装eclipse

eclipse for php 安装2021-12最后版本4.22 2021-12 R | Eclipse Packages jdk17 x64 dmg安装包,要安装jdk这个才能运行 Java Downloads | Oracle...

Android逆向学习(一)vscode进行android逆向修改并重新打包

Android逆向学习&#xff08;一&#xff09;vscode进行android逆向修改并重新打包 写在前面 其实我不知道这个文章能不能写下去&#xff0c;其实我已经开了很多坑但是都没填上&#xff0c;现在专利也发出去了&#xff0c;就开始填坑了&#xff0c;本坑的主要内容是关于androi…...

【深入浅出设计模式--状态模式】

深入浅出设计模式--状态模式 一、背景二、问题三、解决方案四、 适用场景总结五、后记 一、背景 状态模式是一种行为设计模式&#xff0c;让你能在一个对象的内部状态变化时改变其行为&#xff0c;使其看上去就像改变了自身所属的类一样。其与有限状态机的概念紧密相关&#x…...

Debezium系列之:Debezium Server在生产环境大规模应用详细的技术方案

Debezium系列之:Debezium Server在生产环境大规模应用详细的技术方案 一、需求背景二、Debezium Server实现技术三、技术方案流程四、生成接入配置五、新增数据库接入和删除数据库接入效果六、监控zookeeper节点程序七、新增数据库接入部署debezium server程序八、删除数据库接…...

多云管理“拦路虎”:深入解析网络互联、身份同步与成本可视化的技术复杂度​

一、引言&#xff1a;多云环境的技术复杂性本质​​ 企业采用多云策略已从技术选型升维至生存刚需。当业务系统分散部署在多个云平台时&#xff0c;​​基础设施的技术债呈现指数级积累​​。网络连接、身份认证、成本管理这三大核心挑战相互嵌套&#xff1a;跨云网络构建数据…...

基于ASP.NET+ SQL Server实现(Web)医院信息管理系统

医院信息管理系统 1. 课程设计内容 在 visual studio 2017 平台上&#xff0c;开发一个“医院信息管理系统”Web 程序。 2. 课程设计目的 综合运用 c#.net 知识&#xff0c;在 vs 2017 平台上&#xff0c;进行 ASP.NET 应用程序和简易网站的开发&#xff1b;初步熟悉开发一…...

SpringTask-03.入门案例

一.入门案例 启动类&#xff1a; package com.sky;import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cache.annotation.EnableCach…...

【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统

目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索&#xff08;基于物理空间 广播范围&#xff09;2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...

力扣-35.搜索插入位置

题目描述 给定一个排序数组和一个目标值&#xff0c;在数组中找到目标值&#xff0c;并返回其索引。如果目标值不存在于数组中&#xff0c;返回它将会被按顺序插入的位置。 请必须使用时间复杂度为 O(log n) 的算法。 class Solution {public int searchInsert(int[] nums, …...

AI病理诊断七剑下天山,医疗未来触手可及

一、病理诊断困局&#xff1a;刀尖上的医学艺术 1.1 金标准背后的隐痛 病理诊断被誉为"诊断的诊断"&#xff0c;医生需通过显微镜观察组织切片&#xff0c;在细胞迷宫中捕捉癌变信号。某省病理质控报告显示&#xff0c;基层医院误诊率达12%-15%&#xff0c;专家会诊…...

Python竞赛环境搭建全攻略

Python环境搭建竞赛技术文章大纲 竞赛背景与意义 竞赛的目的与价值Python在竞赛中的应用场景环境搭建对竞赛效率的影响 竞赛环境需求分析 常见竞赛类型&#xff08;算法、数据分析、机器学习等&#xff09;不同竞赛对Python版本及库的要求硬件与操作系统的兼容性问题 Pyth…...

【堆垛策略】设计方法

堆垛策略的设计是积木堆叠系统的核心&#xff0c;直接影响堆叠的稳定性、效率和容错能力。以下是分层次的堆垛策略设计方法&#xff0c;涵盖基础规则、优化算法和容错机制&#xff1a; 1. 基础堆垛规则 (1) 物理稳定性优先 重心原则&#xff1a; 大尺寸/重量积木在下&#xf…...

数据分析六部曲?

引言 上一章我们说到了数据分析六部曲&#xff0c;何谓六部曲呢&#xff1f; 其实啊&#xff0c;数据分析没那么难&#xff0c;只要掌握了下面这六个步骤&#xff0c;也就是数据分析六部曲&#xff0c;就算你是个啥都不懂的小白&#xff0c;也能慢慢上手做数据分析啦。 第一…...

LangChain【6】之输出解析器:结构化LLM响应的关键工具

文章目录 一 LangChain输出解析器概述1.1 什么是输出解析器&#xff1f;1.2 主要功能与工作原理1.3 常用解析器类型 二 主要输出解析器类型2.1 Pydantic/Json输出解析器2.2 结构化输出解析器2.3 列表解析器2.4 日期解析器2.5 Json输出解析器2.6 xml输出解析器 三 高级使用技巧3…...