当前位置: 首页 > news >正文

Spark09: Spark之checkpoint

一、checkpoint概述

checkpoint,是Spark提供的一个比较高级的功能。有时候,我们的Spark任务,比较复杂,从初始化RDD开始,到最后整个任务完成,有比较多的步骤,比如超过10个transformation算子。而且,整个任务运行的时间也特别长,比如通常要运行1~2个小时。在这种情况下,就比较适合使用checkpoint功能了。因为对于特别复杂的Spark任务,有很高的风险会出现某个要反复使用的RDD因为节点的故障导致丢失,虽然之前持久化过,但是还是导致数据丢失了。那么也就是说,出现失败的时候,没有容错机制,所以当后面的transformation算子,又要使用到该RDD时,就会发现数据丢失了,此时如果没有进行容错处理的话,那么就需要再重新计算一次数据了。所以针对这种Spark Job,如果我们担心某些关键的,在后面会反复使用的RDD,因为节点故障导致数据
丢失,那么可以针对该RDD启动checkpoint机制,实现容错和高可用

如何使用checkpoint?

(1)首先要调用SparkContext的setCheckpointDir()方法,设置一个容错的文件系统的目录,比如HDFS;然后,对RDD调用checkpoint()方法。
(2)最后,在RDD所在的job运行结束之后,会启动一个单独的job,将checkpoint设置过的RDD的数据写入之
前设置的文件系统中。

二、RDD的checkpoint流程

1:SparkContext设置checkpoint目录,用于存放checkpoint的数据;对RDD调用checkpoint方法,然后它就会被RDDCheckpointData对象进行管理,此时这个RDD的checkpoint状态会被设置为Initialized。

2:待RDD所在的job运行结束,会调用job中最后一个RDD的doCheckpoint方法,该方法沿着RDD的血缘关系向上查找被checkpoint()方法标记过的RDD,并将其checkpoint状态从Initialized设置为
CheckpointingInProgress
3:启动一个单独的job,来将血缘关系中标记为CheckpointInProgress的RDD执行checkpoint操作,也就是将其数据写入checkpoint目录
4:将RDD数据写入checkpoint目录之后,会将RDD状态改变为Checkpointed;并且还会改变RDD的血缘关系,即会清除掉RDD所有依赖的RDD;最后还会设置其父RDD为新创建的CheckpointRDD

三、checkpoint与持久化的区别

(1)lineage是否发生改变。

lineage(血缘关系)说的就是RDD之间的依赖关系,持久化只是将数据保存在内存中或者本地磁盘文件中,RDD的lineage(血缘关系)是不变的;Checkpoint执行之后,RDD就没有依赖的RDD了,也就是它的lineage改变了。

(2)丢失数据的可能性。

持久化的数据丢失的可能性较大,如果采用 persist 把数据存在内存中的话,虽然速度最快但是也是最不可靠的,就算放在磁盘上也不是完全可靠的,因为磁盘也会损坏。Checkpoint的数据通常是保存在高可用文件系统中(HDFS),丢失的可能性很低

建议:对需要checkpoint的RDD,先执行persist(StorageLevel.DISK_ONLY)
为什么呢

因为默认情况下,如果某个RDD没有持久化,但是设置了checkpoint,那么这个时候,本来Spark任务已经执行结束了,但是由于中间的RDD没有持久化,在进行checkpoint的时候想要将这个RDD的数据写入外部存储系统的话,就需要重新计算这个RDD的数据,再将其checkpoint到外部存储系统中。如果对需要checkpoint的rdd进行了基于磁盘的持久化,那么后面进行checkpoint操作时,就会直接从磁盘上读取rdd的数据了,就不需要重新再计算一次了,这样效率就高了。那在这能不能使用基于内存的持久化呢?当然是可以的,不过没那个必要。

四、checkpoint的使用

1. scala代码

package com.sanqian.scalaimport org.apache.spark.api.java.StorageLevels
import org.apache.spark.{SparkConf, SparkContext}object CheckPointScala {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("CheckPointScala")val sc = new SparkContext(conf)if (args.length == 0) {System.exit(100)}val outoutPath = args(0)// 1.设置checkpoint目录\sc.setCheckpointDir("hdfs://bigdata01:9000/chk001")val dataRDD = sc.textFile("hdfs://bigdata01:9000/hadoop")dataRDD.persist(StorageLevels.DISK_ONLY)// 2.对RDD执行checkpoint操作dataRDD.checkpoint()dataRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).saveAsTextFile(outoutPath)sc.stop()}
}

2. Java代码

package com.sanqian.java;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;import java.util.Arrays;
import java.util.Iterator;public class CheckPointJava {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setAppName("CheckPointJava");JavaSparkContext sc = new JavaSparkContext(conf);if (args.length == 0){System.exit(100);}String outputPath = args[0];// 1.设置checkpoint目录sc.setCheckpointDir("hdfs://bigdata01:9000/chk001");JavaRDD<String> rdd = sc.textFile("hdfs://bigdata01:9000/hadoop");// 2.对RDD执行checkpoint操作rdd.checkpoint();rdd.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String line) throws Exception {return Arrays.asList(line.split(" ")).iterator();}}).mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String word) throws Exception {return new Tuple2<String, Integer>(word, 1);}}).reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}}).saveAsTextFile(outputPath);sc.stop();}
}

3. 打包代码

(1)将pom.xml中的spark-core的依赖设置为provided,然后编译打包

    <dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.4.3</version><scope>provided</scope></dependency>

(2)D:\ProgramData\IdeaProjects\db_spark>mvn clean package -DskipTests

 (3)将打包的jar包上传到bigdata04的/data/soft/sparkjars目录,创建一个新的spark-submit脚本

spark-submit \
--class $1 \
--master yarn \
--deploy-mode cluster \
--executor-memory 1G \
--num-executors 1 \
db_spark-1.0-SNAPSHOT.jar \
$2

 (4)提交任务:

 sh lwx_run.sh com.sanqian.scala.CheckPointScala /out-chk003

执行成功之后可以到 setCheckpointDir 指定的目录中查看一下,可以看到目录中会生成对应的文件保存rdd中的数据,只不过生成的文件不是普通文本文件,直接查看文件中的内容显示为乱码。

相关文章:

Spark09: Spark之checkpoint

一、checkpoint概述 checkpoint&#xff0c;是Spark提供的一个比较高级的功能。有时候&#xff0c;我们的Spark任务&#xff0c;比较复杂&#xff0c;从初始化RDD开始&#xff0c;到最后整个任务完成&#xff0c;有比较多的步骤&#xff0c;比如超过10个transformation算子。而…...

《剑指offer》:数组部分

一、数组中重复的数字题目描述&#xff1a;在一个长度为n的数组里的所有数字都在0到n-1的范围内。 数组中某些数字是重复的&#xff0c;但不知道有几个数字是重复的。也不知道每个数字重复几次。请找出数组中任意一个重复的数字。 例如&#xff0c;如果输入长度为7的数组{2,3,1…...

基于微信小程序图书馆座位预约管理系统

开发工具&#xff1a;IDEA、微信小程序服务器&#xff1a;Tomcat9.0&#xff0c; jdk1.8项目构建&#xff1a;maven数据库&#xff1a;mysql5.7前端技术&#xff1a;vue、uniapp服务端技术&#xff1a;springbootmybatis本系统分微信小程序和管理后台两部分&#xff0c;项目采用…...

剑指 Offer Day1——栈与队列(简单)

本专栏将记录《剑指 Offer》的刷题&#xff0c;传送门&#xff1a;https://leetcode.cn/study-plan/lcof/。 目录剑指 Offer 09. 用两个栈实现队列剑指 Offer 30. 包含min函数的栈剑指 Offer 09. 用两个栈实现队列 原题链接&#xff1a;09. 用两个栈实现队列 class CQueue { pu…...

详解Python正则表达式中group与groups的用法

在Python中&#xff0c;正则表达式的group和groups方法是非常有用的函数&#xff0c;用于处理匹配结果的分组信息。 group方法是re.MatchObject类中的一个函数&#xff0c;用于返回匹配对象的整个匹配结果或特定的分组匹配结果。而groups方法同样是re.MatchObject类中的函数&am…...

Spring面试重点(三)——AOP循环依赖

Spring面试重点 AOP 前置通知&#xff08;Before&#xff09;&#xff1a;在⽬标⽅法运行之前运行&#xff1b;后置通知&#xff08;After&#xff09;&#xff1a;在⽬标⽅法运行结束之后运行&#xff1b;返回通知&#xff08;AfterReturning&#xff09;&#xff1a;在⽬标…...

计算机网络之HTTP04ECDHE握手解析

DH算法 离散读对数问题是DH算法的数学基础 &#xff08;1&#xff09;计算公钥 &#xff08;2&#xff09;交换公钥&#xff0c;并计算 对方公钥^我的私钥 mod p 离散对数的交换幂运算交换律使二者算出来的值一样&#xff0c;都为K k就是对称加密的秘钥 2. DHE算法 E&#…...

【MySQL数据库】主从复制原理和应用

主从复制和读写分离1. 主从复制的原理2. 主从复制的环境配置2.1 准备好数据库服务器2.2 配置master2.3 配置slave2.4 测试3. 主从复制的应用——读写分离3.1 读写分离的背景3.2 Sharding-JDBC介绍3.3 Sharding-JDBC使用步骤1. 主从复制的原理 MySQL主从复制是一个异步的过程&a…...

复现随记~

note(美团2022) 比较简单的越界漏洞&#xff0c;堆本身并没有什么漏洞&#xff0c;而且保护并没全开&#xff0c;所以逆向思维。必然是ROP类而非指针类&#xff0c;故我们着重注意unsigned int等无符号数前后是否不一致 int __fastcall edit(__int64 a1) {int idx; // [rsp14…...

【计组】设计大型DMP系统--《深入浅出计算机组成原理》(十四)

目录 一、DMP&#xff1a;数据管理平台 二、MongoDB 真的万能吗 三、关系型数据库&#xff1a;不得不做的随机读写 &#xff08;一&#xff09;Cassandra&#xff1a;顺序写和随机读 1、Cassandra 的数据模型 2、Cassandra 的写操作 3、Cassandra 的读操作 &#xff08…...

66 使用注意力机制的seq2seq【动手学深度学习v2】

66 使用注意力机制的seq2seq【动手学深度学习v2】 深度学习学习笔记 学习视频&#xff1a;https://www.bilibili.com/video/BV1v44y1C7Tg/?spm_id_from…top_right_bar_window_history.content.click&vd_source75dce036dc8244310435eaf03de4e330 在机器翻译时&#xff0c;…...

NextJS(ReactSSR)

pre-render&#xff1a; 预渲染 1. 静态化 发生的时间&#xff1a;next build 1). 纯静态化 2). SSG: server static generator getStaticProps: 当渲染组件之前会运行 生成html json //该函数只可能在服务端运行 //该函数运行在组件渲染之前 //该函数只能在build期间运…...

JointBERT代码复现详解【上】

BERT for Joint Intent Classification and Slot Filling代码复现【上】 源码链接&#xff1a;JointBERT源码复现&#xff08;含注释&#xff09; 一、准备工作 源码架构 data&#xff1a;存放两个基准数据集&#xff1b;model&#xff1a;JointBert模型的实现&#xff1b…...

进程间通信(上)

进程间通信&#xff08;上&#xff09;背景进程间通信目的进程间通信发展进程间通信分类管道什么是管道匿名管道实例代码简单的匿名管道实现一个父进程控制单个子进程完成指定任务父进程控制一批子进程完成任务&#xff08;进程池&#xff09;用fork来共享管道站在文件描述符角…...

【Unity3D】Unity 3D 连接 MySQL 数据库

1.Navicat准备 test 数据库&#xff0c;并在test数据库下创建 user 数据表&#xff0c;预先插入测试数据。 2.启动 Unity Hub 新建一个项目&#xff0c;然后在Unity编辑器的 Project视图 中&#xff0c;右击新建一个 Plugins 文件夹将连接 MySQL的驱动包 导入&#xff08;附加驱…...

vue通用后台管理系统

用到的js库 遇到的问题 vuex和 localStorage区别 vuex在内存中&#xff0c;localStorage存在本地localStorage只能存储字符串类型数据&#xff0c;存储对象需要JSON.stringify() 和 parse()…读取内存比读取硬盘速度要快刷新页面vuex数据丢失&#xff0c;localStorage不会vuex…...

IDEA设置只格式化本次迭代变更的代码

趁着上海梅雨季节&#xff0c;周末狠狠更新一下。平常工作在CR的时候&#xff0c;经常发现会有新同事出现大量代码变更行..一看原因竟是在格式化代码时把历史代码也格式化掉了这样不仅坑了自己&#xff08;覆盖率问题等&#xff09;&#xff0c;也可能会影响原始代码责任到人&a…...

算法训练——剑指offer(Hash集合问题)

摘要 数据结构中有一个用于存储重要的数据结构&#xff0c;它们就是HashMap,HasSet&#xff0c;它典型特征就是存储key:value键值对。在查询制定的key的时候查询效率最高O(1)。Hashmap&#xff0c;HasSet的底层结构是如图所示。它们的区别就是是否存在重复的元素。 二、HashMa…...

Element UI框架学习篇(七)

Element UI框架学习篇(七) 1 新增员工 1.1 前台部分 1.1.1 在vue实例的data里面准备好需要的对象以及属性 addStatus:false,//判断是否弹出新增用户弹窗dailog,为true就显示depts:[],//部门信息mgrs:[],//上级领导信息jobs:[],//工作岗位信息//新增用户所需要的对象newEmp:…...

【项目实战】32G的电脑启动IDEA一个后端服务要2min!谁忍的了?

一、背景 本人电脑性能一般&#xff0c;但是拥有着一台高性能的VDI&#xff08;虚拟桌面基础架构&#xff09;&#xff0c;以下是具体的配置 二、问题描述 但是&#xff0c;即便是拥有这么高的性能&#xff0c;每次运行基于Dubbo微服务架构下的微服务都贼久&#xff0c;以下…...

2022年山东省中职组“网络安全”赛项比赛任务书正式赛题

2022年山东省中职组“网络安全”赛项 比赛任务书 一、竞赛时间 总计&#xff1a;360分钟 竞赛阶段竞赛阶段 任务阶段 竞赛任务 竞赛时间 分值 A模块 A-1 登录安全加固 180分钟 200分 A-2 Nginx安全策略 A-3 日志监控 A-4 中间件服务加固 A-5 本地安全策略…...

RibbitMQ 入门到应用 ( 二 ) 安装

3.安装基本操作 3.1.下载安装 3.1.1.官网 下载地址 https://rabbitmq.com/download.html 与Erlang语言对应版本 https://rabbitmq.com/which-erlang.html 3.1.2.安装 Erlang 在确定了RabbitMQ版本号后&#xff0c;先下载安装Erlang环境 Erlang下载链接 https://packa…...

提取DataFrame中每一行的DataFrame.itertuples()方法

【小白从小学Python、C、Java】【计算机等级考试500强双证书】【Python-数据分析】提取DataFrame中的每一行DataFrame.itertuples()选择题关于以下python代码说法错误的一项是?import pandas as pddf pd.DataFrame({A:[a1,a2],B:[b1,b2]},index[i1,i2])print("【显示】d…...

基于卷积神经网络的立体视频编码质量增强方法_余伟杰

基于卷积神经网络的立体视频编码质量增强方法_余伟杰提出的基于TSAN的合成视点质量增强方法全局信息提取流像素重组局部信息提取流多尺度空间注意力机制提出的基于RDEN的轻量级合成视点质量增强方法特征蒸馏注意力块轻量级多尺度空间注意力机制概念扭曲失真孔洞问题失真和伪影提…...

【2023unity游戏制作-mango的冒险】-3.基础动作和动画API实现

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! 本文由 秩沅 原创 收录于专栏&#xff1a;unity游戏制作 ⭐mango的基础动作动画的添加⭐ 文章目录⭐mango的基础动作动画的添加⭐&#x1f…...

跨域的几种解决方案?

1-jsonp 【前端后端实现】jsonp: 利用 <script> 标签没有跨域限制的漏洞&#xff0c;网页可以得到从其他来源动态产生的 JSON 数据。JSONP请求一定需要对方的服务器做支持才可以。JSONP优点是简单兼容性好&#xff0c;可用于解决主流浏览器的跨域数据访问的问题。缺点是仅…...

2022年山东省职业院校技能大赛网络搭建与应用赛项正式赛题

2022年山东省职业院校技能大赛 网络搭建与应用赛项 第二部分 网络搭建与安全部署&服务器配置及应用 竞赛说明&#xff1a; 一、竞赛内容分布 竞赛共分二个模块&#xff0c;其中&#xff1a; 第一模块&#xff1a;网络搭建及安全部署项目 第二模块&#xff1a;服务…...

【JUC并发编程】ArrayBlockingQueue和LinkedBlockingQueue源码2分钟看完

文章目录1、BlockingQueue1&#xff09;接口方法2&#xff09;阻塞队列分类2、ArrayBlockingQueue1&#xff09;构造函数2&#xff09;put()入队3&#xff09;take()出队3、LinkedBlockingQueue1&#xff09;构造函数2&#xff09;put()入队3&#xff09;take()出队1、Blocking…...

GitHub个人资料自述与管理主题设置

目录 关于您的个人资料自述文件 先决条件 添加个人资料自述文件 删除个人资料自述文件 管理主题设置 补充&#xff1a;建立一个空白文件夹 关于您的个人资料自述文件 可以通过创建个人资料 README&#xff0c;在 GitHub.com 上与社区分享有关你自己的信息。 GitHub 在个…...

Express篇-连接mysql

创建数据库配置文件config/sqlconfig.jsconst sqlconfig {host: localhost, // 连接地址user: root, //用户名password: ****, //密码port: 3306 , //端口号database: mysql01_dbbooks //数据库名 } module.exports sqlconfig封装数据库管理工具 utils/mysqlUtils.…...

大地在线影视免费观看/整站优化方案

1. JSON 概述 JSON&#xff08;JavaScript Object Notation, JS 对象标记&#xff09;是一种轻量级的数据交换格式&#xff0c;简洁和清晰的层次结构使得JSON成为理想的数据交换语言。 JSON 支持的数据格式&#xff1a; 对象&#xff08;字典&#xff09;&#xff1a;花括号…...

刷赞网站空间免费/搜索引擎优化seo怎么做

EnableCaching如何一键开启缓存手动挡CacheManagerCache使用演示小结自动挡CachingConfigurationSelectorAutoProxyRegistrarProxyCachingConfigurationCacheOperationSourceCacheOperationBeanFactoryCacheOperationSourceAdvisorCacheInterceptor小结推荐阅读手动挡 我们首先…...

兰州彩票网站制作/聊城网站seo

微信小程序 MD5的方法详解生成的文件可以放在 utils文件中哦&#xff01;&#xff01;&#xff01;/** A JavaScript implementation of the RSA Data Security, Inc. MD5 Message* Digest Algorithm, as defined in RFC 1321.* Version 1.1 Copyright (C) Paul Johnston 1999…...

做医疗网站建设/网络营销推广要求

init()方法中返回的this指向init的实例对象&#xff0c;而init.prototype等于jQuery.prototype&#xff0c;所以也是jQuery的实例对象&#xff1b; 返回this是为了实现链式调用...

网站栅格化怎么做/沈阳seo关键词

第5章 使用高级查询技术 一&#xff1a;用已映射语句关联对象 问题&#xff1a;如果你用过Hibernate或JPA&#xff0c;会想到entity&#xff08;实体对象 -- 数据库对应JavaBean&#xff09;之间可能存在关联关系。如一对一、多对多等。伴随就出现了关联获取技术&#xff0c;我…...

公司网站域名注册流程/网站建设 全网营销

alter table tablename drop column columnname;alter table tabelname add columnname varchar2(8) NULL;一 . 常用mysql命令行命令1 .启动MYSQL服务 net start mysql停止MYSQL服务 net stop mysql2 . netstat –na | findstr 3306 查看被监听的端口 , findstr用于查找后面的在…...