当前位置: 首页 > news >正文

208.Flink(三):窗口的使用,处理函数的使用

目录

一、窗口

1.窗口的概念

2.窗口的分类

(1)按照驱动类型分

(2)按照窗口分配数据的规则分类

3.窗口api概览

(1)按键分区(Keyed)和非按键分区(Non-Keyed)

*1)按键分区窗口(Keyed Windows)

*2)非按键分区(Non-Keyed Windows)

(2)代码中窗口API的调用

(3)窗口分配器

(4)窗口函数

*1)增量聚合函数

^1)归约函数(ReduceFunction)

^2)聚合函数(AggregateFunction)

*2)全窗口函数(full window functions)

*3)增量聚合和全窗口函数的结合使用

(5)触发器(Trigger)

(6)移除器(Evictor)

(7)窗口的简单原理

*1)一个数据来了,怎么认为他是哪个窗口内的数据?

*2)窗口特性

*3)窗口的生命周期

4.时间语义

(1)Flink中的时间语义

(2)Flink以事件时间为默认时间语义

5.水位线(Watermark)

(1)水位线的概念

*1)有序流中的水位线

*2)乱序流中的水位线

(2)水位线和窗口的工作原理

(3) 生成水位线

*1)总体原则

*2)有序流中内置水位线设置

*3)乱序流中内置水位线设置

*4)自定义水位线生成器(周期式、断点式)

*5)在数据源中发送水位线

(6)迟到数据的处理

*1)设置乱序容忍度

*2)设置窗口延迟关闭

*3)侧输出流

(7)基于时间的合流——双流联结(Join)

*1)窗口联结(Window Join)

*2)间隔联结(Interval Join)

二、处理函数

1.基本处理函数(ProcessFunction)

(1)处理函数的功能和使用

(2)ProcessFunction解析

(3)处理函数的分类

2.按键分区处理函数(KeyedProcessFunction)

(1)定时器(Timer)和定时服务(TimerService)

(2)KeyedProcessFunction注意点及实现

3.应用案例:Top N

(1)方法一:ProcessAllWindowFunction

(2)方法二:

4.侧输出流


一、窗口

在批处理统计中,我们可以等待一批数据都到齐后,统一处理。但是在实时处理统计中,我们是来一条就得处理一条,那么我们怎么统计最近一段时间内的数据呢?引入“窗口”。

1.窗口的概念

Flink是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。

Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。

到达窗口结束时间时,窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开。

2.窗口的分类

(1)按照驱动类型分

*1)时间窗口

一定时间作为一个窗口

*2)计数窗口

达到多少数量作为一个窗口

(2)按照窗口分配数据的规则分类

*1)滚动窗口

以一个固定时间为窗口,第一个窗口结束的时间就是下一个窗口开始的时间。

*2)滑动窗口

窗口大小 + 步长。

如果步长 = 窗口大小,其实就是滚动窗口的情况。

步长 > 窗口大小,会有数据被漏掉。

步长 < 窗口大小,窗口会有重叠

*3)会话窗口

基于会话对数据分组

*4)全局窗口

全局有效,没有结束时间

3.窗口api概览

(1)按键分区(Keyed)和非按键分区(Non-Keyed)

定义窗口前,需要确认数据流是基于keyBy还是没有keyBy的。

*1)按键分区窗口(Keyed Windows)

经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。

stream.keyBy(...).window(...)

*2)非按键分区(Non-Keyed Windows)

窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。

对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作。

stream.windowAll(...)

(2)代码中窗口API的调用

窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。

stream.keyBy(<key selector>).window(<window assigner>).aggregate(<window function>)

.window()方法需要传入一个窗口分配器,它指明了窗口的类型。

.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。

(3)窗口分配器

窗口分配器指定窗口的类型。窗口分配器最通用的定义方式,就是调用.window()方法。

(4)窗口函数

窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数全窗口函数

package com.atguigu.window;import com.atguigu.bean.WaterSensor;
import com.atguigu.functions.WaterSensorMapFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;/*** TODO** @author cjp* @version 1.0*/
public class WindowApiDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// TODO 1. 指定 窗口分配器: 指定 用 哪一种窗口 ---  时间 or 计数? 滚动、滑动、会话?// 1.1 没有keyby的窗口: 窗口内的 所有数据 进入同一个 子任务,并行度只能为1
//        sensorDS.windowAll()// 1.2 有keyby的窗口: 每个key上都定义了一组窗口,各自独立地进行统计计算// 基于时间的
//        sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 滚动窗口,窗口长度10s
//        sensorKS.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2))) // 滑动窗口,窗口长度10s,滑动步长2s
//        sensorKS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) // 会话窗口,超时间隔5s
//        sensorKS.window(GlobalWindows.create())  // 全局窗口,计数窗口的底层就是用的这个,需要自定义的时候才会用// 基于计数的
//        sensorKS.countWindow(5)  // 滚动窗口,窗口长度=5个元素
//        sensorKS.countWindow(5,2) // 滑动窗口,窗口长度=5个元素,滑动步长=2个元素// TODO 2. 指定 窗口函数 : 窗口内数据的 计算逻辑WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 增量聚合: 来一条数据,计算一条数据,窗口触发的时候输出计算结果
//        sensorWS
//                .reduce()
//        .aggregate(, )// 全窗口函数:数据来了不计算,存起来,窗口触发的时候,计算并输出结果
//        sensorWS.process()env.execute();}
}

*1)增量聚合函数
^1)归约函数(ReduceFunction)
package com.atguigu.window;import com.atguigu.bean.WaterSensor;
import com.atguigu.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;/*** TODO** @author cjp* @version 1.0*/
public class WindowReduceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);

相关文章:

208.Flink(三):窗口的使用,处理函数的使用

目录 一、窗口 1.窗口的概念 2.窗口的分类 (1)按照驱动类型分 (2)按照窗口分配数据的规则分类 3.窗口api概览 (1)按键分区(Keyed)和非按键分区(Non-Keyed) *1)按键分区窗口(Keyed Windows) *2)非按键分区(Non-Keyed Windows) (2)代码中窗口API的调…...

时序预测 | MATLAB实现POA-CNN-BiLSTM鹈鹕算法优化卷积双向长短期记忆神经网络时间序列预测

时序预测 | MATLAB实现POA-CNN-BiLSTM鹈鹕算法优化卷积双向长短期记忆神经网络时间序列预测 目录 时序预测 | MATLAB实现POA-CNN-BiLSTM鹈鹕算法优化卷积双向长短期记忆神经网络时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 MATLAB实现POA-CNN-BiLSTM鹈鹕算…...

【知识点】增量学习、在线学习、离线学习的区别

参考链接&#xff1a;https://www.6aiq.com/article/1613258706447?p1&m0 离线学习 常见的学习方式&#xff0c;一次性将所有数据参与进训练。 离线学习完成了目标函数的优化将不会在改变了离线学习需要一次提供整个训练集时间和空间成本效率低发生数据变更或模型漂移需…...

c++ 学习 之 运算符重载 之 前置++和后置++

前言 int a1;cout << (a) << endl;cout << a << endl;int b1;cout << (b) << endl; // 这个是错误的cout << b << endl;上面样例中&#xff0c; 前置 返回的是引用&#xff0c;所以a 的值变成了3 后置 返回的不是可以改变的…...

K8s Kubelet 垃圾回收机制

前言 Kubelet 垃圾回收(Garbage Collection)是一个非常有用的功能,它负责自动清理节点上的无用镜像和容器。Kubelet 每隔 1 分钟进行一次容器清理,每隔 5 分钟进行一次镜像清理(截止到 v1.15 版本,垃圾回收间隔时间还都是在源码中固化的,不可自定义配置)。如果节点上已…...

docker安装高斯数据库openGauss数据库

1.创建容器 #创建数据没有挂在的容器 docker run --name opengauss --privilegedtrue -d -e GS_PASSWORDEnmo123 -p 8090:5432 enmotech/opengauss:latest 2. 进入容器&#xff0c;并切换omm用户&#xff0c;使用gsql连接高斯数据库 [rootansible ~]# docker ps -a CONTAIN…...

新手学习:ArcGIS 提取SHP 路网数据、节点

新手学习&#xff1a;ArcGIS 提取SHP 路网数据、节点 参考连接 OSM路网提取道路节点 ArcGIS&#xff1a;如何创建地理数据库、创建要素类数据集、导入要素类、表&#xff1f; 1. 导入开源路网SHP文件 2. 在交点处打断路网数据 未打断路网数据 有一些路径很长&#xff0c;…...

性能测试 —— Tomcat监控与调优:Jconsole监控

JConsole的图形用户界面是一个符合Java管理扩展(JMX)规范的监测工具&#xff0c;JConsole使用Java虚拟机(Java VM)&#xff0c;提供在Java平台上运行的应用程序的性能和资源消耗的信息。在Java平台&#xff0c;标准版(Java SE平台)6&#xff0c;JConsole的已经更新到目前的外观…...

刷题笔记26——图论二分图判定

世界上的事情,最忌讳的就是个十全十美,你看那天上的月亮,一旦圆满了,马上就要亏厌;树上的果子,一旦熟透了,马上就要坠落。凡事总要稍留欠缺,才能持恒。 ——莫言 visited数组是在如果有环的情况下&#xff0c;防止在图中一直绕圈设置的&#xff0c;类似于剪枝操作&#xff0c;走…...

网站整站优化-网站整站优化工具

您是否曾为您的网站在搜索引擎中的排名而感到焦虑&#xff1f;是否苦苦思考如何提高流量、吸引更多用户&#xff1f; 什么是整站优化。简而言之&#xff0c;它是一项用于提升网站在搜索引擎中排名的策略和技巧。通过对网站的内容、结构、速度等方面进行优化&#xff0c;可以使…...

冲刺十五届蓝桥杯P0001阶乘求和

文章目录 题目描述思路分析代码解析 题目描述 思路分析 阶乘是蓝桥杯中常考的知识。 首先我们需要知道 int 和long的最大值是多少。 我们可以知道19的阶乘就已经超过了long的最大值&#xff0c;所以让我们直接计算202320232023&#xff01;的阶乘是不现实的。 所以我们需要…...

c++ 学习 之 运算符重载

前言 运算符重载的概念&#xff1a; 对已有的运算符重新进行定义&#xff0c;赋予其另外一种功能&#xff0c;以适应不同的数据类型 加号运算符重载 作用&#xff1a;定义两个自定义的数据类型相加的运算 正常情况下&#xff0c;如果想要实现类中两个int 类型的相加&#xf…...

各种数据库表名长度限制整理

因为工作原因&#xff0c;需要整理下系统支持的数据库的表名长度限制&#xff0c;现发出来&#xff0c;以节省大家的整理时间&#xff0c;如有不对的敬请斧正&#xff01; 数据库类型长度ORACLE 30GreenPlum40KINGBASEES63PostgreSql63Gbase63瀚高63OSCAR64MYSQL 64HBASE64Mar…...

Go 里的超时控制

前言 日常开发中我们大概率会遇到超时控制的场景&#xff0c;比如一个批量耗时任务、网络请求等&#xff1b;一个良好的超时控制可以有效的避免一些问题&#xff08;比如 goroutine 泄露、资源不释放等&#xff09;。 Timer 在 go 中实现超时控制的方法非常简单&#xff0c;…...

一文彻底搞清楚Spark Schema

前言 Spark Schema定义了DataFrame的结构,可以通过对DataFrame对象调用printSchema()方法来获得该结构。Spark SQL提供了StructType和StructField类以编程方式指定架构。 默认情况下,Spark从数据中推断schema,但有时我们可能需要定义自己的schema(列名和数据类型),尤其…...

Nginx多出口IP解决代理端口数量限制,CentOS安装Nginx并开启https2.0

Nginx多出口IP解决代理端口数量限制,CentOS安装Nginx并开启https2.0。 配置文件如下: http {...upstream test {server www.test.com;}server {listen 80 default_server;server_name _;location / {proxy_pass http://test;proxy_bind $split_ip...

SpringBoot项目(百度AI整合)——如何在Springboot中使用语音文件识别 ffmpeg的安装和使用

前言 前言&#xff1a;在实际使用中&#xff0c;经常要参考官方的案例&#xff0c;但有时候因为工具的不一样&#xff0c;比如idea 和 eclipse&#xff0c;普通项目和spring项目等的差别&#xff1b;还有时候因为水平有限&#xff0c;难以在散布于官方的各个文档读懂&#xff…...

探索古彝文AI识别技术:助力中国传统文化的传承与发扬

目录 ⭐️ 写在前面 ⭐️ 一、什么是古彝文 1.1 古彝文介绍 1.2 古彝文与其他古文字示例 1.3 古彝文的重要性 ⭐️二、AI识别技术的挑战与前景 2.1 挑战 2.2 前景 ⭐️三、合合信息AI识别技术 3.1 智能文字识别技术&#x1f44d;&#x1f44d; 3.2 古文识别应用 ⭐…...

mysql面试题2:说一说MySQL的架构设计?一条 MySQL 语句执行的步骤?

该文章专注于面试,面试只要回答关键点即可,不需要对框架有非常深入的回答,如果你想应付面试,是足够了,抓住关键点 面试官:说一说MySQL的架构设计? MySQL的架构设计主要包括以下几个组件: 连接器(Connector):负责与客户端建立连接,并进行身份验证和授权。 查询缓存…...

UPnP协议和SSDP协议

1、两种协议 UPnP协议&#xff1a;Universal Plug and Play&#xff0c;广义的即插即用。UPnP协议的目的&#xff1a;当有新设备连接上网络&#xff0c;网络上的其他设备能够马上知道有新设备加入&#xff0c;然后这些设备能互相宣传和发现彼此&#xff0c;以便能使用和控制彼…...

notepad++配置python2环境

&#xff08;1&#xff09;python2版本下载&#xff1a;Index of /ftp/python/2.7.8/https://www.python.org/ftp/python/2.7.8/ &#xff08;2&#xff09; 配置notepad环境 1.打开Notepad&#xff0c;点击“插件”-“插件管理器”&#xff0c;在“可用”选项卡中&#xff0c…...

在ThinkAdmin中弹出层关闭后回调

在thinkadmin里面&#xff0c;窗口的的一些方法全部都集成在admin.js里面&#xff0c;在之前的文章中也有出现过类似的问题&#xff0c;就是对动态加载的数据进行统计&#xff0c;那时候写也是想记录下&#xff0c;现在自己都不记得是哪个站用的了&#xff0c;所以在这里也把这…...

vue3 和vue2 的比较

文章目录 生命周期多根节点Composition API组合式APIOptions API与composition API对比优化逻辑组织优化逻辑复用 异步组件(Suspense)Suspense组件 响应式原理性能体积优化编译优化diff算法优化静态提升数据劫持&#xff08;响应式系统&#xff09;优化 生命周期 vue3在组合AP…...

算法通过村第八关-树(深度优先)黄金笔记|寻找祖先

文章目录 前言最近公共祖先问题总结 前言 提示&#xff1a;生活就是一场有很多规则&#xff0c;却没有裁判的比赛。 --约瑟夫布罗茨基《悲伤与理智》 最近公共祖先问题 参考题目地址&#xff1a;236. 二叉树的最近公共祖先 - 力扣&#xff08;LeetCode&#xff09; 如果将搜索…...

postgresql|数据库|数据库测试工具pgbench之使用

前言&#xff1a; 数据库是项目中的重要组件&#xff0c;也是一个基础的重要组件&#xff0c;其地位说是第一我想应该是没有什么太多问题的。 那么&#xff0c;数据库的设计这些方面是不用多说的&#xff0c;关键的第一步&#xff0c;主要是涉及数据库的部署方式&#xff0c;…...

代码随想录Day51 | 309.最佳买卖股票时机含冷冻期

309. 买卖股票的最佳时机含冷冻期 class Solution { public:int maxProfit(vector<int>& prices) {int n prices.size();if (n 0) return 0;vector<vector<int>> dp(n, vector<int>(4, 0));dp[0][0] - prices[0]; // 持股票for (int i 1; i &l…...

libopenssl 实现私钥加密公钥解密

在需要验证可信来源时&#xff0c;需要用到签名验签。因此&#xff0c;需要使用私钥加密&#xff0c;公钥解密&#xff0c;取得被加密的信息。这就会使用到私钥加密&#xff0c;公钥解密的场景了。 参考&#xff1a; https://github.com/openssl/openssl/issues/20493 https:/…...

代码随想录 Day - 51|#309 最佳买卖股票时机含冷冻期|#714 买卖股票的最佳时机含手续费

清单 ● 309.最佳买卖股票时机含冷冻期 ● 714.买卖股票的最佳时机含手续费 LeetCode #309 最佳买卖股票时机含冷冻期 1. 题目 给定一个整数数组&#xff0c;其中第 i 个元素代表了第 i 天的股票价格 。 设计一个算法计算出最大利润。在满足以下约束条件下&#xff0c;你可…...

.net 使用IL生成代理类实现AOP对比Java Spring Boot的AOP

首先&#xff0c;我们需要定义一个接口&#xff0c;代表我们要代理的目标对象的功能&#xff1a; // 日志记录器接口 public interface ILogger {/// <summary>/// 记录日志/// </summary>/// <param name"message">日志消息</param>void L…...

美容店预约小程序搭建流程

随着科技的不断发展&#xff0c;小程序已经成为了人们生活中不可或缺的一部分。对于美容店来说&#xff0c;搭建一个预约小程序不仅可以提高工作效率&#xff0c;还可以增加客户数量、提高服务质量。那么&#xff0c;如何搭建一个美容店预约小程序呢&#xff1f;本文将为你详细…...

网站开发职责/网站批量查询工具

http://acm.hdu.edu.cn/showproblem.php?pid4849 会有非常多奇怪的Wa的题。当初在西安就不知道为什么wa&#xff0c;昨晚做了&#xff0c;由于一些Sb错误也wa了非常久。这会儿怎么写都会AC。。。。 收获&#xff1a; 1、还是基本都构思好在去写程序&#xff0c;由于当时没过。…...

济南网站推广效果/新疆疫情最新情况

1、外链的对于优化发生了转移。&#xff08;1&#xff09;8月22日百度更新的外链识别算法。&#xff08;2&#xff09;百度开放了一款外链的识别工具。对于垃圾链接的识别 倒闭资源站点的价格提升第一个影响 提升了网站优化的门槛第二个影响 外链对优化转移 营销的成本提升…...

茂名建设企业网站/app接入广告变现

Android系统为设置界面的UI提供了一系列的接口&#xff0c;设置界面的部分和Activity是分离的&#xff0c;会有一个PreferenceScreen的对象是根目录&#xff0c;在其中会包含CheckBoxPreference EditTextPreference ListPreference PreferenceCategory RingtonePreference相关的…...

做互联网需要网站吗/优化人员配置

结合网上的资料&#xff0c;自己亲自的去安装了一次MySQL&#xff0c;安装版本是win7x64 5.7.16。在安装过程中出现并解决了如下问题:1.“MySQL 服务无法启动 服务没报告任何错误”2.启动MySQL服务的时候&#xff0c;提示“发生系统错误 2&#xff0c;系统找不到指定的文件”。…...

网站通栏是什么/网站开发北京公司

主要分为三个步骤 应用程序处理(简而言之就shader编码中可以处理的一切信息都属与应用程序处理阶段)>几何顶点处理(主要工作坐标的顶点变换)>光栅化处理(与像素处理相关操作) 关于渲染管线 原文链接 http://game.ceeger.com/forum/read.php?tid10970&fid2 正文所谓…...

做医疗网站建设/网络营销推广要求

init()方法中返回的this指向init的实例对象&#xff0c;而init.prototype等于jQuery.prototype&#xff0c;所以也是jQuery的实例对象&#xff1b; 返回this是为了实现链式调用...