当前位置: 首页 > news >正文

Flink触发器Trigger

前言

在 Flink 窗口计算模型中,数据先经过 WindowAssigner 分配窗口,然后再经过触发器 Trigger,Trigger 决定了一个窗口何时被 ProcessFunction 处理。每个 WindowAssigner 都有一个默认的 Trigger,如果默认的不满足需求,可以通过 WindowedStream.trigger() 指定自定义的 Trigger。

认识Trigger

所有触发器都是org.apache.flink.streaming.api.windowing.triggers.Trigger的子类,父类定义了一个触发器应该具备的能力:

@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {private static final long serialVersionUID = -4104633972991191369L;public Trigger() {}public abstract TriggerResult onElement(T var1, long var2, W var4, TriggerContext var5) throws Exception;public abstract TriggerResult onProcessingTime(long var1, W var3, TriggerContext var4) throws Exception;public abstract TriggerResult onEventTime(long var1, W var3, TriggerContext var4) throws Exception;public boolean canMerge() {return false;}public void onMerge(W window, OnMergeContext ctx) throws Exception {throw new UnsupportedOperationException("This trigger does not support merging.");}public abstract void clear(W var1, TriggerContext var2) throws Exception;
}

Trigger 抽象类提供了六个方法:

  • onElement 元素被加入到窗口时触发,返回值决定窗口是否计算
  • onProcessingTime 注册的ProcessingTime任务时间到达时触发
  • onEventTime 注册的EventTime任务时间到达时触发
  • canMerge 是否可以合并
  • onMerge Trigger合并
  • clear 窗口被移除时触发

如果数据本身携带窗口是否触发计算的标记,那么重写 onElement() 方法即可;但是在时间窗口计算模型下,并不是通过元素来判断窗口是否需要计算的,而是窗口的结束时间到达时才出发计算,这个时候就需要用到定时器 Timer。Timer 就像一个闹钟,我们可以在它上面注册一个未来的时间戳,当这个时间到达时,对应的事件就会被触发,就像闹钟喊醒沉睡的你一样。

Trigger 会有自己的 Timer,TriggerContext 提供了注册时间事件的方法,你可以根据自己采用的时间语义,调用对应的注册方法来注册事件。

triggerContext.registerProcessingTimeTimer();
triggerContext.registerEventTimeTimer();

在重写这三个方法时,要重点关注方法的返回值。方法的返回值有两个作用:1、窗口内的数据是否可以计算,2、窗口内的数据是否需要清理。

public enum TriggerResult {CONTINUE(false, false),FIRE_AND_PURGE(true, true),FIRE(true, false),PURGE(false, true);private final boolean fire;private final boolean purge;
}

TriggerResult 枚举有四个值,含义分别是:

  • CONTINUE 不做任何操作
  • FIRE 触发窗口计算,但是数据仍然保留
  • PURGE 不触发计算,只是清理窗口内数据
  • FIRE_AND_PURGE 触发窗口计算,同时清理数据

内置的Trigger

Flink 内置了许多常用的 Trigger,大多数情况下它们足以支撑我们的业务场景,只有当内置的Trigger不符合要求时,才需要开发自定义的 Trigger。

1、EventTimeTrigger

和事件时间窗口搭配使用的 Trigger,直到 Watermark 时间戳等于窗口的结束时间才会触发计算。

onElement 的逻辑是:当有元素加入到窗口时,先判断当前 Watermark 时间戳是否到达窗口的结束时间,如果到了就直接触发计算,否则注册一个时间事件,等待 Timer 触发窗口计算。

public class EventTimeTrigger extends Trigger<Object, TimeWindow> {private static final long serialVersionUID = 1L;private EventTimeTrigger() {}public TriggerResult onElement(Object element, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {return TriggerResult.FIRE;} else {ctx.registerEventTimeTimer(window.maxTimestamp());return TriggerResult.CONTINUE;}}public TriggerResult onEventTime(long time, TimeWindow window, Trigger.TriggerContext ctx) {return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;}。。。。。。
}

2、ProcessingTimeTrigger

和 EventTimeTrigger 差不多,区别是采用的处理时间语义,没有 Watermark 相关的判断,直接注册ProcessingTime 事件等待窗口触发计算。

public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {private static final long serialVersionUID = 1L;private ProcessingTimeTrigger() {}public TriggerResult onElement(Object element, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) {ctx.registerProcessingTimeTimer(window.maxTimestamp());return TriggerResult.CONTINUE;}public TriggerResult onProcessingTime(long time, TimeWindow window, Trigger.TriggerContext ctx) {return TriggerResult.FIRE;}
}

3、DeltaTrigger

DeltaTrigger 会计算新加入窗口的元素和上一个元素的差值,当这个差值超过给定的阈值时,窗口就会触发计算,元素之间的差值是通过指定的 DeltaFunction 计算出来的。

public class DeltaTrigger<T, W extends Window> extends Trigger<T, W> {private static final long serialVersionUID = 1L;private final DeltaFunction<T> deltaFunction;private final double threshold;private final ValueStateDescriptor<T> stateDesc;private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) {this.deltaFunction = deltaFunction;this.threshold = threshold;this.stateDesc = new ValueStateDescriptor("last-element", stateSerializer);}public TriggerResult onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {ValueState<T> lastElementState = (ValueState)ctx.getPartitionedState(this.stateDesc);if (lastElementState.value() == null) {lastElementState.update(element);return TriggerResult.CONTINUE;} else if (this.deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) {lastElementState.update(element);return TriggerResult.FIRE;} else {return TriggerResult.CONTINUE;}}
}

自定义Trigger

通过子类继承 Trigger 重写相应的方法,即可自定义我们自己的触发器。

举个例子,我们自定义一个和时间不相关的 Trigger,我们等窗口积攒到一定数量的元素再出发计算。如下示例程序:

public static class CounterTrigger extends Trigger<Long, GlobalWindow> {private final int count;public CounterTrigger(int count) {this.count = count;}@Overridepublic TriggerResult onElement(Long element, long timestamp, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {// 通过Flink state来保存窗口内积攒的元素数量ValueState<Integer> countState = triggerContext.getPartitionedState(new ValueStateDescriptor<Integer>("count", Integer.class));int elementCount = Optional.ofNullable(countState.value()).orElse(0) + 1;if (elementCount >= this.count) {countState.update(0);return TriggerResult.FIRE_AND_PURGE;}countState.update(elementCount);return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long timestamp, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {return null;}@Overridepublic TriggerResult onEventTime(long timestamp, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {return null;}@Overridepublic void clear(GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {}}

接下来验证一下我们的Trigger是否生效。

我们编写一个Flink作业,数据源每秒生成10个随机数,数据会被统一划分到 GlobalWindow 窗口,然后指定我们自定义的 CounterTrigger,等窗口内积攒了十条数据就出发求和计算。

public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.addSource(new SourceFunction<Long>() {@Overridepublic void run(SourceContext<Long> sourceContext) throws Exception {while (true) {Threads.sleep(100);sourceContext.collect(ThreadLocalRandom.current().nextLong(100));}}@Overridepublic void cancel() {}}).windowAll(GlobalWindows.create()).trigger(new CounterTrigger(10)).process(new ProcessAllWindowFunction<Long, Object, GlobalWindow>() {@Overridepublic void process(ProcessAllWindowFunction<Long, Object, GlobalWindow>.Context context, Iterable<Long> iterable, Collector<Object> collector) throws Exception {Iterator<Long> iterator = iterable.iterator();long sum = 0L;while (iterator.hasNext()) {sum += iterator.next();}System.err.println(sum);}});environment.execute();
}

运行Flink作业,控制台每隔一段时间就会输出随机数之和。

尾巴

在 Flink 中,Trigger 决定了何时触发窗口的计算和输出结果。通过灵活配置 Trigger 规则,能够精确控制数据处理的时机,适应不同的业务需求和数据特点。

Trigger 能够处理各种复杂的情况,例如在特定条件满足时触发,或者基于时间间隔、数据量等因素进行触发。它为开发者提供了精细的控制手段,确保数据处理的准确性和及时性。 合理运用 Trigger 可以优化 Flink 作业的性能,避免不必要的计算和资源消耗。

相关文章:

Flink触发器Trigger

前言 在 Flink 窗口计算模型中&#xff0c;数据先经过 WindowAssigner 分配窗口&#xff0c;然后再经过触发器 Trigger&#xff0c;Trigger 决定了一个窗口何时被 ProcessFunction 处理。每个 WindowAssigner 都有一个默认的 Trigger&#xff0c;如果默认的不满足需求&#xf…...

【操作系统的使用】Linux 系统环境变量与服务管理:设置与控制的艺术

文章目录 系统环境变量与服务管理&#xff1a;设置与控制的艺术一、系统环境变量的设置1.1 临时设置环境变量1.2 永久设置环境变量 二、服务启动类型的设置2.1 查看服务状态2.2 启动和停止服务2.3 设置服务的启动类型2.3.1 设置服务在启动时运行2.3.2 禁用服务在启动时运行2.3.…...

速盾:高防cdn配置中性能优化是什么?

高防CDN配置中的性能优化是指通过调整CDN配置以提升网站的加载速度、响应时间和用户体验。在进行性能优化时&#xff0c;需要考虑多个因素&#xff0c;包括CDN节点的选择和布置、缓存策略、缓存过期时间、预取和预加载、并发连接数和网络延迟等。 首先&#xff0c;CDN节点的选…...

Qt_软件添加版本信息

文章内容: 给生成的软件添加软件的版权等信息 #include <windows.h> //中文的话增加下面这一行 #pragma code_page(65001)VS_VERSION_INFO VERSIONINFO...

mallocfree和newdelete的区别

malloc\free和new\delete的区别 malloc/free new/delete 身份&#xff1a; 函数 运算符\关键字 返回值&#xff1a; void* 带类型的指针 参数&#xff1a; 字节个数(手动计算) 类型 自动计算字节数 处理数组&#xff1a; 手动计算数组总字节数 new 类型[数量] 扩容&#xff1…...

无锁队列实现(Michael Scott),伪代码与c++实现

一、Michael & Scoot 原版伪代码实现 structure pointer_t {ptr: pointer to node_t, count: unsigned integer}structure node_t {value: data type, next: pointer_t}structure queue_t {Head: pointer_t, Tail: pointer_t}initialize(Q: pointer to queue_t)node new_…...

猜数字小游戏

前言 猜数字游戏是一款经典且简单的互动游戏&#xff0c;常常用于提高逻辑思维能力和锻炼数学技巧。本文将深入探讨一段用 JavaScript 编写的猜数字游戏代码&#xff0c;帮助读者理解游戏的基本逻辑和实现方法。这段代码不仅适合初学者练习编程技巧&#xff0c;也是了解用户交…...

在Windows上搭建ChatTTS:从本地部署到远程AI音频生成全攻略

文章目录 前言1. 下载运行ChatTTS模型2. 安装Cpolar工具3. 实现公网访问4. 配置ChatTTS固定公网地址 前言 本篇文章主要介绍如何快速地在Windows系统电脑中本地部署ChatTTS开源文本转语音项目&#xff0c;并且我们还可以结合Cpolar内网穿透工具创建公网地址&#xff0c;随时随…...

如何用好 CloudFlare 的速率限制防御攻击

最近也不知道咋回事儿,群里好多站长都反映被CC 攻击了。有人说依旧是 PCDN 干的,但明月感觉不像,因为有几个站长被 CC 攻击都是各种动态请求(这里的动态请求指的是.php 文件的请求)。经常被攻击的站长们都知道,WordPress /Typecho 这类动态博客系统最怕的就是这种动态请求…...

Unity3D 立方体纹理与自制天空盒详解

立方体纹理和自制天空盒是游戏开发中常用的技术之一&#xff0c;可以为游戏增添更加丰富的视觉效果。在本文中&#xff0c;我们将详细介绍Unity3D中立方体纹理和自制天空盒的使用方法&#xff0c;并给出相应的代码实现。 对惹&#xff0c;这里有一个游戏开发交流小组&#xff…...

【工具】VSCODE下载,配置初次设置

打开 settings.json 文件&#xff0c;包含了 Visual Studio Code (VSCode) 中的各种用户配置。 {"files.associations": {"*.vue": "vue","*.wpy": "vue","*.wxml": "html","*.wxss": "…...

vue使用jquery的ajax,页面跳转

一、引入jquery依赖 打开终端更新npm npm install -g npm 更新完后引入输入npm install jquery 加载完后 在最外层的package.json文件中加入以下代码 配置好后导入jquery 设置变量用于接收服务器传输的数据 定义ajax申请数据 服务器的Controller层传输数据 &#xff08;…...

基于微信小程序的社区二手交易系统的详细设计和实现(源码+lw+部署文档+讲解等)

项目运行截图 技术框架 后端采用SpringBoot框架 Spring Boot 是一个用于快速开发基于 Spring 框架的应用程序的开源框架。它采用约定大于配置的理念&#xff0c;提供了一套默认的配置&#xff0c;让开发者可以更专注于业务逻辑而不是配置文件。Spring Boot 通过自动化配置和约…...

D34【python 接口自动化学习】- python基础之输入输出与文件操作

day34 文件关闭 学习日期&#xff1a;20241011 学习目标&#xff1a;输入输出与文件操作&#xfe63;-46 常见常新&#xff1a;文件的关闭 学习笔记&#xff1a; 文件关闭的内部工作过程 close&#xff08;&#xff09;函数 with语句 常用的打开关闭文件 # 文件关闭 # 方式…...

【Linux系列】set -euo pipefail 命令详解

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…...

【Python爬虫实战】正则:中文匹配与贪婪非贪婪模式详解

&#x1f308;个人主页&#xff1a;https://blog.csdn.net/2401_86688088?typeblog &#x1f525; 系列专栏&#xff1a;https://blog.csdn.net/2401_86688088/category_12797772.html 目录 前言 一、匹配中文 &#xff08;一&#xff09;匹配单个中文字符 &#xff08;二…...

保护数据安全:JS前端加密与PHP后端解密实战教程,让敏感信息更安全

保护数据安全&#xff1a;JS前端加密与PHP后端解密实战教程&#xff0c;让敏感信息更安全 在Web开发中&#xff0c;确保用户提交的敏感信息&#xff08;如密码、手机号码等&#xff09;的安全性是非常重要的。一种常见的做法是使用加密技术来保护这些数据&#xff0c;在传输过…...

72 分布式锁

72 分布式锁 什么是分布式锁 分布式锁 分布式 锁。那么分布式是指的什么呢&#xff1f;锁又是锁的谁呢&#xff1f;在业务开发中我们经常会听到分布式分布式的概念&#xff0c;分布式也很简单&#xff0c;通俗的来说就是你具有多个服务器&#xff0c;每个服务器上运行的程序…...

使用Windbg分析dump文件排查C++软件异常的一般步骤与要点分享

目录 1、概述 2、打开dump文件&#xff0c;查看发生异常的异常类型码 3、查看发生异常的那条汇编指令 3.1、汇编代码能最直接、最本真的反映出崩溃的原因 3.2、汇编指令中访问64KB小地址内存区&#xff0c;可能是访问了空指针 3.3、汇编指令中访问了很大的内核态的内存地…...

30 天 Python 3 学习计划

30 天 Python 3 学习计划 https://www.runoob.com/python3/python3-tutorial.html 1. Python3 基础语法 2. Python3 基本数据类型 3. Python3 数据类型转换 4. Python3 解释器 5. Python3 注释 6. Python3 运算符 7. Python3 数字(Number) 8. Python3 字符串 …...

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)

题目&#xff1a;3442. 奇偶频次间的最大差值 I 思路 &#xff1a;哈希&#xff0c;时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况&#xff0c;哈希表这里用数组即可实现。 C版本&#xff1a; class Solution { public:int maxDifference(string s) {int a[26]…...

React 第五十五节 Router 中 useAsyncError的使用详解

前言 useAsyncError 是 React Router v6.4 引入的一个钩子&#xff0c;用于处理异步操作&#xff08;如数据加载&#xff09;中的错误。下面我将详细解释其用途并提供代码示例。 一、useAsyncError 用途 处理异步错误&#xff1a;捕获在 loader 或 action 中发生的异步错误替…...

【OSG学习笔记】Day 18: 碰撞检测与物理交互

物理引擎&#xff08;Physics Engine&#xff09; 物理引擎 是一种通过计算机模拟物理规律&#xff08;如力学、碰撞、重力、流体动力学等&#xff09;的软件工具或库。 它的核心目标是在虚拟环境中逼真地模拟物体的运动和交互&#xff0c;广泛应用于 游戏开发、动画制作、虚…...

srs linux

下载编译运行 git clone https:///ossrs/srs.git ./configure --h265on make 编译完成后即可启动SRS # 启动 ./objs/srs -c conf/srs.conf # 查看日志 tail -n 30 -f ./objs/srs.log 开放端口 默认RTMP接收推流端口是1935&#xff0c;SRS管理页面端口是8080&#xff0c;可…...

JAVA后端开发——多租户

数据隔离是多租户系统中的核心概念&#xff0c;确保一个租户&#xff08;在这个系统中可能是一个公司或一个独立的客户&#xff09;的数据对其他租户是不可见的。在 RuoYi 框架&#xff08;您当前项目所使用的基础框架&#xff09;中&#xff0c;这通常是通过在数据表中增加一个…...

MySQL 知识小结(一)

一、my.cnf配置详解 我们知道安装MySQL有两种方式来安装咱们的MySQL数据库&#xff0c;分别是二进制安装编译数据库或者使用三方yum来进行安装,第三方yum的安装相对于二进制压缩包的安装更快捷&#xff0c;但是文件存放起来数据比较冗余&#xff0c;用二进制能够更好管理咱们M…...

【JVM】Java虚拟机(二)——垃圾回收

目录 一、如何判断对象可以回收 &#xff08;一&#xff09;引用计数法 &#xff08;二&#xff09;可达性分析算法 二、垃圾回收算法 &#xff08;一&#xff09;标记清除 &#xff08;二&#xff09;标记整理 &#xff08;三&#xff09;复制 &#xff08;四&#xff…...

Web中间件--tomcat学习

Web中间件–tomcat Java虚拟机详解 什么是JAVA虚拟机 Java虚拟机是一个抽象的计算机&#xff0c;它可以执行Java字节码。Java虚拟机是Java平台的一部分&#xff0c;Java平台由Java语言、Java API和Java虚拟机组成。Java虚拟机的主要作用是将Java字节码转换为机器代码&#x…...

STM32---外部32.768K晶振(LSE)无法起振问题

晶振是否起振主要就检查两个1、晶振与MCU是否兼容&#xff1b;2、晶振的负载电容是否匹配 目录 一、判断晶振与MCU是否兼容 二、判断负载电容是否匹配 1. 晶振负载电容&#xff08;CL&#xff09;与匹配电容&#xff08;CL1、CL2&#xff09;的关系 2. 如何选择 CL1 和 CL…...

Ubuntu Cursor升级成v1.0

0. 当前版本低 使用当前 Cursor v0.50时 GitHub Copilot Chat 打不开&#xff0c;快捷键也不好用&#xff0c;当看到 Cursor 升级后&#xff0c;还是蛮高兴的 1. 下载 Cursor 下载地址&#xff1a;https://www.cursor.com/cn/downloads 点击下载 Linux (x64) &#xff0c;…...