【Spark分布式内存计算框架——Spark Core】8. 共享变量
第七章 共享变量
在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。
为了满足这种需求,Spark提供了两种类型的变量:
1)、广播变量Broadcast Variables
- 广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本;
2)、累加器Accumulators - 累加器支持在所有不同节点之间进行累加计算(比如计数或者求和);
官方文档:http://spark.apache.org/docs/2.4.5/rdd-programming-guide.html#shared-variables
7.1 广播变量
广播变量允许开发人员在每个节点(Worker or Executor)缓存只读变量,而不是在Task之间传递这些变量。使用广播变量能够高效地在集群每个节点创建大数据集的副本。同时Spark还使用高效的广播算法分发这些变量,从而减少通信的开销。

可以通过调用sc.broadcast(v)创建一个广播变量,该广播变量的值封装在v变量中,可使用获取
该变量value的方法进行访问。

7.2 累加器
Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能,即确提供了多个task对一个变量并行操作的功能。但是task只能对Accumulator进行累加操作,不能读取Accumulator的值,只有Driver程序可以读取Accumulator的值。创建的Accumulator变量的值能够在Spark Web UI上看到,在创建时应该尽量为其命名。

Spark内置了三种类型的Accumulator,分别是LongAccumulator用来累加整数型,DoubleAccumulator用来累加浮点型,CollectionAccumulator用来累加集合元素。
当内置的Accumulator无法满足要求时,可以继承AccumulatorV2实现自定义的累加器。实现自定义累加器的步骤:
- 第一步、继承AccumulatorV2,实现相关方法;
- 第二步、创建自定义Accumulator的实例,然后在SparkContext上注册它;
官方提供实例如下:

7.3 案例演示
以词频统计WordCount程序为例,假设处理的数据如下所示,包括非单词符合,统计数据词频时过滤非单词的符合并且统计总的格式。

实现功能:
第一、过滤非单词符合
- 非单词符合存储列表List中

- 使用广播变量广播列表

第二、累计统计非单词符号出现次数
- 定义一个LongAccumulator累加器,进行计数

范例演示完整代码如下
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}
/**
* 基于Spark框架使用Scala语言编程实现词频统计WordCount程序,将符号数据过滤,并统计出现的次数
* -a. 过滤标点符号数据
* 使用广播变量
* -b. 统计出标点符号数据出现次数
* 使用累加器
*/
object SparkSharedVariableTest {
def main(args: Array[String]): Unit = {
// 创建应用程序入口SparkContext实例对象
val sc: SparkContext = {
// 1.a 创建SparkConf对象,设置应用的配置信息
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// 1.b 传递SparkConf对象,构建Context实例
new SparkContext(sparkConf)
}
sc.setLogLevel("WARN")
// a. 读取文件数据
val datasRDD: RDD[String] = sc.textFile("datas/filter/datas.input", minPartitions = 2)
// TODO: 字典数据,只要有这些单词就过滤: 特殊字符存储列表List中
val list: List[String] = List(",", ".","!","#","$","%")
// TODO: 通过广播变量 将列表list广播到各个Executor内存中,便于多个Task使用
val listBroadcast: Broadcast[List[String]] = sc.broadcast(list)
// TODO: 定义累加器,记录单词为符号数据的个数
val accumulator: LongAccumulator = sc.longAccumulator("number_accum")
// b. 分割单词,过滤数据
val wordsRDD = datasRDD
// 1)、过滤数据,去除空行数据
.filter(line => null != line && line.trim.length > 0)
// 2)、分割单词
.flatMap(line => line.trim.split("\\s+"))
// 3)、过滤字典数据:符号数据
.filter{word =>
// 获取符合列表 TODO: 从广播变量中获取列表list的值
val listValue = listBroadcast.value
// 判断单词是否为符号数据,如果是就过滤掉
val isFlag = listValue.contains(word)
if(isFlag){
// TODO: 如果单词为符号数据,累加器加1
accumulator.add(1L)
}
!isFlag
}
val resultRDD: RDD[(String, Int)] = wordsRDD
// 转换为二元组
.mapPartitions{iter => iter.map(word => (word, 1))}
// 按照单词聚合统计
.reduceByKey((tmp, item) => tmp + item)
resultRDD.foreach(println)
println(s"过滤符合数据的个数:${accumulator.value}")
// 应用程序运行结束,关闭资源
sc.stop()
}
}
运行应用,查看WEB UI监控,定义累加器的值:

相关文章:
【Spark分布式内存计算框架——Spark Core】8. 共享变量
第七章 共享变量 在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和…...
C++多态常见面试题
1.什么是多态 简单点说,就是多种形态,具体就是完成某个行为,当不同的对象去完成时产生的不同形态。多态分为静态多态和动态多态,静态多态一般指的是函数重载,在编译阶段通过函数名修饰规则,不同类型调用不同…...
字母板上的路径 题解,力扣官方出来挨打(小声)
字母板上的路径 我们从一块字母板上的位置 (0, 0) 出发,该坐标对应的字符为 board[0][0]。 在本题里,字母板为board [“abcde”, “fghij”, “klmno”, “pqrst”, “uvwxy”, “z”],如下所示。 我们可以按下面的指令规则行动:…...
代码随想录算法训练营第二十六天 | 39. 组合总和,40.组合总和II,131.分割回文串
一、参考资料组合总和题目链接/文章讲解:https://programmercarl.com/0039.%E7%BB%84%E5%90%88%E6%80%BB%E5%92%8C.html 视频讲解:https://www.bilibili.com/video/BV1KT4y1M7HJ 组合总和II题目链接/文章讲解:https://programmercarl.com/004…...
vueday01-脚手架安装详细
一、vue脚手架安装命令npm i -g vue/cli 或 yarn global add vue/cli安装上面的工具,安装后运行 vue --version ,如果看到版本号,说明安装成功或 vue -V工具安装好之后,就可以安装带有webpack配置的vue项目了。创建项目之前&#…...
初识cesium3d(一)
使用ViteVue3.2Cesium。Vite需要Node.js版本14.18及以上版本。Vite命令创建的工程会自动生成vite.config.js文件,来配置一些相关的参数。 1、使用Vite创建vue3项目 # npm npm init vitelatest cesium-app -- --template vue # yarn yarn create vite cesium-app…...
点云转3D网格【Python】
推荐:使用 NSDT场景设计器 快速搭建 3D场景。 在本文中,我将介绍我的 3D 表面重建过程,以便使用 Python 从点云快速创建网格。 你将能够导出、可视化结果并将结果集成到您最喜欢的 3D 软件中,而无需任何编码经验。 此外࿰…...
【OpenCV图像处理系列一】OpenCV开发环境的安装与搭建(Ubuntu + Window都适用)
🔗 运行环境:OpenCV,Ubuntu,Windows 🚩 撰写作者:左手の明天 🥇 精选专栏:《python》 🔥 推荐专栏:《算法研究》 #### 防伪水印——左手の明天 #### &#x…...
【代码随想录】-动态规划专题
文章目录理论基础斐波拉契数列爬楼梯使用最小花费爬楼梯不同路径不同路径 II整数拆分不同的二叉搜索树背包问题——理论基础01背包二维dp数组01背包一维数组(滚动数组)装满背包分割等和子集最后一块石头的重量 II目标和一和零完全背包零钱兑换 II组合总和…...
c++数据类型 输入输出
C++语法 //常用包: iostream:cin cout endl cstdio:scanf printf algorithm:max min reverse swap cstring:memset memcpymemset(a,-1,sizeof a) 填充数组memcpy(b,a,sizeof a) 将a数组复制到b数组,长度是a数组字节长度 cmath:sin sqrt pow abs fabs编程是一种控制计…...
【设计模式-11】责任链模式
认识设计模式(十一)---责任链模式【一】责任链模式【二】介绍(1)意图(2)主要解决(3)何时使用(4)如何解决(5)关键代码(6&am…...
SpringBoot+Vue实现智能物流管理系统
文末获取源码 开发语言:Java 框架:springboot JDK版本:JDK1.8 服务器:tomcat7 数据库:mysql 5.7/8.0 数据库工具:Navicat11 开发软件:eclipse/myeclipse/idea Maven包:Maven3.3.9 浏…...
【MT7628】MT7628如何修改串口波特率、调试串口物理口、使用UART3口
环境说明 sdk版本:Mediatek_ApSoC_SDK_4320_20150414.tar.bz2 芯片方案:MT7628A Uboot修改串口波特率方法 修改rt2880.h文件 修改include/configs/rt2880.h文件CONFIG_BAUDRATE宏的值 - #define CONFIG_BAUDRATE 57600 +#define CONFIG_BAUDRATE 115200 Kernel中修改串口波特…...
css盒模型介绍
在使用CSS进行网页布局时,我们一定离不开的一个东西————盒子模型。盒子模型,顾名思义,盒子就是用来装东西的,它装的东西就是HTML元素的内容。或者说,每一个可见的 HTML 元素都是一个盒子,下面所说的盒子…...
onetab 谷歌插件历史数据清除
文章目录方法1:测试也可以步骤1:批量执行点击步骤2:python 脚本模拟点击确定操作方法2:成功【推荐】步骤1:修改confirm,类似于hook操作步骤2:批量点击删除操作:onetab 谷歌插件历史数…...
GRBL源码简单分析
结构体说明 GRBL里面的速度规划是带运动段前瞻的,所以有规划运动段数据和微小运动段的区分 这里的“规划运动段”对应的数据结构是plan_block_t,前瞻和加减速会使用到,也就是通过解析G代码后出来的直接直线数据或是圆弧插补出来的拟合直线数据…...
第一部分:简单句——第一章:简单句的核心——二、简单句的核心变化(谓语动词的情态)
二、简单句的核心变化 简单句的核心变化其实就是 一主一谓(n. v.) 表达一件事情,谓语动词是其中最重要的部分,谓语动词的变化主要有四种:三态加一否(时态、语态、情态、否定),其中…...
软考高级考试中有五大证书,其中哪个更值得考?
计算机软考属于专业技术人员职业资格水平评价类,是职业资格、专业技术资格(职称)和专业技术水平"三合一"的考试,是目前IT行业仅有的国家级考试。考试不受学历、专业、资历等条件限制。软考高级考试中有五大证书…...
FlexRay™ 协议控制器 (E-Ray)-04
网络管理 累积的网络管理 (NM) 向量位于网络管理寄存器 1 到网络管理寄存器 3 (NMVx (x = 1-3)) 中。【The accrued Network Management (NM) vector is located in the Network Management Register 1 to Network Management Register 3 (NMVx (x = 1-3)).】 网络管理向量 x…...
container_of 根据成员变量获得包含其的对象的地址!
写在前面 本系列文章的灵感出处均是各个技术书籍的读后感,详细书籍信息见文章最后的参考文献 CONTAINER_OF 在书中发现一个很有意思的宏,以此可以衍生出来其很多的用法,这个宏可以根据某个成员变量的地址得到包含这个成员变量地址的对象的…...
web vue 项目 Docker化部署
Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段: 构建阶段(Build Stage):…...
JavaSec-RCE
简介 RCE(Remote Code Execution),可以分为:命令注入(Command Injection)、代码注入(Code Injection) 代码注入 1.漏洞场景:Groovy代码注入 Groovy是一种基于JVM的动态语言,语法简洁,支持闭包、动态类型和Java互操作性,…...
理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端
🌟 什么是 MCP? 模型控制协议 (MCP) 是一种创新的协议,旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议,它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...
CentOS下的分布式内存计算Spark环境部署
一、Spark 核心架构与应用场景 1.1 分布式计算引擎的核心优势 Spark 是基于内存的分布式计算框架,相比 MapReduce 具有以下核心优势: 内存计算:数据可常驻内存,迭代计算性能提升 10-100 倍(文档段落:3-79…...
MySQL中【正则表达式】用法
MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现(两者等价),用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例: 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...
蓝桥杯3498 01串的熵
问题描述 对于一个长度为 23333333的 01 串, 如果其信息熵为 11625907.5798, 且 0 出现次数比 1 少, 那么这个 01 串中 0 出现了多少次? #include<iostream> #include<cmath> using namespace std;int n 23333333;int main() {//枚举 0 出现的次数//因…...
MySQL 知识小结(一)
一、my.cnf配置详解 我们知道安装MySQL有两种方式来安装咱们的MySQL数据库,分别是二进制安装编译数据库或者使用三方yum来进行安装,第三方yum的安装相对于二进制压缩包的安装更快捷,但是文件存放起来数据比较冗余,用二进制能够更好管理咱们M…...
深度学习水论文:mamba+图像增强
🧀当前视觉领域对高效长序列建模需求激增,对Mamba图像增强这方向的研究自然也逐渐火热。原因在于其高效长程建模,以及动态计算优势,在图像质量提升和细节恢复方面有难以替代的作用。 🧀因此短时间内,就有不…...
CRMEB 中 PHP 短信扩展开发:涵盖一号通、阿里云、腾讯云、创蓝
目前已有一号通短信、阿里云短信、腾讯云短信扩展 扩展入口文件 文件目录 crmeb\services\sms\Sms.php 默认驱动类型为:一号通 namespace crmeb\services\sms;use crmeb\basic\BaseManager; use crmeb\services\AccessTokenServeService; use crmeb\services\sms\…...
Python+ZeroMQ实战:智能车辆状态监控与模拟模式自动切换
目录 关键点 技术实现1 技术实现2 摘要: 本文将介绍如何利用Python和ZeroMQ消息队列构建一个智能车辆状态监控系统。系统能够根据时间策略自动切换驾驶模式(自动驾驶、人工驾驶、远程驾驶、主动安全),并通过实时消息推送更新车…...
