当前位置: 首页 > news >正文

大数据-玩转数据-Flink定时器

一、说明

基于处理时间或者事件时间处理过一个元素之后, 注册一个定时器, 然后指定的时间执行.
Context和OnTimerContext所持有的TimerService对象拥有以下方法:
currentProcessingTime(): Long 返回当前处理时间
currentWatermark(): Long 返回当前watermark的时间戳
registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前key的processing time的定时器。当processing time到达定时时间时,触发timer。
registerEventTimeTimer(timestamp: Long): Unit 会注册当前key的event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。

二、基于处理时间的定时器

package com.lyh.flink08;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;public class ProcessTime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> stream = env.socketTextStream("hadoop100", 9999).map(line -> {String[] datas = line.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));});stream.keyBy(WaterSensor::getId).process(new KeyedProcessFunction<String, WaterSensor, String>() {@Overridepublic void processElement(WaterSensor value,Context ctx,Collector<String> out) throws Exception {ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5000);out.collect(value.toString());}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {System.out.println(timestamp);out.collect("wo be chu fa le ");}}).print();env.execute();}
}

三、基于事件时间的定时器

package com.lyh.flink08;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;public class EventTime_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> stream = env.socketTextStream("hadoop100", 9999).map(line -> {String[] datas = line.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));});WatermarkStrategy<WaterSensor> wms = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element,recordTimestamp) -> element.getTs() * 1000);stream.assignTimestampsAndWatermarks(wms).keyBy(WaterSensor::getId).process(new KeyedProcessFunction<String, WaterSensor, String>() {@Overridepublic void processElement(WaterSensor value,Context ctx,Collector<String> out) throws Exception {System.out.println(ctx.timestamp());ctx.timerService().registerProcessingTimeTimer(ctx.timestamp()+5000);out.collect(value.toString());}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {System.out.println("定时器被触发了");}}).print();env.execute();}
}

相关文章:

大数据-玩转数据-Flink定时器

一、说明 基于处理时间或者事件时间处理过一个元素之后, 注册一个定时器, 然后指定的时间执行. Context和OnTimerContext所持有的TimerService对象拥有以下方法: currentProcessingTime(): Long 返回当前处理时间 currentWatermark(): Long 返回当前watermark的时间戳 registe…...

Linux 操作系统实战视频课 - GPIO 基础介绍

文章目录 一、GPIO 概念说明二、视频讲解沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇我们将讲解 GPIO 。 一、GPIO 概念说明 ARM 平台中的 GPIO(通用输入/输出)是用于与外部设备进行数字输入和输出通信的重要硬件接口。ARM 平台的 GPIO 特性可以根据具体的芯…...

ChatGPT在医疗保健信息管理和电子病历中的应用前景如何?

ChatGPT在医疗保健信息管理和电子病历中有着广阔的应用前景&#xff0c;可以提高医疗保健行业的效率、准确性和可访问性。本文将详细讨论ChatGPT在医疗保健信息管理和电子病历中的应用前景&#xff0c;以及相关的益处和挑战。 ### 1. ChatGPT在医疗保健信息管理中的应用前景 …...

安防监控/视频存储/视频汇聚平台EasyCVR接入海康Ehome车载设备出现收流超时的原因排查

安防视频监控/视频集中存储/云存储/磁盘阵列EasyCVR平台可拓展性强、视频能力灵活、部署轻快&#xff0c;可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等&#xff0c;以及支持厂家私有协议与SDK接入&#xff0c;包括海康Ehome、海大宇等设备的SDK等。视频汇聚平台既具…...

【zookeeper】zookeeper监控指标查看

zookeeper 监控指标 日常工作中&#xff0c;我们有时候需要对zookeeper集群的状态进行检查&#xff0c;下面分享一些常用的方法。 zookeeper获取监控指标已知的有两种方式&#xff1a; 通过zookeeper自带的四字命令 &#xff08;four letter words command &#xff09;获取各…...

Flink 如何处理反压?

分析&回答 什么是反压&#xff08;backpressure&#xff09; 反压通常是从某个节点传导至数据源并降低数据源&#xff08;比如 Kafka consumer&#xff09;的摄入速率。反压意味着数据管道中某个节点成为瓶颈&#xff0c;处理速率跟不上上游发送数据的速率&#xff0c;而…...

JAVA基础-JDBC

本博客记录JAVA基础JDBC部分的学习内容 JDBC基本概念 JDBC : JAVA链接数据库&#xff0c;是JAVA链接数据库的技术的统称&#xff0c;包含如下两部分&#xff1a; 1. JAVA提供的JDBC规范&#xff08;即各种数据库接口&#xff09;存储在java.sql 和 javax.sql中的api 2. 各个数…...

嵌入式学习笔记(1)ARM的编程模式和7种工作模式

ARM提供的指令集 ARM态-ARM指令集&#xff08;32-bit&#xff09; Thumb态-Thumb指令集&#xff08;16-bit&#xff09; Thumb2态-Thumb2指令集&#xff08;16 & 32 bit&#xff09; Thumb指令集是对ARM指令集的一个子集重新编码得到的&#xff0c;指令长度为16位。通常在…...

[NSSCTF Round #15NSSCTF 2nd]——Web、Misc、Crypto方向 详细Writeup

前言 虽然师傅们已经尽力了&#xff0c;但是没拿到前十有点可惜&#xff0c;题很好吃&#xff0c;明年再来&#xff08;&#xff09; 关于wp&#xff1a; 因为我没有学过misc&#xff0c;但是比赛的时候还是运气好出了三道&#xff0c;所以wp就只把做题步骤给出&#xff0c;也…...

Metasploit“MSF”连接postgresql时因排序规则版本不匹配导致无法连接

一、问题 更新Kali之后使用Metasploit时出现一个问题&#xff0c;连接postgresql时因排序规则版本不匹配导致无法连接 警告: database "msf" has a collation version mismatch DETAIL: The database was created using collation version 2.36, but the operati…...

CCF CSP题解:矩阵运算(202305-2)

链接和思路 OJ链接&#xff1a;传送门 本题要求计算1个公式&#xff1a; ( W ⋅ ( Q K T ) ) V \left(\mathbf{W} \cdot (\mathbf{Q} \times \mathbf{K}^{T})\right) \times \mathbf{V} (W⋅(QKT))V 其中&#xff0c; Q \mathbf{Q} Q、 K \mathbf{K} K和 V \mathbf{V} V均…...

划分字母区间【贪心算法】

划分字母区间 给你一个字符串 s 。我们要把这个字符串划分为尽可能多的片段&#xff0c;同一字母最多出现在一个片段中。 注意&#xff0c;划分结果需要满足&#xff1a;将所有划分结果按顺序连接&#xff0c;得到的字符串仍然是 s 。返回一个表示每个字符串片段的长度的列表。…...

低代码的探索之路

Gartner发布报告指出&#xff0c;2023年全球低代码开发平台市场规模将达到345亿美元&#xff0c;比2022年增长20%。 目前&#xff0c;国内外已经有许多低代码平台&#xff0c;包括OutSystems、Mendix、Appian、Microsoft Power App等。这些平台提供了丰富的功能和工具&#xff…...

easyUI combobox不可手动输入和禁用

combobox //下拉可用 $("#selectId").combobox(enable); //下拉不可用 $("#selectId").combobox(disable); //该元素可用 $("#selectId").combobox({ disabled: false }); //该元素不可用 $("#selectId").combobox({ disabled: tru…...

RV64和ARM64栈结构差异

RV64和ARM64栈结构差异 1 RV64和ARM64栈结构差异示意图1.1 RV64和ARM64寄存器介绍1.1.1 RV64寄存器1.1.2 ARM64寄存器 1.2 RV64和ARM64栈结构差异示意图 2 RV64和ARM64栈使用示例2.1 测试的程序2.2 RV64反汇编的汇编程序2.3 ARM64反汇编的汇编程序2.4 RV64和ARM64测试程序的栈结…...

将 Python 与 RStudio IDE 配合使用(R与Python系列第一篇)

目录 前言&#xff1a; 1-安装reticulate包 2-安装Python 3-选择Python的默认版本&#xff08;配置Python环境&#xff09; 4-使用Python 4.1 运行一个简单的Python脚本 4.2 在RStudio上安装Python模块 4.3 在 R 中调用 Python 模块 4.4 在RStudio上调用Python脚本写的…...

数据库访问性能优化

目录 IO性能分析数据库性能优化漏斗法则1、减少数据访问&#xff08;减少磁盘访问&#xff09;(1) 正确的创建并使用索引索引生效场景索引失效场景判断索引是否生效--执行计划 2、返回更少数据&#xff08;减少网络传输或磁盘访问&#xff09;(1) 数据分页处理(减少行数)客户端…...

vue 预览 有token验证的 doc、docx、pdf、xlsx、csv、图片 并下载

预览 doc我也不会 //docx <div v-if"previewType docx" ref"iframeDom" style"border: none; width: 100%; height: 100%"></div> import { renderAsync } from "docx-preview"; let iframeDom: any ref(); axios({url…...

WPF数据视图

将集合绑定到ItemsControl控件时&#xff0c;会不加通告的在后台创建数据视图——位于数据源和绑定的控件之间。数据视图是进入数据源的窗口&#xff0c;可以跟踪当前项&#xff0c;并且支持各种功能&#xff0c;如排序、过滤、分组。 这些功能和数据对象本身是相互独立的&…...

C++ new/delete 与 malloc/free 的区别?

new/delete 与 malloc/free 的区别&#xff1f; 分配内存的位置 malloc是从堆上动态分配内存new是从自由存储区为对象动态分配内存。自由存储区的位置取决于operator new的实现。自由存储区不仅可以为堆&#xff0c;还可以是静态存储区&#xff0c;这都看operator new在哪里为…...

DAY 47

三、通道注意力 3.1 通道注意力的定义 # 新增&#xff1a;通道注意力模块&#xff08;SE模块&#xff09; class ChannelAttention(nn.Module):"""通道注意力模块(Squeeze-and-Excitation)"""def __init__(self, in_channels, reduction_rat…...

TRS收益互换:跨境资本流动的金融创新工具与系统化解决方案

一、TRS收益互换的本质与业务逻辑 &#xff08;一&#xff09;概念解析 TRS&#xff08;Total Return Swap&#xff09;收益互换是一种金融衍生工具&#xff0c;指交易双方约定在未来一定期限内&#xff0c;基于特定资产或指数的表现进行现金流交换的协议。其核心特征包括&am…...

k8s业务程序联调工具-KtConnect

概述 原理 工具作用是建立了一个从本地到集群的单向VPN&#xff0c;根据VPN原理&#xff0c;打通两个内网必然需要借助一个公共中继节点&#xff0c;ktconnect工具巧妙的利用k8s原生的portforward能力&#xff0c;简化了建立连接的过程&#xff0c;apiserver间接起到了中继节…...

【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统

目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索&#xff08;基于物理空间 广播范围&#xff09;2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...

学习STC51单片机32(芯片为STC89C52RCRC)OLED显示屏2

每日一言 今天的每一份坚持&#xff0c;都是在为未来积攒底气。 案例&#xff1a;OLED显示一个A 这边观察到一个点&#xff0c;怎么雪花了就是都是乱七八糟的占满了屏幕。。 解释 &#xff1a; 如果代码里信号切换太快&#xff08;比如 SDA 刚变&#xff0c;SCL 立刻变&#…...

什么是Ansible Jinja2

理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具&#xff0c;可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板&#xff0c;允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板&#xff0c;并通…...

Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决

Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决 问题背景 在一个基于 Spring Cloud Gateway WebFlux 构建的微服务项目中&#xff0c;新增了一个本地验证码接口 /code&#xff0c;使用函数式路由&#xff08;RouterFunction&#xff09;和 Hutool 的 Circle…...

云原生玩法三问:构建自定义开发环境

云原生玩法三问&#xff1a;构建自定义开发环境 引言 临时运维一个古董项目&#xff0c;无文档&#xff0c;无环境&#xff0c;无交接人&#xff0c;俗称三无。 运行设备的环境老&#xff0c;本地环境版本高&#xff0c;ssh不过去。正好最近对 腾讯出品的云原生 cnb 感兴趣&…...

React---day11

14.4 react-redux第三方库 提供connect、thunk之类的函数 以获取一个banner数据为例子 store&#xff1a; 我们在使用异步的时候理应是要使用中间件的&#xff0c;但是configureStore 已经自动集成了 redux-thunk&#xff0c;注意action里面要返回函数 import { configureS…...

人工智能--安全大模型训练计划:基于Fine-tuning + LLM Agent

安全大模型训练计划&#xff1a;基于Fine-tuning LLM Agent 1. 构建高质量安全数据集 目标&#xff1a;为安全大模型创建高质量、去偏、符合伦理的训练数据集&#xff0c;涵盖安全相关任务&#xff08;如有害内容检测、隐私保护、道德推理等&#xff09;。 1.1 数据收集 描…...