当前位置: 首页 > news >正文

Spark RDD持久化

RDD Cache缓存

        RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以序列化的形式缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

 1)代码实现

object cache01 {def main(args: Array[String]): Unit = {//1.创建SparkConf并设置App名称val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")//2.创建SparkContext,该对象是提交Spark App的入口val sc: SparkContext = new SparkContext(conf)//3. 创建一个RDD,读取指定位置文件:hello atguigu atguiguval lineRdd: RDD[String] = sc.textFile("input1")//3.1.业务逻辑val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))val wordToOneRdd: RDD[(String, Int)] = wordRdd.map {word => {println("************")(word, 1)}}//3.5 cache操作会增加血缘关系,不改变原有的血缘关系println(wordToOneRdd.toDebugString)//3.4 数据缓存。wordToOneRdd.cache()//3.6 可以更改存储级别// wordToOneRdd.persist(StorageLevel.MEMORY_AND_DISK_2)//3.2 触发执行逻辑wordToOneRdd.collect()println("-----------------")println(wordToOneRdd.toDebugString)//3.3 再次触发执行逻辑wordToOneRdd.collect()//4.关闭连接sc.stop()}
}

2)源码解析

mapRdd.cache()
def cache(): this.type = persist()
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)object StorageLevel {val NONE = new StorageLevel(false, false, false, false)val DISK_ONLY = new StorageLevel(true, false, false, false)val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)val MEMORY_ONLY = new StorageLevel(false, true, false, true)val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

        注意:默认的存储级别都是仅在内存存储一份。在存储级别的末尾加上“_2”表示持久化的数据存为两份。

        缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。

3)自带缓存算子

        Spark会自动对一些Shuffle操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点Shuffle失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist或cache。

object cache02 {def main(args: Array[String]): Unit = {//1.创建SparkConf并设置App名称val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")//2.创建SparkContext,该对象是提交Spark App的入口val sc: SparkContext = new SparkContext(conf)//3. 创建一个RDD,读取指定位置文件:hello atguigu atguiguval lineRdd: RDD[String] = sc.textFile("input1")//3.1.业务逻辑val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))val wordToOneRdd: RDD[(String, Int)] = wordRdd.map {word => {println("************")(word, 1)}}// 采用reduceByKey,自带缓存val wordByKeyRDD: RDD[(String, Int)] = wordToOneRdd.reduceByKey(_+_)//3.5 cache操作会增加血缘关系,不改变原有的血缘关系println(wordByKeyRDD.toDebugString)//3.4 数据缓存。//wordByKeyRDD.cache()//3.2 触发执行逻辑wordByKeyRDD.collect()println("-----------------")println(wordByKeyRDD.toDebugString)//3.3 再次触发执行逻辑wordByKeyRDD.collect()//4.关闭连接sc.stop()}
}

        访问http://localhost:4040/jobs/页面,查看第一个和第二个job的DAG图。说明:增加缓存后血缘依赖关系仍然有,但是,第二个job取的数据是从缓存中取的。

RDD CheckPoint检查点 

1)检查点:是通过将RDD中间结果写入磁盘。

2)为什么要做检查点?

        由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。

3)检查点存储路径:Checkpoint的数据通常是存储在HDFS等容错、高可用的文件系统

4)检查点数据存储格式为:二进制的文件

5)检查点切断血缘:在Checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移除。

6)检查点触发时间:对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。但是检查点为了数据安全,会从血缘关系的最开始执行一遍

7)设置检查点步骤

(1)设置检查点数据存储路径:sc.setCheckpointDir("./checkpoint1")

(2)调用检查点方法:wordToOneRdd.checkpoint()

8)代码实现

 

object checkpoint01 {def main(args: Array[String]): Unit = {//1.创建SparkConf并设置App名称val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")//2.创建SparkContext,该对象是提交Spark App的入口val sc: SparkContext = new SparkContext(conf)// 需要设置路径,否则抛异常:Checkpoint directory has not been set in the SparkContextsc.setCheckpointDir("./checkpoint1")//3. 创建一个RDD,读取指定位置文件:hello atguigu atguiguval lineRdd: RDD[String] = sc.textFile("input1")//3.1.业务逻辑val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {word => {(word, System.currentTimeMillis())}}//3.5 增加缓存,避免再重新跑一个job做checkpoint
//        wordToOneRdd.cache()//3.4 数据检查点:针对wordToOneRdd做检查点计算wordToOneRdd.checkpoint()//3.2 触发执行逻辑wordToOneRdd.collect().foreach(println)// 会立即启动一个新的job来专门的做checkpoint运算//3.3 再次触发执行逻辑wordToOneRdd.collect().foreach(println)wordToOneRdd.collect().foreach(println)Thread.sleep(10000000)//4.关闭连接sc.stop()}
}

9)执行结果

        访问http://localhost:4040/jobs/页面,查看4个job的DAG图。其中第2个图是checkpoint的job运行DAG图。第3、4张图说明,检查点切断了血缘依赖关系。

(1)只增加checkpoint,没有增加Cache缓存打印

第1个job执行完,触发了checkpoint,第2个job运行checkpoint,并把数据存储在检查点上。第3、4个job,数据从检查点上直接读取。

(hadoop,1577960215526)

。。。。。。

(hello,1577960215526)

(hadoop,1577960215609)

。。。。。。

(hello,1577960215609)

(hadoop,1577960215609)

。。。。。。

(hello,1577960215609)

(2)增加checkpoint,也增加Cache缓存打印

第1个job执行完,数据就保存到Cache里面了,第2个job运行checkpoint,直接读取Cache里面的数据,并把数据存储在检查点上。第3、4个job,数据从检查点上直接读取。

(hadoop,1577960642223)

。。。。。。

(hello,1577960642225)

(hadoop,1577960642223)

。。。。。。

(hello,1577960642225)

(hadoop,1577960642223)

。。。。。。

(hello,1577960642225)

 

 缓存和检查点区别

1)Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖。

2)Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。

3)建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。

4)如果使用完了缓存,可以通过unpersist()方法释放缓存

检查点存储到HDFS集群

如果检查点数据存储到HDFS集群,要注意配置访问集群的用户名。否则会报访问权限异常。

object checkpoint02 {def main(args: Array[String]): Unit = {// 设置访问HDFS集群的用户名System.setProperty("HADOOP_USER_NAME","atguigu")//1.创建SparkConf并设置App名称val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")//2.创建SparkContext,该对象是提交Spark App的入口val sc: SparkContext = new SparkContext(conf)// 需要设置路径.需要提前在HDFS集群上创建/checkpoint路径sc.setCheckpointDir("hdfs://hadoop102:9000/checkpoint")//3. 创建一个RDD,读取指定位置文件:hello atguigu atguiguval lineRdd: RDD[String] = sc.textFile("input1")//3.1.业务逻辑val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {word => {(word, System.currentTimeMillis())}}//3.4 增加缓存,避免再重新跑一个job做checkpointwordToOneRdd.cache()//3.3 数据检查点:针对wordToOneRdd做检查点计算wordToOneRdd.checkpoint()//3.2 触发执行逻辑wordToOneRdd.collect().foreach(println)//4.关闭连接sc.stop()}
}

相关文章:

Spark RDD持久化

RDD Cache缓存 RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以序列化的形式缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供…...

【Linux】Linux系统安装Python3和pip3

1.说明 一般来说Linux会自带Python环境,可能是Python3或者Python2,可能有pip也可能没有pip,所以有时候需要自己安装指定的Python版本。Linux系统下的安装方式都大同小异,基本上都是下载安装包然后编译一下,再创建好软…...

用java进行base64加密

首先定义一组密钥,加密和解密使用同一组密钥private final String key "hahahahahaha";也可以随机生成密钥/*** 生成随机密钥* param keySize 密钥大小推荐128 256* return* throws NoSuchAlgorithmException*/public static String generateSecret(int keySize) th…...

torch函数合集

torch.tensor() 原型:torch.tensor(data, dtypeNone, deviceNone, requires_gradFalse) 功能:其中data可以是:list,tuple,NumPy,ndarray等其他类型,torch.tensor会从data中的数据部分做拷贝(而不是直接引用),根据原始数据类型生成相应类型的torch.Tenso…...

AcWing算法提高课-3.1.2信使

宣传一下算法提高课整理 <— CSDN个人主页&#xff1a;更好的阅读体验 <— 题目传送门点这里 题目描述 战争时期&#xff0c;前线有 nnn 个哨所&#xff0c;每个哨所可能会与其他若干个哨所之间有通信联系。 信使负责在哨所之间传递信息&#xff0c;当然&#xff0c;…...

Paddle OCR Win 11下的安装和简单使用教程

Paddle OCR Win 11下的安装和简单使用教程 对于中文的识别&#xff0c;可以考虑直接使用Paddle OCR&#xff0c;识别准确率和部署都相对比较方便。 环境搭建 目前PaddlePaddle 发布到v2.4&#xff0c;先下载paddlepaddle&#xff0c;再下载paddleocr。根据自己设备操作系统进…...

杂谈:数组index问题和对象key问题

面试题一&#xff1a; var arr [1, 2, 3, 4] 问&#xff1a;arr[1] ?; arr[1] ?答&#xff1a;arr[1] 2; arr[1] 2 这里可以再分为两个问题&#xff1a; 1、数组赋值 var arr [1, 2, 3, 4]arr[1] 10; // 数字场景 arr[10] 1; // 字符串场景 arr[a] 1; // 字符串…...

三天Golang快速入门—Slice切片

三天Golang快速入门—Slice切片Slice切片切片原理切片遍历append函数操作切片append添加append追加多个切片中删除元素切片合并string和slice的联系Slice切片 切片原理 由三个部分构成&#xff0c;指针、长度、容量指针&#xff1a;指向slice第一个元素对应的数组元素的地址长…...

腾讯会议演示者视图/演讲者视图

前言 使用腾讯会议共享PPT时&#xff0c;腾讯会议支持共享用户使用演示者视图/演讲者视图&#xff0c;而会议其他成员可以看到正常的放映视图。下面以Win10系统和Office为例&#xff0c;介绍使用步骤。值得一提的是&#xff0c;该方法同时适用于单显示屏和多显示屏。 腾讯会议…...

【C++】类与对象(一)

文章目录1、面向过程和面向对象初步认识2、类的引入3、类的定义4、类的访问限定符5、类的作用域6、类的实例化7、计算类对象的大小8、this指针9、 C语言和C实现Stack的对比1、面向过程和面向对象初步认识 C语言是面向过程的&#xff0c;关注的是过程&#xff0c;分析出求解问题…...

JavaScript基本语法

本文提到的绝大多数语法都是与Java不同的语法,相同的就不会赘述了.JavaScript的三种引入方式内部js<body><script>alert(hello);</script> </body>行内js<body><div onclick"alert(hello)">这是一个div 点击一下试试</div>…...

OpenCV4.x图像处理实例-道路车辆检测(基于背景消减法)

通过背景消减进行道路车辆检测 文章目录 通过背景消减进行道路车辆检测1、车辆检测思路介绍2、BackgroundSubtractorMOG23、车辆检测实现在本文中,将介绍如何使用简单但有效的背景-前景减法方法执行车辆检测等任务。本文将使用 OpenCV 中使用背景-前景减法和轮廓检测,以及如何…...

pwnlab通关流程

pwnlab通关 关于文件包含&#xff0c;环境变量劫持的一个靶场 信息收集 靶机ip&#xff1a;192.168.112.133 开放端口 根据开放的端口信息决定从80web端口入手 目录信息 在images和upload路径存在目录遍历&#xff0c;config.php被渲染无法查看&#xff0c;upload.php需…...

面向过程与面向对象的区别与联系

目录 什么是面向过程 什么是面向对象 区别 各自的优缺点 什么是面向过程 面向过程是一种以事件为中心的编程思想&#xff0c;编程的时候把解决问题的步骤分析出来&#xff0c;然后用函数把这些步骤实现&#xff0c;在一步一步的具体步骤中再按顺序调用函数。 什么是面向对…...

主机状态(查看资源占用情况、查看网络占用情况)

1. 查看资源占用情况 【1】可以通过top命令查看cpu、内存的使用情况&#xff0c;类似windows的任务管理器 默认5s刷新一次 语法&#xff1a;top 可 Ctrl c 退出 2.磁盘信息监控 【1】使用df命令&#xff0c;查看磁盘信息占用情况 语法&#xff1a;df [ -h ] 以更加人性化…...

代码随想录算法训练营第四十一天 | 01背包问题-二维数组滚动数组,416. 分割等和子集

一、参考资料01背包问题 二维 https://programmercarl.com/%E8%83%8C%E5%8C%85%E7%90%86%E8%AE%BA%E5%9F%BA%E7%A1%8001%E8%83%8C%E5%8C%85-1.html 视频讲解&#xff1a;https://www.bilibili.com/video/BV1cg411g7Y6 01背包问题 一维 https://programmercarl.com/%E8%83%8C%E5…...

VMware NSX 4.1 发布 - 网络安全虚拟化平台

请访问原文链接&#xff1a;VMware NSX 4 - 网络安全虚拟化平台&#xff0c;查看最新版。原创作品&#xff0c;转载请保留出处。 作者主页&#xff1a;www.sysin.org VMware NSX 提供了一个敏捷式软件定义基础架构&#xff0c;用来构建云原生应用程序环境。NSX 专注于为具有异…...

计算理论 复杂度预备知识

文章目录计算理论 复杂度预备知识符号递归表达式求解通项公式主方法Akra-Bazzi 定理计算理论 复杂度预备知识 符号 f(n)o(g(n))f(n)o(g(n))f(n)o(g(n)) &#xff1a;∃c\exists c∃c &#xff0c;当 nnn 足够大时&#xff0c; f(n)<cg(n)f(n)\lt cg(n)f(n)<cg(n) &#…...

二叉树——二叉搜索树中的插入操作

二叉搜索树中的插入操作 链接 给定二叉搜索树&#xff08;BST&#xff09;的根节点 root 和要插入树中的值 value &#xff0c;将值插入二叉搜索树。 返回插入后二叉搜索树的根节点。 输入数据 保证 &#xff0c;新值和原始二叉搜索树中的任意节点值都不同。 注意&#xff0c…...

C# if break,if continue,if return的区别和使用

故事部分&#xff1a; 现在你肚子饿了&#xff0c;想要去&#xff1a; 1.吃个三菜一汤。 2.吃个蛋糕。 3.喝个奶茶。 结果&#xff0c;你吃饭的时候&#xff0c;吃到一个虫子。 你会有几种做法&#xff1f; 1.把有虫子这道菜拿走&#xff0c;继续吃下一道菜 。 2.算了&#xff…...

力扣-第二高的薪水

大家好&#xff0c;我是空空star&#xff0c;本篇带大家了解一道中等的力扣sql练习题。 文章目录前言一、题目&#xff1a;176. 第二高的薪水二、解题1.正确示范①提交SQL运行结果2.正确示范②提交SQL运行结果3.正确示范③提交SQL运行结果4.正确示范④提交SQL运行结果5.其他总结…...

I - 太阳轰炸(组合数学Cnk n固定)

2023河南省赛组队训练赛&#xff08;二&#xff09; - Virtual Judge (vjudge.net) 背景&#xff1a;阿塔尼斯&#xff0c;达拉姆的大主教&#xff0c;在艾尔又一次沦陷之后指挥着星灵的最后一艘方舟舰&#xff1a;亚顿之矛。作为艾尔星灵数千年来的智慧结晶&#xff0c;亚顿之…...

centos安装gitlab

更新系统 sudo yum -y update安装所需要的包 sudo yum -y install epel-release curl vim policycoreutils-python如果要安装并使用本地Postfix服务器发送通知&#xff0c;请安装Postfix&#xff0c;这里就不安装了&#xff1a; sudo yum -y install postfix安装后启动并启用…...

【洛谷 P1093】[NOIP2007 普及组] 奖学金 题解(结构体排序)

[NOIP2007 普及组] 奖学金 题目描述 某小学最近得到了一笔赞助&#xff0c;打算拿出其中一部分为学习成绩优秀的前 555 名学生发奖学金。期末&#xff0c;每个学生都有 333 门课的成绩:语文、数学、英语。先按总分从高到低排序&#xff0c;如果两个同学总分相同&#xff0c;再…...

【Hello Linux】进程优先级和环境变量

作者&#xff1a;小萌新 专栏&#xff1a;Linux 作者简介&#xff1a;大二学生 希望能和大家一起进步&#xff01; 本篇博客简介&#xff1a;简单介绍下进程的优先级 环境变量 进程优先级环境变量进程的优先级基本概念如何查看优先级PRI与NINI值的设置范围NI值如何修改修改方式…...

日期:Date,SimpleDateFormat常见API以及包装类

一.Date类 package com.gch.d1_date;import java.util.Date;/**目标:学会使用Date类处理时间,获取时间的信息*/ public class DateDemo1 {public static void main(String[] args) {// 1.创建一个Date类的对象:代表系统此刻日期时间对象Date d new Date();System.out.println(…...

嵌入式之ubuntu终端操作与shell常用命令详解

目录 文件和目录列表 基本列表功能 显示列表长度 过滤输出列表 浏览文件系统 Linux 文件系统 遍历目录 处理文件 创建文件 复制文件 制表键自动补全 重命名文件 删除文件 处理目录 创建目录 删除目录 ​编辑其他常用命令与操作 Uname命令 clear命令 返回上一级命令 显…...

【Shell学习笔记】6.Shell 流程控制

前言 本章介绍Shell的流程控制。 Shell 流程控制 和 Java、PHP 等语言不一样&#xff0c;sh 的流程控制不可为空&#xff0c;如(以下为 PHP 流程控制写法)&#xff1a; 实例 <?php if (isset($_GET["q"])) {search(q); } else {// 不做任何事情 }在 sh/bash…...

27k入职阿里测开岗那天,我哭了,这5个月付出的一切总算没有白费~

先说一下自己的个人情况&#xff0c;计算机专业&#xff0c;16年普通二本学校毕业&#xff0c;经历过一些失败的工作经历后&#xff0c;经推荐就进入了华为的测试岗&#xff0c;进去才知道是接了个外包项目&#xff0c;不太稳定的样子&#xff0c;可是刚毕业谁知道什么外包不外…...

服务端开发之Java备战秋招面试篇5

努力了那么多年,回头一望,几乎全是漫长的挫折和煎熬。对于大多数人的一生来说,顺风顺水只是偶尔,挫折、不堪、焦虑和迷茫才是主旋律。我们登上并非我们所选择的舞台,演出并非我们所选择的剧本。继续加油吧&#xff01; 目录 1.ArrayList与LinkedList区别&#xff0c; 应用场景…...

用什么做网站开发/win7优化软件

今天主要是寻找板卡问题然后维修&#xff0c;这次生产了600PCS板卡&#xff0c;第一次小批量生产。记得第一次打样是4PCS&#xff0c;很多问题都无法暴露出来&#xff0c;这次600PCS就暴露不少问题了。首先功耗问题就有两个&#xff0c;然后其他的小问题有几个&#xff0c;所以…...

做定制商品的网站/新闻类软文

1.(1)建立一个名为JEWRY的文件夹&#xff0c;并在其中建立一个新的子文件夹JAK&#xff1b;(2)将C&#xff1a;\\TABLE文件夹删除&#xff1b;(3)将C&#xff1a;\\UNION\\TEAM文件夹中的文件MARK.FOX删除&#xff1b;(4)将C&#xff1a;\\TAM\\UPIN文件夹中文件MAIN.PRG拷贝到…...

php是做网站的吗/免费建站模板

第1关:python数据库编程之创建数据库 本关任务:使用 pymysql 创建一个名为 mydb 的数据库。 import pymysql"""需求:创建一个名为 mydb 的数据库 """ if __name__ == __main__:# **********begin********** ## 获取连接conn = pymysql.conn…...

网站上的广告怎么做/seo是什么东西

最近在做一个微信分享的功能&#xff0c;分享到朋友圈还有微信好友的时候&#xff0c;图片成回形针状&#xff0c;自定义的图片显示不出来&#xff0c;为此折腾了好久&#xff0c;如下图所示 经过重重排查&#xff0c;发现是图片路劲的问题&#xff0c;之前是放到/assets/img/…...

做网站客户会问什么问题/如何做网站赚钱

原文地址&#xff1a;http://www.aiuxian.com/article/p-337463.html Centos6.4安装R和Rstudio 安装前所需的各种包(有的话可省略)&#xff1a; yum install gcc gcc-c yum install gcc-gfortran yum install readline-devel yum install libXt-devel yum install fonts-chine…...

昆明公司建设网站制作/重庆百度推广关键词优化

场景一&#xff1a;分流原因&#xff1a;服务器有2块或多块场景二&#xff1a;网维大师与游戏虚拟盘分别使用独立的服务器。分流原因&#xff1a;因为客户机较多&#xff0c;所以虚拟盘服务器用了2块网卡&#xff0c;希望将同步节点的流量与虚拟盘的流量分流。来避免同步节点工…...