详解 Spark Streaming 的 DStream 对象
一、DStream 的创建
1. 通过 RDD 队列
DStream 在内部实现上是一系列连续的 RDD 来表示。每个 RDD 包含有采集周期内的数据
/**
基本语法:StreamingContext.queueStream(queueOfRDDs: Queue, oneAtATime = false)
*/
object DStreamFromRddQueue {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("ds")val ssc = new StreamingContext(conf, Seconds(3))val queueOfRdds = mutable.Queue[RDD[Int]]()val ds = ssc.queueStream(queueOfRdds, oneAtATime = false)ds.print()ssc.start()// 向 RDD 队列中添加元素for(i <- 1 to 5) {queueOfRdds += ssc.sparkContext.makeRDD(1 to 300, 10)Thread.sleep(2000)}ssc.awaitTermination()}
}
2. 通过自定义数据源
通过继承 Receiver 抽象类,并实现 onStart、onStop 方法来自定义数据源采集
/**实现步骤:1.继承 Receiver[T]() 抽象类,定义泛型,并传递参数1.1 泛型是采集的数据类型1.2 传递的参数是存储级别,StorageLevel 中的枚举值2.实现 onStart、onStop 方法3.使用 receiverStream(receiver) 创建 DStream
*/
object DStreamFromDiy {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("ds")val ssc = new StreamingContext(conf, Seconds(3))// 使用自定义数据源采集数据val ds: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver())ds.print()ssc.start()ssc.awaitTermination()}
}// 自定义数据源采集
class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) {private val flag = true// 当 ssc.start() 调用后,启动一个独立的线程去采集数据override def onStart(): Unit = {new Thread(new Runnable(){override def run() {while(flag) {val data = "数据为:" + new Random().nextInt(10)// 将数据存储封装为 DStreamstore(data)Thread.sleep(500)}}}, "receiver").start()}// 停止数据采集override def onStop(): Unit = {flag = false}
}
3. 通过 Kafka 数据源
3.1 版本选型
- ReceiverAPI:需要一个专门的 Executor 去接收数据,然后发送给其他的 Executor 做计算。所以当接收数据的 Executor 和计算的 Executor 速度不同时,特别在接收数据的 Executor 速度大于计算的 Executor 速度时,会导致计算数据的节点内存溢出。(早期版本中提供此方式,当前版本不适用)
- DirectAPI:是由计算的 Executor 来主动接收消费 Kafka 的数据,速度由自身控制
3.2 实现
-
引入依赖
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.0.0</version> </dependency> <dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.10.1</version> </dependency> -
编码
/** 基本语法:使用 KafkaUtils 工具类的 createDirectStream[K, V] 方法连接 Kafka 创建 相关参数:1.StreamingContext:环境对象2.LocationStrategies:位置策略,PreferConsistent 表示自动匹配3.ConsumerStrategies:消费策略,Subscribe[K,V](Set(topic)) 订阅主题4.Map[String, Object]:Kafka 连接配置参数 */ object DStreamFromKafka {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("ds")val ssc = new StreamingContext(conf, Seconds(3))// 封装 Kafka 配置参数val kafkaConf: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux1:9092,linux2:9092,linux3:9092",ConsumerConfig.GROUP_ID_CONFIG -> "atguigu","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")// 创建 Kafka 数据源的 DStreamval ds: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("topic1")),kafkaConf)// 打印输出val data: DStream[String] = ds.map(_.value())data.print()ssc.start()ssc.awaitTermination()} } -
测试
- 启动 Zookeeper 和 Kafka 集群
- 运行程序 main 方法
- 向 Kafka 的主题中生产数据,并查看程序控制台输出
二、DStream 的转换
1. 无状态转换操作
无状态的操作只作用于一个采集周期的 RDD 中,不同采集周期的 RDD 之间的操作结果不会归约汇总
1.1 常见操作
/**
常见原语:map/flatMap/filter/repartition/reduceByKey/groupByKey
*/
object DStreamNoStateChange {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("ds")val ssc = new StreamingContext(conf, Seconds(3))val word = ssc.socketTextStream("localhost", 9999)val wordAsOne = line.map((_, 1))val wordCount = wordAsOne.reduceByKey(_ + _)wordCount.print()/*测试:在 cmd 窗口执行 nc -lp 999,然后分次输入 10 个 hello结果:由于采集周期为 3 秒,所以输出结果为多个 (hello, num),数量与采集周期个数一致,不同的采集周期结果是独立输出的*/ssc.start()ssc.awaitTermination()}
}
1.2 transform
/**
功能:可以将 DStream 中底层的 RDD 获取进行操作,可以扩展功能和实现周期性代码执行
基本语法:Dstream.transform(func: RDD => RDD): Dstream
*/
object DStreamTransform {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("ds")val ssc = new StreamingContext(conf, Seconds(3))val word = ssc.socketTextStream("localhost", 9999)word.transform(rdd => {// Driver端:此处的代码会周期性的执行,每个采集周期执行一次rdd.map(str => {// Executor 端str })})ssc.start()ssc.awaitTermination()}
}
1.3 join
/**
功能:对当前批次(采集周期)内的两个 DStream 中各自的 RDD 中相同的 key 进行 join,效果与两个 RDD 的 join 相同
基本语法:Dstream1.join(Dstream2)
*/
object DStreamTransform {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("ds")val ssc = new StreamingContext(conf, Seconds(3))val ds9999 = ssc.socketTextStream("localhost", 9999)val ds8888 = ssc.socketTextStream("localhost", 8888)val data: DStream[(String, (Int, Int))] = ds9999.map((_, 1)).join(ds888.map((_, 2)))data.print()ssc.start()ssc.awaitTermination()}
}
2. 有状态转换操作
有状态转换操作会将一个采集周期的结果(状态)保存到检查点,并且不断将下一个采集周期的结果(状态)更新保存到检查点中,最终输出所有采集周期归约汇总的结果
2.1 updateStateByKey
/**基本语法:DStream.updateStateByKey(func: (seq: Seq[T], op: Option[T]) => op)参数:1.seq 表示当前采集周期相同 key 的 Value 集合2.op 表示检查点中相同 key 的总 Value (Some 或 None)说明:1.使用 updateStateByKey 需要对检查点目录进行配置,会使用检查点来保存状态2.updateStateByKey会根据 key 对数据的状态进行更新
*/
object DStreamStateChange {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("ds")val ssc = new StreamingContext(conf, Seconds(3))// 必须设置检查点保存路径ssc.checkpoint("cp")val word = ssc.socketTextStream("localhost", 9999)val wordAsOne = line.map((_, 1))// val wordCount = wordAsOne.reduceByKey(_ + _)val wordCount = wordAsOne.updateStateByKey((seq: Seq[Int], op: Option[Int]) => {val sum = seq.sumval newVal = op.getOrElse(0) + sumOption(newVal)})wordCount.print()/*测试:在 cmd 窗口执行 nc -lp 999,然后分次输入 10 个 hello结果:最终的输出结果为 (hello, 10)*/ssc.start()ssc.awaitTermination()}
}
2.2 window 操作
/**基本语法:1.DStream.window(windowSize: Duration, step: Duration)参数:1.windowSize 表示窗口大小2.step 表示窗口滑动步长说明:1.窗口大小和步长必须为采集周期大小的整数倍2.步长默认为一个采集周期大小2.countByWindow(windowSize: Duration, step: Duration):统计滑动窗口计数流中的元素个数3.reduceByWindow(func, windowSize: Duration, step: Duration):通过自定义函数聚合滑动窗口流中的元素4.reduceByKeyAndWindow(func, windowSize: Duration, step: Duration, [numTasks]):通过自定义函数聚合滑动窗口流中相同 key 的 value5.reduceByKeyAndWindow(func, invFunc, windowSize: Duration, step: Duration, [numTasks])参数说明:1.func 表示窗口中相同 key 的聚合计算方式2.invFunc 表示删除在窗口滑动后不再存在的数据值
*/
object DStreamWindow {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("ds")val ssc = new StreamingContext(conf, Seconds(3))ssc.checkpoint("cp")val word = ssc.socketTextStream("localhost", 9999)val wordAsOne = line.map((_, 1))// val ds = wordAsOne.window(Seconds(6)) // 会有重复数据val ds = wordAsOne.window(Seconds(6), Seconds(6))val wordCount = ds.reduceByKey(_ + _)// 必须设置检查点保存路径val wordCount1 = wordAsOne.reduceByKeyAndWindow((x: Int, y: Int) => x + y,(x: Int, y: Int) => x - y,Seconds(6), Seconds(6))wordCount.print()// wordCount1.print()ssc.start()ssc.awaitTermination()}
}
三、DStream 的输出
SparkStreaming 也有惰性机制,执行输出操作才会触发所有 DStream 计算的执行
/**基本语法:1.print():将 DStream 输出到控制台,只有这个输出会带时间戳2.saveAsTextFiles(prefix, [suffix]):将 DStream 保存为 text 格式文件,每一批次的存储文件名基于参数中的 prefix 和 suffix (prefix-Time_IN_MS[.suffix])3.saveAsObjectFiles(prefix, [suffix]):以 Java 对象序列化的方式将 DStream 中的数据保存为SequenceFiles,每一批次的存储文件名为 "prefix-TIME_IN_MS[.suffix]"4.saveAsHadoopFiles(prefix, [suffix]):将 DStream 中的数据保存为 Hadoop files,每一批次的存储文件名为 "prefix-TIME_IN_MS[.suffix]"5.foreachRDD(func):最通用的输出操作,将函数 func 用于 DStream 的每一个 RDD,可以将 RDD 存入文件或者通过网络将其写入数据库说明:使用foreachRDD(func)把数据写到 MySQL 的外部数据库的注意事项:1.创建连接对象不能写在 driver 层面(因为所有的连接对象都不能序列化)2.如果写在 foreachRDD 中则每个 RDD 中的每一条数据都会创建连接,影响性能和资源;3.推荐使用 RDD 的 foreachPartition() 算子,在每个分区迭代中创建连接
*/
object DStreamOutput {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("ds")val ssc = new StreamingContext(conf, Seconds(3))val word = ssc.socketTextStream("localhost", 9999)val wordAsOne = line.map((_, 1))val wordCount = wordAsOne.reduceByKey(_ + _)// wordCount.print() // SparkStreaming 没有输出操作会报错wordCount.foreachRDD(rdd => {rdd.foreach(println)})ssc.start()ssc.awaitTermination()}
}
四、SparkStreaming 优雅的关闭
SparkStreaming 任务需要 7*24 小时执行,但是有时涉及到升级代码需要主动停止程序,而分布式程序没办法做到一个个进程去停止,所以需要使用第三方系统 (MySQL/Redis/Zookeepr/HDFS) 来控制内部程序关闭
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.streaming.{StreamingContext, StreamingContextState}class MonitorStop(ssc: StreamingContext) extends Runnable {override def run(): Unit = {val fs: FileSystem = FileSystem.get(new URI("hdfs://linux1:9000"), new Configuration(), "hello")while(true) {try{Thread.sleep(5000)} catch {case e: InterruptedException => e.printStackTrace()}val state: StreamingContextState = ssc.getStateval bool: Boolean = fs.exists(new Path("hdfs://linux1:9000/stopSpark"))if(bool) {if(state == StreamingContextState.ACTIVE) {// 优雅地关闭,停止接收新数据,并将已有的数据处理完后再关闭ssc.stop(stopSparkContext = true, stopGracefully = true)System.exit(0)}}}}
}import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}object SparkTest {def createSSC(): _root_.org.apache.spark.streaming.StreamingContext = {val update: (Seq[Int], Option[Int]) => Some[Int] = (values: Seq[Int], status:
Option[Int]) => {//当前批次内容的计算val sum: Int = values.sum//取出状态信息中上一次状态 val lastStatu: Int = status.getOrElse(0)Some(sum + lastStatu)}val sparkConf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("SparkTest")//设置优雅的关闭sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")val ssc = new StreamingContext(sparkConf, Seconds(5))ssc.checkpoint("./ck")val line: ReceiverInputDStream[String] = ssc.socketTextStream("linux1", 9999)val word: DStream[String] = line.flatMap(_.split(" "))val wordAndOne: DStream[(String, Int)] = word.map((_, 1))val wordAndCount: DStream[(String, Int)] = wordAndOne.updateStateByKey(update)wordAndCount.print()ssc}def main(args: Array[String]): Unit = {// 从检查点恢复数据val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck", () => createSSC())new Thread(new MonitorStop(ssc)).start()ssc.start()ssc.awaitTermination()}}
相关文章:
详解 Spark Streaming 的 DStream 对象
一、DStream 的创建 1. 通过 RDD 队列 DStream 在内部实现上是一系列连续的 RDD 来表示。每个 RDD 包含有采集周期内的数据 /** 基本语法:StreamingContext.queueStream(queueOfRDDs: Queue, oneAtATime false) */ object DStreamFromRddQueue {def main(args: Ar…...
QT常用控件
目录 1.控件概述 2. QWidget 核⼼属性 设置组件是否可用 获取组件当前位置和尺⼨ QWidget的图标 组件的透明度设置 QWidget光标的设置 字体的设置 组件提示 设置组件获取到焦点的策略 stylesheet样式表 3.常用组件 QPushButton RadioButton Check Box QLabel …...
如何解决chatgpt出现503 bad gateway的问题
昨日,ChatGPT官网挂了,也就是使用web网页端访问的用户,会出现 bad gateway 情况。我们去ChatGPT官方的监控查看,已经展示相关错误。 影响的范围有: 影响了 ChatGPT 所有计划的所有用户。影响包括所有与 ChatGPT 相关…...
Halcon 双相机标定与拼图(二)
一、概述 这种标定有两种模式,有一个标定板和多个标定板两种 一个标定板 两个相机的重叠区域比较大,那么我们可以把标定板放到那个重叠区域来统一坐标系,如下 这种是只需要一个标定板,这种是推荐的方式 。这种是比较简单的&…...
【加密与解密】【04】Java安全架构
JAVA安全模块划分 JCA,Java Cryptography Architecture,Java加密体系结构JCE,Java Cryptography Extension,Java加密扩展包JSSE,Java Secure Sockets Extension,Java安全套接字扩展包JAAS,Java…...
论文阅读:Neural Scene Flow Prior
目录 概要 Motivation 整体架构流程 技术细节 小结 论文地址:...
如何通过 6 种简单方法将照片从华为转移到 PC?
华为作为全球领先的智能手机供应商之一,最近推出了其自主研发的操作系统——HarmonyOS 2.0,旨在为智能手机、平板电脑和智能手表等设备提供更流畅的用户体验。随着Mate 40/P40等系列手机计划升级到HarmonyOS 2.0,用户可能需要将手机中的文件备…...
QtCharts使用
1.基础配置 1.QGraphicsView提升为QChartView#include <QtCharts> QT_CHARTS_USE_NAMESPACE #include "ui_widget.h"2. QT charts 2.柱状图 2.1QBarSeries //1.创建Qchart对象QChart *chart new QChart();chart->setTitle("直方图演示");//设…...
深入分析 Flink SQL 工作机制
摘要:本文整理自 Flink Forward 2020 全球在线会议中文精华版,由 Apache Flink PMC 伍翀(云邪)分享,社区志愿者陈婧敏(清樾)整理。旨在帮助大家更好地理解 Flink SQL 引擎的工作原理。文章主要分…...
Spring Bean参数校验Validator
Spring Bean参数校验Validator 以下2种方式可以用于所有的 Spring bean 不仅仅是 Controller 控制器。 一、原始类型参数 在控制器(或者其他Bean)上使用Validated注解。 控制器类 RestController RequestMapping("account") Validated pub…...
AOP案例
黑马程序员JavaWeb开发教程 文章目录 一、案例1.1 案例1.2 步骤1.2.1 准备1.2.2 编码 一、案例 1.1 案例 将之前案例中增、删、改相关节后的操作日志记录到数据库表中。 操作日志:日志信息包含:操作人、操作时间、执行方法的全类名、执行方法名、方法…...
Facebook海外户Facebook广告被暂停的原因
有很多伙伴在Facebook广告时,有时会遇到账号被暂停,并通知你违反了哪些规则,那么Facebook广告被暂停的原因有哪些呢?今天小编详细梳理了一些原因,可以往下看哦~ 您的Facebook广告被暂停可能有以下几个原因:…...
网站企业需要适用于什么服务器?
对于网站企业会选择什么样的服务器呢? 为了保证网站能够稳定的运行需要选择高可用性和可靠性的网站服务器,选择具备高可用性架构的云服务器供应商,能够提供多可用区部署、自动故障转移和备份恢复等功能,保障网站在各种故障情况下的…...
winscp无法上传,删除,修改文件并提示权限不够的分析
使用winscp删除文件,报了个错如下 根据这个错就去百度,网上大部分都是通过下面这种方法解决: 在winscp端进行设置 输入主机名(即IP地址)、用户名和密码,然后点击高级 在箭头所指位置输入sudo + sftp应用程序的路径 先查询 sudo find / -name sftp-server -print点击Sh…...
Hadoop3:MapReduce之InputFormat数据输入过程整体概览(0)
一、MapReduce中数据流向 二、MapTask并行度 1、原理概览 数据块:Block是HDFS物理上把数据分成一块一块。数据块是HDFS存储数据单位。 数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapRed…...
【Leetcode Python】70.爬楼梯
麻烦大家要自己去leetcode看题目 第一个思路 用递归会超时 return self.climbStairs(n - 1) self.climbStairs(n - 2)第二个思路 滚动数组思想 class Solution(object):def climbStairs(self, n):""":type n: int:rtype: int"""if(n<2)…...
深度学习 - 张量的广播机制和复杂运算
张量的广播机制(Broadcasting)是一种处理不同形状张量进行数学运算的方式。通过广播机制,PyTorch可以自动扩展较小的张量,使其与较大的张量形状兼容,从而进行元素级的运算。广播机制遵循以下规则: 如果张量…...
【CSS】will-change 属性详解
目录 基本语法属性值常见用途will-change 如何用于优化动画效果示例: will-change 是一个 CSS 属性,用于告诉浏览器某个元素在未来可能会发生哪些变化。这可以帮助浏览器优化渲染性能,提前做一些准备工作,从而提高性能。 基本语法…...
linux安装mysql后,配置mysql,并连接navicat软件
Xshell连接登陆服务器 输入全局命令 mysql -u root -p 回车后,输入密码,不显示输入的密码 注意mysql服务状态,是否运行等 修改配置文件my.cnf,这里没找到就找my.ini,指定有一个是对的 find / -name my.cnf 接下…...
【学习笔记】Axios、Promise
TypeScript 1、Axios 1.1、概述 1.2、axios 的基本使用 1.3、axios 的请求方式及对应的 API 1.4、axios 请求的响应结果结构 1.5、axios 常用配置选项 1.6、axios.create() 1.7、拦截器 1.8、取消请求2、Promise 2.1、封装 fs 读…...
在软件开发中正确使用MySQL日期时间类型的深度解析
在日常软件开发场景中,时间信息的存储是底层且核心的需求。从金融交易的精确记账时间、用户操作的行为日志,到供应链系统的物流节点时间戳,时间数据的准确性直接决定业务逻辑的可靠性。MySQL作为主流关系型数据库,其日期时间类型的…...
华为OD机试-食堂供餐-二分法
import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...
高危文件识别的常用算法:原理、应用与企业场景
高危文件识别的常用算法:原理、应用与企业场景 高危文件识别旨在检测可能导致安全威胁的文件,如包含恶意代码、敏感数据或欺诈内容的文档,在企业协同办公环境中(如Teams、Google Workspace)尤为重要。结合大模型技术&…...
【RockeMQ】第2节|RocketMQ快速实战以及核⼼概念详解(二)
升级Dledger高可用集群 一、主从架构的不足与Dledger的定位 主从架构缺陷 数据备份依赖Slave节点,但无自动故障转移能力,Master宕机后需人工切换,期间消息可能无法读取。Slave仅存储数据,无法主动升级为Master响应请求ÿ…...
MySQL中【正则表达式】用法
MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现(两者等价),用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例: 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...
select、poll、epoll 与 Reactor 模式
在高并发网络编程领域,高效处理大量连接和 I/O 事件是系统性能的关键。select、poll、epoll 作为 I/O 多路复用技术的代表,以及基于它们实现的 Reactor 模式,为开发者提供了强大的工具。本文将深入探讨这些技术的底层原理、优缺点。 一、I…...
浅谈不同二分算法的查找情况
二分算法原理比较简单,但是实际的算法模板却有很多,这一切都源于二分查找问题中的复杂情况和二分算法的边界处理,以下是博主对一些二分算法查找的情况分析。 需要说明的是,以下二分算法都是基于有序序列为升序有序的情况…...
Pinocchio 库详解及其在足式机器人上的应用
Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库,专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性,并提供了一个通用的框架&…...
【Go语言基础【12】】指针:声明、取地址、解引用
文章目录 零、概述:指针 vs. 引用(类比其他语言)一、指针基础概念二、指针声明与初始化三、指针操作符1. &:取地址(拿到内存地址)2. *:解引用(拿到值) 四、空指针&am…...
七、数据库的完整性
七、数据库的完整性 主要内容 7.1 数据库的完整性概述 7.2 实体完整性 7.3 参照完整性 7.4 用户定义的完整性 7.5 触发器 7.6 SQL Server中数据库完整性的实现 7.7 小结 7.1 数据库的完整性概述 数据库完整性的含义 正确性 指数据的合法性 有效性 指数据是否属于所定…...
