大数据-玩转数据-Flink定时器
一、说明
基于处理时间或者事件时间处理过一个元素之后, 注册一个定时器, 然后指定的时间执行.
Context和OnTimerContext所持有的TimerService对象拥有以下方法:
currentProcessingTime(): Long 返回当前处理时间
currentWatermark(): Long 返回当前watermark的时间戳
registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前key的processing time的定时器。当processing time到达定时时间时,触发timer。
registerEventTimeTimer(timestamp: Long): Unit 会注册当前key的event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。
二、基于处理时间的定时器
package com.lyh.flink08;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;public class ProcessTime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> stream = env.socketTextStream("hadoop100", 9999).map(line -> {String[] datas = line.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));});stream.keyBy(WaterSensor::getId).process(new KeyedProcessFunction<String, WaterSensor, String>() {@Overridepublic void processElement(WaterSensor value,Context ctx,Collector<String> out) throws Exception {ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5000);out.collect(value.toString());}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {System.out.println(timestamp);out.collect("wo be chu fa le ");}}).print();env.execute();}
}
三、基于事件时间的定时器
package com.lyh.flink08;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;public class EventTime_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> stream = env.socketTextStream("hadoop100", 9999).map(line -> {String[] datas = line.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));});WatermarkStrategy<WaterSensor> wms = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element,recordTimestamp) -> element.getTs() * 1000);stream.assignTimestampsAndWatermarks(wms).keyBy(WaterSensor::getId).process(new KeyedProcessFunction<String, WaterSensor, String>() {@Overridepublic void processElement(WaterSensor value,Context ctx,Collector<String> out) throws Exception {System.out.println(ctx.timestamp());ctx.timerService().registerProcessingTimeTimer(ctx.timestamp()+5000);out.collect(value.toString());}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {System.out.println("定时器被触发了");}}).print();env.execute();}
}
相关文章:
大数据-玩转数据-Flink定时器
一、说明 基于处理时间或者事件时间处理过一个元素之后, 注册一个定时器, 然后指定的时间执行. Context和OnTimerContext所持有的TimerService对象拥有以下方法: currentProcessingTime(): Long 返回当前处理时间 currentWatermark(): Long 返回当前watermark的时间戳 registe…...
Linux 操作系统实战视频课 - GPIO 基础介绍
文章目录 一、GPIO 概念说明二、视频讲解沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇我们将讲解 GPIO 。 一、GPIO 概念说明 ARM 平台中的 GPIO(通用输入/输出)是用于与外部设备进行数字输入和输出通信的重要硬件接口。ARM 平台的 GPIO 特性可以根据具体的芯…...
ChatGPT在医疗保健信息管理和电子病历中的应用前景如何?
ChatGPT在医疗保健信息管理和电子病历中有着广阔的应用前景,可以提高医疗保健行业的效率、准确性和可访问性。本文将详细讨论ChatGPT在医疗保健信息管理和电子病历中的应用前景,以及相关的益处和挑战。 ### 1. ChatGPT在医疗保健信息管理中的应用前景 …...
安防监控/视频存储/视频汇聚平台EasyCVR接入海康Ehome车载设备出现收流超时的原因排查
安防视频监控/视频集中存储/云存储/磁盘阵列EasyCVR平台可拓展性强、视频能力灵活、部署轻快,可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等,以及支持厂家私有协议与SDK接入,包括海康Ehome、海大宇等设备的SDK等。视频汇聚平台既具…...
【zookeeper】zookeeper监控指标查看
zookeeper 监控指标 日常工作中,我们有时候需要对zookeeper集群的状态进行检查,下面分享一些常用的方法。 zookeeper获取监控指标已知的有两种方式: 通过zookeeper自带的四字命令 (four letter words command )获取各…...
Flink 如何处理反压?
分析&回答 什么是反压(backpressure) 反压通常是从某个节点传导至数据源并降低数据源(比如 Kafka consumer)的摄入速率。反压意味着数据管道中某个节点成为瓶颈,处理速率跟不上上游发送数据的速率,而…...
JAVA基础-JDBC
本博客记录JAVA基础JDBC部分的学习内容 JDBC基本概念 JDBC : JAVA链接数据库,是JAVA链接数据库的技术的统称,包含如下两部分: 1. JAVA提供的JDBC规范(即各种数据库接口)存储在java.sql 和 javax.sql中的api 2. 各个数…...
嵌入式学习笔记(1)ARM的编程模式和7种工作模式
ARM提供的指令集 ARM态-ARM指令集(32-bit) Thumb态-Thumb指令集(16-bit) Thumb2态-Thumb2指令集(16 & 32 bit) Thumb指令集是对ARM指令集的一个子集重新编码得到的,指令长度为16位。通常在…...
[NSSCTF Round #15NSSCTF 2nd]——Web、Misc、Crypto方向 详细Writeup
前言 虽然师傅们已经尽力了,但是没拿到前十有点可惜,题很好吃,明年再来() 关于wp: 因为我没有学过misc,但是比赛的时候还是运气好出了三道,所以wp就只把做题步骤给出,也…...
Metasploit“MSF”连接postgresql时因排序规则版本不匹配导致无法连接
一、问题 更新Kali之后使用Metasploit时出现一个问题,连接postgresql时因排序规则版本不匹配导致无法连接 警告: database "msf" has a collation version mismatch DETAIL: The database was created using collation version 2.36, but the operati…...
CCF CSP题解:矩阵运算(202305-2)
链接和思路 OJ链接:传送门 本题要求计算1个公式: ( W ⋅ ( Q K T ) ) V \left(\mathbf{W} \cdot (\mathbf{Q} \times \mathbf{K}^{T})\right) \times \mathbf{V} (W⋅(QKT))V 其中, Q \mathbf{Q} Q、 K \mathbf{K} K和 V \mathbf{V} V均…...
划分字母区间【贪心算法】
划分字母区间 给你一个字符串 s 。我们要把这个字符串划分为尽可能多的片段,同一字母最多出现在一个片段中。 注意,划分结果需要满足:将所有划分结果按顺序连接,得到的字符串仍然是 s 。返回一个表示每个字符串片段的长度的列表。…...
低代码的探索之路
Gartner发布报告指出,2023年全球低代码开发平台市场规模将达到345亿美元,比2022年增长20%。 目前,国内外已经有许多低代码平台,包括OutSystems、Mendix、Appian、Microsoft Power App等。这些平台提供了丰富的功能和工具ÿ…...
easyUI combobox不可手动输入和禁用
combobox //下拉可用 $("#selectId").combobox(enable); //下拉不可用 $("#selectId").combobox(disable); //该元素可用 $("#selectId").combobox({ disabled: false }); //该元素不可用 $("#selectId").combobox({ disabled: tru…...
RV64和ARM64栈结构差异
RV64和ARM64栈结构差异 1 RV64和ARM64栈结构差异示意图1.1 RV64和ARM64寄存器介绍1.1.1 RV64寄存器1.1.2 ARM64寄存器 1.2 RV64和ARM64栈结构差异示意图 2 RV64和ARM64栈使用示例2.1 测试的程序2.2 RV64反汇编的汇编程序2.3 ARM64反汇编的汇编程序2.4 RV64和ARM64测试程序的栈结…...
将 Python 与 RStudio IDE 配合使用(R与Python系列第一篇)
目录 前言: 1-安装reticulate包 2-安装Python 3-选择Python的默认版本(配置Python环境) 4-使用Python 4.1 运行一个简单的Python脚本 4.2 在RStudio上安装Python模块 4.3 在 R 中调用 Python 模块 4.4 在RStudio上调用Python脚本写的…...
数据库访问性能优化
目录 IO性能分析数据库性能优化漏斗法则1、减少数据访问(减少磁盘访问)(1) 正确的创建并使用索引索引生效场景索引失效场景判断索引是否生效--执行计划 2、返回更少数据(减少网络传输或磁盘访问)(1) 数据分页处理(减少行数)客户端…...
vue 预览 有token验证的 doc、docx、pdf、xlsx、csv、图片 并下载
预览 doc我也不会 //docx <div v-if"previewType docx" ref"iframeDom" style"border: none; width: 100%; height: 100%"></div> import { renderAsync } from "docx-preview"; let iframeDom: any ref(); axios({url…...
WPF数据视图
将集合绑定到ItemsControl控件时,会不加通告的在后台创建数据视图——位于数据源和绑定的控件之间。数据视图是进入数据源的窗口,可以跟踪当前项,并且支持各种功能,如排序、过滤、分组。 这些功能和数据对象本身是相互独立的&…...
C++ new/delete 与 malloc/free 的区别?
new/delete 与 malloc/free 的区别? 分配内存的位置 malloc是从堆上动态分配内存new是从自由存储区为对象动态分配内存。自由存储区的位置取决于operator new的实现。自由存储区不仅可以为堆,还可以是静态存储区,这都看operator new在哪里为…...
零基础设计模式——行为型模式 - 责任链模式
第四部分:行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习!行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想:使多个对象都有机会处…...
Map相关知识
数据结构 二叉树 二叉树,顾名思义,每个节点最多有两个“叉”,也就是两个子节点,分别是左子 节点和右子节点。不过,二叉树并不要求每个节点都有两个子节点,有的节点只 有左子节点,有的节点只有…...
ip子接口配置及删除
配置永久生效的子接口,2个IP 都可以登录你这一台服务器。重启不失效。 永久的 [应用] vi /etc/sysconfig/network-scripts/ifcfg-eth0修改文件内内容 TYPE"Ethernet" BOOTPROTO"none" NAME"eth0" DEVICE"eth0" ONBOOT&q…...
学校时钟系统,标准考场时钟系统,AI亮相2025高考,赛思时钟系统为教育公平筑起“精准防线”
2025年#高考 将在近日拉开帷幕,#AI 监考一度冲上热搜。当AI深度融入高考,#时间同步 不再是辅助功能,而是决定AI监考系统成败的“生命线”。 AI亮相2025高考,40种异常行为0.5秒精准识别 2025年高考即将拉开帷幕,江西、…...
Redis的发布订阅模式与专业的 MQ(如 Kafka, RabbitMQ)相比,优缺点是什么?适用于哪些场景?
Redis 的发布订阅(Pub/Sub)模式与专业的 MQ(Message Queue)如 Kafka、RabbitMQ 进行比较,核心的权衡点在于:简单与速度 vs. 可靠与功能。 下面我们详细展开对比。 Redis Pub/Sub 的核心特点 它是一个发后…...
Chrome 浏览器前端与客户端双向通信实战
Chrome 前端(即页面 JS / Web UI)与客户端(C 后端)的交互机制,是 Chromium 架构中非常核心的一环。下面我将按常见场景,从通道、流程、技术栈几个角度做一套完整的分析,特别适合你这种在分析和改…...
恶补电源:1.电桥
一、元器件的选择 搜索并选择电桥,再multisim中选择FWB,就有各种型号的电桥: 电桥是用来干嘛的呢? 它是一个由四个二极管搭成的“桥梁”形状的电路,用来把交流电(AC)变成直流电(DC)。…...
如何在Windows本机安装Python并确保与Python.NET兼容
✅作者简介:2022年博客新星 第八。热爱国学的Java后端开发者,修心和技术同步精进。 🍎个人主页:Java Fans的博客 🍊个人信条:不迁怒,不贰过。小知识,大智慧。 💞当前专栏…...
Java数组Arrays操作全攻略
Arrays类的概述 Java中的Arrays类位于java.util包中,提供了一系列静态方法用于操作数组(如排序、搜索、填充、比较等)。这些方法适用于基本类型数组和对象数组。 常用成员方法及代码示例 排序(sort) 对数组进行升序…...
写一个shell脚本,把局域网内,把能ping通的IP和不能ping通的IP分类,并保存到两个文本文件里
写一个shell脚本,把局域网内,把能ping通的IP和不能ping通的IP分类,并保存到两个文本文件里 脚本1 #!/bin/bash #定义变量 ip10.1.1 #循环去ping主机的IP for ((i1;i<10;i)) doping -c1 $ip.$i &>/dev/null[ $? -eq 0 ] &&am…...
