当前位置: 首页 > news >正文

Flink时间窗口程序骨架结构

前言

Flink 作业的基本骨架结构包含三部分:创建执行环境、定义数据处理逻辑、提交并执行Flink作业。

日常大部分 Flink 作业是基于时间窗口计算模型的,同样的,开发一个Flink时间窗口作业也有一套基本的骨架结构,了解这套结构有助于我们更快地上手时间窗口作业开发。

窗口程序的基本骨架

一个Flink时间窗口作业的代码基本骨架如下所示:

stream.keyBy(...)               <-  仅 keyed 窗口需要.window(...)              <-  必填项:"assigner"[.trigger(...)]            <-  可选项:"trigger" (省略则使用默认 trigger)[.evictor(...)]            <-  可选项:"evictor" (省略则不使用 evictor)[.allowedLateness(...)]    <-  可选项:"lateness" (省略则为 0)[.sideOutputLateData(...)] <-  可选项:"output tag" (省略则不对迟到数据使用 side output).reduce/aggregate/apply() <-  必填项:"function"[.getSideOutput(...)]      <-  可选项:"output tag"

时间窗口作业对数据逻辑的处理,主要包含以下步骤:

  • 对数据流进行分组,将DataStream装换为KeyedStream
  • 指定窗口分配器 WindowAssigner,将数据划分到对应的窗口
  • 指定窗口触发器 Trigger,决定了窗口何时关闭并计算
  • 指定窗口移除器 Evictor,它可以在窗口计算前后对窗口内的数据进行移除
  • allowedLateness 允许迟到的数据,事件时间语义下,即使事件时钟到达窗口关闭时间,窗口仍会保留一段时间以等待迟到的数据
  • sideOutputLateData 针对窗口关闭后到达的迟到数据,可以将其输出到另外一条数据流,对计算结果做修正
  • ProcessFunction 窗口内数据的处理函数

时间窗口作业实战

了解了时间窗口作业的基本骨架,以及相关组件的作用,接下来就实战一把。

如下示例程序,数据源每秒会生成2个一百以内的随机数,然后数据经过 keyBy 算子分组,这里为了简单,数据全部划分为一组,KeySelector 统一返回 “all”。

分组后,窗口分配器将数据划分到对应的窗口。这里基于处理时间语义,统一分配10秒大小的时间窗口,时间窗口被Flink封装为 TimeWindow 对象,包含两个属性,分别是起始时间戳和结束时间戳。

一旦有数据进入窗口,Trigger#onElement 就会触发,返回值决定了Flink如何处理窗口。显然我们的逻辑是时间到达窗口的结束时间,窗口就会触发计算并关闭,所以我们会注册一个 ProcessingTime 事件,窗口结束时间一到,Trigger#onProcessingTime 就会触发,窗口就会开始计算。

窗口计算前,还需要经过移除器Evictor。它有两个方法,分别在窗口计算前和计算后调用,在这里你可以根据条件移除窗口内无须计算的数据。示例程序中,把小于10的数移除掉了。

最终,窗口内的数据会交给 ProcessWindowFunction 处理,窗口内的数据被Flink封装成迭代器Iterable,通过它可以获得所有窗口内的数据。示例程序 中,我们所有元素打印出来并求和。

public class TimeWindowStructure {public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.addSource(new SourceFunction<Long>() {@Overridepublic void run(SourceContext<Long> sourceContext) throws Exception {while (true) {Threads.sleep(500);sourceContext.collect(ThreadLocalRandom.current().nextLong(100));}}@Overridepublic void cancel() {}}).keyBy(i -> "all")// 窗口分配器.window(new WindowAssigner<Long, TimeWindow>() {static final long WINDOW_SIZE = 10_000L;@Overridepublic Collection<TimeWindow> assignWindows(Long event, long timestamp, WindowAssignerContext windowAssignerContext) {// 把数据分配到对应的窗口,一条数据甚至可以分配到多个窗口// 这里根据处理时间 分配10秒大小的窗口final long processingTime = windowAssignerContext.getCurrentProcessingTime();long start = processingTime / WINDOW_SIZE * WINDOW_SIZE;long end = start + WINDOW_SIZE;return List.of(new TimeWindow(start, end));}@Overridepublic Trigger<Long, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment streamExecutionEnvironment) {// 默认触发器,废弃了return null;}@Overridepublic TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {// 窗口序列化器return new TimeWindow.Serializer();}@Overridepublic boolean isEventTime() {// 是否基于事件时间语义return false;}})// 窗口触发器.trigger(new Trigger<Long, TimeWindow>() {private long max_register_processing_time = 0L;@Overridepublic TriggerResult onElement(Long element, long timestamp, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {// 每个元素进入窗口,都会触发该方法 返回结果决定了窗口是否计算或关闭// 我们是根据处理时间窗口结束时间来判断是否触发的,所以注册一个处理时间事件即可if (timeWindow.maxTimestamp() > max_register_processing_time) {max_register_processing_time = timeWindow.maxTimestamp();triggerContext.registerProcessingTimeTimer(max_register_processing_time);}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {// 窗口计算并清除数据return TriggerResult.FIRE_AND_PURGE;}@Overridepublic TriggerResult onEventTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {return null;}@Overridepublic void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {triggerContext.deleteProcessingTimeTimer(timeWindow.maxTimestamp());}})// 窗口移除器.evictor(new Evictor<Long, TimeWindow>() {@Overridepublic void evictBefore(Iterable<TimestampedValue<Long>> iterable, int i, TimeWindow timeWindow, EvictorContext evictorContext) {// 窗口计算前触发Iterator<TimestampedValue<Long>> iterator = iterable.iterator();while (iterator.hasNext()) {TimestampedValue<Long> next = iterator.next();Long value = next.getValue();if (value < 10) {iterator.remove();System.err.println("Evicted:" + value);}}}@Overridepublic void evictAfter(Iterable<TimestampedValue<Long>> iterable, int i, TimeWindow timeWindow, EvictorContext evictorContext) {// 窗口计算后触发}})// 因为是基于事件时间语义,不存在迟到数据,所以无须设置 allowedLateness、sideOutputLateData// 窗口处理函数.process(new ProcessWindowFunction<Long, String, String, TimeWindow>() {@Overridepublic void process(String group, ProcessWindowFunction<Long, String, String, TimeWindow>.Context context, Iterable<Long> iterable, Collector<String> collector) throws Exception {TimeWindow window = context.window();StringBuilder builder = new StringBuilder();builder.append("[" + window.getStart() + "-" + window.maxTimestamp() + "] elements:");Iterator<Long> iterator = iterable.iterator();Long sum = 0L;while (iterator.hasNext()) {Long value = iterator.next();sum += value;builder.append(value + " ");}builder.append(", sum:" + sum);System.err.println(builder.toString());}});environment.execute();}
}

运行Flink作业,控制台输出:

Evicted:3
Evicted:6
Evicted:1
[1722665800000-1722665809999] elements:89 17 16 57 94 47 67 98 , sum:485
Evicted:6
Evicted:4
[1722665810000-1722665819999] elements:86 50 71 95 36 10 55 43 96 36 28 87 89 50 53 35 63 95 , sum:1078
Evicted:4
Evicted:8
Evicted:0
[1722665820000-1722665829999] elements:85 20 42 86 46 20 32 45 91 59 57 64 31 67 78 71 28 , sum:922

尾巴

了解Flink时间窗口作业的基本骨架结构,理清Flink时间窗口的数据流转过程,有助于我们更快上手Flink时间窗口作业的开发。

Flink时间窗口作业包含的核心组件有:WindowAssigner、Window、Trigger、Evictor、ProcessWindowFunction。

相关文章:

Flink时间窗口程序骨架结构

前言 Flink 作业的基本骨架结构包含三部分&#xff1a;创建执行环境、定义数据处理逻辑、提交并执行Flink作业。 日常大部分 Flink 作业是基于时间窗口计算模型的&#xff0c;同样的&#xff0c;开发一个Flink时间窗口作业也有一套基本的骨架结构&#xff0c;了解这套结构有助…...

计算机视觉之可做什么

1、计算机视觉的应用 计算机视觉在我们生活中已经有了很广泛的应用&#xff0c;在我们可见、不可见&#xff1b;可感知、不可感知的地方&#xff0c;深深地影响了我们的生活、生产方式。 日常生活&#xff1a;美颜相机、火车站刷脸进站、线上办理业务的身份认证、自动驾驶等等…...

观察者模式的思考

观察者模式由来 观察者模式&#xff08;Observer Pattern&#xff09;是一种行为型设计模式&#xff0c;它的起源可以追溯到20世纪90年代初&#xff0c;由设计模式四人帮&#xff08;Erich Gamma, Richard Helm, Ralph Johnson 和 John Vlissides&#xff09;在其著作《设计模…...

ORACLE SELECT INTO 赋值为空,抛出 NO DATA FOUND 异常

例子&#xff1a; DECLARE ORDER_NUM VARCHAR2(20); BEGIN SELECT S.ORDER_NUM INTO ORDER_NUM FROM SALES_ORDER S WHERE S.ID122344; DBMS_OUTPUT.PUT_LINE(单号: || ORDER_NUM); END; 在查询结果为空的情况下&#xff0c;以上代码会报错&#xff1a;未找到任何数据 解决方…...

GPT提示词

参考 提示词大全&#xff1a; GPT提示词大全&#xff08;中英文双语&#xff09;持续更新 提示词.com...

Redis协议详解及其异步应用

目录 一、Redis Pipeline&#xff08;管道&#xff09;概述优点使用场景工作原理Pipeline 的基本操作步骤C 示例&#xff08;使用 [hiredis](https://github.com/redis/hiredis) 库&#xff09; 二、Redis 事务概述事务的前提事务特征&#xff08;ACID 分析&#xff09;WATCH 命…...

LeetCode213:打家劫舍II

题目链接&#xff1a;213. 打家劫舍 II - 力扣&#xff08;LeetCode&#xff09; 代码如下 class Solution { public:int rob(vector<int>& nums) {if(nums.size() 0) return 0;if(nums.size() 1) return nums[0];if(nums.size() 2) return max(nums[0…...

linux一二三章那些是重点呢

第一章 静态库动态库的区别 什么是库 库文件是计算机上的一类文件&#xff0c;可以简单的把库文件看成一种代码仓库&#xff0c;它提供给使用者一些可以直接 拿来用的变量、函数或类。 如何制作 静态动态库 静态库&#xff1a; GCC 进行链接时&#xff0c;会把静态库中代码打…...

C语言中的程序入口:超越main函数的探索

在C语言中&#xff0c;尽管main函数是标准程序的默认入口点&#xff0c;但借助编译器特性和链接器选项&#xff0c;我们可以指定其他函数作为程序的入口。GCC编译器通过-e选项&#xff0c;允许我们将任何符合签名的函数作为程序的入口。这一特性可以用于特定的实验需求、特定系…...

《面试之MQ篇》

《面试之MQ篇》 1. 为什么要使用MQ 首先,面试官问的第一个问题或者说是逼问的一个问题&#xff1a;“为什么要使用MQ”其实面试官问这个问题就是想考察你MQ的特性&#xff0c;这个时候呢&#xff0c;我们必须要答出三点&#xff1a;解耦、异步、削峰。 1. 解耦 1. 传统系统…...

Git 分支操作-开发规范

一、背景 在实际开发中&#xff0c;一般在主分支的基础上单独创建一个新的分支进行开发&#xff0c;最后合并到master分支&#xff0c;而不是直接在master分支进行开发。 二、新建分支 1、初始状态&#xff0c;local为本地分支&#xff0c;remote为远程分支 2、单击 “Remot…...

JSONArray根据指定字段去重

JSONArray dataList new JSONArray();这儿省略dataList 加数据的过程 dataList new JSONArray(dataList.stream().distinct().collect(Collectors.toList())); Set<String> timestamps new HashSet<>();根据时间字段去重 dataList dataList.stream().map(obj -…...

线程有哪几种状态? 分别说明从一种状态到另一种状态转变有哪些方式?

在 Java 中&#xff0c;线程的生命周期管理通过不同的状态来跟踪。一个线程在其生命周期中可以处于多种状态&#xff0c;不同的状态之间会通过特定的事件发生转变。以下是 Java 线程的几种状态及其之间的转移方式&#xff1a; 1. 线程的状态 1.1 NEW&#xff08;新建状态&…...

自注意力机制self-attention中的KV 缓存

在自注意力机制中&#xff0c;KV 缓存&#xff08;Key-Value Caching&#xff09;主要用于加速模型在推理阶段的计算&#xff0c;尤其是在处理长序列或者生成任务&#xff08;如文本生成&#xff09;时&#xff0c;这种缓存机制可以显著提高效率。 1. KV 缓存的背景 在 Trans…...

前端库--nanoid(轻量级的uuid)

文章目录 定义&#xff1a;生成方式&#xff1a;现实使用:NanoID 只有 108 个字节那么大NanoID更安全NanoID它既快速又紧凑 使用步骤1.安装nanoid包2.引入使用3.使用4.自定义字母 定义&#xff1a; UUID 是 通用唯一识别码&#xff08;Universally Unique Identifier&#xff…...

计算机基础-什么是网络端口?

网络端口可以想象成一个大型公寓楼的邮箱。每个公寓楼&#xff08;这里指的是一个计算机或服务器&#xff09;有很多个邮箱&#xff08;即网络端口&#xff09;&#xff0c;每个邮箱都有一个独一无二的编号&#xff08;端口号&#xff09;。当一封信&#xff08;网络数据包&…...

力扣动态规划基础版(斐波那契类型)

70. 爬楼梯https://leetcode.cn/problems/climbing-stairs/ 70.爬楼梯 方法一 动态规划 考虑转移方程和边界条件&#xff1a; f&#xff08;x&#xff09; f&#xff08;x -1&#xff09; f&#xff08;x - 2&#xff09;;f&#xff08;1&#xff09; 1&#xff1b;f&…...

Java重修笔记 InetAddress 类和 Socket 类

InetAddress 类相关方法 1. 获取本机 InetAddress 对象&#xff1a;getLocalHost public static InetAddress getLocalHost() throws UnknownHostException 返回值&#xff1a;本地主机的名字和地址 异常&#xff1a;UnknownHostException - 如果本地主机名无法解析成地址 2…...

秋招突击——8/6——万得数据面试总结

文章目录 引言正文面经整理一1、讲一下java的多态&#xff0c;重载&#xff0c;重写的概念&#xff0c;区别2、说一下Java的数组&#xff0c;链表的结构&#xff0c;优缺点3、创建java线程的方式有哪些&#xff0c;具体说说4、创建线程池呢、每个参数的意义5、通过那几种方式保…...

STM32定时器

目录 STM32定时器概述 STM32基本定时器 基本定时器的功能 STM32基本定时器的寄存器 STM32通用定时器 STM32定时器HAL库函数 STM32定时器概述 从本质上讲定时器就是“数字电路”课程中学过的计数器&#xff08;Counter&#xff09;&#xff0c;它像“闹钟”一样忠实地为处…...

uniapp 对接腾讯云IM群组成员管理(增删改查)

UniApp 实战&#xff1a;腾讯云IM群组成员管理&#xff08;增删改查&#xff09; 一、前言 在社交类App开发中&#xff0c;群组成员管理是核心功能之一。本文将基于UniApp框架&#xff0c;结合腾讯云IM SDK&#xff0c;详细讲解如何实现群组成员的增删改查全流程。 权限校验…...

挑战杯推荐项目

“人工智能”创意赛 - 智能艺术创作助手&#xff1a;借助大模型技术&#xff0c;开发能根据用户输入的主题、风格等要求&#xff0c;生成绘画、音乐、文学作品等多种形式艺术创作灵感或初稿的应用&#xff0c;帮助艺术家和创意爱好者激发创意、提高创作效率。 ​ - 个性化梦境…...

rknn优化教程(二)

文章目录 1. 前述2. 三方库的封装2.1 xrepo中的库2.2 xrepo之外的库2.2.1 opencv2.2.2 rknnrt2.2.3 spdlog 3. rknn_engine库 1. 前述 OK&#xff0c;开始写第二篇的内容了。这篇博客主要能写一下&#xff1a; 如何给一些三方库按照xmake方式进行封装&#xff0c;供调用如何按…...

Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决

Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决 问题背景 在一个基于 Spring Cloud Gateway WebFlux 构建的微服务项目中&#xff0c;新增了一个本地验证码接口 /code&#xff0c;使用函数式路由&#xff08;RouterFunction&#xff09;和 Hutool 的 Circle…...

CSS设置元素的宽度根据其内容自动调整

width: fit-content 是 CSS 中的一个属性值&#xff0c;用于设置元素的宽度根据其内容自动调整&#xff0c;确保宽度刚好容纳内容而不会超出。 效果对比 默认情况&#xff08;width: auto&#xff09;&#xff1a; 块级元素&#xff08;如 <div>&#xff09;会占满父容器…...

08. C#入门系列【类的基本概念】:开启编程世界的奇妙冒险

C#入门系列【类的基本概念】&#xff1a;开启编程世界的奇妙冒险 嘿&#xff0c;各位编程小白探险家&#xff01;欢迎来到 C# 的奇幻大陆&#xff01;今天咱们要深入探索这片大陆上至关重要的 “建筑”—— 类&#xff01;别害怕&#xff0c;跟着我&#xff0c;保准让你轻松搞…...

NPOI操作EXCEL文件 ——CAD C# 二次开发

缺点:dll.版本容易加载错误。CAD加载插件时&#xff0c;没有加载所有类库。插件运行过程中用到某个类库&#xff0c;会从CAD的安装目录找&#xff0c;找不到就报错了。 【方案2】让CAD在加载过程中把类库加载到内存 【方案3】是发现缺少了哪个库&#xff0c;就用插件程序加载进…...

Bean 作用域有哪些?如何答出技术深度?

导语&#xff1a; Spring 面试绕不开 Bean 的作用域问题&#xff0c;这是面试官考察候选人对 Spring 框架理解深度的常见方式。本文将围绕“Spring 中的 Bean 作用域”展开&#xff0c;结合典型面试题及实战场景&#xff0c;帮你厘清重点&#xff0c;打破模板式回答&#xff0c…...

Web后端基础(基础知识)

BS架构&#xff1a;Browser/Server&#xff0c;浏览器/服务器架构模式。客户端只需要浏览器&#xff0c;应用程序的逻辑和数据都存储在服务端。 优点&#xff1a;维护方便缺点&#xff1a;体验一般 CS架构&#xff1a;Client/Server&#xff0c;客户端/服务器架构模式。需要单独…...

什么是VR全景技术

VR全景技术&#xff0c;全称为虚拟现实全景技术&#xff0c;是通过计算机图像模拟生成三维空间中的虚拟世界&#xff0c;使用户能够在该虚拟世界中进行全方位、无死角的观察和交互的技术。VR全景技术模拟人在真实空间中的视觉体验&#xff0c;结合图文、3D、音视频等多媒体元素…...