Spark 广播/累加
Spark 广播/累加
- 广播变量
- 普通变量广播
- 分布式数据集广播
- 克制 Shuffle
- 强制广播
- 配置项
- Join Hints
- broadcast
- 累加器
Spark 提供了两类共享变量:广播变量(Broadcast variables)/累加器(Accumulators)
广播变量
创建广播变量的方式:
- 从普通变量创建广播变量 : 由 Driver 分发各 Executors
- 从分布式数据集创建广播变量 : Driver 拉取各 Executors 分区数并合并, 再分发各Executors
普通变量广播
普通变量分发 :
- 普通变量在 Driver 端创建 (非分布式数据集),要把普通变量分发给每个 Task
- 以 Task 粒度分发,当有 n 个 Task,变量就要分发 n 次
- 在同个 Executor 内部,多个不同的 Task 多次重复缓存同样的内容 , 对内存资源浪费

广播变量分发:
- 以 Executors 粒度分发,同个 Executor 的 各 Tasks 互相拷贝。即:变量分发数 = Executors 数
普通变量广播:
val list: List[String] = List("Apache", "Spark")val bc = sc.broadcast(list)

分布式数据集广播
创建分布式数据集广播:
val userFile: String = "hdfs://ip:port/rootDir/userData"
val df: DataFrame = spark.read.parquet(userFile)val bc_df: Broadcast[DataFrame] = spark.sparkContext.broadcast(df)
分布式数据集广播过程 :
- Driver 从所有的 Executors 拉取这些数据分区,再在本地构建全量数据
- Driver 把合并的全量数据分发给各个 Executors
- Executors 收到数据后,缓存到存储系统的 BlockManager

克制 Shuffle
无优化时,默认用 Shuffle Join
val transactionsDF: DataFrame = _
val userDF: DataFrame = _transactionsDF.join(userDF, Seq("userID"), "inner")
Shuffle Join 的过程 :
- 对关联俩表分别进行 Shuffle
- Shuffle 的分区规则:先对 Join keys 计算哈希值,再对哈希值进行分区数取模
- Shuffle 后,同 key 的数据会在同个 Executors
- Reduce Task 对 同 key 的数据进行关联

优化代码:
import org.apache.spark.sql.functions.broadcastval transactionsDF: DataFrame = _
val userDF: DataFrame = _val bcUserDF = broadcast(userDF)
transactionsDF.join(bcUserDF, Seq("userID"), "inner")
广播过程:
- Driver 从所有 Executors 收集 userDF 的所有数据分片,再在本地汇总数据
- 给每个 Executors 都发送一份全量数据,各自在本地关联
- 利用广播变量 ,就能避免 Shuffle

强制广播
广播注意点:
- 创建广播变量越大,网络开销和 Driver 内存也就越大。当广播变量大小 > 8GB,就会直接报错
- Broadcast Joins 不支持全连接(Full Outer Joins)
- 左连接(Left Outer Join)时,只能广播右表
- 右连接(Right Outer Join)时,只能广播左表
配置项
两张 Join 表,只要其中一张表的尺寸 < 10MB,就会采用 Broadcast Joins 做数据关联
# 采用 Broadcast Join 实现的最低阈值
spark.sql.autoBroadcastJoinThreshold 10m
数据在存储/内存大小差异的原因:
- 为了存储/访问效率,数据采用 Parquet/ORC 格式进行落盘
- JVM 一般需要比数据原始更大的内存空间来存储对象
准确预估表在内存的大小:
- 把表缓存到内存,如: DataFrame/Dataset.cache
- 读取执行计划的统计数据
val df: DataFrame = _
df.cache.countval plan = df.queryExecution.logical
val estimated: BigInt = spark.sessionState.executePlan(plan).optimizedPlan.stats.sizeInBytes
Join Hints
Join Hints :在开发中用特殊的语法,告知 Spark SQL 运行时采用这种 Join
val table1: DataFrame = spark.read.parquet(path1)
val table2: DataFrame = spark.read.parquet(path2)table1.createOrReplaceTempView("t1")
table2.createOrReplaceTempView("t2")val query: String = "select /*+ broadcast(t2) */ * from t1 inner join t2 on t1.id = t2.id"val queryResutls: DataFrame = spark.sql(query)
DataFrame 的 DSL 语法中使用 Join Hints :
table1.join(table2.hint("b"roadcast"), Seq("key"), "inner")
broadcast
广播数据表 :
import org.apache.spark.sql.functions.broadcasttable1.join(broadcast(table2), Seq(“key”), “inner”)
广播设置要点:以广播阈值配置为主,以强制广播为辅
累加器
累加器的作用:全局计数(Global counter)
SparkContext 提供了 3 种累加器 :
- longAccumulator:Long 类型的累加器
- doubleAccumulator :对 Double 类型的数值做全局计数
- collectionAccumulator :定义集合类型的累加器
累加器在 Driver 端定义,在 RDD 算子中调用 add 进行累加。最后在 Driver 端调用 value ,就能获取全局计数结果
// 定义 Long 类型的累加器
val ac = sc.longAccumulator("Empty string")def f(x: String): Boolean = {if(x.equals("")) {// 当遇到空字符串时,累加器加 1ac.add(1)return false} else {return true}
} //用 f 对 RDD 进行过滤
val cleanWordRDD: RDD[String] = wordRDD.filter(f)// 作业执行完毕,通过调用 value 获取累加器结果
ac.value
相关文章:
Spark 广播/累加
Spark 广播/累加广播变量普通变量广播分布式数据集广播克制 Shuffle强制广播配置项Join Hintsbroadcast累加器Spark 提供了两类共享变量:广播变量(Broadcast variables)/累加器(Accumulators) 广播变量 创建广播变量…...
飞天云动,站在下一个商业时代的门口
ChatGPT的爆火让AIGC再度成为热词,随之而来的是对其商业化的畅想——不是ChatGPT自身如何盈利,而是它乃至整个AIGC能给现在的商业环境带来多大改变。 这不由得令人想起另一个同样旨在改变世界的概念,元宇宙。不同的是,元宇宙更侧…...
上海分时电价机制调整对储能项目的影响分析
安科瑞 耿敏花 2022年12月16日,上海市发改委发布《关于进一步完善我市分时电价机制有关事项的通知》(沪发改价管〔2022〕50号)。通知明确上海分时电价机制,一般工商业及其他两部制、大工业两部制用电夏季(7、8、9月)和冬季&#x…...
产品新人如何快速上手工作
三百六十行,行行出产品经理:上至封神的乔布斯,下至卖鸡蛋罐饼的阿姨,他们对如何打造自己的产品都会有一套完整的产品思路,这也是为什么说“人人都是产品经理”。这个看似光鲜的“经理”有时也会被戏称产品汪࿰…...
Linux: ARM GIC仅中断CPU 0问题分析
文章目录1. 前言2. 分析背景3. 问题4. 分析4.1 ARM GIC 中断芯片简介4.1.1 中断类型和分布4.1.2 拓扑结构4.2 问题根因4.2.1 设置GIC SPI中断的CPU亲和性4.2.2 GIC初始化:缺省的CPU亲和性4.2.2.1 boot CPU亲和性初始化流程4.2.2.1 其它非 boot CPU亲和性初始化流程5…...
第20篇:Java运算符全面总结(系列二)
目录 4、逻辑运算符 4.1 逻辑运算符 4.2 代码示例 5、赋值运算符 5.1 赋值运算符...
OpenCV4.x图像处理实例-OpenCV两小时快速入门(基于Python)
OpenCV两小时快速入门(基于Python) 文章目录 OpenCV两小时快速入门(基于Python)1、OpenCV环境安装2、图像读取与显示3、图像像素访问、操作与ROI4、图像缩放5、几何变换5.1 平移5.2 旋转6、基本绘图6.1 绘制直线6.2 绘制圆6.3 绘制矩形6.4 绘制文本7、剪裁图像8、图像平滑与…...
【Git】Mac忽略.DS_Store文件
我们在github上经常看到某些仓库里面包含了.DS_Store文件,或者某些sdk的压缩包里面可以看到,这都是由于随着git的提交把这类文件也提交到仓库,压缩也是一样,压缩这个先留着后面处理。 Mac上的.DS_Store文件 .DS_Store 文件&#…...
12.2 基于Django的服务器信息查看应用(CPU信息)
文章目录CPU信息展示图表展示-视图函数设计图表展示-前端界面设计折线图和饼图展示饼图测试折线图celery和Django配合实现定时任务Windows安装redis根据数据库中的数据绘制CPU折线图CPU信息展示 图表展示-视图函数设计 host/views.py def cpu(request):logical_core_num ps…...
【软件测试】接口测试总结
本文主要分为两个部分: 第一部分:主要从问题出发,引入接口测试的相关内容并与前端测试进行简单对比,总结两者之前的区别与联系。但该部分只交代了怎么做和如何做?并没有解释为什么要做? 第二部分࿱…...
代码随想录算法训练营第52天 || 300.最长递增子序列 || 674. 最长连续递增序列 || 718. 最长重复子数组
代码随想录算法训练营第52天 || 300.最长递增子序列 || 674. 最长连续递增序列 || 718. 最长重复子数组 300.最长递增子序列 题目介绍 给你一个整数数组 nums ,找到其中最长严格递增子序列的长度。 子序列 是由数组派生而来的序列,删除(或…...
gitblit 安装使用
1 安装服务端 简而言之:需要安装 java,gitblit, git 三个软件 Windows 10环境使用Gitblit搭建局域网Git服务器 前言 安装Java并配置环境安装gitblit并配置启动gitblit为windows服务使用gitblit创建repository并管理用户 1.1 安装Java并配…...
使用 TensorFlow、Keras-OCR 和 OpenCV 从技术图纸中获取信息
简单介绍输入是技术绘图图像。对象检测模型获取图像后对其进行分类,找到边界框,分配维度,计算属性。示例图像(输入)分类后,找到“IPN”部分。之后,它计算属性,例如惯性矩。它适用于不…...
ESP32设备驱动-GUVA-S12SD紫外线检测传感器驱动
GUVA-S12SD紫外线检测传感器驱动 文章目录 GUVA-S12SD紫外线检测传感器驱动1、GUVA-S12SD介绍2、硬件准备3、软件准备4、驱动实现1、GUVA-S12SD介绍 GUVA-S12SD 紫外线传感器芯片适用于检测太阳光中的紫外线辐射。 它可用于任何需要监控紫外线量的应用,并且可以简单地连接到任…...
WIN7下 program file 权限不足?咋整?!!
在WIN7下对Program Files目录的权限问题 [问题点数:40分,结帖人mysunck] 大部分人说要使用manifest,但是其中一个人说: “安装程序要求管理员很正常,你的程序可以在programfiles,但用户数据不能放那里,因…...
119.(leaflet篇)文字碰撞
听老人家说:多看美女会长寿 地图之家总目录(订阅之前建议先查看该博客) 文章末尾处提供保证可运行完整代码包,运行如有问题,可“私信”博主。 效果如下所示: 下面献上完整代码,代码重要位置会做相应解释 <!DOCTYPE html> <html>...
cuda编程以及GPU基本知识
目录CPU与GPU的基本知识CPU特点GPU特点GPU vs. CPU什么样的问题适合GPU?GPU编程CUDA编程并行计算的整体流程CUDA编程术语:硬件CUDA编程术语:内存模型CUDA编程术语:软件线程块(Thread Block)网格(…...
Python 机器学习/深度学习/算法专栏 - 导读目录
目录 一.简介 二.机器学习 三.深度学习 四.数据结构与算法 五.日常工具 一.简介 Python 机器学习、深度学习、算法主要是博主从研究生到工作期间接触的一些机器学习、深度学习以及一些算法的实现的记录,从早期的 LR、SVM 到后期的 Deep,从学习到工…...
Springboot怎么实现restfult风格Api接口
前言在最近的一次技术评审会议上,听到有同事发言说:“我们的项目采用restful风格的接口设计,开发效率更高,接口扩展性更好...”,当我听到开头第一句,我脑子里就开始冒问号:项目里的接口用到的是…...
Jetpack Compose 深入探索系列六:Compose runtime 高级用例
Compose runtime vs Compose UI 在深入讨论之前,非常重要的一点是要区分 Compose UI 和 Compose runtime。Compose UI 是 Android 的新 UI 工具包,具有 LayoutNodes 的树形结构,它们稍后在画布上绘制其内容。Compose runtime 提供底层机制和…...
以下是对华为 HarmonyOS NETX 5属性动画(ArkTS)文档的结构化整理,通过层级标题、表格和代码块提升可读性:
一、属性动画概述NETX 作用:实现组件通用属性的渐变过渡效果,提升用户体验。支持属性:width、height、backgroundColor、opacity、scale、rotate、translate等。注意事项: 布局类属性(如宽高)变化时&#…...
spring:实例工厂方法获取bean
spring处理使用静态工厂方法获取bean实例,也可以通过实例工厂方法获取bean实例。 实例工厂方法步骤如下: 定义实例工厂类(Java代码),定义实例工厂(xml),定义调用实例工厂ÿ…...
【SSH疑难排查】轻松解决新版OpenSSH连接旧服务器的“no matching...“系列算法协商失败问题
【SSH疑难排查】轻松解决新版OpenSSH连接旧服务器的"no matching..."系列算法协商失败问题 摘要: 近期,在使用较新版本的OpenSSH客户端连接老旧SSH服务器时,会遇到 "no matching key exchange method found", "n…...
【Android】Android 开发 ADB 常用指令
查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...
Spring AI Chat Memory 实战指南:Local 与 JDBC 存储集成
一个面向 Java 开发者的 Sring-Ai 示例工程项目,该项目是一个 Spring AI 快速入门的样例工程项目,旨在通过一些小的案例展示 Spring AI 框架的核心功能和使用方法。 项目采用模块化设计,每个模块都专注于特定的功能领域,便于学习和…...
DAY 45 超大力王爱学Python
来自超大力王的友情提示:在用tensordoard的时候一定一定要用绝对位置,例如:tensorboard --logdir"D:\代码\archive (1)\runs\cifar10_mlp_experiment_2" 不然读取不了数据 知识点回顾: tensorboard的发展历史和原理tens…...
【java面试】微服务篇
【java面试】微服务篇 一、总体框架二、Springcloud(一)Springcloud五大组件(二)服务注册和发现1、Eureka2、Nacos (三)负载均衡1、Ribbon负载均衡流程2、Ribbon负载均衡策略3、自定义负载均衡策略4、总结 …...
数据分析六部曲?
引言 上一章我们说到了数据分析六部曲,何谓六部曲呢? 其实啊,数据分析没那么难,只要掌握了下面这六个步骤,也就是数据分析六部曲,就算你是个啥都不懂的小白,也能慢慢上手做数据分析啦。 第一…...
Copilot for Xcode (iOS的 AI辅助编程)
Copilot for Xcode 简介Copilot下载与安装 体验环境要求下载最新的安装包安装登录系统权限设置 AI辅助编程生成注释代码补全简单需求代码生成辅助编程行间代码生成注释联想 代码生成 总结 简介 尝试使用了Copilot,它能根据上下文补全代码,快速生成常用…...
OPENCV图形计算面积、弧长API讲解(1)
一.OPENCV图形面积、弧长计算的API介绍 之前我们已经把图形轮廓的检测、画框等功能讲解了一遍。那今天我们主要结合轮廓检测的API去计算图形的面积,这些面积可以是矩形、圆形等等。图形面积计算和弧长计算常用于车辆识别、桥梁识别等重要功能,常用的API…...
