Spark 内存运用
RDD Cache
当同一个 RDD 被引用多次时,就可以考虑进行 Cache,从而提升作业的执行效率
// 用 cache 对 wordCounts 加缓存
wordCounts.cache
// cache 后要用 action 才能触发 RDD 内存物化
wordCounts.count// 自定义 Cache 的存储介质、存储形式、副本数量
wordCounts.persist(MEMORY_ONLY)
Spark 的 Cache 机制 :
- 缓存的存储级别:限定了数据缓存的存储介质,如 : 内存、磁盘
- 缓存的计算过程:从 RDD 展开到分片 (Block),存储于内存或磁盘的过程
- 缓存的销毁过程:缓存数据主动或被动删除的内存或磁盘的过程
存储级别
Spark 支持的存储级别:
- RDD 缓存的默认存储级别:MEMORY_ONLY
- DataFrame 缓存的默认存储级别:MEMORY_AND_DISK
存储级别 | 存储介质 | 存储形式 | 副本设置 | |||
---|---|---|---|---|---|---|
内存 | 磁盘 | 对象值 | 序列化 | |||
MEMORY_ONLY | √ | √ | 1 | |||
MEMORY_ONLY_2 | √ | √ | 2 | |||
MEMORY_ONLY_SER | √ | √ | 1 | |||
MEMORY_ONLY_SER_2 | √ | √ | 2 | |||
DISK_ONLY | √ | √ | 1 | |||
DISK_ONLY_2 | √ | √ | 2 | |||
DISK_ONLY_3 | √ | √ | 3 | |||
MEMORY_AND_DISK | √ | √ | √ | √ | 1 | 内存以对象值存储,磁盘以序列化 |
MEMORY_AND_DISK_2 | √ | √ | √ | √ | 2 | |
MEMORY_AND_DISK_SER | √ | √ | √ | 1 | 内存/磁盘都以序列化的字节数组存储 | |
MEMORY_AND_DISK_SER2 | √ | √ | √ | 2 |
计算过程
缓存的计算过程 :
- MEMORY_AND_DISK :先把数据集全部缓存到内存,内存不足时,才把剩余的数据落磁盘
- MEMORY_ONLY :只把数据往内存里塞
内存中的存储过程 :
销毁过程
缓存的销毁过程 :
- 缓存抢占 Execution Memory 空间,会进行缓存释放
Spark 清除缓存的原则:
- LRU:按元素的访问顺序,优先清除那些最近最少访问的 BlockId、MemoryEntry 键值对
- 在清除时,同属一个 RDD 的 MemoryEntry 不会清除
Spark 实现 LRU 的数据结构:LinkedHashMap , 内部有两个数据结构
- HashMap : 用于快速访问,根据指定的 BlockId,O(1) 返回 MemoryEntry
- 双向链表 : 用于维护元素(BlockId 和 MemoryEntry 键值对)的访问顺序
Spark 会释放 LRU 的 MemoryEntry :
Cache 注意点
用 Cache 的基本原则 :
- 当
RDD/DataFrame/Dataset
的引用数为 1,坚决不用 Cache - 当引用数大于 1,且运行成本超过 30%,就考虑用 Cache
运行成本占比 : 计算某个分布式数据集要消耗的总时间与作业执行时间的比值
- 端到端的执行时间为 1 小时
- DataFrame 被引用了 2 次
- 从读取数据源到生成该 DataFrame 花了 12 分钟
- 该 DataFrame 的运行成本占比 =
12*2/60 = 40%
用 noop 计算 DataFrame 运行时间 :
df.write.format("noop").save()
.cache
是惰性操作,在调用 .cache
后,要先用 count 才能触发缓存的完全物化
Cache 要遵循最小公共子集原则 :
val filePath: String = _
val df: DataFrame = spark.read.parquet(filePath)//Cache方式一
val cachedDF = df.cache
//数据分析
cachedDF.filter(col2 > 0).select(col1, col2)
cachedDF.select(col1, col2).filter(col2 > 100)
两个查询的 Analyzed Logical Plan 不一致,无法缓存复用
//Cache方式二
df.select(col1, col2).filter(col2 > 0).cache
//数据分析
df.filter(col2 > 0).select(col1, col2)
df.select(col1, col2).filter(col2 > 100)
Analyzed Logical Plan 完全一致,能缓存复用
//Cache方式三
val cachedDF = df.select(col1, col2).cache
//数据分析
cachedDF.filter(col2 > 0).select(col1, col2)
cachedDF.select(col1, col2).filter(col2 > 100)
及时清理 Cache :
- 异步模式 (常用):调用 unpersist() 或 unpersist(False)
- 同步模式:调用 unpersist(True)
OOM
OOM的具体区域 :
- 发生 OOM 的 LOC(Line Of Code),代码位置
- OOM 发生在 Driver 端,还是在 Executor 端
- 在 Executor 端的哪片内存区域
Driver OOM
Driver 端的 OOM 位置 :
- 创建小规模的分布式数据集:使用 parallelize、createDataFrame 创建数据集
- 收集计算结果:通过 take、show、collect 把结果收集到 Driver 端
Driver 端的 OOM 原因:
- 创建的数据集超过内存上限
- 收集的结果集超过内存上限
广播变量的创建与分发 :
广播变量的数据拉取就是用 collect 。当数据总大小超过 Driver 端内存时 , 就报 OOM :
java.lang.OutOfMemoryError: Not enough memory to build and broadcast
对结果集尺寸预估,适当增加 Driver 内存配置
- Driver 内存大小 :
spark.driver.memory
查看执行计划 :
val df: DataFrame = _
df.cache.countval plan = df.queryExecution.logical
val estimated: BigInt = spark.sessionState.executePlan(plan).optimizedPlan.stats.sizeInBytes
Executor OOM
当 Executors OOM 时,定位 Execution Memory 和 User Memory
User Memory OOM
User Memory 用于存储用户自定义的数据结构,如: 数组、列表、字典
当自定义数据结构的总大小超出 User Memory 上限时,就会报错
java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf
java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newInstance
计算 User Memory 消耗时,要考虑 Executor 的线程池大小
- 当 dict 大小为
#size
, Executor 线程池大小为#threads
- dict 对 User Memory 的总消耗:
#size * #threads
- 当总消耗超出 User Memory 上限,就会 OOM
val dict = List("spark", "tune")
val words = spark.sparkContext.textFile("~/words.csv")val keywords = words.filter(word => dict.contains(word))
keywords.map((_, 1)).reduceByKey(_ + _).collect
自定义数据分发 :
- 自定义的列表 dict 会随着 Task 分发到所有 Executors
- 多个 Task 的 dict 会对 User Memory 产生重复消耗
解决 User Memory OOM 的思路 :
- 先对数据结构的消耗进行预估
- 相应地扩大 User Memory
UserMemory 总大小 = spark.executor.memory * (1 - spark.memory.fraction)
Execution Memory OOM
Execution Memory OOM 常见实例:数据倾斜和 数据膨胀
配置说明:
- 2 个 CPU core,每个 core 有两个线程,内存大小为 1GB
- spark.executor.cores = 3,spark.executor.memory = 900MB
- Execution Memory = 180MB
- Storage Memory = 180MB
- Execution Memory 上限 = 360MB
数据倾斜
3 个 Reduce Task 对应的数据分片大小分别是 100MB , 100MB , 300MB
- 当 Executor 线程池大小为 3,所以每个 Reduce Task 最多 360MB * 1/3 = 120MB
数据倾斜导致OOM :
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NHRj4w40-1677761995168)(…/…/png/%E5%86%85%E5%AD%98%E8%BF%90%E7%94%A8/image-20230209210919876.png)]
2 种调优思路:
- 消除数据倾斜,让所有的数据分片尺寸都小于 100MB
- 调整 Executor 线程池、内存、并行度等相关配置,提高 1/N 上限到 300MB
在 CPU 利用率高下解决 OOM :
- 维持并发度、并行度不变,增大执行内存设置,提高 1/N 上限到 300MB
- 维持并发度、执行内存不变,提升并行度把数据打散,将所有的数据分片尺寸都缩小到 100MB 内
数据膨胀
3 个 Map Task 对应的数据分片大小都是 100MB
数据膨胀导致OOM :
2 种调优思路:
- 把数据打散,提高数据分片数量、降低数据粒度,让膨胀后的数据量降到 100MB 左右
- 加大内存配置,结合 Executor 线程池调整,提高 1/N 上限到 300MB
相关文章:
Spark 内存运用
RDD Cache 当同一个 RDD 被引用多次时,就可以考虑进行 Cache,从而提升作业的执行效率 // 用 cache 对 wordCounts 加缓存 wordCounts.cache // cache 后要用 action 才能触发 RDD 内存物化 wordCounts.count// 自定义 Cache 的存储介质、存储形式、副本…...
SpringBoot集成Swagger3.0(入门) 02
文章目录Swagger3常用配置注解接口测试API信息配置Swagger3 Docket开关,过滤,分组Swagger3常用配置注解 ApiImplicitParams,ApiImplicitParam:Swagger3对参数的描述。 参数名参数值name参数名value参数的具体意义,作用。required参…...
网络协议丨ICMP协议
ICMP协议,全称 Internet Control Message Protocol,就是互联网控制报文协议。我们其实对它并不陌生,我们平时经常使用的”ping“一下就是基于这个协议工作的。网络包在异常复杂的网络环境中传输时,常常会遇到各种各样的问题。当遇…...
12.1 基于Django的服务器信息查看应用(系统信息、用户信息)
文章目录新建Django项目创建子应用并设置本地化创建数据库表创建超级用户git管理项目(requirements.txt、README.md、.ignore)主机信息监控应用的框架搭建具体功能实现系统信息展示前端界面设计视图函数设计用户信息展示视图函数设计自定义过滤器的实现前…...
ExSwin-Unet 论文研读
ExSwin-Unet摘要1 引言2 方法2.1 基于窗口的注意力块2.2 外部注意力块2.3 不平衡的 Unet 架构2.4 自适应加权调整2.5 双重损失函数3 实验结果3.1 数据集3.2 实现细节3.3 与 SOTA 方法的比较3.4 消融研究4 讨论和限制5 结论数据集来源: https://feta.grand-challenge…...
置顶!!!主页禁言提示原因:在自己论坛发动态误带敏感词,在自己论坛禁止评论90天
置顶!!!主页禁言提示原因:在自己论坛发动态误带敏感词,在自己论坛禁止评论90天 置顶!!!主页禁言提示原因:在自己论坛发动态误带敏感词,在自己论坛禁止评论90天…...
优思学院|解密六西格玛:探索DMAIC和DMADV之间的区别
六西格玛方法中最为广泛使用的两种方法是DMAIC和DMADV。这两种方法都是为了让企业流程更加高效和有效而设计的。虽然这两种方法有一些重要的共同特点,但它们并不可以互相替代,并且被开发用于不同的企业流程。在更详细地比较这两种方法之前,我…...
Pytorch的DataLoader输入输出(以文本为例)
本文不做太多原理介绍,直讲使用流畅。想看更多底层实现-〉传送门。DataLoader简介torch.utils.data.DataLoader是PyTorch中数据读取的一个重要接口,该接口定义在dataloader.py脚本中,只要是用PyTorch来训练模型基本都会用到该接口。本文介绍t…...
代谢组学:Microbiome又一篇!绘制重症先天性心脏病新生儿肠道微生态全景图谱
文章标题:Mapping the early life gut microbiome in neonates with critical congenital heart disease: multiomics insights and implications for host metabolic and immunological health 发表期刊:Microbiome 影响因子:16.837…...
Java基本类型所占字节简述
类型分类所占字节取值范围boolean布尔型1bit0 false、 1 true (1个bit 、1个字节、4个字节)char 字符型(Unicode字符集中的一个元素) 2字节-32768~32767(-2的15次方~2的15次方-1)byte整型1字节-128&a…...
Linux vi常用操作
vi/vim 共分为三种模式,分别是命令模式(Command mode),输入模式(Insert mode)和底线命令模式(Last line mode)。 这三种模式的作用分别是: 命令模式: 用户刚…...
Unicode(宽字节)、ANSI(多字节)
1、什么时候用Unicode(宽字节),什么时候用ANSI(多字节)? 在linux/windows等操作系统中使用的,一般都是Unicode(宽字节)。 下位机PLC/单片机等硬件设备中使用,一般都是ANSI(多字节)。 所以,通讯中(比如VS项目&#x…...
STM32实战之LED循环点亮
接着上一章讲。本章我们来讲一讲LED流水灯,循环点亮LED。 在LED章节有的可能没有讲到,本章会对其进行说明,尽量每个函数说一下作用。也会在最后说一下STM32的寄存器,在编程中寄存器是避免不了的东西,寄存器也是非常好理…...
智慧厕所智能卫生间系统有哪些功能
南宁北站智能厕所主要功能有哪些?1、卫生间环境空气监测男厕、女厕环境空气监测系统包括对厕所内的温度、湿度、氨气、硫化氢、PM2.5、烟雾等气体数据的实时监测。2、卫生间厕位状态监测系统实时监测厕位内目前的使用状态(有人或无人),数据信…...
【网络】套接字 -- TCP
🥁作者: 华丞臧. 📕专栏:【网络】 各位读者老爷如果觉得博主写的不错,请诸位多多支持(点赞收藏关注)。如果有错误的地方,欢迎在评论区指出。 推荐一款刷题网站 👉 LeetCode刷题网站 文章…...
NDK C++ map容器
map容器// TODO map容器 #include <iostream> #include <map>using namespace std;int main() {// TODO map<int, string>按key值排序,同一个key不可以重复插入map<int, string> map1;map1.insert(pair<int, string>(1, "111&qu…...
linux(Centos)安装docker
官网地址:Install Docker Engine on CentOS 首先检查linux系统版本及内核: 安装docker要求系统版本至少为7.x版本,内核至少为3.8以上 cat /etc/redhat-release # 查看系统版本号uname -r #查看linux系统内核 检查系统是否能连上外网&#…...
Delphi 中 FireDAC 数据库连接(处理错误)
参见:Delphi 中 FireDAC 数据库连接(总览)本主题描述了如何用FireDAC处理数据库错误。一、概述EFDDBEngineException类是所有DBMS异常的基类。单个异常对象是一个数据库错误的集合,可以通过EFDDBEngineException.Errors[]属性访问…...
算法小抄3-理解使用Python容器之列表
引言 首先说一个概念哈,程序算法数据结构,算法是条件语句与循环语句组成的逻辑结构,而数据结构也就是容器. 算法决定数据该如何处理,而容器则决定如何数据如何存储. 不同的语言对容器有不同的实现方式, 但他们的功能都是相似的, 打好容器基础,你就可以在各式各样的语言中来回横…...
Vue3中watch的value问题
目录前言一,ref和reactive的简单复习1.ref函数1.2 reactive函数1.3 用ref定义对象类型数据不用reactive二,watch的value问题2.1 ref2.1.1 普通类型数据2.1.2 对象类型数据2.1.3 另一种方式2.2 reactive三,总结后记前言 在Vue3中,…...
【线性筛+DP】最大和
看错题了,呃呃,其实就是个简单DP最大和 - 蓝桥云课 (lanqiao.cn)题意:思路:设dp[i]为以1为终点的最大和,然后枚举状态和决策就行了主要是线性筛的应用,它可以预处理出一个数的最小质因子是多少Code…...
openpnp - configure - 丢弃(Discard)位置的设置
文章目录openpnp - configure - 丢弃(Discard)位置的设置概述笔记设置丢弃位置吸取元件失败后, 吸嘴一直吸气的处理ENDopenpnp - configure - 丢弃(Discard)位置的设置 概述 测试时, 吸取了一个元件, 吸取成功了, 现在想将这个料丢掉. 点击控制面板-Special页中的Discard不好…...
java Object 万字详解 (通俗易懂)
基本介绍构造方法成员方法hashCode()getClass()toString()equals()finalize()JavaBean重写Object类的方法重写toString重写equals一、基本介绍Object类是java类层次最顶层的基类(父类),所有类都是直接或间接继承自Object类,因此&a…...
Java并发简介(什么是并发)
文章目录并发概念并发和并行同步和异步阻塞和非阻塞进程和线程竞态条件和临界区管程并发的特点提升资源利用率程序响应更快并发的问题安全性问题缓存导致的可见性问题线程切换带来的原子性问题编译优化带来的有序性问题保证并发安全的思路互斥同步(阻塞同步…...
团队API管理工具-YAPI
团队API管理工具-YAPI 推荐一款接口管理平台,操作简单、界面友好、功能丰富、支持markdown语法、可使用Postman导入、Swagger同步数据展示、LDAP、权限管理等功能。 YApi是高效、易用、功能强大的api管理平台,旨在为开发、产品、测试人员提供更优雅的接…...
学习记录 --- Pytorch优化器
文章目录参考文献什么是优化器optimizer的定义optimizer的属性defaultsstateparam_groupsoptimizer的方法zero_grad()step()add_param_group()state_dict()、load_state_dict()优化一个网络同时优化多个网络当成一个网络优化当成多个网络优化只优化网络的某些指定的层调整学习率…...
Flink State 状态后端分析
flink状态实现分析 state * State* |* -------------------InternalKvState* | |* MergingState |* | |* …...
和年薪30W的阿里测开工程师聊过后,才知道我的工作就是打杂的...
前几天和一个朋友聊面试,他说上个月同时拿到了腾讯和阿里的offer,最后选择了阿里。 阿里内部将员工一共分为了14个等级,P6是资深工程师,P7是技术专家。 其中P6和P7就是一个分水岭了,P6是最接近P7的不持股员工&#x…...
C#开发的OpenRA的界面布局数据加载
C#开发的OpenRA的界面布局数据加载 当显示完成加载界面之后,就是进行其它内容处理。 因为后面内容的加载会比较长时间,所以首先显示加载界面是一种非常友好的方法。 因此在软件设计里,尽可能先显示界面,让用户先看到程序正在运行, 然后再处理时间长的加载。如果不这样做,…...
并查集结构
文章目录并查集特点构建过程查找两个元素是否是同一集合优化查找领头元素设置两个元素为同一集合构建结构应用场景并行计算集合问题并查集特点 对于使用并查集构建的结构,可以使得查询两个元素是否在同一集合,以及合并集合的操作无限接近O(1) 构建过程…...
电子商务专业网站建设/竞价托管一般要多少钱
关于自适应LMS的理论基础已经非常的成熟,随便找一本关于自适应滤波器的书就会有介绍相关的内容,有的还可出了它的具体算法,但是还没有一本书有讲过怎样编写能够时实(Real Time)处理的基于C的自适应LMS算法(…...
wordpress企业网站制作/百度知道官网登录入口
一、用法 二、参考资料 1.官方 1.Java下利用Jackson进行JSON解析和序列化 2. 转载于:https://www.cnblogs.com/shirui/p/8875563.html...
旅游景点网站建设设计说明/蚂蚁bt
暖气来了,嗓子眼儿冒火、口腔溃疡、大便干燥,该怎么办呢?解放军309医院营养科主任医师张晔开出四字饮食处方:降、清、润、补。 降火汤——冬瓜配紫菜 很多家庭最爱做西红柿黄瓜片汤,其实冬季最好的汤是冬瓜汤ÿ…...
网页美工设计主要从哪些方面设计/昆明seo关键字推广
题目描述: 1. A和B是好朋友,则B与A也是好朋友; 2.如果A和C是好朋友,B和C也是好朋友,那么 A和B也是好朋友。 问:现在给出所有好朋友的对数,可以把这些朋友分成多少组,满足每组中的任意…...
武汉网址模板建站/沈阳网站seo公司
CMM对软件质量保证是这样描述的:–软件质量保证(Quality Assurance)的目的是为管理者提供有关软件过程和产品的适当的可视性。它包括评审和审核软件产品及其活动,以验证其是否遵守既定的规程和标准,并向有关负责人汇报…...
做数据ppt模板下载网站/seo搜索引擎优化简历
原文地址:http://www.uml.org.cn/zjjs/201108111.asp 1.集群 1.1定义:是一组独立的计算机系统构成一个松耦合的多处理器系统,它们之间通过网络实现进程间的通信。应用程序可以通过网络共享内存进行消息传送,实现分布式计算机。 是…...