208.Flink(三):窗口的使用,处理函数的使用
目录
一、窗口
1.窗口的概念
2.窗口的分类
(1)按照驱动类型分
(2)按照窗口分配数据的规则分类
3.窗口api概览
(1)按键分区(Keyed)和非按键分区(Non-Keyed)
*1)按键分区窗口(Keyed Windows)
*2)非按键分区(Non-Keyed Windows)
(2)代码中窗口API的调用
(3)窗口分配器
(4)窗口函数
*1)增量聚合函数
^1)归约函数(ReduceFunction)
^2)聚合函数(AggregateFunction)
*2)全窗口函数(full window functions)
*3)增量聚合和全窗口函数的结合使用
(5)触发器(Trigger)
(6)移除器(Evictor)
(7)窗口的简单原理
*1)一个数据来了,怎么认为他是哪个窗口内的数据?
*2)窗口特性
*3)窗口的生命周期
4.时间语义
(1)Flink中的时间语义
(2)Flink以事件时间为默认时间语义
5.水位线(Watermark)
(1)水位线的概念
*1)有序流中的水位线
*2)乱序流中的水位线
(2)水位线和窗口的工作原理
(3) 生成水位线
*1)总体原则
*2)有序流中内置水位线设置
*3)乱序流中内置水位线设置
*4)自定义水位线生成器(周期式、断点式)
*5)在数据源中发送水位线
(6)迟到数据的处理
*1)设置乱序容忍度
*2)设置窗口延迟关闭
*3)侧输出流
(7)基于时间的合流——双流联结(Join)
*1)窗口联结(Window Join)
*2)间隔联结(Interval Join)
二、处理函数
1.基本处理函数(ProcessFunction)
(1)处理函数的功能和使用
(2)ProcessFunction解析
(3)处理函数的分类
2.按键分区处理函数(KeyedProcessFunction)
(1)定时器(Timer)和定时服务(TimerService)
(2)KeyedProcessFunction注意点及实现
3.应用案例:Top N
(1)方法一:ProcessAllWindowFunction
(2)方法二:
4.侧输出流
一、窗口
在批处理统计中,我们可以等待一批数据都到齐后,统一处理。但是在实时处理统计中,我们是来一条就得处理一条,那么我们怎么统计最近一段时间内的数据呢?引入“窗口”。
1.窗口的概念
Flink是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。
Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。
到达窗口结束时间时,窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开。
2.窗口的分类
(1)按照驱动类型分
*1)时间窗口
一定时间作为一个窗口
*2)计数窗口
达到多少数量作为一个窗口
(2)按照窗口分配数据的规则分类
*1)滚动窗口
以一个固定时间为窗口,第一个窗口结束的时间就是下一个窗口开始的时间。
*2)滑动窗口
窗口大小 + 步长。
如果步长 = 窗口大小,其实就是滚动窗口的情况。
步长 > 窗口大小,会有数据被漏掉。
步长 < 窗口大小,窗口会有重叠
*3)会话窗口
基于会话对数据分组
*4)全局窗口
全局有效,没有结束时间
3.窗口api概览
(1)按键分区(Keyed)和非按键分区(Non-Keyed)
定义窗口前,需要确认数据流是基于keyBy还是没有keyBy的。
*1)按键分区窗口(Keyed Windows)
经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。
stream.keyBy(...).window(...)
*2)非按键分区(Non-Keyed Windows)
窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。
对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作。
stream.windowAll(...)
(2)代码中窗口API的调用
窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。
stream.keyBy(<key selector>).window(<window assigner>).aggregate(<window function>)
.window()方法需要传入一个窗口分配器,它指明了窗口的类型。
.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。
(3)窗口分配器
窗口分配器指定窗口的类型。窗口分配器最通用的定义方式,就是调用.window()方法。
(4)窗口函数
窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数和全窗口函数。
package com.atguigu.window;import com.atguigu.bean.WaterSensor;
import com.atguigu.functions.WaterSensorMapFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;/*** TODO** @author cjp* @version 1.0*/
public class WindowApiDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// TODO 1. 指定 窗口分配器: 指定 用 哪一种窗口 --- 时间 or 计数? 滚动、滑动、会话?// 1.1 没有keyby的窗口: 窗口内的 所有数据 进入同一个 子任务,并行度只能为1
// sensorDS.windowAll()// 1.2 有keyby的窗口: 每个key上都定义了一组窗口,各自独立地进行统计计算// 基于时间的
// sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 滚动窗口,窗口长度10s
// sensorKS.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2))) // 滑动窗口,窗口长度10s,滑动步长2s
// sensorKS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) // 会话窗口,超时间隔5s
// sensorKS.window(GlobalWindows.create()) // 全局窗口,计数窗口的底层就是用的这个,需要自定义的时候才会用// 基于计数的
// sensorKS.countWindow(5) // 滚动窗口,窗口长度=5个元素
// sensorKS.countWindow(5,2) // 滑动窗口,窗口长度=5个元素,滑动步长=2个元素// TODO 2. 指定 窗口函数 : 窗口内数据的 计算逻辑WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 增量聚合: 来一条数据,计算一条数据,窗口触发的时候输出计算结果
// sensorWS
// .reduce()
// .aggregate(, )// 全窗口函数:数据来了不计算,存起来,窗口触发的时候,计算并输出结果
// sensorWS.process()env.execute();}
}
*1)增量聚合函数
^1)归约函数(ReduceFunction)
package com.atguigu.window;import com.atguigu.bean.WaterSensor;
import com.atguigu.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;/*** TODO** @author cjp* @version 1.0*/
public class WindowReduceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);相关文章:
208.Flink(三):窗口的使用,处理函数的使用
目录 一、窗口 1.窗口的概念 2.窗口的分类 (1)按照驱动类型分 (2)按照窗口分配数据的规则分类 3.窗口api概览 (1)按键分区(Keyed)和非按键分区(Non-Keyed) *1)按键分区窗口(Keyed Windows) *2)非按键分区(Non-Keyed Windows) (2)代码中窗口API的调…...
时序预测 | MATLAB实现POA-CNN-BiLSTM鹈鹕算法优化卷积双向长短期记忆神经网络时间序列预测
时序预测 | MATLAB实现POA-CNN-BiLSTM鹈鹕算法优化卷积双向长短期记忆神经网络时间序列预测 目录 时序预测 | MATLAB实现POA-CNN-BiLSTM鹈鹕算法优化卷积双向长短期记忆神经网络时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 MATLAB实现POA-CNN-BiLSTM鹈鹕算…...
【知识点】增量学习、在线学习、离线学习的区别
参考链接:https://www.6aiq.com/article/1613258706447?p1&m0 离线学习 常见的学习方式,一次性将所有数据参与进训练。 离线学习完成了目标函数的优化将不会在改变了离线学习需要一次提供整个训练集时间和空间成本效率低发生数据变更或模型漂移需…...
c++ 学习 之 运算符重载 之 前置++和后置++
前言 int a1;cout << (a) << endl;cout << a << endl;int b1;cout << (b) << endl; // 这个是错误的cout << b << endl;上面样例中, 前置 返回的是引用,所以a 的值变成了3 后置 返回的不是可以改变的…...
K8s Kubelet 垃圾回收机制
前言 Kubelet 垃圾回收(Garbage Collection)是一个非常有用的功能,它负责自动清理节点上的无用镜像和容器。Kubelet 每隔 1 分钟进行一次容器清理,每隔 5 分钟进行一次镜像清理(截止到 v1.15 版本,垃圾回收间隔时间还都是在源码中固化的,不可自定义配置)。如果节点上已…...
docker安装高斯数据库openGauss数据库
1.创建容器 #创建数据没有挂在的容器 docker run --name opengauss --privilegedtrue -d -e GS_PASSWORDEnmo123 -p 8090:5432 enmotech/opengauss:latest 2. 进入容器,并切换omm用户,使用gsql连接高斯数据库 [rootansible ~]# docker ps -a CONTAIN…...
新手学习:ArcGIS 提取SHP 路网数据、节点
新手学习:ArcGIS 提取SHP 路网数据、节点 参考连接 OSM路网提取道路节点 ArcGIS:如何创建地理数据库、创建要素类数据集、导入要素类、表? 1. 导入开源路网SHP文件 2. 在交点处打断路网数据 未打断路网数据 有一些路径很长,…...
性能测试 —— Tomcat监控与调优:Jconsole监控
JConsole的图形用户界面是一个符合Java管理扩展(JMX)规范的监测工具,JConsole使用Java虚拟机(Java VM),提供在Java平台上运行的应用程序的性能和资源消耗的信息。在Java平台,标准版(Java SE平台)6,JConsole的已经更新到目前的外观…...
刷题笔记26——图论二分图判定
世界上的事情,最忌讳的就是个十全十美,你看那天上的月亮,一旦圆满了,马上就要亏厌;树上的果子,一旦熟透了,马上就要坠落。凡事总要稍留欠缺,才能持恒。 ——莫言 visited数组是在如果有环的情况下,防止在图中一直绕圈设置的,类似于剪枝操作,走…...
网站整站优化-网站整站优化工具
您是否曾为您的网站在搜索引擎中的排名而感到焦虑?是否苦苦思考如何提高流量、吸引更多用户? 什么是整站优化。简而言之,它是一项用于提升网站在搜索引擎中排名的策略和技巧。通过对网站的内容、结构、速度等方面进行优化,可以使…...
冲刺十五届蓝桥杯P0001阶乘求和
文章目录 题目描述思路分析代码解析 题目描述 思路分析 阶乘是蓝桥杯中常考的知识。 首先我们需要知道 int 和long的最大值是多少。 我们可以知道19的阶乘就已经超过了long的最大值,所以让我们直接计算202320232023!的阶乘是不现实的。 所以我们需要…...
c++ 学习 之 运算符重载
前言 运算符重载的概念: 对已有的运算符重新进行定义,赋予其另外一种功能,以适应不同的数据类型 加号运算符重载 作用:定义两个自定义的数据类型相加的运算 正常情况下,如果想要实现类中两个int 类型的相加…...
各种数据库表名长度限制整理
因为工作原因,需要整理下系统支持的数据库的表名长度限制,现发出来,以节省大家的整理时间,如有不对的敬请斧正! 数据库类型长度ORACLE 30GreenPlum40KINGBASEES63PostgreSql63Gbase63瀚高63OSCAR64MYSQL 64HBASE64Mar…...
Go 里的超时控制
前言 日常开发中我们大概率会遇到超时控制的场景,比如一个批量耗时任务、网络请求等;一个良好的超时控制可以有效的避免一些问题(比如 goroutine 泄露、资源不释放等)。 Timer 在 go 中实现超时控制的方法非常简单,…...
一文彻底搞清楚Spark Schema
前言 Spark Schema定义了DataFrame的结构,可以通过对DataFrame对象调用printSchema()方法来获得该结构。Spark SQL提供了StructType和StructField类以编程方式指定架构。 默认情况下,Spark从数据中推断schema,但有时我们可能需要定义自己的schema(列名和数据类型),尤其…...
Nginx多出口IP解决代理端口数量限制,CentOS安装Nginx并开启https2.0
Nginx多出口IP解决代理端口数量限制,CentOS安装Nginx并开启https2.0。 配置文件如下: http {...upstream test {server www.test.com;}server {listen 80 default_server;server_name _;location / {proxy_pass http://test;proxy_bind $split_ip...
SpringBoot项目(百度AI整合)——如何在Springboot中使用语音文件识别 ffmpeg的安装和使用
前言 前言:在实际使用中,经常要参考官方的案例,但有时候因为工具的不一样,比如idea 和 eclipse,普通项目和spring项目等的差别;还有时候因为水平有限,难以在散布于官方的各个文档读懂ÿ…...
探索古彝文AI识别技术:助力中国传统文化的传承与发扬
目录 ⭐️ 写在前面 ⭐️ 一、什么是古彝文 1.1 古彝文介绍 1.2 古彝文与其他古文字示例 1.3 古彝文的重要性 ⭐️二、AI识别技术的挑战与前景 2.1 挑战 2.2 前景 ⭐️三、合合信息AI识别技术 3.1 智能文字识别技术👍👍 3.2 古文识别应用 ⭐…...
mysql面试题2:说一说MySQL的架构设计?一条 MySQL 语句执行的步骤?
该文章专注于面试,面试只要回答关键点即可,不需要对框架有非常深入的回答,如果你想应付面试,是足够了,抓住关键点 面试官:说一说MySQL的架构设计? MySQL的架构设计主要包括以下几个组件: 连接器(Connector):负责与客户端建立连接,并进行身份验证和授权。 查询缓存…...
UPnP协议和SSDP协议
1、两种协议 UPnP协议:Universal Plug and Play,广义的即插即用。UPnP协议的目的:当有新设备连接上网络,网络上的其他设备能够马上知道有新设备加入,然后这些设备能互相宣传和发现彼此,以便能使用和控制彼…...
变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析
一、变量声明设计:let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性,这种设计体现了语言的核心哲学。以下是深度解析: 1.1 设计理念剖析 安全优先原则:默认不可变强制开发者明确声明意图 let x 5; …...
大数据学习栈记——Neo4j的安装与使用
本文介绍图数据库Neofj的安装与使用,操作系统:Ubuntu24.04,Neofj版本:2025.04.0。 Apt安装 Neofj可以进行官网安装:Neo4j Deployment Center - Graph Database & Analytics 我这里安装是添加软件源的方法 最新版…...
Springcloud:Eureka 高可用集群搭建实战(服务注册与发现的底层原理与避坑指南)
引言:为什么 Eureka 依然是存量系统的核心? 尽管 Nacos 等新注册中心崛起,但金融、电力等保守行业仍有大量系统运行在 Eureka 上。理解其高可用设计与自我保护机制,是保障分布式系统稳定的必修课。本文将手把手带你搭建生产级 Eur…...
VTK如何让部分单位不可见
最近遇到一个需求,需要让一个vtkDataSet中的部分单元不可见,查阅了一些资料大概有以下几种方式 1.通过颜色映射表来进行,是最正规的做法 vtkNew<vtkLookupTable> lut; //值为0不显示,主要是最后一个参数,透明度…...
爬虫基础学习day2
# 爬虫设计领域 工商:企查查、天眼查短视频:抖音、快手、西瓜 ---> 飞瓜电商:京东、淘宝、聚美优品、亚马逊 ---> 分析店铺经营决策标题、排名航空:抓取所有航空公司价格 ---> 去哪儿自媒体:采集自媒体数据进…...
【从零学习JVM|第三篇】类的生命周期(高频面试题)
前言: 在Java编程中,类的生命周期是指类从被加载到内存中开始,到被卸载出内存为止的整个过程。了解类的生命周期对于理解Java程序的运行机制以及性能优化非常重要。本文会深入探寻类的生命周期,让读者对此有深刻印象。 目录 …...
【Nginx】使用 Nginx+Lua 实现基于 IP 的访问频率限制
使用 NginxLua 实现基于 IP 的访问频率限制 在高并发场景下,限制某个 IP 的访问频率是非常重要的,可以有效防止恶意攻击或错误配置导致的服务宕机。以下是一个详细的实现方案,使用 Nginx 和 Lua 脚本结合 Redis 来实现基于 IP 的访问频率限制…...
基于Springboot+Vue的办公管理系统
角色: 管理员、员工 技术: 后端: SpringBoot, Vue2, MySQL, Mybatis-Plus 前端: Vue2, Element-UI, Axios, Echarts, Vue-Router 核心功能: 该办公管理系统是一个综合性的企业内部管理平台,旨在提升企业运营效率和员工管理水…...
[ACTF2020 新生赛]Include 1(php://filter伪协议)
题目 做法 启动靶机,点进去 点进去 查看URL,有 ?fileflag.php说明存在文件包含,原理是php://filter 协议 当它与包含函数结合时,php://filter流会被当作php文件执行。 用php://filter加编码,能让PHP把文件内容…...
五子棋测试用例
一.项目背景 1.1 项目简介 传统棋类文化的推广 五子棋是一种古老的棋类游戏,有着深厚的文化底蕴。通过将五子棋制作成网页游戏,可以让更多的人了解和接触到这一传统棋类文化。无论是国内还是国外的玩家,都可以通过网页五子棋感受到东方棋类…...
