当前位置: 首页 > news >正文

Spark2.x 入门:DStream 输出操作

在Spark应用中,外部系统经常需要使用到Spark DStream处理后的数据,因此,需要采用输出操作把DStream的数据输出到数据库或者文件系统中。

这里以《Spark2.1.0入门:DStream输出操作》中介绍的NetworkWordCountStateful.scala为基础进行修改。

把DStream输出到文本文件中

NetworkWordCountStateful.scala

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevelobject NetworkWordCountStateful {def main(args: Array[String]) {//定义状态更新函数val updateFunc = (values: Seq[Int], state: Option[Int]) => {val currentCount = values.foldLeft(0)(_ + _)val previousCount = state.getOrElse(0)Some(currentCount + previousCount)}StreamingExamples.setStreamingLogLevels()  //设置log4j日志级别val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")val sc = new StreamingContext(conf, Seconds(5))sc.checkpoint("file:///usr/local/spark/mycode/streaming/dstreamoutput/")    //设置检查点,检查点具有容错机制val lines = sc.socketTextStream("localhost", 9999)val words = lines.flatMap(_.split(" "))val wordDstream = words.map(x => (x, 1))val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)stateDstream.print()//下面是新增的语句,把DStream保存到文本文件中stateDstream.saveAsTextFiles("file:///usr/local/spark/mycode/streaming/dstreamoutput/output.txt")sc.start()sc.awaitTermination()}
}

把DStream写入到MySQL数据库中

mysql> use spark
mysql> create table wordcount (word char(20), count int(4));
mysql> select * from wordcount
//这个时候wordcount表是空的,没有任何记录

NetworkWordCountStateful.scala

import java.sql.{PreparedStatement, Connection, DriverManager}
import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevelobject NetworkWordCountStateful {def main(args: Array[String]) {//定义状态更新函数val updateFunc = (values: Seq[Int], state: Option[Int]) => {val currentCount = values.foldLeft(0)(_ + _)val previousCount = state.getOrElse(0)Some(currentCount + previousCount)}val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")val sc = new StreamingContext(conf, Seconds(5))sc.checkpoint("file:///usr/local/spark/mycode/streaming/dstreamoutput/")    //设置检查点,检查点具有容错机制val lines = sc.socketTextStream("localhost", 9999)val words = lines.flatMap(_.split(" "))val wordDstream = words.map(x => (x, 1))val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)stateDstream.print()//下面是新增的语句,把DStream保存到MySQL数据库中     stateDstream.foreachRDD(rdd => {//内部函数def func(records: Iterator[(String,Int)]) {var conn: Connection = nullvar stmt: PreparedStatement = nulltry {val url = "jdbc:mysql://localhost:3306/spark"val user = "root"val password = "hadoop"  //笔者设置的数据库密码是hadoop,请改成你自己的mysql数据库密码conn = DriverManager.getConnection(url, user, password)records.foreach(p => {val sql = "insert into wordcount(word,count) values (?,?)"stmt = conn.prepareStatement(sql);stmt.setString(1, p._1.trim)stmt.setInt(2,p._2.toInt)stmt.executeUpdate()})} catch {case e: Exception => e.printStackTrace()} finally {if (stmt != null) {stmt.close()}if (conn != null) {conn.close()}}}val repartitionedRDD = rdd.repartition(3)repartitionedRDD.foreachPartition(func)})sc.start()sc.awaitTermination()}
}

对于stateDstream,为了把它保存到MySQL数据库中,我们采用了如下的形式:

stateDstream.foreachRDD(function)

其中,function就是一个RDD[T]=>Unit类型的函数,对于本程序而言,就是RDD[(String,Int)]=>Unit类型的函数,也就是说,stateDstream中的每个RDD都是RDD[(String,Int)]类型(想象一下,统计结果的形式是(“hadoop”,3))。这样,对stateDstream中的每个RDD都会执行function中的操作(即把该RDD保存到MySQL的操作)。

下面看function的处理逻辑,在function部分,函数体要执行的处理逻辑实际上是下面的形式:

 def func(records: Iterator[(String,Int)]){……}val repartitionedRDD = rdd.repartition(3)repartitionedRDD.foreachPartition(func) 

也就是说,这里定义了一个内部函数func,它的功能是,接收records,然后把records保存到MySQL中。到这里,你可能会有疑问?为什么不是把stateDstream中的每个RDD直接拿去保存到MySQL中,还要调用rdd.repartition(3)对这些RDD重新设置分区数为3呢?这是因为,每次保存RDD到MySQL中,都需要启动数据库连接,如果RDD分区数量太大,那么就会带来多次数据库连接开销,为了减少开销,就有必要把RDD的分区数量控制在较小的范围内,所以,这里就把RDD的分区数量重新设置为3。然后,对于每个RDD分区,就调用repartitionedRDD.foreachPartition(func),把每个分区的数据通过func保存到MySQL中,这时,传递给func的输入参数就是Iterator[(String,Int)]类型的records。如果你不好理解下面这种调用形式:

repartitionedRDD.foreachPartition(func) //这种形式func没有带任何参数,可能不太好理解,不是那么直观

实际上,这句语句和下面的语句是等价的,下面的语句形式你可能会更好理解:

repartitionedRDD.foreachPartition(records => func(records)) 

上面这种等价的形式比较直观,为func()函数传入了一个records参数,这就正好和 def func(records: Iterator[(String,Int)])定义对应起来了,方便理解。

相关文章:

Spark2.x 入门:DStream 输出操作

在Spark应用中,外部系统经常需要使用到Spark DStream处理后的数据,因此,需要采用输出操作把DStream的数据输出到数据库或者文件系统中。 这里以《Spark2.1.0入门:DStream输出操作》中介绍的NetworkWordCountStateful.scala为基础…...

Python爬虫——简单网页抓取(实战案例)小白篇

Python 爬虫是一种强大的工具,用于从网页中提取数据。这里,我将通过一个简单的实战案例来展示如何使用 Python 和一些流行的库(如 requests 和 BeautifulSoup)来抓取网页数据。 实战案例:抓取一个新闻网站的头条新闻标…...

linux,ubuntu,使用ollama本地部署大模型llama3,模型通用,简易快速安装

文章目录 前言安装ollama启动ollama运行llama3模型查看ollama列表删除模型通过代码进行调用REST API 前言 在拥有了一条4090显卡后,那冗余的性能让你不得不去想着办法整花活,于是就想着部署个llama3,于是发现了ollama这个新大陆,…...

JS中的encodeURIComponent函数示例

JavaScript中的encodeURIComponent函数用于对字符串进行URL编码。它将字符串中的特殊字符转换为相应的编码形式,以确保字符串可以安全地嵌入到URL中。 使用encodeURIComponent函数时,它会将除了字母、数字、-、_、.、~以外的所有字符都进行编码。编码后…...

8.20 pre day bug

pre-bug1 分号省略 这些语句的分隔规则会导致一些意想不到的情形,如以下的一个示例; let m n f(bc).toString()但该语句最终会被解析为: let m n f(ab).toString();returntrue一定会被解析成 return;true;pre-bug2 Math.random()与Mat…...

位运算专题

分享丨【题单】位运算(基础/性质/拆位/试填/恒等式/思维) - 力扣(LeetCode) Leetcode 3133. 数组最后一个元素的最小值 我的答案与思路: class Solution { public: // 4 --> (100)2 7 --> (0111)2 // 5 --&g…...

HaProxy学习 —300K的TCP Socket并发连接实现(翻译)

HaProxy学习 —300K的TCP Socket并发连接实现(翻译) 1 原文链接2 原文翻译2.1 调整Linux系统参数2.2 调整HAProxy 1 原文链接 Use HAProxy to load balance 300k concurrent tcp socket connections: Port Exhaustion, Keep-alive and others&#xff0…...

92.WEB渗透测试-信息收集-Google语法(6)

免责声明:内容仅供学习参考,请合法利用知识,禁止进行违法犯罪活动! 内容参考于: 易锦网校会员专享课 上一个内容:91.WEB渗透测试-信息收集-Google语法(5) 监控的漏洞也有很多 打…...

[数据集][目标检测]木材缺陷检测数据集VOC+YOLO格式2383张10类别

数据集格式:Pascal VOC格式YOLO格式(不包含分割路径的txt文件,仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数):2383 标注数量(xml文件个数):2383 标注数量(txt文件个数):2383 标注…...

【启明智显分享】智能音箱AI大模型一站式解决方案重塑人机交互体验,2个月高效落地

2010年左右,智能系统接入音箱市场,智能音箱行业在中国市场兴起。但大潮激荡,阿里、小米、百度三大巨头凭借自身强大的资本、技术、粉丝群强势入局,形成三足鼎立态势。经过几年快速普及,智能音箱整体渗透率极高&#xf…...

逻辑与集合论基础及其在编程中的应用

目录 第一篇文章:逻辑与集合论基础及其在编程中的深度应用 引言 命题逻辑与谓词逻辑在编程中的深入应用 集合论及其在编程中的深度运用 函数的概念及其与集合的结合 总结与应用 第一篇文章:逻辑与集合论基础及其在编程中的深度应用 引言 逻辑与集…...

【无标题】为什么 pg_rewind 在 PostgreSQL 中很重要?

文章目录 pg_rewind 的工作原理使用 pg_rewind 的要求Basic Usage of pg_rewind重要注意事项:为什么 pg_rewind 需要干净关闭?无法进行干净关闭的情况处理不正常关机结论 pg_rewind 是 PostgreSQL 中的一个实用程序,用于将一个数据库集群与另一个数据库集…...

hostapd生成beacon_ie

配置文件 /data/vendor/wifi/hostapd/hostapd_wlan0.conf 配置参数 AP启动过程:1.上层配置一些参数并根据参数生成配置文件 2.init的时候设置默认参数并加载配置文件上的参数(如果重复,以配置文件上的设置优先) 相关函数及结构…...

leetcode349:两个数组的交集

两个数组的交集 给定两个数组 nums1 和 nums2 &#xff0c;返回 它们的 交集 。输出结果中的每个元素一定是 唯一 的。我们可以 不考虑输出结果的顺序 。 public int[] intersection(int[] nums1, int[] nums2) {ArrayList<Integer> list new ArrayList<>();Has…...

Metasploit漏洞利用系列(八):MSF渗透测试 - PHPCGI漏洞利用实战

在本系列的第八篇文章中&#xff0c;我们将深入探索如何利用Metasploit Framework (MSF) 来针对PHPCGI (PHP Common Gateway Interface) 的漏洞进行渗透测试。PHPCGI作为一种将Web服务器与PHP脚本交互的方式&#xff0c;其不恰当的配置或老旧版本中可能存在的漏洞常被攻击者利用…...

基于python的主观题自动阅卷系统设计与实现

博主介绍&#xff1a; 大家好&#xff0c;本人精通Java、Python、C#、C、C编程语言&#xff0c;同时也熟练掌握微信小程序、Php和Android等技术&#xff0c;能够为大家提供全方位的技术支持和交流。 我有丰富的成品Java、Python、C#毕设项目经验&#xff0c;能够为学生提供各类…...

计算机毕业设计仪器设备管理系统-折旧-报废-转移-借出-归还

本文主要阐述如何采用利用网络数据库技术&#xff0c;在信息管理系统中合理的进行管理。在全面解析系统的设计理念以及设计手段&#xff0c;将系统进程中的所需的工具以及技术进行综合的设计&#xff0c;重点解析信息管理情况以及自动化管理进程&#xff0c;主要包括&#xff1…...

DAY37

零钱兑换 II public int change(int amount, int[] coins) {int []dpnew int[amount1];dp[0]1;for(int i0;i<coins.length;i){for(int jcoins[i];j<amount;j){dp[j]dp[j-coins[i]];}}return dp[amount];}组合总和 Ⅳ public int combinationSum4(int[] nums, int target)…...

将iso格式的镜像文件转化成云平台能安装的镜像格式(raw/vhd/QCOW2/VMDK )亲测--图文详解

1.首先,你将你的iso的文件按照正常的流程和需求安装到你的虚拟机中,我这里使用的是vmware,安装完成之后,关机。再次点开你安装好的那台虚拟机的窗口,如下图 选中要导出的镜像,镜像需要关机 2.点击工具栏的文件------选择 导出 整个工程到 ovf 格式—这里你可以选择你要导…...

Numba加速计算(CPU + GPU + prange)

文章目录 加速方法&#xff1a;Numba、CuPy、PyTorch、PyCUDA、Dask、Rapids一、Numba简介二、Numba类型&#xff1a;CPU GPU三、项目实战 —— 数组的每个元素加23.1、使用 python - range 循环计算 —— &#xff08;时耗&#xff1a;137.37 秒&#xff09;3.2、使用 python…...

TDengine 快速体验(Docker 镜像方式)

简介 TDengine 可以通过安装包、Docker 镜像 及云服务快速体验 TDengine 的功能&#xff0c;本节首先介绍如何通过 Docker 快速体验 TDengine&#xff0c;然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker&#xff0c;请使用 安装包的方式快…...

基于Flask实现的医疗保险欺诈识别监测模型

基于Flask实现的医疗保险欺诈识别监测模型 项目截图 项目简介 社会医疗保险是国家通过立法形式强制实施&#xff0c;由雇主和个人按一定比例缴纳保险费&#xff0c;建立社会医疗保险基金&#xff0c;支付雇员医疗费用的一种医疗保险制度&#xff0c; 它是促进社会文明和进步的…...

Objective-C常用命名规范总结

【OC】常用命名规范总结 文章目录 【OC】常用命名规范总结1.类名&#xff08;Class Name)2.协议名&#xff08;Protocol Name)3.方法名&#xff08;Method Name)4.属性名&#xff08;Property Name&#xff09;5.局部变量/实例变量&#xff08;Local / Instance Variables&…...

Java-41 深入浅出 Spring - 声明式事务的支持 事务配置 XML模式 XML+注解模式

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持续更新中&#xff01;&#xff08;长期更新&#xff09; 目前2025年06月05日更新到&#xff1a; AI炼丹日志-28 - Aud…...

AI编程--插件对比分析:CodeRider、GitHub Copilot及其他

AI编程插件对比分析&#xff1a;CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展&#xff0c;AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者&#xff0c;分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...

大学生职业发展与就业创业指导教学评价

这里是引用 作为软工2203/2204班的学生&#xff0c;我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要&#xff0c;而您认真负责的教学态度&#xff0c;让课程的每一部分都充满了实用价值。 尤其让我…...

重启Eureka集群中的节点,对已经注册的服务有什么影响

先看答案&#xff0c;如果正确地操作&#xff0c;重启Eureka集群中的节点&#xff0c;对已经注册的服务影响非常小&#xff0c;甚至可以做到无感知。 但如果操作不当&#xff0c;可能会引发短暂的服务发现问题。 下面我们从Eureka的核心工作原理来详细分析这个问题。 Eureka的…...

MFC 抛体运动模拟:常见问题解决与界面美化

在 MFC 中开发抛体运动模拟程序时,我们常遇到 轨迹残留、无效刷新、视觉单调、物理逻辑瑕疵 等问题。本文将针对这些痛点,详细解析原因并提供解决方案,同时兼顾界面美化,让模拟效果更专业、更高效。 问题一:历史轨迹与小球残影残留 现象 小球运动后,历史位置的 “残影”…...

搭建DNS域名解析服务器(正向解析资源文件)

正向解析资源文件 1&#xff09;准备工作 服务端及客户端都关闭安全软件 [rootlocalhost ~]# systemctl stop firewalld [rootlocalhost ~]# setenforce 0 2&#xff09;服务端安装软件&#xff1a;bind 1.配置yum源 [rootlocalhost ~]# cat /etc/yum.repos.d/base.repo [Base…...

LRU 缓存机制详解与实现(Java版) + 力扣解决

&#x1f4cc; LRU 缓存机制详解与实现&#xff08;Java版&#xff09; 一、&#x1f4d6; 问题背景 在日常开发中&#xff0c;我们经常会使用 缓存&#xff08;Cache&#xff09; 来提升性能。但由于内存有限&#xff0c;缓存不可能无限增长&#xff0c;于是需要策略决定&am…...