Pyspark基础入门5_RDD的持久化方法
Pyspark
注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Spark Flink Kafka Hbase Hive Flume等等~写的都是纯干货,各种顶会的论文解读,一起进步。
今天继续和大家分享一下Pyspark基础入门5
#博学谷IT学习技术支持
`
文章目录
- Pyspark
- 前言
- 一、RDD的缓存
- 二、使用步骤
- 1.演示缓存的使用操作
- 三、RDD的checkpoint检查点
- 四、缓存和检查点区别
- 总结
前言
今天和大家分享的是Spark RDD的持久化方法。
一、RDD的缓存
缓存:
一般当一个RDD的计算非常的耗时|昂贵(计算规则比较复杂),或者说这个RDD需要被重复(多方)使用,此时可以将这个RDD计算完的结果缓存起来, 便于后续的使用, 从而提升效率
通过缓存也可以提升RDD的容错能力, 当后续计算失败后, 尽量不让RDD进行回溯所有的依赖链条, 从而减少重新计算时间
注意:
缓存仅仅是一种临时的存储, 缓存数据可以保存到内存(executor内存空间),也可以保存到磁盘中, 甚至支持将缓存数据保存到堆外内存中(executor以外的系统内容)
由于临时存储, 可能会存在数据丢失, 所以缓存操作, 并不会将RDD之间的依赖关系给截断掉(丢失掉),因为当缓存失效后, 可以基于原有依赖关系重新计算
缓存的API都是LAZY的, 如果需要触发缓存操作, 必须后续跟上一个action算子, 一般建议使用count如果不添加action算子, 只有当后续遇到第一个action算子后, 才会触发缓存
二、使用步骤
设置缓存的API:
rdd.cache(): 执行缓存操作 仅能将数据缓存到内存中
rdd.persist(缓存的级别(位置)): 执行缓存操作, 默认将数据缓存到内存中, 当然也可以自定义缓存位置
手动清理缓存的API:
rdd.unpersist()
默认情况下, 当整个Spark应用程序执行完成后, 缓存也会自动失效的, 自动删除
常用的缓存级别:
MEMORY_ONLY : 仅缓存到内存中
DISK_ONLY: 仅缓存到磁盘
MEMORY_AND_DISK: 内存 + 磁盘 优先缓存到内存中, 当内存不足的时候, 剩余数据缓存到磁盘中
OFF_HEAP: 缓存到堆外内存
最为常用的: MEMORY_AND_DISK
1.演示缓存的使用操作
import timeimport jieba
from pyspark import SparkContext, SparkConf, StorageLevel
import os# 锁定远端环境, 确保环境统一
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
"""清洗需求: 需要先对数据进行清洗转换处理操作, 清洗掉为空的数据, 以及数据字段个数不足6个的数据, 并且将每一行的数据放置到一个元组中, 元组中每一个元素就是一个字段的数据
"""def xuqiu1():# 需求一: 统计每个关键词出现了多少次, 获取前10个res = rdd_map \.flatMap(lambda field_tuple: jieba.cut(field_tuple[2])) \.map(lambda keyWord: (keyWord, 1)) \.reduceByKey(lambda agg, curr: agg + curr) \.sortBy(lambda res_tup: res_tup[1], ascending=False).take(10)print(res)def xuqiu2():res = rdd_map \.map(lambda field_tuple: ((field_tuple[1], field_tuple[2]), 1)) \.reduceByKey(lambda agg, curr: agg + curr) \.top(10, lambda res_tup: res_tup[1])print(res)if __name__ == '__main__':print("Spark的Python模板")# 1. 创建SparkContext核心对象conf = SparkConf().setAppName('sougou').setMaster('local[*]')sc = SparkContext(conf=conf)# 2. 读取外部文件数据rdd = sc.textFile(name='file:///export/data/workspace/ky06_pyspark/_02_SparkCore/data/SogouQ.sample')# 3. 执行相关的操作:# 3.1 执行清洗操作rdd_filter = rdd.filter(lambda line: line.strip() != '' and len(line.split()) == 6)rdd_map = rdd_filter.map(lambda line: (line.split()[0],line.split()[1],line.split()[2][1:-1],line.split()[3],line.split()[4],line.split()[5]))# 由于 rdd_map 被多方使用了, 此时可以将其设置为缓存rdd_map.persist(storageLevel=StorageLevel.MEMORY_AND_DISK).count()# 3.2 : 实现需求# 需求一: 统计每个关键词出现了多少次, 获取前10个# 快速抽取函数: ctrl + alt + Mxuqiu1()# 当需求1执行完成, 让缓存失效rdd_map.unpersist().count()# 需求二:统计每个用户每个搜索词点击的次数xuqiu2()time.sleep(100)
三、RDD的checkpoint检查点
checkpoint比较类似于缓存操作, 只不过缓存是将数据保存到内存 或者 磁盘上, 而checkpoint是将数据保存到磁盘或者HDFS(主要)上
checkpoint提供了更加安全可靠的持久化的方案, 确保RDD的数据不会发生丢失, 一旦构建checkpoint操作后, 会将RDD之间的依赖关系(血缘关系)进行截断,后续计算出来了问题, 可以直接从检查点的位置恢复数据主要作用: 容错 也可以在一定程度上提升效率(性能) (不如缓存)在后续计算失败后, 从检查点直接恢复数据, 不需要重新计算
相关的API:
第一步: 设置检查点保存数据位置
sc.setCheckpointDir(‘路径地址’)
第二步: 在对应RDD开启检查点rdd.checkpoint()rdd.count()注意: 如果运行在集群模式中, checkpoint的保存的路径地址必须是HDFS, 如果是local模式 可以支持在本地路径checkpoint数据不会自动删除, 必须同时手动方式将其删除掉
import timefrom pyspark import SparkContext, SparkConf
import os# 锁定远端环境, 确保环境统一
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':print("演示checkpoint相关的操作")# 1- 创建SparkContext对象conf = SparkConf().setAppName('sougou').setMaster('local[*]')sc = SparkContext(conf=conf)# 开启检查点, 设置检查点的路径sc.setCheckpointDir('/spark/chk') # 默认的地址为HDFS# 2- 获取数据集rdd = sc.parallelize(['张三 李四 王五 赵六', '田七 周八 李九 老张 老王 老李'])# 3- 执行相关的操作: 以下操作仅仅是为了让依赖链条更长, 并没有太多的实际意义rdd1 = rdd.flatMap(lambda line: line.split())rdd2 = rdd1.map(lambda name: (name, 1))rdd3 = rdd2.map(lambda name_tuple: (f'{name_tuple[0]}_itcast', name_tuple[1]))rdd3 = rdd3.repartition(3)rdd4 = rdd3.map(lambda name_tuple: name_tuple[0])# RDD4设置检查点:rdd4.checkpoint()rdd4.count()rdd5 = rdd4.flatMap(lambda name: name.split('_'))rdd5 = rdd5.repartition(4)rdd6 = rdd5.map(lambda name: (name, 1))rdd_res = rdd6.reduceByKey(lambda agg, curr: agg + curr)print(rdd_res.collect())time.sleep(1000)
四、缓存和检查点区别
1- 存储位置不同:
缓存: 存储在内存或者磁盘 或者 堆外内存中
检查点: 可以将数据存储在磁盘 或者 HDFS上, 在集群模式下, 仅能保存到HDFS上
2- 血缘关系:
缓存: 不会截断RDD之间的血缘关系, 因为缓存数据有可能会失效, 当失效后, 需要重新回溯计算操作
检查点: 会截断RDD的之间的血缘关系, 因为检查点将数据保存到更加安全可靠的位置, 认为数据不会发生丢失问题, 当执行失败的时候, 也不需要重新回溯计算
3- 生命周期:
缓存: 当程序执行完成后, 或者手动调度unpersist 缓存都会被删除
检查点: 即使程序退出后, 检查点的数据依然是存在的, 不会删除, 需要手动删除的
一般建议将两种持久化的方案一同作用于项目环境中, 先设置缓存 然后再设置检查点, 最后统一触发执行(底层: 会将数据先缓存好, 然后将缓存好的数据, 保存到checkpoint对应的路径中, 后续在使用的时候, 优先从缓存中读取, 如果缓存中没有, 会从checkpoint中获取, 同时再把读取数据放置到缓存中)
总结
今天和大家分享了RDD的两种持久化方法。
相关文章:
Pyspark基础入门5_RDD的持久化方法
Pyspark 注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Spark Flink Kafka Hbase Hi…...
汽车娱乐系统解决方案
Danlaw在汽车和航空航天行业里是全球知名的技术和服务供应商,致力于提供更加安全与智能的系统。Danlaw以突破性技术和高效开发、动态环境的自适应解决方案而闻名。Danlaw优秀的联网汽车解决方案使之成为全球大型互联设备供应商之一。 一 信息娱乐系统测试 | 风丘科…...
Go语言结构体,这一篇就够了
Go语言结构体,这一篇就够了1.结构体的概念2.结构体的定义3.结构体的实例化4.结构体初始化5.构造函数6.方法和接收者方法接收者7.嵌套结构体8.结构体的“继承”9.结构体与JSON序列化10.结构体标签(Tag)Go语言中没有“类”的概念,也…...
【python】各种排序算法代码大集合
超级好用的口诀: 时间复杂度:快些以nlogn的速度归队。 稳定性:心情不稳定,快些选一堆好友来聊天吧。 直接插容易插变O(N),起泡起得好变O(N).(初始序列已经有序) 插入排序法在近乎有序的情况下,效率特别高,通过插入排序,可以引申出希尔排序 归并排序:左半部分排好序…...
K8S Pod健康检查
因为 k8s 中采用大量的异步机制、以及多种对象关系设计上的解耦,当应用实例数 增加/删除、或者应用版本发生变化触发滚动升级时,系统并不能保证应用相关的 service、ingress 配置总是及时能完成刷新。在一些情况下,往往只是新的 Pod 完成自身…...
NFS服务器与CGI程序详解
目录 NFS 服务器 一,NFS 服务器简介 二,NFS的使用 三,客户端使用 autofs 自动挂载 1,autofs产生的原因 四,autofs的安装与配置文件 五,autofs的使用 www服务器---cgi程序 CGI程序的应用 NFS 服务器 一&a…...
可视化项目管理,控制项目进度,项目经理需要做好以下工作
对于项目的管理者来说,项目信息透明,能够更容易让管理者发现项目中的问题,及时找到问题的原因和相关任务的责任人。 当项目信息能相对精准地呈现给管理者时,也能促进项目成员也能更加认真负责的完成任务,不会找借口推…...
海康工业相机使用教程
工业相机使用一、硬件连接1、准备材料2、相机供电(1)区分电源适配器正负极(2)连接相机电源线缆(3)连接完成后,相机蓝色灯常亮则成功3、软件连接(1)MVS客户端下载地址&…...
java开发手册之安全规约
安全规约隶属于用户个人的页面或者功能必须进行权限控制校验。 说明:防止没有做水平权限校验就可随意访问、修改、删除别人的数据,比如查看他人的私信内容、修改他人的订单。 用户敏感数据禁止直接展示,必须对展示数据进行脱敏。 说明&#x…...
python模块引入问题和解决方案_真方案不骗人
1.pycharm运行python脚本的过程 使用pycharm等编辑器run/debug运行python脚本时,编辑器会通过本地python命令全路径执行脚本,例如 D:\DevelopTools\Python\python.exe D:/Codes/一长串路径/bbss_nature_python/demo/test_no_param_in.py 并且会在pyth…...
Read book Netty in action(Chapter X)--Unit Testing
序言 ChannelHandler 是Netty 应用程序的关键元素,所以彻底地测试它们应该是你的开发过程的一个标准部分。最佳实践要求你的测试不仅要能够证明你的实现是正确的,而且还要能够很容易地隔离那些因修改代码而突然出现的问题。这种类型的测试叫作单元测试。…...
Appium+Python连接真机、跳过登录页、Unexpected error while obtaining UI hierarchy问题
Appium连接真机 使用数据线连接电脑,然后选择文件传输方式 打开手机设置拉至底部,点击关于手机,连续点击7次版本号打开开发者模式 点击设置中的系统与更新,找到开发者选项----> 打开USB调试即可 在终端中输入adb devices确定…...
ES6模块化
目录 一、什么是 ES6 模块化规范 二、ES6 模块化的基本语法 2.1默认导出 2.1默认导入 2.1 注意事项 2.2按需导出 2.2按需导入 2.2按需导出与按需导入的注意事项 2.3直接导入并执行模块中的代码 一、什么是 ES6 模块化规范 ES6 模块化规范是浏览器端与服务器端通用的…...
201809-3 CCF 元素选择器 满分题解(超详细注释代码) + 解题思路(超详细)
问题描述 解题思路 根据题意可以知道在查询中可以分为两种情况 第一种是查询一个标签选择器或者id选择器(可以称为一级查询) 第二种就是存在大于两级的查询(可以称为多级查询) 显然第一种查询需要存储每一种元素在内容中所有出现…...
证书拓展域(1)
证书拓展定义了数字证书的标准拓展,每个拓展域GB/T 16264.8-200X中定义的一个OID相关。 这些OID都是id-ce的成员,其定义如下: id-ce OBJECT IDENTIFIER :: { joint-iso-ccitt(2) ds(5) 29 }1.证书策略 certificatePolicies 1.1 定义 本…...
浅谈ChatGPT 和 对AI 的思考
新世纪以来,人工智能作为一个非常热门话题,一直收到大众的广泛的关注。从一开始的图像的分类,检测,到人脸的识别,到视频分析分类,到事件的监测,到基于图片的文本生成,到AI自动写小说…...
NCRE计算机等级考试Python真题(十二)
第十二套试题1、以下关于程序设计语言的描述,错误的选项是:A.Python语言是一种脚本编程语言B.汇编语言是直接操作计算机硬件的编程语言C.程序设计语言经历了机器语言、汇编语言、脚本语言三个阶段D.编译和解释的区别是一次性翻译程序还是每次执行时都要翻…...
Java并发类库提供的线程池有哪几种? 分别有什么特点?
第21讲 | Java并发类库提供的线程池有哪几种? 分别有什么特点? 我在专栏第 17 讲中介绍过线程是不能够重复启动的,创建或销毁线程存在一定的开销,所以利用线程池技术来提高系统资源利用效率,并简化线程管理,…...
企业微信如何群发消息到客户群?
为提升工作效率,工作中,企业常常会借助企业微信的群发功能一键发送多个客户。那么企业微信如何群发消息呢? 其中成员个人支持群发消息到客户群,企业也可以创建内容提醒成员进行执行群发。 管理员支持在管理端或在手机端创建企业…...
【信号与系统笔记】第一章 绪论
1.1信号传输系统 信息传输的任务 将带有信息的信号,通过某种系统由发送者传送给接收者。 通信系统的组成 转换器:把消息转换为电信号或者把电信号还原成消息信道:信号传输的通道,广义上来说。发射机和接收机也可以是信道的一部分…...
[神经网络]DETR目标检测网络
一、概述 相较于传统目标检测,DETR是一种纯端到端的网络。它不再需要NMS(非极大值抑制,用于去除多余的预测框)和生成anchor。 DETR提出了一个新的目标函数(二分图匹配),这个函数可以强制网络输出一个独一无二的预测值&…...
【服务器管理】connection refused问题解决
简述 在配置服务器的时候,遇到了这个问题。我当时明明已经搭建好了服务,但是我在客户端比如手机上,却怎么都连不上服务器。看日志的话显示的是connection refuesed timeout 这种情况,大概率是服务器的端口没有被打开。 我们只需…...
2023_华为OD机试真题_Python_047_整理扑克牌
整理扑克牌 题目描述 给定一组数字,表示扑克牌的牌面数字,忽略扑克牌的花色,请按如下规则对这一组扑克牌进行整理: 步骤1. 对扑克牌进行分组,形成组合牌,规则如下: 当牌面数字相同张数大于等于4时,组合牌为“炸弹”;3张相同牌面数字 + 2张相同牌面数字,且3张牌与2…...
吐血整理,自动化测试pytest测试框架,资深测试带你少走弯路......
目录:导读前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜)前言 Pytest框架详解 py…...
SAP BASE64加密及解密
简介:BASE64是一种编码方法,它是一种基于用64个可打印字符来表示二进制数据的表示方法,主要应用于数据存储,传输,打印它是用64个可打印字符表示二进制所有数据方法。由于2的6次方等于64,所以可以用每6个位元…...
【页面无响应】Web页面经常无响应前端如何定位与优化(已解決)
【写在前面】客户现场应用我们的系统时候,发现用着用着就出现1个页面无响应现象,给客户带来极其不好的体验,尤其是当重要工作汇报演示时,就给我看无响应,浏览器崩溃?这样对产品的发展无疑是致命的伤&#x…...
隐私计算 FATE - 多分类神经网络算法测试
一、说明 本文分享基于 Fate 使用 横向联邦 神经网络算法 对 多分类 的数据进行 模型训练,并使用该模型对数据进行 多分类预测。 二分类算法:是指待预测的 label 标签的取值只有两种;直白来讲就是每个实例的可能类别只有两种 (0 或者 1)…...
Codeforces Round 853 (Div. 2)
Codeforces Round 853 (Div. 2) C. Serval and Toxels Arrays 思路: 求任意两个组合的元素个数。 注意到,其实每个元素都是独立的。他在任意组合的出现情况组成的贡献是可以分开讨论的。我们讨论元素x。假设x在m1个数组中出现了cnt次(一个…...
Ka频段需要更多带宽?
随着全球连接需求的增长,许多卫星通信(satcom)系统日益采用Ka频段,对数据速率的要求也水涨船高。目前,高性能信号链已经能支持数千兆瞬时带宽,一个系统中可能有成百上千个收发器,超高吞吐量数据速率已经成为现实。 另…...
初学pyinstaller打包过程中的一些问题
记录一下使用pyinstaller打包过程中的一些问题: 不安装虚拟环境打包,直接打包,一般不会出现什么问题,但是打包的exe很大,把所有模块和依赖库也一起打包了。 建议使用虚拟环境打包,安装必要的包࿰…...
做网站可以申请专利吗/各大网站推广平台
目录 模块 模块的使用 if __name__ "__main__": 包 模块 当代码量很大的时候,我们不可能把所有的代码写在一个文件中,所以这时候就需要分多个.py文件进行写代码。一个.py文件就是一个模块。如文件名为main.py,那么模块名就为…...
网站 电信已备案 联通/永久免费个人网站申请注册
问题描述 给定一个字符串,你需要从第start位开始每隔step位输出字符串对应位置上的字符。 输入格式 第一行一个只包含小写字母的字符串。 第二行两个非负整数start和step,意义见上。 输出格式 一行,表示对应输出。 样例输入 abcdefg 2 2 样例…...
巴音郭楞网站建设/怎么建个人网站
在分类中,和自己的父类关联public class AssessQualityIndex extends IdEntity<AssessQualityIndex> {private static final long serialVersionUID 1L; private String scoreStandard; // 评分标准private AssessQualityIndex parent; // 父级Js…...
网站首页布局分析/app推广引流渠道
算是读书笔记吧二进制编码二进制和我们平时用的十进制,其实并没有什么本质区别,只是平时我们是“逢十进一”,这里变成了“逢二进一”而已。每一位,相比于十进制下的 0~9 这十个数字,我们只能用 0 和 1 这两…...
模板网站建设公司/免费奖励自己的网站
8.4.6 用编程方式添加DataTable行 在为DataTable定义了架构之后,也就是设置好了需要的列名以后,就可以可通过将DataRow对象添加到表的Rows集合中来将数据行添加到表中。与添加DataColumn类似,同样可以通过使用DataRow构造函数,或…...