从ContinuousEventTimeTrigger/ContinuousProcessingTimeTrigger代码看如何实现一个自定义的触发器
背景
当我们想要实现提前触发计算的触发器时,我们可以使用ContinuousEventTimeTrigger/ContinuousProcessingTimeTrigger作为触发器达到比如几分钟触发一次计算并发送计算结果的类,我们本文就从代码角度解析下实现自定义触发器的一些注意事项
ContinuousEventTimeTrigger源码解析
@PublicEvolving
public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object, W> {private static final long serialVersionUID = 1L;private final long interval;/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */private final ReducingStateDescriptor<Long> stateDesc =new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);private ContinuousEventTimeTrigger(long interval) {this.interval = interval;}@Overridepublic TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx)throws Exception {if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {// if the watermark is already past the window fire immediately// 这里只需要fire触发计算,为什么不是FIRE_AND_PURGE?return TriggerResult.FIRE;} else {// 这里注册一个结束窗口的计时器是否必要?ctx.registerEventTimeTimer(window.maxTimestamp());}ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);if (fireTimestamp.get() == null) {long start = timestamp - (timestamp % interval);long nextFireTimestamp = start + interval;ctx.registerEventTimeTimer(nextFireTimestamp);fireTimestamp.add(nextFireTimestamp);}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {//为什么是FIRE而不是FIRE_AND_PURGEif (time == window.maxTimestamp()) {return TriggerResult.FIRE;}ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);Long fireTimestamp = fireTimestampState.get();if (fireTimestamp != null && fireTimestamp == time) {fireTimestampState.clear();fireTimestampState.add(time + interval);ctx.registerEventTimeTimer(time + interval);return TriggerResult.FIRE;}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)throws Exception {return TriggerResult.CONTINUE;}@Overridepublic void clear(W window, TriggerContext ctx) throws Exception {// 清除计时器ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);Long timestamp = fireTimestamp.get();if (timestamp != null) {ctx.deleteEventTimeTimer(timestamp);fireTimestamp.clear();}}@Overridepublic boolean canMerge() {return true;}@Overridepublic void onMerge(W window, OnMergeContext ctx) throws Exception {ctx.mergePartitionedState(stateDesc);Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get();if (nextFireTimestamp != null) {ctx.registerEventTimeTimer(nextFireTimestamp);}}@Overridepublic String toString() {return "ContinuousEventTimeTrigger(" + interval + ")";}@VisibleForTestingpublic long getInterval() {return interval;}/*** Creates a trigger that continuously fires based on the given interval.** @param interval The time interval at which to fire.* @param <W> The type of {@link Window Windows} on which this trigger can operate.*/public static <W extends Window> ContinuousEventTimeTrigger<W> of(Time interval) {return new ContinuousEventTimeTrigger<>(interval.toMilliseconds());}private static class Min implements ReduceFunction<Long> {private static final long serialVersionUID = 1L;@Overridepublic Long reduce(Long value1, Long value2) throws Exception {return Math.min(value1, value2);}}
}
ContinuousProcessingTimeTrigger源码
@PublicEvolving
public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<Object, W> {private static final long serialVersionUID = 1L;private final long interval;/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */private final ReducingStateDescriptor<Long> stateDesc =new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);private ContinuousProcessingTimeTrigger(long interval) {this.interval = interval;}@Overridepublic TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx)throws Exception {ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);timestamp = ctx.getCurrentProcessingTime();// 注册计时器,为什么这里不需要类似ContinuousEventTimeTrigger一样注册一个窗口结束时间的计时器?if (fireTimestamp.get() == null) {long start = timestamp - (timestamp % interval);long nextFireTimestamp = start + interval;ctx.registerProcessingTimeTimer(nextFireTimestamp);fireTimestamp.add(nextFireTimestamp);return TriggerResult.CONTINUE;}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)throws Exception {ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);if (fireTimestamp.get().equals(time)) {fireTimestamp.clear();fireTimestamp.add(time + interval);ctx.registerProcessingTimeTimer(time + interval);return TriggerResult.FIRE;}//为什么这里没有FIRE_AND_PURGE?状态是何时清理的?return TriggerResult.CONTINUE;}@Overridepublic void clear(W window, TriggerContext ctx) throws Exception {//清除计时器// State could be merged into new window.ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);Long timestamp = fireTimestamp.get();if (timestamp != null) {ctx.deleteProcessingTimeTimer(timestamp);fireTimestamp.clear();}}@Overridepublic boolean canMerge() {return true;}@Overridepublic void onMerge(W window, OnMergeContext ctx) throws Exception {// States for old windows will lose after the call.ctx.mergePartitionedState(stateDesc);// Register timer for this new window.Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get();if (nextFireTimestamp != null) {ctx.registerProcessingTimeTimer(nextFireTimestamp);}}@VisibleForTestingpublic long getInterval() {return interval;}@Overridepublic String toString() {return "ContinuousProcessingTimeTrigger(" + interval + ")";}/*** Creates a trigger that continuously fires based on the given interval.** @param interval The time interval at which to fire.* @param <W> The type of {@link Window Windows} on which this trigger can operate.*/public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(Time interval) {return new ContinuousProcessingTimeTrigger<>(interval.toMilliseconds());}private static class Min implements ReduceFunction<Long> {private static final long serialVersionUID = 1L;@Overridepublic Long reduce(Long value1, Long value2) throws Exception {return Math.min(value1, value2);}}
}
疑问:
1.为什么ContinuousEventTimeTrigger需要注册一个窗口结束时间的计时器而ContinuousProcessingTimeTrigger不注册?
答案: 其实我们需要看下它注册后的目的作用是什么,ContinuousEventTimeTrigger的ontimer在处理窗口结束的触发器时会返回FIRE触发计算,那问题就来了,如果只是触发计算,那么如果没有,那么仅仅只是窗口结束的时候没有触发一次计算而已。所以这里不是必须的
2.为什么ContinuousEventTimeTrigger/ContinuousProcessingTimeTrigger返回的结果是FIRE而不是FIRE_AND_PURGE?状态是什么时候清理的?
答案: 首先要明确状态的清理这个逻辑,状态其实包括窗口的状态,触发器的状态等,返回FIRE仅仅是触发计算,而不会清理任何状态,而假设返回FIRE_AND_PURGE的作用是触发计算,并进行窗口状态的清理(注意这里是不包括触发器的状态的清理的),其实状态的清理是由WindowOperator在清理时间到时进行的(对于事件时间是窗口结束时间+迟到容忍间隔时间,对于处理时间是窗口结束时间),所以不必要在窗口结束时间到的时候返回FIRE_AND_PURGE,可以统一由WindowOperator在清理时间到之后统一清理状态
相关文章:
从ContinuousEventTimeTrigger/ContinuousProcessingTimeTrigger代码看如何实现一个自定义的触发器
背景 当我们想要实现提前触发计算的触发器时,我们可以使用ContinuousEventTimeTrigger/ContinuousProcessingTimeTrigger作为触发器达到比如几分钟触发一次计算并发送计算结果的类,我们本文就从代码角度解析下实现自定义触发器的一些注意事项 Continuo…...

Linux 5种网络模型
[参考]:《黑马程序员Redis》https://www.bilibili.com/video/BV1cr4y1671t/?p166&share_sourcecopy_web&vd_source9e65300ccca322aeb367bb1eb677b0fc [参考]:《操作系统》 [参考]:《UNIX网络编程》 为了避免用户应用导致冲突甚至内…...

10.1 调试事件读取寄存器
当读者需要获取到特定进程内的寄存器信息时,则需要在上述代码中进行完善,首先需要编写CREATE_PROCESS_DEBUG_EVENT事件,程序被首次加载进入内存时会被触发此事件,在该事件内首先我们通过lpStartAddress属性获取到当前程序的入口地…...

Linux系统常用指令篇---(一)
Linux系统常用指令篇—(一) 1.cd指令 Linux系统中,磁盘上的文件和目录被组成一棵目录树,每个节点都是目录或文件。 语法:cd 目录名 功能:改变工作目录。将当前工作目录改变到指定的目录下。 (简单理解为进入指定目录下) 举例: cd .. : 返…...

【初识Linux】:常见指令(1)
朋友们、伙计们,我们又见面了,本期来给大家解读一下有关Linux的基础知识点,如果看完之后对你有一定的启发,那么请留下你的三连,祝大家心想事成! C 语 言 专 栏:C语言:从入门到精通 数…...

STM32复习笔记(四):看门狗
目录 (一)简介 (二)IWDG IWDG的CUBEMX工程配置 IWDG相关函数(非常少,所以直接贴上来): (三)WWDG (一)简介 看门狗分为独立看门…...

【C++进阶(七)】仿函数深度剖析模板进阶讲解
💓博主CSDN主页:杭电码农-NEO💓 ⏩专栏分类:C从入门到精通⏪ 🚚代码仓库:NEO的学习日记🚚 🌹关注我🫵带你学习C 🔝🔝 模板进阶 1. 前言2. 仿函数的概念3. 仿函数的实…...

基于SSM的电动车上牌管理系统(有报告)。Javaee项目。
演示视频: 基于SSM的电动车上牌管理系统(有报告)。Javaee项目。 项目介绍: 采用M(model)V(view)C(controller)三层体系结构,通过Spring SpringM…...

mstsc无法保存RDP凭据, 100%生效
问题 即使如下两项都打勾,其还是无法保存凭据,特别是连接Ubuntu (freerdp server): 解决方法 网上多种复杂方法,不生效,其思路是修改后台配置,以使mstsc跟平常一样自动记住凭据。最后,如下的…...

OpenGLES:绘制一个混色旋转的3D球体
效果展示 本篇博文会实现一个混色旋转的3D球体 一.球体解析 前面几篇博文讲解了如何使用OpenGLES实现不同的3D图形 本篇博文讲解怎样实现3D世界的代表图形:一个混色旋转的3D球体 1.1 极限正多面体 如果有学习过我前几篇3D图形绘制的博文,就知道要想…...
Spring AOP 基于注解源码整理
导入配置类 EnableAspectJAutoProxy 注解导入 AspectJAutoProxyRegistrarImportBeanDefinitionRegistrar#registerBeanDefinitions向容器中加入AnnotationAwareAspectJAutoProxyCreatorAnnotationAwareAspectJAutoProxyCreator#initBeanFactory初始化ReflectiveAspectJAdvisor…...

C语言 —— 函数栈帧的创建和销毁
在我们之前学习函数的时候,我们可能有很多困惑? 比如: 局部变量是怎么创建的?为什么局部变量的值是随机值?函数是怎么传参的?传参的顺序是怎样的?形参和实参是什么关系?函数调用是怎么做的?函数调用是结束后怎么返回的? 那么要解决这些问题, 我们就需要知道…...

Appleid苹果账号自动解锁改密(自动解锁二验改密码)
目前该项目能实现以下功能: 多用户使用,权限控制多账号管理账号分享页,支持设置密码、有效期、自定义HTML内容自动解锁与关闭二步验证自动/定时修改密码自动删除Apple ID中的设备代理池与Selenium集群,提高解锁成功率允许手动触发…...
Conflicting peer dependency: eslint@8.50.0
npm install 输出 npm ERR! code ERESOLVE npm ERR! ERESOLVE could not resolve npm ERR! npm ERR! While resolving: vue/eslint-config-standard6.1.0 npm ERR! Found: eslint-plugin-vue8.7.1 npm ERR! node_modules/eslint-plugin-vue npm ERR! dev eslint-plugin-vue…...
Vue3 defineProps使用
MyTag.vue <script setup> import { ref, nextTick, defineProps, defineEmits } from "vue"; const props defineProps({flag: Boolean,title: String, }); // 写成这样也可以 // const props defineProps(["flag", "title"]);const e…...

机器学习7:逻辑回归
一、说明 逻辑回归模型是处理分类问题的最常见机器学习模型之一。二项式逻辑回归只是逻辑回归模型的一种类型。它指的是两个变量的分类,其中概率用于确定二元结果,因此“二项式”中的“bi”。结果为真或假 — 0 或 1。 二项式逻辑回归的一个例子是预测人…...
生活小记-纸张尺寸
A系列纸张: A0:841 x 1189 毫米A1:594 x 841 毫米A2:420 x 594 毫米A3:297 x 420 毫米A4:210 x 297 毫米A5:148 x 210 毫米A6:105 x 148 毫米A7:74 x 105 毫米A8…...

【MATLAB源码-第41期】基于压缩感知算法的OFDM系统信道估计和LS算法对比仿真。
操作环境: MATLAB 2013b 1、算法描述 压缩感知(Compressed Sensing, CS)是一种从稀疏或可压缩信号中重构完整信号的数学理论和技术。下面详细介绍压缩感知和它在OFDM信道估计中的应用。 1. 压缩感知基本概念 在传统采样理论中࿰…...

优思学院|六西格玛将烹饪和美味提升至极致
最近,我们曾提到一个美国男子如何利用六西格玛来控制糖尿病。这表明六西格玛逐渐被认为是一个不仅可以在工作场所之外使用,尤其不仅限于制造业的系统。 六西格玛的核心理念是改进过程的质量,从而改善最终结果。如果你做了晚餐或尝试了一道新…...
git stash
git stash 是 Git 中一个非常有用的命令,用于临时保存当前工作目录中的修改,以便你可以切换到其他分支或处理其他任务而不丢失你的修改。它的主要用途是: 保存未提交的修改:你可以使用 git stash 命令将未提交的修改(包…...

AI-调查研究-01-正念冥想有用吗?对健康的影响及科学指南
点一下关注吧!!!非常感谢!!持续更新!!! 🚀 AI篇持续更新中!(长期更新) 目前2025年06月05日更新到: AI炼丹日志-28 - Aud…...

stm32G473的flash模式是单bank还是双bank?
今天突然有人stm32G473的flash模式是单bank还是双bank?由于时间太久,我真忘记了。搜搜发现,还真有人和我一样。见下面的链接:https://shequ.stmicroelectronics.cn/forum.php?modviewthread&tid644563 根据STM32G4系列参考手…...
什么?连接服务器也能可视化显示界面?:基于X11 Forwarding + CentOS + MobaXterm实战指南
文章目录 什么是X11?环境准备实战步骤1️⃣ 服务器端配置(CentOS)2️⃣ 客户端配置(MobaXterm)3️⃣ 验证X11 Forwarding4️⃣ 运行自定义GUI程序(Python示例)5️⃣ 成功效果
【开发技术】.Net使用FFmpeg视频特定帧上绘制内容
目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法,当前调用一个医疗行业的AI识别算法后返回…...
IP如何挑?2025年海外专线IP如何购买?
你花了时间和预算买了IP,结果IP质量不佳,项目效率低下不说,还可能带来莫名的网络问题,是不是太闹心了?尤其是在面对海外专线IP时,到底怎么才能买到适合自己的呢?所以,挑IP绝对是个技…...

脑机新手指南(七):OpenBCI_GUI:从环境搭建到数据可视化(上)
一、OpenBCI_GUI 项目概述 (一)项目背景与目标 OpenBCI 是一个开源的脑电信号采集硬件平台,其配套的 OpenBCI_GUI 则是专为该硬件设计的图形化界面工具。对于研究人员、开发者和学生而言,首次接触 OpenBCI 设备时,往…...

Chrome 浏览器前端与客户端双向通信实战
Chrome 前端(即页面 JS / Web UI)与客户端(C 后端)的交互机制,是 Chromium 架构中非常核心的一环。下面我将按常见场景,从通道、流程、技术栈几个角度做一套完整的分析,特别适合你这种在分析和改…...

rknn toolkit2搭建和推理
安装Miniconda Miniconda - Anaconda Miniconda 选择一个 新的 版本 ,不用和RKNN的python版本保持一致 使用 ./xxx.sh进行安装 下面配置一下载源 # 清华大学源(最常用) conda config --add channels https://mirrors.tuna.tsinghua.edu.cn…...
Pydantic + Function Calling的结合
1、Pydantic Pydantic 是一个 Python 库,用于数据验证和设置管理,通过 Python 类型注解强制执行数据类型。它广泛用于 API 开发(如 FastAPI)、配置管理和数据解析,核心功能包括: 数据验证:通过…...

从零开始了解数据采集(二十八)——制造业数字孪生
近年来,我国的工业领域正经历一场前所未有的数字化变革,从“双碳目标”到工业互联网平台的推广,国家政策和市场需求共同推动了制造业的升级。在这场变革中,数字孪生技术成为备受关注的关键工具,它不仅让企业“看见”设…...