大数据技术架构(组件)35——Spark:Spark Streaming(1)
2.3、Spark Streaming
2.3.0、Overview
Spark Streaming 是核心 Spark API 的扩展,它支持实时数据流的可扩展、高吞吐量、容错流处理。数据可以从许多来源(如 Kafka、Kinesis 或 TCP 套接字)获取,并且可以使用复杂的算法进行处理,这些算法由 map、reduce、join 和 window 等高级函数表示。最后,可以将处理后的数据推送到文件系统、数据库和实时仪表板。当然也可以在数据流上应用机器学习和图处理。
工作原理如下:Spark Streaming 接收实时输入的数据流,并将数据分成批处理,然后由 Spark 引擎处理以批处理生成最终的结果流。其中SparkStreaming提供了一种离散流或DStream的高级抽象来代表一个连续的数据流,底层就是由一系列RDD来表示。
DStream 中的每个 RDD 都包含来自某个区间的数据,如下图:
2.3.0.1、Example
import org.apache.spark._
import org.apache .spark.streaming._
import org.apache.spark.streaming.StreamingContext_ // not necessary since Spark 1.3
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new streamingContext(conf, Seconds(1))
// Create a Dstream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextstream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(”"))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this Dstream to the console
wordCounts.print()
ssc.start() // start the computation
ssc.awaitTermination() // Wait for the computation to terminate
如上面的demo所示,每个输入流都会和一个Receiver对象相关联,该对象用来接收数据并将其存储在Spark内存中进行下一步的处理。因此如果你想要在流应用程序中并行接收多个数据流的话,那么就得需要创建多个Receiver对象用来接收数据。同时也需要记住的是SparkStreaming应用程序是属于常驻的,而且也是Spark程序,那么Worker/Executor也会占用一部分资源,所以为了能够保障运行Receiver以及正常处理数据,那么就需要申请到足够的资源,所以其分配的核数一定要大于receivers的个数。
2.3.0.2、Points To Remember
1、一旦Context启动之后,就不能增加或者设置新的流计算
2、一旦Context停止后,就无法重新启动。这里说的是容错方面。
3、同一时间一个JVM内只能有一个StreamingContext。
4、在StreamingContext上调用stop()方法,同时也会把SparkContext给停止;如果只是想停止StreamingContext,那么可以在调用stop()方法的时候指定stopSparkContext=false。
5、一个SparkContext可以被复用创建多个StreamingContext(即在下一个StreamingContext被创建之前停止上一个StreamingContext,且不停止SparkContext)
2.3.1、Receiver
SparkStreaming可以从任意的数据源来接收数据并处理,目前内置的数据源包括Kafka、File、Socket等等。当然目前Spark内置支持的数据源可以满足日常大部分的场景,但有些时候仍然需要自定义Receiver来定制接收数据源。这小节将来讲述如何实现一个自定义的Receiver。首先要继承Receiver,然后重写onStart和onStop方法。onStart()方法会在启动的时候负责接收数据;onStop()方法将停止这些接收数据的线程,当然还可以使用isStopped()方法来检查它们是否停止接收数据。
在 Spark Streaming 中,当一个 Receiver 启动时,每隔 spark.streaming.blockInterval 毫秒就会产生一个新的块,每个块都会变成 RDD 的一个分区,最终由 DStream 创建。例如,由 KafkaInputDStream 创建的 RDD 中的分区数由 batchInterval / spark.streaming.blockInterval 确定,其中 batchInterval 是将流数据分成批次的时间间隔(通过 StreamingContext 的构造函数参数设置)。例如,如果批处理间隔为 2 秒(默认),块间隔为 200 毫秒(默认),则RDD 将包含 10 个分区,还有一个流程路径涉及从迭代器接收数据,由 ReceivedBlockHandler 表示。创建 RDD 后,驱动程序的 JobScheduler 可以将其处理安排为作业。在 Spark Streaming 的当前实现和默认配置下,任何时间点只有一个作业处于活动状态(即正在执行)。因此,如果一个批次的处理时间比批次间隔长,那么下一个批次的作业将保持排队,将其设置为 1 的原因是并发作业可能会导致奇怪的资源共享,并且可能难以调试系统中是否有足够的资源来足够快地处理摄取的数据,当然可以通过实验性 Spark 属性 spark.streaming.concurrentJobs 进行更改,默认情况下设置为 1。一次只运行一个作业,不难看出,如果批处理时间小于批处理间隔,那么系统将是稳定的。
Receiver一旦接收到数据后,那么就会调用store(data)方法进行存储,这里有两种处理方式来保障Receiver是否可靠:
1、来一条存储一条,这种可靠性较差
2、存储整个对象/序列化集合。(阻塞的方式存储)
其自定义实现store()方法会影响到整体的容错和可靠。当应用程序发生了异常时应该要有捕获机制,并要有重试机制。
如果应用程序发生重启的时候,那么会调用Receiver类中的restart()方法,其内部会异步调用onStop方法并隔一定延迟后调用onStart()方法完成重启动作。
public class JavaCustomReceiver extends Receiver<String> {String host = null;int port = -1;public JavaCustomReceiver(String host_ , int port_) {super(storageLevel.MEMORY_AND_DISK_2());host = host_;port = port_;}@Overridepublic void onstart() {// Start the thread that receives data over a connectionnew Thread(this::receive).start();}@overridepublic void onstop() {// There is nothing much to do as the thread calling receive()// is designed to stop by itself if isStopped() returns false}/** Create a socket connection and receive data until receiver is stopped */private void receive() {Socket socket = nul1;String userInput = null;try {// connect to the serversocket = new Socket(host, port);BufferedReader reader = new BufferedReader(new InputstreamReader(socket.getInputstream(), StandardCharsets.UTF 8))// Until stopped or connection broken continue readingwhile (!isStopped() && (userInput = reader.readLine()) != null) {System.out.println("Received data "" + userInput + "");store(userInput);}reader.close();socket.close();// Restart in an attempt to connect again when server is active againrestart("Trying to connect again");} catch(ConnectException ce) {// restart if could not connect to serverrestart("Could not connect", ce);} catch(Throwable t) f// restart if there is any other errorrestart("Error receiving data", t);}}
}// 调用自定义Receiver:
// Assuming ssc is the JavastreamingContext
JavaDStream<String> customReceiverstream = ssc.receiverstream(new JavaCustomReceiver(host, port));
JavaDstream<String> words = customReceiverstream.flatMap(s -> ...);
...
相关文章:
大数据技术架构(组件)35——Spark:Spark Streaming(1)
2.3、Spark Streaming2.3.0、OverviewSpark Streaming 是核心 Spark API 的扩展,它支持实时数据流的可扩展、高吞吐量、容错流处理。数据可以从许多来源(如 Kafka、Kinesis 或 TCP 套接字)获取,并且可以使用复杂的算法进行处理&am…...
实现超大文件上传逻辑
引言 文件上传功能是我们开发中经常会遇到的功能点,当日常开发中遇到小文件(比如:头像),可以直接将文件转为字节流直接上传到服务器上即可。但是当遇到大文件这种(比如:一部电影至少1个G)该怎么…...
JavaScript HTML DOM EventListener
JavaScript HTML DOM EventListener 是一个非常重要的概念,在前端开发中被广泛使用。它是用来监听 HTML DOM 上的事件,并执行特定的代码块。 EventListener 的语法非常简单,下面是一个示例代码: element.addEventListener("…...
构建RFID系统的重要组成部分
RFID读写设备,通常被用来扫描读取安装了RFID电子标签的目标物品,能实现快速批量无接触读写,是构建RFID系统的重要组成部分。RFID读写设备,通常有固定式读写设备和可移动读写设备两种。下面来了解一下RFID的特点,RFID系…...
PID控制算法简介
目录 1 简介 2 比例Proportional 3 积分Integral 4 微分Differential 5 公式 6 积分限幅 7 积分限行 8 相关代码 1 简介 PID控制中有P、I、D三个参数,PID即:Proportional(比例)、Integral(积分&#…...
【王道数据结构】第八章 | 排序
目录 8.1. 排序的基本概念 8.2. 插入排序 8.2.1. 直接插入排序 8.2.2. 折半插入排序 8.2.3. 希尔排序 8.3. 交换排序 8.3.1. 冒泡排序 8.3.2. 快速排序 8.4. 选择排序 8.4.1. 简单选择排序 8.4.2. 堆排序 8.5. 归并排序和基数排序 8.5.2. 基数排序 8.1. 排序的基本概念 排…...
95后外贸SOHO,年入7位数,他究竟是怎么做的?
外贸SOHO,一年到底能挣多少钱?有人说:“勤勤恳恳,年薪也就十来万吧”;也有人说:“100万而已我早就已经挣到了”;还有人说:“谁说新手难出头?我做跨境半年赚200万…...
2023年全国最新消防设施操作员精选真题及答案
百分百题库提供消防设施操作员考试试题、消防设施操作员考试预测题、消防设施操作员考试真题、消防设施操作员证考试题库等,提供在线做题刷题,在线模拟考试,助你考试轻松过关。 一、多选题 15、以下符合电气火灾监控系统监控设备的安装要求的有:( ) A、…...
mysql 无需修改配置文件,即可改变表数据存储位置
由于Linux系统的mysql 默认数据存储在/var/lib/mysql路径下,而该路径装系统时默认大小仅50G,当我们的数据稍微大一点时就会把该空间占满,无法再插入数据。 针对该问题有两种解决办法: 1、修改/etc/my.cnf配置文件,重启…...
轻松解决Session-Cookie 鉴权(含坑)附代码
Session-Cookie 鉴权 cookie介绍 Cookie 存储在客户端,可随意篡改,不安全有大小限制,最大为 4kb有数量限制,一般一个浏览器对于一个网站只能存不超过 20 个 Cookie,浏览器一般只允许存放 300个 CookieCookie 是不可跨…...
pyinstaller使用详细
目录常用命令spec文件配置报错常用命令 pyinstaller -D xxx.py //打包生成目录(director)pyinstaller -F xxx.py//打包生成单个exe文件pyinstaller xxx.spec //根据现有的spec文件进行打包运行以上命令之一后会生成build、dist文件夹以及xxx.spec文件&a…...
java -数据结构,List相关基础知识,ArrayList的基本使用,泛型的简单、包装类介绍
一、 预备知识-泛型(Generic) 1.1、泛型的引入 比如:我们实现一个简单的顺序表 class MyArrayList{public int[] elem;public int usedSize;public MyArrayList(){this.elem new int[10];}public void add(int key){this.elem[usedSize] key;usedSize;}public …...
RabbitMQ学习总结(10)—— RabbitMQ如何保证消息的可靠性
一、丢失场景 RabbitMQ丢失的以下3种情况: (1)生产者:生产者发送消息至MQ的数据丢失...
购物车案例【版本为vue3】
前言: 首先我们要明白整个购物车的组成。它是由一个主页面加两个组件组合成的。本章主要运用父子之间的通讯: 父传子 子传父 首先新建一个vue3项目,这里有俩种创建方式: vue-cli : ● 输入安装指令 npm init vuelates…...
Multisim14 安装包及安装教程
Multisim14 安装教程 Multisim14下载地址:Kevin的学习站–安装包下载地址 Multisim14 简介: Multisim 14 是美国国家仪器有限公司(National Instrument,NI)推出的以 Windows 为基础、符合工业标准的、具有 SPICE 最佳仿…...
Java实现简单的图书管理系统源码+论文
简单图书管理系统设计(文末附带源码论文) 为图书管理人员编写一个图书管理系统,图书管理系统的设计主要是实现对图书的管理和相关操作,包括3个表: 图书信息表——存储图书的基本信息,包括书号、书名、作者…...
前端调试2
一、用chrome调试(node.js)例:const fs require(fs/promises);(async function() {const fileContent await fs.readFile(./package.json, {encoding: utf-8});await fs.writeFile(./package2.json, fileContent); })();1.先 node index.js 跑一下:2.然…...
AlphaFold 2 处理蛋白质折叠问题
蛋白质是一个较长的氨基酸序列,比如100个氨基酸的规模,如此长的氨基酸序列连在一起是不稳定的,它们会卷在一起,形成一个独特的3D结构,这个3D结构的形状决定了蛋白质的功能。 蛋白质结构预测(蛋白质折叠问题…...
问卷调查会遇到哪些问题?怎么解决?
提到问卷调查我们并不陌生,它经常被用作调查市场、观察某类群体的行为特征等多种调查中。通过问卷调查得出的数据能够非常真实反映出是市场的现状和变化趋势,所以大家经常使用这个方法进行调查研究。不过,很多人在进行问卷调查的时候也会遇到…...
量化选股——基于动量因子的行业风格轮动策略(第1部分—因子测算)
文章目录动量因子与行业轮动概述动量因子的理解投资视角下的行业轮动现象投资者视角与奈特不确定性动量因子在行业风格上的效果测算动量因子效果测算流程概述1. 行业选择:申万一级行业2. 动量因子选择:阿隆指标(Aroon)3. 测算方法…...
工作常用git命令
修改hard:git reset --hard md5git push -f合并多次commitsgit rebase -i HEAD~4git push -f冲突文件被覆盖冲突文件被覆盖了,可以用git checkout commitId /path来快速把一个或一些文件还原会之前的提交,重新commit ,merge一次删除分支git b…...
test3
数据链路层故障分析 一、网桥故障 a.主要用途简述 网桥作为一种桥接器,可以连接两个局域网。工作在数据链路层,是早期的两端口二层网络设备。可将一个大的VLAN分割为多个网段,或者将两个以上的LAN互联为一个逻辑LAN,使得LAN上的…...
领证啦,立抵3600,软考证书到手后还有很多作用
2022年下半年软考合格证书发放在2023年2月-3月进行,目前已有多个省市开始发证了,比如上海、江苏、辽宁、浙江、山东等地。还没收到领证通知的考生也不要着急,可以关注当地软考办通知。 拿到证书的朋友可以去申请入户,职称评聘&am…...
响应式布局之viewport-超级简单
之前文章CSS布局之详解_故里2130的博客-CSDN博客 上面的文章可以实现响应式布局,根据浏览器的大小变化而变化,但是相对于viewport来说,之前的还是有点复杂,而使用viewport更加的简单。 当我们使用amfe-flexible的时候࿰…...
分布式计算考试资料
第一章 分布式系统的定义 分布式系统是一个其硬件或软件组件分布在连网的计算机上,组件之间通过传递信息进行通信和动作协调的系统。分布式系统的目标 资源共享(resource sharing) 一些计算机通过网络连接起来,并在这个范围内有效地共享资源。 硬件的共…...
Java修饰符和运算符,超详细整理,适合新手入门
目录 一、访问控制修饰符 1、访问权限 二、运算符 1、算术运算符 2、关系运算符 3、逻辑运算符 4、赋值运算符 5、三元运算符 一、访问控制修饰符 Java 支持 4 种不同的访问权限: private 私有的 protected 受保护的 public 公共的 default 默认 1、…...
软件功能测试包含了哪些测试项目?功能测试报告收费标准
一、软件功能测试是什么? 软件功能测试是测试人员通过执行功能测试用例逐步验证软件产品各项功能是否达到预期需求的测试过程。也是俗称的“点点点测试”,这是基础性的测试类型,软件产品的功能直接影响到用户体验,所以软件功能测试意义重大…...
Netty 学习笔记——概念篇
Netty Home Netty GitHub Netty简介 Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。 也就是说,Netty 是一个…...
元宇宙开始告别以资本为主导的野蛮生长,新的竞争格局和态势将形成
欲要成为这样一场洗牌的胜利者,元宇宙的玩家需要真正站在商业的角度,而非资本市场的角度来看待元宇宙,来寻找元宇宙的正确的发展模式和方法。原因在于,在这样一场洗牌过程当中,仅仅只是对于以往以资本为主导的发展模式…...
MySQL 5:MySQL视图
View(视图)是一个不存在的虚拟表。 其实质是根据SQL语句获取动态数据集并命名。 用户只需要使用视图名就可以获取结果集,并作为表来使用。数据库中只存储了视图的定义,不存储视图中的数据。 这些数据存储在原始表中。当使用视图查…...
涪陵做网站/百度助手官网
织梦升级到5.7版本后,使用时常遇到登录后台空白的情况,下面给大家介绍一个屡试不爽的好方法。 问题 织梦dede后台登录之后全部空白 解决方法 找到include/userlogin.class.php文件 大约288行左右 代码修改如下 把开头的六行代码注释掉即可 functio…...
深圳市做网站/广州市新闻发布
引言 一个小任务,拆成1000份。如果以前1个人需要干1000小时,现在1000个人1小时就干完了。 对,并行计算很简单,这个就是云计算的基础。 1. 如何通俗地理解云计算? 1. 什么是云计算? - 拖雷的回答 - 知乎(用公…...
虾皮跨境电商靠谱吗/天津seo技术教程
最近几个月,我一直在使用带有Sandbox凭据的Paypal Merchant SDK,并且一切正常.但是今天,我面临着这个奇怪的问题,每当我对SetExpressCheckout进行API调用时,由于内部错误,错误代码为10001,我都会导致事务失败.我没有更改代码中的任何内容,它刚刚开始显示.我使用github api进行交…...
兰州一氧化碳/优化大师的优化项目有哪7个
InfoQ记者独家获悉:阿里巴巴宣布重磅开源OpenJDK长期支持版本Alibaba Dragonwell。众所周知,Oracle对Java 的策略已经发生系列转变,由于Java 用户群体庞大,导致此事在很长一段时间内都是开发者的讨论焦点。作为世界上最大的Java用…...
西安疫情最新消息今天封城/苏州排名搜索优化
Linux下,如何将一个乱码的文件进行重命名方法一:命令格式:mv $(ls |egrep "[^a-zA-Z0-9.-]") tandao.tx[rootnb o]# ls |egrep "[^a-zA-Z0-9.-]"?-?ˉ? ###从找到文件中找出乱码文件[rootnb o]# mv $(ls |egrep "…...
简约wordpress/可以免费网络推广网站
参照官方推荐的编译:http://www.roman10.net/how-to-build-ffmpeg-with-ndk-r9/ build_config.sh总是不过, 问题原因:./configure 后面的命令多余了 空行 解决办法:删除多余的空行转载于:https://www.cnblogs.com/poe-blog/p/3489…...