大数据分析与应用实验任务十一
大数据分析与应用实验任务十一
实验目的
-
通过实验掌握spark Streaming相关对象的创建方法;
-
熟悉spark Streaming对文件流、套接字流和RDD队列流的数据接收处理方法;
-
熟悉spark Streaming的转换操作,包括无状态和有状态转换。
-
熟悉spark Streaming输出编程操作。
实验任务
一、DStream 操作概述
-
创建 StreamingContext 对象
登录 Linux 系统后,启动 pyspark。进入 pyspark 以后,就已经获得了一个默认的 SparkConext 对象,也就是 sc。因此,可以采用如下方式来创建 StreamingContext 对象:
from pyspark.streaming import StreamingContext sscluozhongye = StreamingContext(sc, 1)
如果是编写一个独立的 Spark Streaming 程序,而不是在 pyspark 中运行,则需要在代码文件中通过类似如下的方式创建 StreamingContext 对象:
from pyspark import SparkContext, SparkConf from pyspark.streaming import StreamingContext conf = SparkConf() conf.setAppName('TestDStream') conf.setMaster('local[2]') sc = SparkContext(conf = conf) ssc = StreamingContext(sc, 1) print("创建成功,lzy防伪")
二、基本输入源
- 文件流
-
在 pyspark 中创建文件流
首先,在 Linux 系统中打开第 1 个终端(为了便于区分多个终端,这里记作“数据源终端”),创建一个 logfile 目录,命令如下:
cd /root/Desktop/luozhongye/ mkdir streaming cd streaming mkdir logfile
其次,在 Linux 系统中打开第二个终端(记作“流计算终端”),启动进入 pyspark,然后,依次输入如下语句:
from pyspark import SparkContext from pyspark.streaming import StreamingContext ssc = StreamingContext(sc, 10) lines = ssc.textFileStream('file:///root/Desktop/luozhongye/streaming/logfile') words = lines.flatMap(lambda line: line.split(' ')) wordCounts = words.map(lambda x : (x,1)).reduceByKey(lambda a,b:a+b) wordCounts.pprint() ssc.start() ssc.awaitTermination()
-
采用独立应用程序方式创建文件流
#!/usr/bin/env python3 from pyspark import SparkContext, SparkConf from pyspark.streaming import StreamingContext conf = SparkConf() conf.setAppName('TestDStream') conf.setMaster('local[2]') sc = SparkContext(conf = conf) ssc = StreamingContext(sc, 10) lines = ssc.textFileStream('file:///root/Desktop/luozhongye/streaming/logfile') words = lines.flatMap(lambda line: line.split(' ')) wordCounts = words.map(lambda x : (x,1)).reduceByKey(lambda a,b:a+b) wordCounts.pprint() ssc.start() ssc.awaitTermination() print("2023年12月7日lzy")
保存该文件,并执行以下命令:
cd /root/Desktop/luozhongye/streaming/logfile/ spark-submit FileStreaming.py
- 套接字流
-
使用套接字流作为数据源
新建一个代码文件“/root/Desktop/luozhongye/streaming/socket/NetworkWordCount.py”,在NetworkWordCount.py 中输入如下内容:
#!/usr/bin/env python3 from __future__ import print_function import sys from pyspark import SparkContext from pyspark.streaming import StreamingContextif __name__ == "__main__":if len(sys.argv) != 3:print("Usage: NetworkWordCount.py <hostname> <port>", file=sys.stderr)exit(-1)sc = SparkContext(appName="PythonStreamingNetworkWordCount")ssc = StreamingContext(sc, 1)lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)counts.pprint()ssc.start()ssc.awaitTermination()
使用如下 nc 命令生成一个 Socket 服务器端:
nc -lk 9999
新建一个终端(记作“流计算终端”),执行如下代码启动流计算:
cd /root/Desktop/luozhongye/streaming/socket /usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 9999
-
使用 Socket 编程实现自定义数据源
新建一个代码文件“/root/Desktop/luozhongye/streaming/socket/DataSourceSocket.py”,在 DataSourceSocket.py 中输入如下代码:
#!/usr/bin/env python3 import socket# 生成 socket 对象 server = socket.socket() # 绑定 ip 和端口 server.bind(('localhost', 9999)) # 监听绑定的端口 server.listen(1) while 1:# 为了方便识别,打印一个“I’m waiting the connect...”print("I'm waiting the connect...")# 这里用两个值接收,因为连接上之后使用的是客户端发来请求的这个实例# 所以下面的传输要使用 conn 实例操作conn, addr = server.accept()# 打印连接成功print("Connect success! Connection is from %s " % addr[0])# 打印正在发送数据print('Sending data...')conn.send('I love hadoop I love spark hadoop is good spark is fast'.encode())conn.close()print('Connection is broken.') print("2023年12月7日lzy")
执行如下命令启动 Socket 服务器端:
cd /root/Desktop/luozhongye/streaming/socket /usr/local/spark/bin/spark-submit DataSourceSocket.py
新建一个终端(记作“流计算终端”),输入以下命令启动 NetworkWordCount 程序:
cd /root/Desktop/luozhongye/streaming/socket /usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 9999
-
RDD 队列流
Linux 系统中打开一个终端,新建一个代码文件“/root/Desktop/luozhongye/ streaming/rddqueue/ RDDQueueStream.py”,输入以下代码:
#!/usr/bin/env python3 import time from pyspark import SparkContext from pyspark.streaming import StreamingContextif __name__ == "__main__":print("")sc = SparkContext(appName="PythonStreamingQueueStream")ssc = StreamingContext(sc, 2)# 创建一个队列,通过该队列可以把 RDD 推给一个 RDD 队列流rddQueue = []for i in range(5):rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]time.sleep(1)# 创建一个 RDD 队列流inputStream = ssc.queueStream(rddQueue)mappedStream = inputStream.map(lambda x: (x % 10, 1))reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)reducedStream.pprint()ssc.start()ssc.stop(stopSparkContext=True, stopGraceFully=True)
下面执行如下命令运行该程序:
cd /root/Desktop/luozhongye/streaming/rddqueue /usr/local/spark/bin/spark-submit RDDQueueStream.py
三、转换操作
-
滑动窗口转换操作
对“套接字流”中的代码 NetworkWordCount.py 进行一个小的修改,得到新的代码文件“/root/Desktop/luozhongye/streaming/socket/WindowedNetworkWordCount.py”,其内容如下:
#!/usr/bin/env python3 from __future__ import print_function import sys from pyspark import SparkContext from pyspark.streaming import StreamingContextif __name__ == "__main__":if len(sys.argv) != 3:print("Usage: WindowedNetworkWordCount.py <hostname> <port>", file=sys.stderr)exit(-1)sc = SparkContext(appName="PythonStreamingWindowedNetworkWordCount")ssc = StreamingContext(sc, 10)ssc.checkpoint("file:///root/Desktop/luozhongye/streaming/socket/checkpoint")lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))counts = lines.flatMap(lambda line: line.split(" ")) \.map(lambda word: (word, 1)) \.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)counts.pprint()ssc.start()ssc.awaitTermination()
为了测试程序的运行效果,首先新建一个终端(记作“数据源终端”),执行如下命令运行nc 程序:
cd /root/Desktop/luozhongye/streaming/socket/ nc -lk 9999
然后,再新建一个终端(记作“流计算终端”),运行客户端程序 WindowedNetworkWordCount.py,命令如下:
cd /root/Desktop/luozhongye/streaming/socket/ /usr/local/spark/bin/spark-submit WindowedNetworkWordCount.py localhost 9999
在数据源终端内,连续输入 10 个“hadoop”,每个 hadoop 单独占一行(即每输入一个 hadoop就按回车键),再连续输入 10 个“spark”,每个 spark 单独占一行。这时,可以查看流计算终端内显示的词频动态统计结果,可以看到,随着时间的流逝,词频统计结果会发生动态变化。
-
updateStateByKey 操作
在“/root/Desktop/luozhongye/streaming/stateful/”目录下新建一个代码文件 NetworkWordCountStateful.py,输入以下代码:
#!/usr/bin/env python3 from __future__ import print_function import sys from pyspark import SparkContext from pyspark.streaming import StreamingContextif __name__ == "__main__":if len(sys.argv) != 3:print("Usage: NetworkWordCountStateful.py <hostname> <port>", file=sys.stderr)exit(-1)sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")ssc = StreamingContext(sc, 1)ssc.checkpoint("file:///root/Desktop/luozhongye/streaming/stateful/")# RDD with initial state (key, value) pairsinitialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])def updateFunc(new_values, last_sum):return sum(new_values) + (last_sum or 0)lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))running_counts = lines.flatMap(lambda line: line.split(" ")) \.map(lambda word: (word, 1)) \.updateStateByKey(updateFunc, initialRDD=initialStateRDD)running_counts.pprint()ssc.start()ssc.awaitTermination()
新建一个终端(记作“数据源终端”),执行如下命令启动 nc 程序:
nc -lk 9999
新建一个 Linux 终端(记作“流计算终端”),执行如下命令提交运行程序:
cd /root/Desktop/luozhongye/streaming/stateful /usr/local/spark/bin/spark-submit NetworkWordCountStateful.py localhost 9999
四、把 DStream 输出到文本文件中
下面对之前已经得到的“/root/Desktop/luozhongye/streaming/stateful/NetworkWordCountStateful.py”代码进行简单的修改,把生成的词频统计结果写入文本文件中。
修改后得到的新代码文件“/root/Desktop/luozhongye/streaming/stateful/NetworkWordCountStatefulText.py”的内容如下:
#!/usr/bin/env python3
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContextif __name__ == "__main__":if len(sys.argv) != 3:print("Usage: NetworkWordCountStateful.py <hostname> <port>", file=sys.stderr)exit(-1)sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")ssc = StreamingContext(sc, 1)ssc.checkpoint("file:///root/Desktop/luozhongye/streaming/stateful/")# RDD with initial state (key, value) pairs initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])def updateFunc(new_values, last_sum):return sum(new_values) + (last_sum or 0)lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))running_counts = lines.flatMap(lambda line: line.split(" ")) \.map(lambda word: (word, 1)) \.updateStateByKey(updateFunc, initialRDD=initialStateRDD)running_counts.saveAsTextFiles("file:///root/Desktop/luozhongye/streaming/stateful/output")running_counts.pprint()ssc.start()ssc.awaitTermination()
新建一个终端(记作“数据源终端”),执行如下命令运行nc 程序:
cd /root/Desktop/luozhongye/streaming/socket/
nc -lk 9999
新建一个 Linux 终端(记作“流计算终端”),执行如下命令提交运行程序:
cd /root/Desktop/luozhongye/streaming/stateful
/usr/local/spark/bin/spark-submit NetworkWordCountStatefulText.py localhost 9999
实验心得
通过本次实验,我深入理解了Spark Streaming,包括创建StreamingContext、DStream等对象。同时,我了解了Spark Streaming对不同类型数据流的处理方式,如文件流、套接字流和RDD队列流。此外,我还熟悉了Spark Streaming的转换操作和输出编程操作,并掌握了map、flatMap、filter等方法。最后,我能够自定义输出方式和格式。总之,这次实验让我全面了解了Spark Streaming,对未来的学习和工作有很大的帮助。
相关文章:

大数据分析与应用实验任务十一
大数据分析与应用实验任务十一 实验目的 通过实验掌握spark Streaming相关对象的创建方法; 熟悉spark Streaming对文件流、套接字流和RDD队列流的数据接收处理方法; 熟悉spark Streaming的转换操作,包括无状态和有状态转换。 熟悉spark S…...

“78Win-Vận mệnh tốt”Trang web hỗ trợ kỹ thuật
Chng ti l một phần mềm cung cấp dịch vụ mua hộ xổ số cho người Việt Nam gốc Hoa. Bạn c thể gửi số v số lượng v số cần mua hộ, chng ti sẽ gửi đến tay bạn trước khi mở giải thưởng. Bạn chỉ cần trả tiền offline. Nếu bạ…...

React中使用react-json-view展示JSON数据
文章目录 一、前言1.1、在线demo1.2、Github仓库 二、实践2.1、安装react-json-view2.2、组件封装2.3、效果2.4、参数详解2.4.1、src(必须) :JSON Object2.4.2、name:string或false2.4.3、theme:string2.4.4、style:object2.4.5、…...

一文简述“低代码开发平台”到底是什么?
低代码开发平台到底是什么? 低代码开发平台(英文全称Low-Code Development Platform)是一种基于图形界面、可视化编程技术的开发平台,旨在提高软件开发的效率和质量。它可以帮助开发者快速构建应用程序,减少手动编写代…...

HNU计算机体系结构-实验3:多cache一致性算法
文章目录 实验3 多cache一致性算法一、实验目的二、实验说明三 实验内容1、cache一致性算法-监听法模拟2、cache一致性算法-目录法模拟 四、思考题五、实验总结 实验3 多cache一致性算法 一、实验目的 熟悉cache一致性模拟器(监听法和目录法)的使用&am…...

Go语言学习路线规划
🌷🍁 博主猫头虎(🐅🐾)带您 Go to New World✨🍁 🦄 博客首页——🐅🐾猫头虎的博客🎐 🐳 《面试题大全专栏》 🦕 文章图文…...

微软NativeApi-NtQuerySystemInformation
微软有一个比较实用的Native接口:NtQuerySystemInformation,具体可以参考微软msdn官方文档:NtQuerySystemInformation, 是一个系统函数,用于收集特定于所提供的指定种类的系统信息。ProcessHacker等工具使用NtQuerySys…...

灵活与高效的结合,CodeMeter Cloud Lite轻云锁解决方案
众多软件开发商日渐认识到,威步推出的一系列尖端软件产品在维护知识产权、精确控制及有效管理软件授权方面发挥着不可或缺的作用。这些产品的核心功能包括将许可证及其他关键敏感数据安全地存储于高端复杂的硬件设备、经过专业加密的文件,或者置于受严格…...

Flink 系列文章汇总索引
Flink 系列文章 一、Flink 专栏 本专栏系统介绍某一知识点,并辅以具体的示例进行说明。 本专栏的文章编号可能不是顺序的,主要是因为写的时候顺序没统一,但相关的文章又引入了,所以后面就没有调整了,按照写文章的顺…...

计算机网络——期末考试复习资料
什么是计算机网络 将地理位置不同的具有独立功能的多台计算机及其外部设备通过通信线路和通信设备连接起来;实现资源共享和数据传递的计算机的系统。 三种交换方式 报文交换:路由器转发报文; 电路交换:建立一对一电路 分组交换&a…...

【数据结构】面试OJ题——链表
目录 1.移除链表元素 思路: 2.反转链表 思路: 3.链表的中间结点 思路: 4.链表中的倒数第K个结点 思路: 5.合并两个有序链表 思路: 6.链表分割 思路: 7.链表的回文结构 思路: 8.随机链表…...

flask web开发学习之初识flask(三)
文章目录 一、flask扩展二、项目配置1. 直接配置2. 使用配置文件3. 使用环境变量4. 实例文件夹 三、flask命令四、模版和静态文件五、flask和mvc架构 一、flask扩展 flask扩展是指那些为Flask框架提供额外功能和特性的库。这些扩展通常遵循Flask的设计原则,易于集成…...

【设计模式-3.1】结构型——外观模式
说明:本文介绍设计模式中结构型设计模式中的,外观模式; 亲手下厨还是点外卖? 外观模式属于结构型的设计模式,关注类或对象的组合,所呈现出来的结构。以吃饭为例,在介绍外观模式之前࿰…...

flutter学习-day2-认识flutter
📚 目录 简介特点架构 框架层引擎层嵌入层 本文学习和引用自《Flutter实战第二版》:作者:杜文 1. 简介 Flutter 是 Google 推出并开源的移动应用开发框架,主打跨平台、高保真、高性能。开发者可以通过 Dart 语言开发 App&#…...

解决selenium使用.get()报错:unknown error: unsupported protocol
解决方法 将原来的: url "https://www.baidu.com" browser.get(url)替换为: url "https://www.baidu.com" browser.execute_script(f"window.location.replace({url});") # 直接平替 .get()问题解析 之前运行都是正…...

关于加密解密,加签验签那些事
面对MD5、SHA、DES、AES、RSA等等这些名词你是否有很多问号?这些名词都是什么?还有什么公钥加密、私钥解密、私钥加签、公钥验签。这些都什么鬼?或许在你日常工作没有听说过这些名词,但是一旦你要设计一个对外访问的接口ÿ…...

容器重启后,Conda文件完整保存(虚拟环境、库包),如何重新安装conda并迁移之前的虚拟环境
Vim安装 容器重启后默认是vi,升级vim,执行命令 apt install -y vim安装 Anaconda 1. 下载Anaconda 其他版本请查看Anaconda官方库 wget https://mirrors.bfsu.edu.cn/anaconda/archive/Anaconda3-2023.03-1-Linux-x86_64.sh --no-check-certificate…...

gitee对接使用
1.创建一个文件夹 2.进入Gitee接受对方项目编辑 3.打开终端初始化一开始创建的文件夹 git init 3.1打开终端 3.2输入git.init 4.克隆对方的项目 4.1进入Gitee复制对方项目的路径 4.2在编辑器终端内克隆对方项目 git clone 网址 如此你的编辑器就会出现对方的项目 …...

C语言中的一维数组与二维数组
目录 一维数组数组的创建初始化使用在内存中的存储 二维数组创建初始化使用在内存中的存储 数组越界 一维数组 数组的创建 数组是一组相同类型元素的集合。 int arr1[10]; char arr3[10]; float arr4[10]; double arr5[10];下面这个数组能否成功创建? int count…...

【Linux】地址空间
本片博客将重点回答三个问题 什么是地址空间? 地址空间是如何设计的? 为什么要有地址空间? 程序地址空间排布图 在32位下,一个进程的地址空间,取值范围是0x0000 0000~ 0xFFFF FFFF 回答三个问题之前我们先来证明地址空…...

作为一个产品经理带你了解Axure的安装和基本使用
1.Axure的简介 Axure是一种强大的原型设计工具,它允许用户创建交互式的、高保真度的原型,以及进行用户体验设计和界面设计。Axure可以帮助设计师和产品经理快速创建和共享原型,以便团队成员之间进行沟通和反馈。Axure提供了丰富的交互组件和功…...

接口测试总结及其用例设计方法
接口测试的总结文档 第一部分:主要从问题出发,引入接口测试的相关内容并与前端测试进行简单对比,总结两者之前的区别与联系。但该部分只交代了怎么做和如何做?并没有解释为什么要做? 第二部分:主要介绍…...

2023团体程序设计天梯赛——模拟赛和总决赛题
M-L1-1 嫑废话上代码 Linux 之父 Linus Torvalds 的名言是:“Talk is cheap. Show me the code.”(嫑废话,上代码)。本题就请你直接在屏幕上输出这句话。 输入格式: 本题没有输入。 输出格式: 在一行中输出…...

智能优化算法应用:基于人工蜂鸟算法无线传感器网络(WSN)覆盖优化 - 附代码
智能优化算法应用:基于人工蜂鸟算法无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用:基于人工蜂鸟算法无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.人工蜂鸟算法4.实验参数设定5.算法结果6.参考…...

视频中自监督学习:「我的世界」下指令理解与跟随
本文介绍了北京大学人工智能研究院梁一韬助理教授所带领的 CraftJarvis 团队在「我的世界」环境下探索通用智能体设计的新进展,题为“GROOT: Learning to Follow Instructions by Watching Gameplay Videos”。 GROOT 该研究的核心目标是探索能否摆脱文本数据的标…...

Spring基于xml半注解开发
目录 Component的使用 依赖注解的使用 非自定义Bean的注解开发 Component的使用 基本Bean注解,主要是使用注解的方式替代原有的xml的<bean>标签及其标签属性的配置,使用Component注解替代<bean>标签中的id以及class属性,而对…...

功能测试,接口测试,自动化测试,压力测试,性能测试,渗透测试,安全测试,具体是干嘛的?
软件测试是一个广义的概念,他包括了多领域的测试内容,比如,很多新手可能都听说:功能测试,接口测试,自动化测试,压力测试,性能测试,渗透测试,安全测试等&#…...

oracle 下载java之前版本
登录oracle官网:Oracle | Cloud Applications and Cloud Platform 点击resource 进入该页面 点击这个 出现之前版本...

LLM之Agent(四)| AgentGPT:一个在浏览器运行的Agent
AgentGPT是一个自主人工智能Agent平台,用户只需要为Agent指定一个名称和目标,就可以在浏览器中链接大型语言模型(如GPT-4)来创建和部署Agent平台。 PS:目前agentGPT仅支持chatgpt模型,暂时不支持本地llm模…...

AGM离线下载器使用说明
AGM专用离线下载器示意图: 供电方式: 通过 USB 接口给下载器供电,跳线 JP 断开。如果客户 PCB 的 JTAG 口不能提供 3.3V 电源,或仅需烧写下载器,尚未连接用户 PCB 时,采用此种方式供电。 或者:…...