大数据——Spark Streaming
是什么
Spark Streaming是一个可扩展、高吞吐、具有容错性的流式计算框架。
之前我们接触的spark-core和spark-sql都是离线批处理任务,每天定时处理数据,对于数据的实时性要求不高,一般都是T+1的。但在企业任务中存在很多的实时性的任务需求,列如双十一的京东阿里都会要求做一个实时的数据大屏,显示实时订单。

实时计算框架对比
| 框架类别 | 框架类型 | 数据单位 | 其他 | 吞吐量 | 延迟 |
|---|---|---|---|---|---|
| Storm | 流式计算框架 | record的处理数据单位 | 支持micro-batch方式 | 一般 | 更低 |
| Spark | 批处理计算框架 | RDD处理数据单位 | 支持micro-batch流式处理数据 | 更强 | 一般 |
Spark Streaming组件
- Streaming Context
- 一个Context启动,则不能有新的DStream建立或者添加;
- 一个Context停止,不能重新启动;
- 在JVM中,只能有一个Streaming Context活跃;一个Spark Context会创建一个Streaming Context;
- Streaming Context上调用stop方法,SparkContext也会关闭,如果只想关闭Streaming Context,可以设置stop()方法里的false参数;
- 一个SparkContext对象可以重复创建多个Streaming Context对象,但每次只能运行一个,即需要关闭一个再开下一个。
- DStream
- 表示一个连续的数据流;
- DStream内部是由一系列的RDD组成;
- DStream中的每个RDD都有确定时间间隔内的数据;
- 对DStream的操作都转换成对DStream隐含的RDD操作;
- 数据源:
| 数据源 | 类型 |
|---|---|
| 基本源 | TCP/IP or FileSystem |
| 高级源 | Kafka or Flume |
Spark Streaming编码步骤
import os
# 配置spark driver和pyspark运⾏时,所使⽤的python解释器路径
PYSPARK_PYTHON = "/miniconda2/envs/py365/bin/python"
JAVA_HOME='/root/bigdata/jdk'
SPARK_HOME = "/root/bigdata/spark"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME
from pyspark import SparkContext
from pyspark.streaming import StreamingContextif __name__ == "__main__":sc = SparkContext("local[2]",appName="NetworkWordCount")#参数2:指定执⾏计算的时间间隔ssc = StreamingContext(sc, 1)#监听ip,端⼝上的上的数据lines = ssc.socketTextStream('localhost',9999)#将数据按空格进⾏拆分为多个单词words = lines.flatMap(lambda line: line.split(" "))#将单词转换为(单词,1)的形式pairs = words.map(lambda word:(word,1))#统计单词个数wordCounts = pairs.reduceByKey(lambda x,y:x+y)#打印结果信息,会使得前⾯的transformation操作执⾏wordCounts.pprint()#启动StreamingContextssc.start()#等待计算结束ssc.awaitTermination()
Spark Streaming状态操作
Spark Streaming存在两种状态操作:UpdateStateByKey和Window操作。
- updateStateByKey
如果没有updateStateByKey,我们需要将每一秒的数据计算好放入mysql中,再用mysql进行计算,而updateStateByKey将每隔一段数据进行打包,封装成RDD,这样每个时间片段的数据之间是没有关联的。一般为以下步骤:
- ⾸先,要定义⼀个state,可以是任意的数据类型
- 其次,要定义state更新函数–指定⼀个函数如何使⽤之前的state和新值来更新state
- 对于每个batch,Spark都会为每个之前已经存在的key去应⽤⼀次state更新函数,⽆论这个key在batch中是否有新的数据。如果state更新函数返回none,那么key对应的state就会被删除
- 对于每个新出现的key,也会执⾏state更新函数
- Window

Window操作是基于窗⼝⻓度和滑动间隔来⼯作的;窗⼝的⻓度控制考虑前⼏批次数据量;默认为批处理的滑动间隔来确定计算结果的频率。
窗口长度L是运算的数据量;
滑动间隔G是控制每隔多长时间做一次运算。
相关文章:
大数据——Spark Streaming
是什么 Spark Streaming是一个可扩展、高吞吐、具有容错性的流式计算框架。 之前我们接触的spark-core和spark-sql都是离线批处理任务,每天定时处理数据,对于数据的实时性要求不高,一般都是T1的。但在企业任务中存在很多的实时性的任务需求&…...
graphviz 绘制二叉树
代码 digraph BalancedBinaryTree {node [fontname"Arial", shapecircle, stylefilled, color"#ffffff", fillcolor"#0077be", fontsize12, width0.7, height0.7];edge [fontname"Arial", fontsize10, color"#333333", arr…...
STM32 PA15/JTDI 用作普通IO,烧录口不能使用问题解决
我们一般用SW调试接口 所以DEBUG选择Serial Wire 这样PA15可以用作普通IO使用。 工程中默认加上: PA13(JTMS/SWDIO).ModeSerial_Wire PA13(JTMS/SWDIO).SignalDEBUG_JTMS-SWDIO PA14(JTCK/SWCLK).ModeSerial_Wire PA14(JTCK/SWCLK).SignalDEBUG_JTCK-SWCLK...
【ARM Coresight 系列文章 9 -- ETM 介绍 1】
文章目录 ARM Coresight ETM 介绍1.1.1 ARM Coresight ETM 版本介绍1.1.2 ARM Coresight 常见术语1.2 ARM Coresight ETM 常用寄存器介绍1.2.1 TRCVIIECTLR(ViewInst Include-Exclude Control Register)1.2.2 TRCVISSCTLR(ViewInst Start/Stop Processing Element Comparator C…...
设计模式 - 中介者模式
目录 一. 前言 二. 实现 三. 优缺点 一. 前言 中介者模式又叫调停模式,定义一个中介角色来封装一系列对象之间的交互,使原有对象之间的耦合松散,且可以独立地改变它们之间的交互。 中介者模式可以使对象之间的关系数量急剧减少࿰…...
HttpServletRequest对象与RequestDispatcher对象
一、HttpServletRequest对象 1.介绍 在Servlet API中,定义了一个HttpServletRequest接口,它继承自ServletRequest接口,专门用来封装HTTP请求消息。由于HTTP请求消息分为请求行、请求消息头和请求消息体三部分,因此,在…...
Spring Boot启动流程
加载启动类:加了SpringBootApplication的启动类的main 方法中,通过运行SpringApplication.run()方法启动 【SpringBootApplication是由EnableAutoConfiguration(导入自动配置AutoConfigurationSelector类从而加载加了Configuration的配置&am…...
ARM day5
三盏灯流水 .text .global _start _start: 1.LDR R0,0X50000A28LDR R1,[R0]ORR R1,R1,#(0X1<<4)STR R1,[R0] 1.LDR R0,0X50000A28LDR R1,[R0]ORR R1,R1,#(0X1<<5)STR R1,[R0] 2.LDR R0,0X50006000LDR R1,[R0]BIC R1,R1,#(0X3<<20)ORR R1,R1,#(0X1<<…...
流程引擎概述及组成
流程引擎概述及组成 一、流程引擎概述 流程,可以理解为步骤,一个有序的活动或动作; 引擎,可以理解为驱动,是一个程序或者一套系统。 所以,字面意思可以理解为,流程引擎是一套(或…...
定时任务Apscheduler实践案例
定时任务Apscheduler实践案例 参考文章 https://blog.csdn.net/weixin_44799217/article/details/127353134 实现案例 本案例是使用定时任务apscheduler实现的每个三分钟发送一次邮件的任务 实现代码 import time from apscheduler.schedulers.blocking import BlockingSched…...
C#学习系列相关之多线程(五)----线程池ThreadPool用法
一、线程池的作用 线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管…...
京东数据接口|电商运营中数据分析的重要性
在电商运营中,数据分析是非常重要的一环,它可以帮助电商企业更好地了解市场、了解消费者、了解产品、了解销售渠道等各种信息,从而制定更为科学有效的运营策略,提高销售效益。 数据方面用户可以直接选择使用数据接口来获取&#…...
C++入门(1)
目录 1.C关键字2.命名空间(namespace)2.1是什么2.2为什么2.3怎么用 3.C输入&输出4.缺省函数概念分类 5.函数重载6.引用6.1概念6.2特性6.3使用场景6.4引用和指针的不同点 1.C关键字 C总共有63个关键字 这里入门不多说,有需要的自行去了解 2.命名空间(namespac…...
redis-6.2.7 集群安装3主3从
因为资源有限准备了3 台 服务器,先查看防火墙的端口是否开放,如果没有开放先开放端口我使用的 6379 和 6380 这两个端口 所以将这两个端口放开。去redis 官网下载redis 安装包。下载地址 : redis 安装包下载 3. 安装redis 上传上去之后 3 台…...
【动态库】Ubuntu 添加动态库的搜索路径
在运行程序时,经常遇到下面这种动态库加载失败的情况,这时往往是系统在动态库的搜索路径下没有找到对应的库文件导致的。 目录 一、使用 LD_LIBRARY_PATH 二、修改 /etc/ld.so.conf 一、使用 LD_LIBRARY_PATH 环境变量 LD_LIBRARY_PATH是动态库的搜索…...
95740-26-4|用于体内DNA合成的探针F-ara-EdU
产品简介:(2S)-2-Deoxy-2-fluoro-5-ethynyluridine,一种用于体内DNA合成的探针,其毒性比EdU和BrdU都小。当需要延长细胞存活时间和不受干扰的细胞周期进展时,非常适合进行代谢DNA标记。 CAS号:95740-26-4 分子式&…...
Ajax使用流程
Ajax在不刷新页面的情况下,进行页面局部更新。 Ajax使用流程: 创建XmlHttpReqeust对象发送Ajax请求处理服务器响应 1. 创建XmlHttpReqeust对象 XmlHttpReqeust对象是Ajax的核心,使用该对象发起请求,接收响应 不同的浏览器创建…...
1808_ChibiOS基本的架构介绍
全部学习汇总: GreyZhang/g_ChibiOS: I found a new RTOS called ChibiOS and it seems interesting! (github.com) 简单看了一下ChibiOS的架构介绍,感觉这种OS以及组件非常适合快速构建一个应用。这里做一个简单的资料整理。。 1. 不同于其他的OS&#…...
曦力音视频转换工具Xilisoft Video Converter Ultimate mac中文版
Xilisoft Video Converter Ultimate mac是一款功能强大的视频转换软件,它可以将几乎所有流行的视频格式转换为其他格式,包括AVI、MPEG、WMV、DivX、MP4、H.264/AVC、AVCHD、MKV、RM、MOV、XviD、3GP等。此外,它还支持将视频转换为音频格式&am…...
Spring MVC 五:DispatcherServlet初始化之 mvc:annotation-driven
通过xml方式初始化DispatcherServlet时,xml文件中可以配置: <mvc:annotation-driven />或: <mvc:annotation-driven ><!--设置响应输出字符集--><mvc:message-converters><bean class"org.springframework.…...
多模态2025:技术路线“神仙打架”,视频生成冲上云霄
文|魏琳华 编|王一粟 一场大会,聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中,汇集了学界、创业公司和大厂等三方的热门选手,关于多模态的集中讨论达到了前所未有的热度。其中,…...
Leetcode 3576. Transform Array to All Equal Elements
Leetcode 3576. Transform Array to All Equal Elements 1. 解题思路2. 代码实现 题目链接:3576. Transform Array to All Equal Elements 1. 解题思路 这一题思路上就是分别考察一下是否能将其转化为全1或者全-1数组即可。 至于每一种情况是否可以达到…...
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以?
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以? 在 Golang 的面试中,map 类型的使用是一个常见的考点,其中对 key 类型的合法性 是一道常被提及的基础却很容易被忽视的问题。本文将带你深入理解 Golang 中…...
使用分级同态加密防御梯度泄漏
抽象 联邦学习 (FL) 支持跨分布式客户端进行协作模型训练,而无需共享原始数据,这使其成为在互联和自动驾驶汽车 (CAV) 等领域保护隐私的机器学习的一种很有前途的方法。然而,最近的研究表明&…...
CocosCreator 之 JavaScript/TypeScript和Java的相互交互
引擎版本: 3.8.1 语言: JavaScript/TypeScript、C、Java 环境:Window 参考:Java原生反射机制 您好,我是鹤九日! 回顾 在上篇文章中:CocosCreator Android项目接入UnityAds 广告SDK。 我们简单讲…...
大模型多显卡多服务器并行计算方法与实践指南
一、分布式训练概述 大规模语言模型的训练通常需要分布式计算技术,以解决单机资源不足的问题。分布式训练主要分为两种模式: 数据并行:将数据分片到不同设备,每个设备拥有完整的模型副本 模型并行:将模型分割到不同设备,每个设备处理部分模型计算 现代大模型训练通常结合…...
关于 WASM:1. WASM 基础原理
一、WASM 简介 1.1 WebAssembly 是什么? WebAssembly(WASM) 是一种能在现代浏览器中高效运行的二进制指令格式,它不是传统的编程语言,而是一种 低级字节码格式,可由高级语言(如 C、C、Rust&am…...
Redis的发布订阅模式与专业的 MQ(如 Kafka, RabbitMQ)相比,优缺点是什么?适用于哪些场景?
Redis 的发布订阅(Pub/Sub)模式与专业的 MQ(Message Queue)如 Kafka、RabbitMQ 进行比较,核心的权衡点在于:简单与速度 vs. 可靠与功能。 下面我们详细展开对比。 Redis Pub/Sub 的核心特点 它是一个发后…...
音视频——I2S 协议详解
I2S 协议详解 I2S (Inter-IC Sound) 协议是一种串行总线协议,专门用于在数字音频设备之间传输数字音频数据。它由飞利浦(Philips)公司开发,以其简单、高效和广泛的兼容性而闻名。 1. 信号线 I2S 协议通常使用三根或四根信号线&a…...
JS手写代码篇----使用Promise封装AJAX请求
15、使用Promise封装AJAX请求 promise就有reject和resolve了,就不必写成功和失败的回调函数了 const BASEURL ./手写ajax/test.jsonfunction promiseAjax() {return new Promise((resolve, reject) > {const xhr new XMLHttpRequest();xhr.open("get&quo…...
