当前位置: 首页 > news >正文

SparkStreaming_window_sparksql_reids

1.5 window

滚动窗口+滑动窗口

window操作就是窗口函数。Spark Streaming提供了滑动窗口操作的支持,从而让我们可以对一个滑动窗口内的数据执行计算操作。每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操作,然后生成的RDD,会作为window DStream的一个RDD。比如下图中,就是对每三秒钟的数据执行一次滑动窗口计算,这3秒内的3个RDD会被聚合起来进行处理,然后过了两秒钟,又会对最近三秒内的数据执行滑动窗口计算。所以每个滑动窗口操作,都必须指定两个参数,窗口长度以及滑动间隔,而且这两个参数值都必须是batch间隔的整数倍。

  1. 红色的矩形就是一个窗口,窗口hold的是一段时间内的数据流。

  2. 这里面每一个time都是时间单元,在官方的例子中,每隔window size是3 time unit, 而且每隔2个单位时间,窗口会slide一次。

所以基于窗口的操作,需要指定2个参数:

window length - The duration of the window (3 in the figure)

slide interval - The interval at which the window-based operation is performed (2 in the figure).

  1. 窗口大小,个人感觉是一段时间内数据的容器。

  2. 滑动间隔,就是我们可以理解的cron表达式吧。

案例实现

package com.qianfeng.sparkstreaming
​
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
​
/*** 统计,截止到目前为止出现的每一个key的次数* window窗口操作,每个多长M时间,通过过往N长时间内产生的数据* M就是滑动长度sliding interval* N就是窗口长度window length*/
object Demo05_WCWithWindow {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("WordCountUpdateStateByKey").setMaster("local[*]")val batchInterval = 2val duration = Seconds(batchInterval)val ssc = new StreamingContext(conf, duration)val lines:DStream[String] = ssc.socketTextStream("qianfeng01", 6666)val pairs:DStream[(String, Int)] = lines.flatMap(_.split("\\s+")).map((_, 1))
​val ret:DStream[(String, Int)] = pairs.reduceByKeyAndWindow(_+_,windowDuration = Seconds(batchInterval * 3),slideDuration = Seconds(batchInterval * 2))
​ret.print()
​ssc.start()ssc.awaitTermination()}
}

1.6 SparkSQL和SparkStreaming的整合案例

Spark最强大的地方在于,可以与Spark Core、Spark SQL整合使用,之前已经通过transform、foreachRDD等算子看到,如何将DStream中的RDD使用Spark Core执行批处理操作。现在就来看看,如何将DStream中的RDD与Spark SQL结合起来使用。

案例:top3的商品排序: 最新的top3

这里就是基于updatestateByKey,统计截止到目前为止的不同品类下的商品销量top3

代码实现

package com.qianfeng.sparkstreaming
​
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
/*** SparkStreaming整合SparkSQL的案例之,热门品类top3排行* 输入数据格式:* id brand category* 1 huwei watch* 2 huawei phone**/
object Demo06_SQLWithStreaming {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("StreamingIntegerationSQL").setMaster("local[*]")val batchInterval = 2val duration = Seconds(batchInterval)val spark = SparkSession.builder().config(conf).getOrCreate()val ssc = new StreamingContext(spark.sparkContext, duration)ssc.checkpoint("/Users/liyadong/data/sparkdata/streamingdata/chk-1")val lines:DStream[String] = ssc.socketTextStream("qianfeng01", 6666)//001 mi moblieval pairs:DStream[(String, Int)] = lines.map(line => {val fields = line.split("\\s+")if(fields == null || fields.length != 3) {("", -1)} else {val brand = fields(1)val category = fields(2)(s"${category}_${brand}", 1)}}).filter(t => t._2 != -1)
​val usb:DStream[(String, Int)] = pairs.updateStateByKey(updateFunc)
​usb.foreachRDD((rdd, bTime) => {if(!rdd.isEmpty()) {//category_brand countimport spark.implicits._val df = rdd.map{case (cb, count) => {val category = cb.substring(0, cb.indexOf("_"))val brand = cb.substring(cb.indexOf("_") + 1)(category, brand, count)}}.toDF("category", "brand", "sales")
​df.createOrReplaceTempView("tmp_category_brand_sales")val sql ="""|select|  t.category,|  t.brand,|  t.sales,|  t.rank|from (|  select|    category,|    brand,|    sales,|    row_number() over(partition by category order by sales desc) rank|  from tmp_category_brand_sales|) t|where t.rank < 4|;""".stripMarginspark.sql(sql).show()}})
​ssc.start()ssc.awaitTermination()}
​def updateFunc(seq: Seq[Int], option: Option[Int]): Option[Int] = {Option(seq.sum + option.getOrElse(0))}
}

1.7 SparkStreaming整合Reids

//将实时结果写入Redis中
dStream.foreachRDD((w,c)=>{val jedis = new Jedis("192.168.10.101", 6379)   //抽到公共地方即可jedis.auth("root")jedis.set(w.toString(),c.toString())  //一个key对应多个值,可以考虑hset
})

Guff_hys_python数据结构,大数据开发学习,python实训项目-CSDN博客

相关文章:

SparkStreaming_window_sparksql_reids

1.5 window 滚动窗口滑动窗口 window操作就是窗口函数。Spark Streaming提供了滑动窗口操作的支持&#xff0c;从而让我们可以对一个滑动窗口内的数据执行计算操作。每次掉落在窗口内的RDD的数据&#xff0c;会被聚合起来执行计算操作&#xff0c;然后生成的RDD&#xff0c;会…...

爬虫工作量由小到大的思维转变---<第二十四章 Scrapy的`统计数据`收集stats collection ---12月26日补>

前言: 前两篇是讲的数据诊断分析,还有一篇深挖解决内存泄漏的文章,目前我还没整理汇编出来;但是,想到分析问题的时候,忽然觉得爬虫的数据统计好像也挺重要;于是,心血来潮准备来插一篇这个------让大家对日常scrapy爬的数据,做到心里有数!不必自己去搅破脑汁捣腾日志,敲计算器了…...

Kafka:本地设置

这是设置 Kafka 将数据从 Elasticsearch 发布到 Kafka 主题的三部分系列的第一部分;该主题将被 Neo4j 使用。第一部分帮助您在本地设置 Kafka。第二部分将讨论如何设置Elasticsearch将数据发布到Kafka主题。最后 将详细介绍如何使用连接器订阅主题并使用数据。 Kafka Kafka 是…...

.NetCore NPOI 读取excel内容及单元格内图片

由于数据方提供的数据在excel文件中不止有文字内容还包含图片信息&#xff0c;于是编写相关测试代码&#xff0c;读取excel文件内容及图片信息. 本文使用的是 NPOI-2.6.2 版本&#xff0c;此版本持.Net4.7.2;.NetStandard2.0;.NetStandard2.1;.Net6.0。 测试文档内容&#xf…...

TCP/UDP协议

1. 请解释TCP和UDP的主要区别。 TCP和UDP都是位于传输层的协议&#xff0c;具有不同的特点和应用场景。以下是它们的主要区别&#xff1a; 连接方式&#xff1a;TCP是面向连接的协议&#xff0c;这意味着在数据传输之前需要先建立连接。这通常通过三次握手来建立连接&#xff…...

3D 渲染如何帮助电商促进销售?

在线工具推荐&#xff1a; 3D数字孪生场景编辑器 - GLTF/GLB材质纹理编辑器 - 3D模型在线转换 - Three.js AI自动纹理开发包 - YOLO 虚幻合成数据生成器 - 三维模型预览图生成器 - 3D模型语义搜索引擎 3D 渲染图像因其高转化率而成为亚马逊卖家的最新趋势。它是电子商务平…...

使用栈求表达式的值【数据结构】

中缀表达式转后缀表达式 转换流程&#xff1a; 初始化一个运算符栈。自左向右扫描中缀表达式&#xff0c;当扫描到操作数时直接连接到后缀表达式上。当扫描到操作符时&#xff0c;和运算符栈栈顶的操作符进行比较。如果比栈顶运算符高&#xff0c;则入栈。如果比栈顶运算符低…...

{MySQL}索引事务和JDBC

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、索引1.1索引是什么1.2作用1.3代码 二、事务2.1什么是事务2.2使用 三.JDBC总结 前言 接着上次&#xff0c;继续讲下MySQL 提示&#xff1a;以下是本篇文章正…...

Qt designer界面和所有组件功能的详细介绍(全!!!)

PyQt5和Qt designer的详细安装教程&#xff1a;https://blog.csdn.net/qq_43811536/article/details/135185233?spm1001.2014.3001.5501 目录 1. 界面介绍2. Widget Box 常用组件2.1 Layouts&#xff08;布局&#xff09;2.2 Spacers&#xff08;间隔器&#xff09;2.3 Item V…...

mysql_存储过程

举例子 createdefiner root% procedure insert_batch_test(IN START int(10), IN max_num int(10)) BEGINDECLAREi INT DEFAULT 0;SET autocommit 0;REPEATSET i i 1;INSERT INTO test (std, score)VALUES (CEILING(RAND() * 10 100), CEILING(RAND() * 50 50));UNTIL i …...

uboot学习及内核更换_incomplete

官方文档 在前面 文章目录 uboot常见命令学习环境变量网络控制台uboot标准启动其他 升级uboot或内核bin和uimg以及booti和bootm的区别制作uImage更换内核更换uboot后续计划 uboot常见命令学习 环境变量 Environment Variables环境变量 autostart 如果值为yes&#xff0c;则会…...

KVM 自动化脚本的使用及热/冷迁移

一、介绍 目录结构介绍 [rootkvm-server kvm]# tree -L 2 . ├── control # 控制脚本目录 │ ├── KVMInstall.sh # kvm服务安装脚本 │ ├── VMHost.sh # kvm虚拟机克隆脚本 │ └── VMTemplate.sh # kvm模板机安装脚本 ├── mount # 此目录保持为空&…...

Unity中Shader裁剪空间推导(在Shader中使用)

文章目录 前言一、在Shader中使用转化矩阵1、在顶点着色器中定义转化矩阵2、用 UNITY_NEAR_CLIP_VALUE 区分平台矩阵3、定义一个枚举用于区分当前是处于什么相机 二、我们在DirectX平台下&#xff0c;看看效果1、正交相机下2、透视相机下3、最终代码 前言 在上一篇文章中&…...

ES的使用(Elasticsearch)

ES的使用&#xff08;Elasticsearch&#xff09; es是什么&#xff1f; es是非关系型数据库&#xff0c;是分布式文档数据库&#xff0c;本质上是一个JSON 文本 为什么要用es? 搜索速度快&#xff0c;近乎是实时的存储、检索数据 怎么使用es? 1.下载es的包&#xff08;环境要…...

车牌识别技术,如何用python识别车牌号

目录 一.前言 二.运行环境 三.代码 四.识别效果 五.参考 一.前言 车牌识别技术&#xff08;License Plate Recognition, LPR&#xff09;在交通计算机视觉&#xff08;Computer Vision, CV&#xff09;领域具有非常重要的研究意义。以下是该技术的一些扩展说明&#xff1…...

爬虫工作量由小到大的思维转变---<第二十五章 Scrapy开始很快,越来越慢(追溯篇)>

爬虫工作量由小到大的思维转变---&#xff1c;第二十二章 Scrapy开始很快,越来越慢(诊断篇)&#xff1e;-CSDN博客 爬虫工作量由小到大的思维转变---&#xff1c;第二十三章 Scrapy开始很快,越来越慢(医病篇)&#xff1e;-CSDN博客 前言: 之前提到过,很多scrapy写出来之后,不…...

Servlet入门

目录 1.Servlet介绍 1.1什么是Servlet 1.2Servlet的使用方法 1.3Servlet接口的继承结构 2.Servlet快速入门 2.1创建javaweb项目 2.1.1创建maven工程 2.1.2添加webapp目录 2.2添加依赖 2.3创建servlet实例 2.4配置servlet 2.5设置打包方式 2.6部署web项目 3.servl…...

【C#与Redis】--高级主题--Redis 哨兵

一、简介 1.1 哨兵的概述 哨兵&#xff08;Sentinel&#xff09;是 Redis 分布式系统中用于监控和管理多个 Redis 服务器的组件。它的主要目标是确保 Redis 系统的高可用性&#xff0c;通过实时监测主节点和从节点的状态&#xff0c;及时发现并自动处理故障&#xff0c;保证系…...

linux安装python

文章目录 前言一、下载安装包二、安装1.安装依赖2.解压3.安装4.软链接5.验证 总结 前言 本篇文章介绍linux环境下安装python。 一、下载安装包 下载地址&#xff1a;官方网站 我们以最新的标准版为例 二、安装 1.安装依赖 yum -y install openssl-devel ncurses-devel li…...

【如何破坏单例模式(详解)】

✅如何破坏单例模式 &#x1f4a1;典型解析✅拓展知识仓✅反射破坏单例✅反序列化破坏单例✅ObjectlnputStream ✅总结✅如何避免单例被破坏✅ 避免反射破坏单例✅ 避免反序列化破坏单例 &#x1f4a1;典型解析 单例模式主要是通过把一个类的构造方法私有化&#xff0c;来避免重…...

什么是 SPI,它有什么用?

文章目录 什么是 SPI&#xff0c;它有什么用&#xff1f; 什么是 SPI&#xff0c;它有什么用&#xff1f; SPI 全称是 Service Provider Interface &#xff0c;它是 JDK 内置的一种动态扩展点的实现。 简单来说&#xff0c;就是我们可以定义一个标准的接口&#xff0c;然后第三…...

FolkMQ 新的消息中间件,v1.0.25

简介 采用 “多路复用” “内存运行” “快照持久化” “Broker 集群模式”&#xff08;可选&#xff09;基于 Socket.D 网络应用协议 开发。全新设计&#xff0c;自主架构&#xff01; 角色功能生产端发布消息&#xff08;Qos0、Qos1&#xff09;、发布定时消息&#xff…...

小程序入门-登录+首页

正常新建一个登录页面 创建首页和TatBar&#xff0c;实现登录后底部出现两个按钮 代码 "pages": ["pages/login/index","pages/index/index","pages/logs/logs" ],"tabBar": {"list": [{"pagePath"…...

React快速入门之组件

目录 组件JSX在标签使用{}嵌入JS表达式使用组件组件嵌套以&#x1f332;树的方式管理组件间的关系组件纯粹原则 组件 文件&#xff1a;Profile.js export default function Profile({isPacked true&#xff0c;head,stlyeTmp,src,size 80}) {if (isPacked) {head head &q…...

.NET Conf 2023 回顾 – 庆祝社区、创新和 .NET 8 的发布

作者&#xff1a; Jon Galloway - Principal Program Manager, .NET Community Team Mehul Harry - Product Marketing Manager, .NET, Azure Marketing 排版&#xff1a;Alan Wang .NET Conf 2023 是有史以来规模最大的 .NET 会议&#xff0c;来自全球各地的演讲者进行了 100 …...

Hadoop入门学习笔记——六、连接到Hive

视频课程地址&#xff1a;https://www.bilibili.com/video/BV1WY4y197g7 课程资料链接&#xff1a;https://pan.baidu.com/s/15KpnWeKpvExpKmOC8xjmtQ?pwd5ay8 Hadoop入门学习笔记&#xff08;汇总&#xff09; 目录 六、连接到Hive6.1. 使用Hive的Shell客户端6.2. 使用Beel…...

【K8S 基本概念】Kurbernetes的架构和核心概念

目录 一、Kurbernetes 1.1 简介 1.2、K8S的特性&#xff1a; 1.3、docker和K8S&#xff1a; 1.4、K8S的作用&#xff1a; 1.5、K8S的特性&#xff1a; 二、K8S集群架构与组件&#xff1a; 三、K8S的核心组件&#xff1a; 一、master组件&#xff1a; 1、kube-apiserve…...

WPS复选框里打对号,显示小太阳或粗黑圆圈的问题解决方法

问题描述 WPS是时下最流行的字处理软件之一&#xff0c;是目前唯一可以和微软office办公套件相抗衡的国产软件。然而&#xff0c;在使用WPS的过程中也会出现一些莫名其妙的错误&#xff0c;如利用WPS打开docx文件时&#xff0c;如果文件包含复选框&#xff0c;经常会出…...

对“企业数据资源相关会计处理暂行规定“的个人理解

附&#xff1a;2023年数据资源入表白皮书下载&#xff1a; 关注WX公众号&#xff1a; commindtech77&#xff0c; 获得数据资产相关白皮书下载地址 1. 回复关键字&#xff1a;数据资源入表白皮书 下载 《2023数据资源入表白皮书》 2. 回复关键字&#xff1a;光大银行 下载 光…...

JavaScript:函数隐含对象arguments/剩余参数. . .c/解构赋值

除了this&#xff0c;在函数内部还存在着一个隐含的参数arguments arguments 是一个类数组对象&#xff08;伪数组&#xff09; 调用函数时传递的所有实参&#xff0c;都被存储在arguments中 arguments[0] 表示的是第一个实参 arguments[1] 表示的是第二个实参 以此类推..…...

装饰工程公司排名/河北seo人员

圈椅&#xff0c;作为最著名的明式家具&#xff0c;一直走在世界家具潮流的前端。这是一把充满传奇和经典故事的椅子&#xff0c;它起源于唐代&#xff0c;在宋朝初具邹形&#xff0c;于明代发展到顶峰。其实早在唐朝就出现了圈椅&#xff0c;元代画家任仁发的《张果见明皇图》…...

免费网页制作成app/seo网站推广如何做

JVM内存区域中&#xff0c;除了程序计数器外&#xff0c;其他几个运行时区域都有可能发生OutOfMemoryError&#xff08;OOM&#xff09;异常。本文对OOM异常进行总结&#xff0c;通过代码验证JVM规范中描述的运行时区域存储的内容&#xff1b;了解可能导致这些区域OOM异常的代码…...

路桥建设局网站/seo网站推广怎么做

TypeScript 和 JavaScript 都是开发人员广泛使用的编程语言。尽管它们有一些相似之处,但您也应该注意一些关键差异。在本文中,我们将仔细研究 TypeScript 和 JavaScript,以便您更好地了解何时使用每种语言。 什么是TypeScript? TypeScript 是 Microsoft 创建的一种语言。…...

wordpress调用文章标题/比百度好用的搜索软件手机版

HTML4.01 参考手册(共89个)排序按字母顺序&#xff0c;h1-h6算一个&#xff0c;除去不赞成使用的11个还有78个。 1 <!--...--> 定义注释。 2 <!DOCTYPE> 定义文档类型。 3 <a> 定义锚。 4 <abbr> 定义缩写。 5 <acronym> 定义只…...

企业英文网站建设的重要性/开网店3个月来亏了10万

前文链接&#xff1a;分数的加减法——C语言初学者代码中的常见错误与瑕疵(11) 重构 题目的修正 我抛弃了原题中“其中a, b, c, d是一个0-9的整数”这样的前提条件&#xff0c;因为这种限制毫无必要。只假设a, b, c, d是十进制整数形式的字符序列。 我也不清楚这种题目应该如何…...

给个网站2022年手机上能用的/搜索排名影响因素

昨天听一个前同事说他们公司老大让他去研究下关于Nginx 方面的知识&#xff0c;我想了下Nginx 在如今的开发技术栈中应该会很大可能会用到&#xff0c;所以写篇博文记录总结下官网学习教程吧。 花了点时间写了篇Nginx入门学习教程 阅读后你将Get以下技能 什么是代理&#xff…...