Spark 内存运用
RDD Cache
当同一个 RDD 被引用多次时,就可以考虑进行 Cache,从而提升作业的执行效率
// 用 cache 对 wordCounts 加缓存
wordCounts.cache
// cache 后要用 action 才能触发 RDD 内存物化
wordCounts.count// 自定义 Cache 的存储介质、存储形式、副本数量
wordCounts.persist(MEMORY_ONLY)
Spark 的 Cache 机制 :
- 缓存的存储级别:限定了数据缓存的存储介质,如 : 内存、磁盘
- 缓存的计算过程:从 RDD 展开到分片 (Block),存储于内存或磁盘的过程
- 缓存的销毁过程:缓存数据主动或被动删除的内存或磁盘的过程
存储级别
Spark 支持的存储级别:
- RDD 缓存的默认存储级别:MEMORY_ONLY
- DataFrame 缓存的默认存储级别:MEMORY_AND_DISK
存储级别 | 存储介质 | 存储形式 | 副本设置 | |||
---|---|---|---|---|---|---|
内存 | 磁盘 | 对象值 | 序列化 | |||
MEMORY_ONLY | √ | √ | 1 | |||
MEMORY_ONLY_2 | √ | √ | 2 | |||
MEMORY_ONLY_SER | √ | √ | 1 | |||
MEMORY_ONLY_SER_2 | √ | √ | 2 | |||
DISK_ONLY | √ | √ | 1 | |||
DISK_ONLY_2 | √ | √ | 2 | |||
DISK_ONLY_3 | √ | √ | 3 | |||
MEMORY_AND_DISK | √ | √ | √ | √ | 1 | 内存以对象值存储,磁盘以序列化 |
MEMORY_AND_DISK_2 | √ | √ | √ | √ | 2 | |
MEMORY_AND_DISK_SER | √ | √ | √ | 1 | 内存/磁盘都以序列化的字节数组存储 | |
MEMORY_AND_DISK_SER2 | √ | √ | √ | 2 |
计算过程
缓存的计算过程 :
- MEMORY_AND_DISK :先把数据集全部缓存到内存,内存不足时,才把剩余的数据落磁盘
- MEMORY_ONLY :只把数据往内存里塞
内存中的存储过程 :
销毁过程
缓存的销毁过程 :
- 缓存抢占 Execution Memory 空间,会进行缓存释放
Spark 清除缓存的原则:
- LRU:按元素的访问顺序,优先清除那些最近最少访问的 BlockId、MemoryEntry 键值对
- 在清除时,同属一个 RDD 的 MemoryEntry 不会清除
Spark 实现 LRU 的数据结构:LinkedHashMap , 内部有两个数据结构
- HashMap : 用于快速访问,根据指定的 BlockId,O(1) 返回 MemoryEntry
- 双向链表 : 用于维护元素(BlockId 和 MemoryEntry 键值对)的访问顺序
Spark 会释放 LRU 的 MemoryEntry :
Cache 注意点
用 Cache 的基本原则 :
- 当
RDD/DataFrame/Dataset
的引用数为 1,坚决不用 Cache - 当引用数大于 1,且运行成本超过 30%,就考虑用 Cache
运行成本占比 : 计算某个分布式数据集要消耗的总时间与作业执行时间的比值
- 端到端的执行时间为 1 小时
- DataFrame 被引用了 2 次
- 从读取数据源到生成该 DataFrame 花了 12 分钟
- 该 DataFrame 的运行成本占比 =
12*2/60 = 40%
用 noop 计算 DataFrame 运行时间 :
df.write.format("noop").save()
.cache
是惰性操作,在调用 .cache
后,要先用 count 才能触发缓存的完全物化
Cache 要遵循最小公共子集原则 :
val filePath: String = _
val df: DataFrame = spark.read.parquet(filePath)//Cache方式一
val cachedDF = df.cache
//数据分析
cachedDF.filter(col2 > 0).select(col1, col2)
cachedDF.select(col1, col2).filter(col2 > 100)
两个查询的 Analyzed Logical Plan 不一致,无法缓存复用
//Cache方式二
df.select(col1, col2).filter(col2 > 0).cache
//数据分析
df.filter(col2 > 0).select(col1, col2)
df.select(col1, col2).filter(col2 > 100)
Analyzed Logical Plan 完全一致,能缓存复用
//Cache方式三
val cachedDF = df.select(col1, col2).cache
//数据分析
cachedDF.filter(col2 > 0).select(col1, col2)
cachedDF.select(col1, col2).filter(col2 > 100)
及时清理 Cache :
- 异步模式 (常用):调用 unpersist() 或 unpersist(False)
- 同步模式:调用 unpersist(True)
OOM
OOM的具体区域 :
- 发生 OOM 的 LOC(Line Of Code),代码位置
- OOM 发生在 Driver 端,还是在 Executor 端
- 在 Executor 端的哪片内存区域
Driver OOM
Driver 端的 OOM 位置 :
- 创建小规模的分布式数据集:使用 parallelize、createDataFrame 创建数据集
- 收集计算结果:通过 take、show、collect 把结果收集到 Driver 端
Driver 端的 OOM 原因:
- 创建的数据集超过内存上限
- 收集的结果集超过内存上限
广播变量的创建与分发 :
广播变量的数据拉取就是用 collect 。当数据总大小超过 Driver 端内存时 , 就报 OOM :
java.lang.OutOfMemoryError: Not enough memory to build and broadcast
对结果集尺寸预估,适当增加 Driver 内存配置
- Driver 内存大小 :
spark.driver.memory
查看执行计划 :
val df: DataFrame = _
df.cache.countval plan = df.queryExecution.logical
val estimated: BigInt = spark.sessionState.executePlan(plan).optimizedPlan.stats.sizeInBytes
Executor OOM
当 Executors OOM 时,定位 Execution Memory 和 User Memory
User Memory OOM
User Memory 用于存储用户自定义的数据结构,如: 数组、列表、字典
当自定义数据结构的总大小超出 User Memory 上限时,就会报错
java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf
java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newInstance
计算 User Memory 消耗时,要考虑 Executor 的线程池大小
- 当 dict 大小为
#size
, Executor 线程池大小为#threads
- dict 对 User Memory 的总消耗:
#size * #threads
- 当总消耗超出 User Memory 上限,就会 OOM
val dict = List("spark", "tune")
val words = spark.sparkContext.textFile("~/words.csv")val keywords = words.filter(word => dict.contains(word))
keywords.map((_, 1)).reduceByKey(_ + _).collect
自定义数据分发 :
- 自定义的列表 dict 会随着 Task 分发到所有 Executors
- 多个 Task 的 dict 会对 User Memory 产生重复消耗
解决 User Memory OOM 的思路 :
- 先对数据结构的消耗进行预估
- 相应地扩大 User Memory
UserMemory 总大小 = spark.executor.memory * (1 - spark.memory.fraction)
Execution Memory OOM
Execution Memory OOM 常见实例:数据倾斜和 数据膨胀
配置说明:
- 2 个 CPU core,每个 core 有两个线程,内存大小为 1GB
- spark.executor.cores = 3,spark.executor.memory = 900MB
- Execution Memory = 180MB
- Storage Memory = 180MB
- Execution Memory 上限 = 360MB
数据倾斜
3 个 Reduce Task 对应的数据分片大小分别是 100MB , 100MB , 300MB
- 当 Executor 线程池大小为 3,所以每个 Reduce Task 最多 360MB * 1/3 = 120MB
数据倾斜导致OOM :
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NHRj4w40-1677761995168)(…/…/png/%E5%86%85%E5%AD%98%E8%BF%90%E7%94%A8/image-20230209210919876.png)]
2 种调优思路:
- 消除数据倾斜,让所有的数据分片尺寸都小于 100MB
- 调整 Executor 线程池、内存、并行度等相关配置,提高 1/N 上限到 300MB
在 CPU 利用率高下解决 OOM :
- 维持并发度、并行度不变,增大执行内存设置,提高 1/N 上限到 300MB
- 维持并发度、执行内存不变,提升并行度把数据打散,将所有的数据分片尺寸都缩小到 100MB 内
数据膨胀
3 个 Map Task 对应的数据分片大小都是 100MB
数据膨胀导致OOM :
2 种调优思路:
- 把数据打散,提高数据分片数量、降低数据粒度,让膨胀后的数据量降到 100MB 左右
- 加大内存配置,结合 Executor 线程池调整,提高 1/N 上限到 300MB
相关文章:

Spark 内存运用
RDD Cache 当同一个 RDD 被引用多次时,就可以考虑进行 Cache,从而提升作业的执行效率 // 用 cache 对 wordCounts 加缓存 wordCounts.cache // cache 后要用 action 才能触发 RDD 内存物化 wordCounts.count// 自定义 Cache 的存储介质、存储形式、副本…...

SpringBoot集成Swagger3.0(入门) 02
文章目录Swagger3常用配置注解接口测试API信息配置Swagger3 Docket开关,过滤,分组Swagger3常用配置注解 ApiImplicitParams,ApiImplicitParam:Swagger3对参数的描述。 参数名参数值name参数名value参数的具体意义,作用。required参…...
网络协议丨ICMP协议
ICMP协议,全称 Internet Control Message Protocol,就是互联网控制报文协议。我们其实对它并不陌生,我们平时经常使用的”ping“一下就是基于这个协议工作的。网络包在异常复杂的网络环境中传输时,常常会遇到各种各样的问题。当遇…...

12.1 基于Django的服务器信息查看应用(系统信息、用户信息)
文章目录新建Django项目创建子应用并设置本地化创建数据库表创建超级用户git管理项目(requirements.txt、README.md、.ignore)主机信息监控应用的框架搭建具体功能实现系统信息展示前端界面设计视图函数设计用户信息展示视图函数设计自定义过滤器的实现前…...

ExSwin-Unet 论文研读
ExSwin-Unet摘要1 引言2 方法2.1 基于窗口的注意力块2.2 外部注意力块2.3 不平衡的 Unet 架构2.4 自适应加权调整2.5 双重损失函数3 实验结果3.1 数据集3.2 实现细节3.3 与 SOTA 方法的比较3.4 消融研究4 讨论和限制5 结论数据集来源: https://feta.grand-challenge…...

置顶!!!主页禁言提示原因:在自己论坛发动态误带敏感词,在自己论坛禁止评论90天
置顶!!!主页禁言提示原因:在自己论坛发动态误带敏感词,在自己论坛禁止评论90天 置顶!!!主页禁言提示原因:在自己论坛发动态误带敏感词,在自己论坛禁止评论90天…...

优思学院|解密六西格玛:探索DMAIC和DMADV之间的区别
六西格玛方法中最为广泛使用的两种方法是DMAIC和DMADV。这两种方法都是为了让企业流程更加高效和有效而设计的。虽然这两种方法有一些重要的共同特点,但它们并不可以互相替代,并且被开发用于不同的企业流程。在更详细地比较这两种方法之前,我…...
Pytorch的DataLoader输入输出(以文本为例)
本文不做太多原理介绍,直讲使用流畅。想看更多底层实现-〉传送门。DataLoader简介torch.utils.data.DataLoader是PyTorch中数据读取的一个重要接口,该接口定义在dataloader.py脚本中,只要是用PyTorch来训练模型基本都会用到该接口。本文介绍t…...

代谢组学:Microbiome又一篇!绘制重症先天性心脏病新生儿肠道微生态全景图谱
文章标题:Mapping the early life gut microbiome in neonates with critical congenital heart disease: multiomics insights and implications for host metabolic and immunological health 发表期刊:Microbiome 影响因子:16.837…...
Java基本类型所占字节简述
类型分类所占字节取值范围boolean布尔型1bit0 false、 1 true (1个bit 、1个字节、4个字节)char 字符型(Unicode字符集中的一个元素) 2字节-32768~32767(-2的15次方~2的15次方-1)byte整型1字节-128&a…...
Linux vi常用操作
vi/vim 共分为三种模式,分别是命令模式(Command mode),输入模式(Insert mode)和底线命令模式(Last line mode)。 这三种模式的作用分别是: 命令模式: 用户刚…...
Unicode(宽字节)、ANSI(多字节)
1、什么时候用Unicode(宽字节),什么时候用ANSI(多字节)? 在linux/windows等操作系统中使用的,一般都是Unicode(宽字节)。 下位机PLC/单片机等硬件设备中使用,一般都是ANSI(多字节)。 所以,通讯中(比如VS项目&#x…...
STM32实战之LED循环点亮
接着上一章讲。本章我们来讲一讲LED流水灯,循环点亮LED。 在LED章节有的可能没有讲到,本章会对其进行说明,尽量每个函数说一下作用。也会在最后说一下STM32的寄存器,在编程中寄存器是避免不了的东西,寄存器也是非常好理…...

智慧厕所智能卫生间系统有哪些功能
南宁北站智能厕所主要功能有哪些?1、卫生间环境空气监测男厕、女厕环境空气监测系统包括对厕所内的温度、湿度、氨气、硫化氢、PM2.5、烟雾等气体数据的实时监测。2、卫生间厕位状态监测系统实时监测厕位内目前的使用状态(有人或无人),数据信…...

【网络】套接字 -- TCP
🥁作者: 华丞臧. 📕专栏:【网络】 各位读者老爷如果觉得博主写的不错,请诸位多多支持(点赞收藏关注)。如果有错误的地方,欢迎在评论区指出。 推荐一款刷题网站 👉 LeetCode刷题网站 文章…...

NDK C++ map容器
map容器// TODO map容器 #include <iostream> #include <map>using namespace std;int main() {// TODO map<int, string>按key值排序,同一个key不可以重复插入map<int, string> map1;map1.insert(pair<int, string>(1, "111&qu…...

linux(Centos)安装docker
官网地址:Install Docker Engine on CentOS 首先检查linux系统版本及内核: 安装docker要求系统版本至少为7.x版本,内核至少为3.8以上 cat /etc/redhat-release # 查看系统版本号uname -r #查看linux系统内核 检查系统是否能连上外网&#…...

Delphi 中 FireDAC 数据库连接(处理错误)
参见:Delphi 中 FireDAC 数据库连接(总览)本主题描述了如何用FireDAC处理数据库错误。一、概述EFDDBEngineException类是所有DBMS异常的基类。单个异常对象是一个数据库错误的集合,可以通过EFDDBEngineException.Errors[]属性访问…...

算法小抄3-理解使用Python容器之列表
引言 首先说一个概念哈,程序算法数据结构,算法是条件语句与循环语句组成的逻辑结构,而数据结构也就是容器. 算法决定数据该如何处理,而容器则决定如何数据如何存储. 不同的语言对容器有不同的实现方式, 但他们的功能都是相似的, 打好容器基础,你就可以在各式各样的语言中来回横…...

Vue3中watch的value问题
目录前言一,ref和reactive的简单复习1.ref函数1.2 reactive函数1.3 用ref定义对象类型数据不用reactive二,watch的value问题2.1 ref2.1.1 普通类型数据2.1.2 对象类型数据2.1.3 另一种方式2.2 reactive三,总结后记前言 在Vue3中,…...

JavaSec-RCE
简介 RCE(Remote Code Execution),可以分为:命令注入(Command Injection)、代码注入(Code Injection) 代码注入 1.漏洞场景:Groovy代码注入 Groovy是一种基于JVM的动态语言,语法简洁,支持闭包、动态类型和Java互操作性,…...
云计算——弹性云计算器(ECS)
弹性云服务器:ECS 概述 云计算重构了ICT系统,云计算平台厂商推出使得厂家能够主要关注应用管理而非平台管理的云平台,包含如下主要概念。 ECS(Elastic Cloud Server):即弹性云服务器,是云计算…...

Vue3 + Element Plus + TypeScript中el-transfer穿梭框组件使用详解及示例
使用详解 Element Plus 的 el-transfer 组件是一个强大的穿梭框组件,常用于在两个集合之间进行数据转移,如权限分配、数据选择等场景。下面我将详细介绍其用法并提供一个完整示例。 核心特性与用法 基本属性 v-model:绑定右侧列表的值&…...

【网络安全产品大调研系列】2. 体验漏洞扫描
前言 2023 年漏洞扫描服务市场规模预计为 3.06(十亿美元)。漏洞扫描服务市场行业预计将从 2024 年的 3.48(十亿美元)增长到 2032 年的 9.54(十亿美元)。预测期内漏洞扫描服务市场 CAGR(增长率&…...

dedecms 织梦自定义表单留言增加ajax验证码功能
增加ajax功能模块,用户不点击提交按钮,只要输入框失去焦点,就会提前提示验证码是否正确。 一,模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...

C# 类和继承(抽象类)
抽象类 抽象类是指设计为被继承的类。抽象类只能被用作其他类的基类。 不能创建抽象类的实例。抽象类使用abstract修饰符声明。 抽象类可以包含抽象成员或普通的非抽象成员。抽象类的成员可以是抽象成员和普通带 实现的成员的任意组合。抽象类自己可以派生自另一个抽象类。例…...
C# SqlSugar:依赖注入与仓储模式实践
C# SqlSugar:依赖注入与仓储模式实践 在 C# 的应用开发中,数据库操作是必不可少的环节。为了让数据访问层更加简洁、高效且易于维护,许多开发者会选择成熟的 ORM(对象关系映射)框架,SqlSugar 就是其中备受…...
精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南
精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南 在数字化营销时代,邮件列表效度、用户参与度和网站性能等指标往往决定着创业公司的增长成败。今天,我们将深入解析邮件打开率、网站可用性、页面参与时…...

如何在网页里填写 PDF 表格?
有时候,你可能希望用户能在你的网站上填写 PDF 表单。然而,这件事并不简单,因为 PDF 并不是一种原生的网页格式。虽然浏览器可以显示 PDF 文件,但原生并不支持编辑或填写它们。更糟的是,如果你想收集表单数据ÿ…...
LCTF液晶可调谐滤波器在多光谱相机捕捉无人机目标检测中的作用
中达瑞和自2005年成立以来,一直在光谱成像领域深度钻研和发展,始终致力于研发高性能、高可靠性的光谱成像相机,为科研院校提供更优的产品和服务。在《低空背景下无人机目标的光谱特征研究及目标检测应用》这篇论文中提到中达瑞和 LCTF 作为多…...