【Spark分布式内存计算框架——Spark Streaming】4.入门案例(下)Streaming 工作原理
2.3 Streaming 工作原理
SparkStreaming处理流式数据时,按照时间间隔划分数据为微批次(Micro-Batch),每批次数据当做RDD,再进行处理分析。
以上述词频统计WordCount程序为例,讲解Streaming工作原理。
创建 StreamingContext
当SparkStreaming流式应用启动(streamingContext.start)时,首先创建StreamingContext流式上下文实例对象,整个流式应用环境构建,底层还是SparkContext。
当StreamingContext对象构建以后,启动接收器Receiver,专门从数据源端接收数据,此接收器作为Task任务运行在Executor中,一直运行(Long Runing),一直接收数据。
从WEB UI界面【Jobs Tab】可以看到【Job-0】是一个Receiver接收器,一直在运行,以Task方式运行,需要1Core CPU。
可以从多个数据源端实时消费数据进行处理,例如从多个TCP Socket接收数据,对每批次数据进行词频统计,使用DStream#union函数合并接收数据流,演示代码如下:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 从TCP Socket 中读取数据,对每批次(时间为5秒)数据进行词频统计,将统计结果输出到控制台。
* TODO: 从多个Socket读取流式数据,进行union合并
*/
object StreamingDStreamUnion {
def main(args: Array[String]): Unit = {
// TODO: 1. 构建StreamingContext流式上下文实例对象
val ssc: StreamingContext = {
// a. 创建SparkConf对象,设置应用配置信息
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[4]")
// b.创建流式上下文对象, 传递SparkConf对象,TODO: 时间间隔 -> 用于划分流式数据为很多批次Batch
val context = new StreamingContext(sparkConf, Seconds(5))
// c. 返回
context
}
// TODO: 2. 从数据源端读取数据,此处是TCP Socket读取数据
/*
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String]
*/
val inputDStream01: DStream[String] = ssc.socketTextStream("node1.itcast.cn", 9999)
val inputDStream02: DStream[String] = ssc.socketTextStream("node1.itcast.cn", 9988)
// 合并两个DStream流
val inputDStream: DStream[String] = inputDStream01.union(inputDStream02)
// TODO: 3. 对每批次的数据进行词频统计
val resultDStream: DStream[(String, Int)] = inputDStream
// 过滤不合格的数据
.filter(line => null != line && line.trim.length > 0)
// 按照分隔符划分单词
.flatMap(line => line.trim.split("\\s+"))
// 转换数据为二元组,表示每个单词出现一次
.map(word => (word, 1))
// 按照单词分组,聚合统计
.reduceByKey((tmp, item) => tmp + item)
// TODO: 4. 将结果数据输出 -> 将每批次的数据处理以后输出
resultDStream.print(10)
// TODO: 5. 对于流式应用来说,需要启动应用
ssc.start()
// 流式应用启动以后,正常情况一直运行(接收数据、处理数据和输出数据),除非人为终止程序或者程序异常停止
ssc.awaitTermination()
// 关闭流式应用(参数一:是否关闭SparkContext,参数二:是否优雅的关闭)
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}
接收器接收数据
启动每个接收器Receiver以后,实时从数据源端接收数据(比如TCP Socket),也是按照时间间隔将接收的流式数据划分为很多Block(块)。
接收器 Receiver划分流式数据的时间间隔BlockInterval ,默认值为 200ms,通过属性【spark.streaming.blockInterval】设置。接收器将接收的数据划分为Block以后,按照设置的存储级别对Block进行存储,从TCP Socket中接收数据默认的存储级别为:MEMORY_AND_DISK_SER_2,先存储内存,不足再存储磁盘,存储2副本。
从TCP Socket消费数据时可以设置Block存储级别,演示代码如下:
// TODO: 2. 从数据源端读取数据,此处是TCP Socket读取数据
/*
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String]
*/
val inputDStream: ReceiverInputDStream[String] = ssc.socketTextStream(
"node1.itcast.cn", //
9999, //
// TODO: 设置Block存储级别为先内存,不足磁盘,副本为1
storageLevel = StorageLevel.MEMORY_AND_DISK
)
汇报接收Block报告
接收器Receiver将实时汇报接收的数据对应的Block信息,当BatchInterval时间达到以后,StreamingContext将对应时间范围内数据block当做RDD,加载SparkContextt处理数据。
以此循环处理流式的数据,如下图所示:
Streaming 工作原理总述
整个Streaming运行过程中,涉及到两个时间间隔:
- 批次时间间隔:BatchInterval
- 每批次数据的时间间隔,每隔多久加载一个Job;
- Block时间间隔:BlockInterval
- 接收器划分流式数据的时间间隔,可以调整大小哦,官方建议最小值不能小于50ms;
- 默认值为200ms,属性:spark.streaming.blockInterval,调整设置
官方案例:
BatchInterval: 1s = 1000ms = 5 * BlockInterval
每批次RDD数据中,有5个Block,每个Block就是RDD一个分区数据
从代码层面结合实际数据处理层面来看,Streaming处理原理如下,左边为代码逻辑,右边为实际每批次数据处理过程。
具体运行数据时,每批次数据依据代码逻辑执行。
// TODO: 3. 对每批次的数据进行词频统计
val resultDStream: DStream[(String, Int)] = inputDStream
// 过滤不合格的数据
.filter(line => null != line && line.trim.length > 0)
// 按照分隔符划分单词
.flatMap(line => line.trim.split("\\s+"))
// 转换数据为二元组,表示每个单词出现一次
.map(word => (word, 1))
// 按照单词分组,聚合统计
.reduceByKey((tmp, item) => tmp + item)
// TODO: 4. 将结果数据输出 -> 将每批次的数据处理以后输出
resultDStream.print(10)
流式数据流图如下:
相关文章:
【Spark分布式内存计算框架——Spark Streaming】4.入门案例(下)Streaming 工作原理
2.3 Streaming 工作原理 SparkStreaming处理流式数据时,按照时间间隔划分数据为微批次(Micro-Batch),每批次数据当做RDD,再进行处理分析。 以上述词频统计WordCount程序为例,讲解Streaming工作原理。 创…...
2、算法先导---思维能力与工具
题目 碎纸片的拼接复原(2013B) 内容 破碎文件的拼接在司法物证复原、历史文献修复以及军事情报获取等领域都有着重要的应用。传统上,拼接复原工作需由人工完成,准确率较高,但效率很低。特别是当碎片数量巨大,人工拼接很难在短时…...
WordPress 函数:add_theme_support() 开启主题自定义功能(全面)
add_theme_support() 用于在我们的当前使用的主题添加一些特殊的功能,函数一般写在主题的functions.php文件中,当然也可以再插件中使用钩子来调用该函数,如果是挂在钩子上,那他必须挂在after_setup_theme钩子上,因为 i…...
Winform控件开发(16)——Timer(史上最全)
前言: Timer控件的作用是按用户定义的时间间隔引发事件的计时器,说的直白点就是,他就像一个定时炸弹一样到了一定时间就爆炸一次,区别在于定时炸弹炸完了就不会再次爆炸了,但是Timer这个计时器到了下一个固定时间还会触发一次,上面那张图片就是一个典型的计时器,该定时器…...
游戏高度可配置化:通用数据引擎(data-e)及其在模块化游戏开发中的应用构想图解
游戏高度可配置化:通数据引擎在模块化游戏开发中的应用构想图解 ygluu 码客 卢益贵 目录 一、前言 二、模块化与插件 1、常规模块化 2、插件式模块化(插件开发) 三、通用数据引擎理论与构成 1、名字系统(数据类型…...
CountDownLatch与CyclicBarrier原理剖析
1.CountDownLatch 1.1 什么是CountDownLatch CountDownLatch是一个同步工具类,用来协调多个线程之间的同步,或者说起到线程之间的通信(而不是用作互斥的作用)。 CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之…...
NLP中的对话机器人——预训练基准模型
引言 本文是七月在线《NLP中的对话机器人》的视频笔记,主要介绍FAQ问答型聊天机器人的实现。 场景二 上篇文章中我们解决了给定一个问题和一些回答,从中找到最佳回答的任务。 在场景二中,我们来实现: 给定新问题,从…...
C语言学习及复习笔记-【14】C文件读写
14 C文件读写 14.1打开文件 您可以使用 fopen( ) 函数来创建一个新的文件或者打开一个已有的文件,这个调用会初始化类型 FILE 的一个对象,类型 FILE包含了所有用来控制流的必要的信息。下面是这个函数调用的原型: FILE *fopen( const char…...
模拟退火算法优化灰色
clc; clear; close all; warning off; %% tic T01000; % 初始温度 Tend1e-3; % 终止温度 L200; % 各温度下的迭代次数(链长) q0.9; %降温速率 X[16.4700 96.1000 16.4700 94.4400 20.0900 92.5400 22.3900 93.3700 25.…...
Pandas怎么添加数据列删除列
Pandas怎么添加数据列 1、直接赋值 # 1、直接赋值df.loc[:, "最高气温"] df["最高气温"].str.replace("℃", "").astype("int32")df.loc[:, "最低气温"] df["最低气温"].str.replace("℃"…...
C++类和对象:构造函数和析构函数
目录 一. 类的六个默认成员函数 二. 构造函数 2.1 什么是构造函数 2.2 编译器自动生成的默认构造函数 2.3 构造函数的特性总结 三. 析构函数 3.1 什么是析构函数 3.2 编译器自动生成的析构函数 3.3 析构函数的特性总结 一. 类的六个默认成员函数 对于任意一个C类&…...
【Stata】从入门到精通.零基础小白必学的教程,一学就fei
视频教程移步:https://www.bilibili.com/video/BV1hK4y1d714/?p4&spm_id_frompageDriver&vd_sourcecc8074e9c81a225f214226065db53d32P3 第二讲 Stata处理数据全流程(上) P3 - 01:37内置数据 file example datasets使用…...
【RuoYi优化】调整JVM启动内存
📔 笔记介绍 大家好,千寻简笔记是一套全部开源的企业开发问题记录,毫无保留给个人及企业免费使用,我是作者星辰,笔记内容整理并发布,内容有误请指出,笔记源码已开源,前往Gitee搜索《chihiro-notes》,感谢您的阅读和关注。 作者各大平台直链: GitHub | Gitee | CSD…...
[架构模型]MVC模型详细介绍,并应用到unity中
简介: MVC模式是一种软件架构模式,它将应用程序分为三个主要部分:模型(Model)、视图(View)和控制器(Controller)。MVC模式的目标是实现应用程序的松耦合,以便…...
?? JavaScript 双问号(空值合并运算符)
?? JavaScript 双问号(空值合并运算符) 一、简述 在网上浏览 JavaScript 代码时或者学习其他代码时,可能会发现有的表达式用了两个问号(??)如下所示: let username; console.log(username ?? "Guest"…...
作业2.25----通过操作Cortex-A7核,串口输入相应的命令,控制LED灯进行工作
1.通过操作Cortex-A7核,串口输入相应的命令,控制LED灯进行工作 例如在串口输入led1on,开饭led1灯点亮 2.例如在串口输入led1off,开饭led1灯熄灭 3.例如在串口输入led2on,开饭led2灯点亮 4.例如在串口输入led2off,开饭led2灯熄灭 5.例如在串口输入led…...
0101基础概念-图-数据结构和算法(Java)
文章目录1 图1.1 定义1.2 4种图模型2 无向图2.1 定义2.2 术语后记1 图 1.1 定义 图是一种非线性的数据结构,表示多对多的关系。 图(Graph)是由顶点的有穷非空集合和顶点之间边的集合组成,通常表示为:G(V, E)…...
Linux基础命令和工具使用详解
Linux基础命令和工具使用详解一、grep搜索字符二、find查找文件三、ls 显示文件四、wc命令计算字数五、uptime机器启动时间负载六、ulimit用户资源七、curl http八、scp远程拷贝九、dos2unix和unix2dos十、sed 行处理10.1、简单模式10.2、替换模式十一、awk 列处理11.1、打印某…...
一个好的python文件可以有几种用途?
大家好鸭!我是小熊猫~ 这次来带大家浅浅回顾一点python小知识~ 源码资料电子书:点击此处跳转文末名片获取 python文件总共有两种用途: 一种是执行文件另一种是被当做模块导入 编写好的一个python文件可以有两种用途: 1. 脚本,…...
HDFS优化
单节点多块磁盘数据均衡 生成HDFS块均衡计划 hdfs diskbalancer -plan node1 执行均衡计划,node1.plan.json均衡计划文件 hdfs diskbalancer -execute node1.plan.json 查看当前均衡任务的执行情况 hdfs diskbalancer -query node1 取消均衡任务hdfs diskbalancer -cancel nod…...
行测-判断推理-图形推理-样式规律-黑白运算
黑白元素个数不同,优先考虑黑白运算白白白黑黑白黑白黑选A考试时,这种题不要先把规律全部推出来,再去做题,太慢了直接看要推的图,通过排除法选答案黑白元素个数不同,优先考虑黑白运算白白白黑黑白黑白黑选B…...
java+springboot+vue高校学生医疗保险管理系统
医保管理系统是对与职工健康息息相关的档案进行的系统化、自动化的管理,主要是对职工办理的医疗保险的管理,本系统能够很好的适应社会的需求,最大化的为城镇职工提供服务。医疗保险是国家社会保障体系的重要组成部分,也是社会保险…...
[已解决] AHK 映射 ESC 延迟 500 ms 的严重问题
问题描述 今天发现一个重大bug,我竟然用了一年多都不知道! CapsLock::Esc 我的 ahk 脚本将 capslock 映射为 esc,但这在vim环境中,估算响应 500ms。 也就说按下 caps 键,还要等一会,才进入normal模式 如果…...
QML state详解
1.state简介 changes(list<Change>):保存当前State下的多个Change对象,比如PropertyChanges、StateChangeScript、ParentChange等。 extend(string):表示该状态要在哪个State的基础上进行扩展,当一个…...
一起Talk Android吧(第五百零六回:如何调整组件在约束布局中的角度)
文章目录背景介绍相关属性使用方法示例程序各位看官们大家好,上一回中咱们说的例子是"如何调整组件在约束布局中的大小",这一回中咱们说的例子是"如何调整组件在约束布局中的角度"。闲话休提,言归正转, 让我们一起Talk A…...
微信投票-课后程序(JAVA基础案例教程-黑马程序员编著-第七章-课后作业)
【实验7-5】 微信投票 【任务介绍】 1.任务描述 如今微信聊天已经普及到几乎每一个人,在聊天中,经常会有人需要帮忙在某个APP中投票。本案例要求编写一个模拟微信投票的程序,通过在控制台输入指令,实现添加候选人、查看当前投票…...
duboo+zookeeper分布式架构入门
分布式 dubbo Zookeeper 分布式系统就是若干独立计算机的集合(并且这些计算机之间相互有关联,就像是一台计算机中的C盘F盘等),这些计算对于用户来说就是一个独立的系统。 zookeeper安装 下载地址:Index of /dist/z…...
黑盒测试用例设计方法-等价类划分法
目录 一、等价类的作用 二、等价类的分类 三、等价类的方法 四、等价类的原则 五、按照测试用例的完整性划分等价类 六、等价类步骤 七、案例 一、等价类的作用 为穷举测试设计测试点。 穷举:列出所有的可能情况,对其一一判断。 测试点&#x…...
4.OCR文本识别Connectionist Temporal Classification(CTC)算法
文章目录1.基础介绍2.Connectionist Temporal Classification(CTC)算法2.1 什么是Temporal Classification2.2 CTC问题描述2.2关于对齐2.3 前向后向算法2.4 推理时3.pytorch中的CTCLOSS参考资料欢迎访问个人网络日志🌹🌹知行空间🌹dz…...
误删了Ubuntu/Linux的一些默认用户目录怎么办?
用户目录:指位于 $HOME 下的一系列常用目录,例如 Documents,Downloads,Music,还有 Desktop等。本文不是讲如何恢复原有目录及其重要文件,适用于仅恢复目录功能一:仅恢复个别目录如误删了Desktop…...
网站开发建设收费标准/优化营商环境建议
本文共2900余字,预计阅读时间8分钟,本文知乎连接:Python操作Excel文件(0):盘点,本文同步发布于silaoA的博客和微信公众号平台。 关注学习了解更多的Cygwin、Linux、Python技术。目 录0x00 xlrd/…...
网站如何建立快捷方式/海淀区seo搜索引擎优化企业
本文翻译自:Android 8: Cleartext HTTP traffic not permittedI had reports from users with Android 8 that my app (that uses back-end feed) does not show content. 我收到来自Android 8用户的报告,称我的应用程序(使用后端供稿&#x…...
怎么自己做网站模板/seo技巧课程
傅里叶变换是信号分析中最重要的工具没有之一。对于一个复杂输入信号我们除了用单位冲击信号来分解方法以外,还可以将其分解为复指数信号,对于周期函数来讲这个分解成为傅里级数对于非周期信号称为傅里叶变换。分解是现代科学的最主要的方法,…...
flash网站管理系统/推广软件的渠道有哪些
【题目描述】 某个地区有n(n<1000)个犯罪团伙,当地警方按照他们的危险程度由高到低给他们编号为1-n,他们有些团伙之间有直接联系,但是任意两个团伙都可以通过直接或间接的方式联系,这样这里就形成了一个庞大的犯罪集团…...
做一网站/网上推销产品的软件
文章目录八、AR(p){\rm AR}(p)AR(p)序列与其自协方差函数1.AR(p){\rm AR}(p)AR(p)序列2.Yule-Walker方程3.自协方差函数的正定性回顾总结八、AR(p){\rm AR}(p)AR(p)序列与其自协方差函数 1.AR(p){\rm AR}(p)AR(p)序列 之前我们给出过AR(p){\rm AR}(p)AR(p)序列的定义…...
做推广自己找网站/新的营销方式有哪些
为何使用无线网络 WLAN与LAN的比较WLAN通过无线接入点(AP)代替以太网交换机将客户端连接到网络。WLAN连接的是通常由电池供电的移动设备,而不是接到电源插座上的LAN设备。WLAN使用的帧格式与有线以太网LAN不同。由于射频可以覆盖设备的外部,因此 WLAN会…...