【API篇】十、生成Flink水位线
文章目录
- 1、水位线的生成原则
- 2、有序流内置水位线
- 3、乱序流内置水位线
- 4、自定义周期性水位线生成器
- 5、自定义断点式水位线生成器
- 6、从数据源中发送水位线
1、水位线的生成原则
水位线出现,即代表这个时间之前的数据已经全部到齐,之后不会再出现之前的数据了。参考前面的乱序流,可以得出:
- 想要保证数据绝对正确,就得加足够大的延迟,但实时性就没保障了
- 想要实时性强,就得把延迟设置小,但此时迟到数据可能遗漏,准确性降低
水位线的定义,是对低延迟和结果准确性的一个权衡。Flink生成水位线的方法是.assignTimestampsAndWatermarks()
,它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间
DataStream<Event> stream = env.addSource(xxx);DataStream withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(WatermarkStrategy对象);
WatermarkStrategy是一个接口,包含了一个时间戳分配器TimestampAssigner和一个水位线生成WatermarkGenerator:
public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>,WatermarkGeneratorSupplier<T>{// 负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。@OverrideTimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);// 主要负责按照既定的方式,基于时间戳生成水位线@OverrideWatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
2、有序流内置水位线
有序流的时间戳全部单调递增,没有迟到数据,直接WatermarkStrategy.forMonotonousTimestamps()
就可以拿到WatermarkStrategy对象
public class WatermarkMonoDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());// TODO 1.定义Watermark策略WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.1 指定watermark生成:升序的watermark,没有等待时间.<WaterSensor>forMonotonousTimestamps()// 1.2 指定 时间戳分配器,从数据中提取.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);// 返回的时间戳,要毫秒,这里拿自定义对象的ts属性做为时间戳return element.getTs() * 1000L;}});// TODO 2. 指定 watermark策略SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor -> sensor.getId())// TODO 3.使用事件时间语义的窗口,别再用处理时间TumblingProcessTime.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}}).print();env.execute();}
}
执行下,输入10时,逻辑时钟被推到了10s,到达区间,触发窗口,执行全窗口函数的process,输出当前窗口的数据:
3、乱序流内置水位线
调用WatermarkStrategy. forBoundedOutOfOrderness()
,传入延迟时间:
public class WatermarkOutOfOrdernessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());// TODO 1.定义Watermark策略WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.1 指定watermark生成:乱序的,等待3s.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 1.2 指定 时间戳分配器,从数据中提取.withTimestampAssigner((element, recordTimestamp) -> {// 返回的时间戳,要 毫秒System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;});// TODO 2. 指定 watermark策略SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor -> sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}}).print();env.execute();}
}
执行:
简单分析下结果:
- 第一条数据s1,1,1进来,创建窗口,水位线为1s-3s(延迟3s)
- s1,10,10进来,水位线为10-3 =7s,还未到达10,窗口不触发(若是有序流,无等待下,此时窗口已被触发了)
- 此时进来一条乱序数据,比如s1,6,6,6-3=3s,水位线保持上面的7不变,watermark不会推进,且6这条数据也会被统计在
[0,10)
的区间内 - s1,11,11进来,11-3=8,也不会触发,但这条数据是属于
[10,20)
区间的那个桶的 - s1,13,13进来,达到10,窗口触发
4、自定义周期性水位线生成器
上面只是定义了时间戳的提取逻辑,水位线的生成采用的默认内置策略。接下来自定义水位线生成器:周期性水位生成器。
周期性生成器是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发射生成的水位线
// 自定义水位线的产生
public class CustomPeriodicWatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());// 定义Watermark策略WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.1 指定watermark生成器.<WaterSensor>forGenerator(context -> MyPeriodWatermarkGenerator<>(3000L))// 1.2 指定时间戳分配器,从数据中提取.withTimestampAssigner((element, recordTimestamp) -> {// 返回的时间戳,要 毫秒System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;});// TODO 2. 指定 watermark策略SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor -> sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}}).print();env.execute();}}
模仿前面的内置生成器,定义自己的水位线生成器:
public class MyPeroidWatermarkGenerator implements WatermarkGenerator<Event> {private Long delayTime = 5000L; // 延迟时间private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳//构造方法,传入延迟时间,构造水位线生成器对象public MyPeroidWatermarkGenerator(long delayTime){this.delayTime = delayTime;this.maxTs = Long.MIN_VALUE + this.delayTime + 1;}/*** 每条数据进来都调用一次,用来提取最大的事件事件*/@Overridepublic void onEvent(Event event,long eventTimestamp,WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(event.timestamp,maxTs); // 更新最大时间戳System.out.println("调用了onEvent方法,获取目前为止最大的时间戳=" + maxTimestamp);}/*** 周期性调用,默认20ms*/@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线,默认200ms调用一次output.emitWatermark(new Watermark(maxTs - delayTime - 1L));System,out,println("调用了onPeriodicEmit方法,生成watermark==" + (maxTimestamp - delayTs - 1) );}}
核心部分,指定水位线生成器的Lamdba表达式展开就是:
运行:
- 数据没进来前,每200ms调用一次发射水位线的方法,此时的水位线是构造方法里Long.MIN_VALUE那个
- 进来一条数据,调用onEvent,最大时间戳被更新,到周期后再发射水位线
maxTs-delayTs-1
- 继续周期性调用onPeriodicEmit方法
onPeriodicEmit()里调用output.emitWatermark(),就可以发出水位线了,这个方法由系统框架周期性地调用,默认200ms一次
修改默认的周期,比如改为400ms:
env.getConfig().setAutoWatermarkInterval(400L);
5、自定义断点式水位线生成器
断点式生成器会不停地检测onEvent()中的事件,发现带有水位线信息的当事件时,就立即发出水位线。改下代码,定义水位线生成器:
public class PointWatermarkGenerator implements WatermarkGenerator<Event> {private Long delayTime = 5000L; // 延迟时间private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳//构造方法,传入延迟时间,构造水位线生成器对象public MyPeroidWatermarkGenerator(long delayTime){this.delayTime = delayTime;this.maxTs = Long.MIN_VALUE + this.delayTime + 1;}/*** 每条数据进来都调用一次,用来提取最大的事件事件*/@Overridepublic void onEvent(Event event,long eventTimestamp,WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(event.timestamp,maxTs); // 更新最大时间戳// 发射水位线output.emitWatermark(new Watermark(maxTs - delayTime - 1L));System.out.println("调用了onEvent方法,获取目前为止最大的时间戳=" + maxTimestamp + ",生成watermark==" + (maxTimestamp - delayTs - 1));}/*** 周期性调用,默认20ms*/@Overridepublic void onPeriodicEmit(WatermarkOutput output) {}}
周期性代码改为:
//...// 定义Watermark策略WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.1 指定watermark生成器.<WaterSensor>forGenerator(context -> PointWatermarkGenerator<>(3000L))// 1.2 指定时间戳分配器,从数据中提取.withTimestampAssigner((element, recordTimestamp) -> {// 返回的时间戳,要 毫秒return element.getTs() * 1000L;});
运行:此时不再周期性的发射水位线
6、从数据源中发送水位线
在自定义的数据源中抽取事件时间,然后发送水位线:
env.fromSource(
kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource"
)//注意fromSorce方法的第二个传参,之前用的WatermarkStrategy.noWatermark()
注意此时不用再assignTimestampsAndWatermarks了,在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一
相关文章:

【API篇】十、生成Flink水位线
文章目录 1、水位线的生成原则2、有序流内置水位线3、乱序流内置水位线4、自定义周期性水位线生成器5、自定义断点式水位线生成器6、从数据源中发送水位线 1、水位线的生成原则 水位线出现,即代表这个时间之前的数据已经全部到齐,之后不会再出现之前的数…...

【Javascript】弹出框
目录 警告框 确认框 提示框 警告框 alert(你好); 确认框 var isConfirm confirm(请确认) console.log( isConfirm); 提示框...

NSS [鹤城杯 2021]EasyP
NSS [鹤城杯 2021]EasyP 直接给了源码 <?php include utils.php;if (isset($_POST[guess])) {$guess (string) $_POST[guess];if ($guess $secret) {$message Congratulations! The flag is: . $flag;} else {$message Wrong. Try Again;} }if (preg_match(/utils\.p…...

mysql用户及权限管理(InsCode AI 创作助手)
MySQL是一个广泛使用的开源关系型数据库管理系统,用于存储和管理大量数据。对于那些需要使用MySQL的管理员和开发人员来说,用户权限管理是确保数据库安全性的至关重要的一环。在本篇技术博客中,我们将深入探讨MySQL的用户权限管理,…...

命令模式——让程序舒畅执行
● 命令模式介绍 命令模式(Command Pattern),是行为型设计模式之一。命令模式相对于其他的设计模式来说并没有那么多条条框框,其实并不是一个很“规矩”的模式,不过,就是基于一点,命令模式相对于…...

GZ035 5G组网与运维赛题第3套
2023年全国职业院校技能大赛 GZ035 5G组网与运维赛项(高职组) 赛题第3套 一、竞赛须知 1.竞赛内容分布 竞赛模块1--5G公共网络规划部署与开通(35分) 子任务1:5G公共网络部署与调试(15分) 子…...

071:mapboxGL上传含shp的zip文件,在map上解析显示图形
第071个 点击查看专栏目录 本示例是介绍演示如何在vue+mapbox中上传含有shp文件的zip,在地图上显示图形。这里先通过上传解压解析,转换生成geojson文件,然后在地图上渲染图形。 直接复制下面的 vue+mapbox源代码,操作2分钟即可运行实现效果 文章目录 示例效果所用的zip文…...

python下拉框选择测试
把下拉选择的值得打印出来: import tkinter as tk def on_select(event): # 当选择下拉框中的一项时,此函数将被调用 selected event.widget.cget("text") # 获取选中的文本 print(f"You selected: {selected}") # 打印选中…...

即时编译器JIT
类编译加载执行过程 如下图所示,一个Java代码从编译到运行大抵会经历以下几个过程。具体每个过程笔者会在下文站展开讨论。 类编译 首先是类编译阶段,这个阶段会将Java文件变为class文件,这个class文件包含一个常量池和方法表集合…...

npm更新包时This operation requires a one-time password.
[访问我的npm包](mhfwork/yt-ui - npm) 更新npm包时出现 This operation requires a one-time password.是因为需要认证 解决办法 1. 点击红线处的链接 2. 进入npm官网获取指定秘钥 3. 再次填入 one-time password 即可...

C++类模板再学习
之前已经学习了C类模板;类模板的写法和一般类的写法有很大的差别;不容易熟悉;下面再做一遍; 做一个椭圆类,成员有长轴长度和短轴长度; // ellipse.h: interface for the ellipse class. // //#if !define…...

华为终端智能家居应用方案
PLC-IoT概述 华为智能PLC-IoT工业物联网系列通信模块是基于电力线宽带载波技术的产品,实现数据在电力线上双向、高速、稳定的传输,广泛适用于电力、交通、工业制造、智能家居等领域,PLC-IoT通信模块包含头端和尾端两种类型,头端配…...

PHP下载文件
/***文件下载*param $filepath源文件路径 */function dwon_file($filepath){if(file_exists($filepath)){header(content-type:text/html;charsetutf8);header(Content-Description: File Transfer);header(Content-Type: application/octet-stream);header(Content-Dispositio…...

38基于matlab的期货预测,利用PSO优化SVM和未优化的SVM进行对比,得到实际输出和期望输出结果。
基于matlab的期货预测,利用PSO优化SVM和未优化的SVM进行对比,得到实际输出和期望输出结果。线性核函数、多项式、RBF核函数三种核函数任意可选,并给出均方根误差,相对误差等结果,程序已调通,可直接运行。 3…...

【Codeforces】 CF582D Number of Binominal Coefficients
题目链接 CF方向 Luogu方向 题目解法 看到 p α ∣ ( n k ) p^{\alpha} | \binom{n}{k} pα∣(kn) ,首先想到 k u m m e r kummer kummer 定理,那么限制即为 n − k n-k n−k 和 k k k 做加法在 p p p 进制下的进位数 ≥ α \ge \alpha ≥α …...

sql第二次上机作业
1查找借阅了ISBN为“4-6045-1023-4”的借书证号,读者姓名,专业名和借书时间 use tsgl go select Reader.Lno,Rname,Spec,Lend.Bordate FROM Reader,Lend WHERE Reader.LnoLend.Lno AND ISBN 4-6045-1023-42查找借阅了《数据库原理》一书的借阅信息&…...

辅助驾驶功能开发-功能规范篇(22)-3-L2级辅助驾驶方案功能规范
1.3.3 TLA系统功能定义 1.3.3.1 状态机 1.3.3.2 状态迁移图 1.3.3.3 功能定义 1.3.3.3.1 信号需求列表 1.3.3.3.2 系统开启关闭 1)初始化 车辆上电后,交通灯辅助系统(TLA)进行初始化,控制器需在 220ms 内发出第一帧报文,并在 3s 内完成内部自检,同时上电 3s 内不进行…...

Python基础入门例程16-NP16 发送offer(列表)
目录 描述 输入描述: 输出描述: 解答 : 说明: 描述 某公司在面试结束后,创建了一个依次包含字符串 Allen 和 Tom 的列表offer_list,作为通过面试的名单。 请你依次对列表中的名字发送类似 Allen, you…...

Web前端面试之Vue—对Vue的理解
目录 一、web发展历程 二、vue是什么 三、Vue核心特性 组件化 数据驱动 指令 四、Vue与Angular以及React的区别 一、web发展历程 Web是World Wide Web的简称,中文译为万维网 我们可以将它规划成如下的几个时代来进行理解 静态网页:最早的网页是没…...

C/C++晶晶赴约会 2020年12月电子学会青少年软件编程(C/C++)等级考试一级真题答案解析
目录 C/C晶晶赴约会 一、题目要求 1、编程实现 2、输入输出 二、算法分析 三、程序编写 四、程序说明 五、运行结果 六、考点分析 C/C晶晶赴约会 2020年12月 C/C编程等级考试一级编程题 一、题目要求 1、编程实现 晶晶的朋友贝贝约晶晶下周一起去看展览࿰…...

js 解决 H 指数
给你一个整数数组 citations ,其中 citations[i] 表示研究者的第 i 篇论文被引用的次数。计算并返回该研究者的 h 指数。 根据维基百科上 h 指数的定义:h 代表“高引用次数” ,一名科研人员的 h 指数 是指他(她)至少发…...

在JS中,var 、let 、const 总结
let是英文单词"let"的缩写。在JavaScript中,let 关键字用来声明一个块级作用域 的变量,这意味着变量仅在声明它的代码块内有效,超出该代码块作用域时就无法访问该变量。与var不同的是,let不会被提升到函数作用域或全局作…...

关于网络安全运营工作与安全建设工作的一些思考
以下内容是个人成长过程中对于网络安全运营工作的理解和思考,希望通过这篇文章帮助大家更好的去做安全运营体系化建设,开始吧! 文章目录 一、网络安全运营是什么?二、网络安全运营建设阶段第一阶段:设备限制阶段第二阶…...

【机器学习可解释性】4.SHAP 值
机器学习可解释性 1.模型洞察的价值2.特征重要性排列3.部分依赖图4.SHAP 值5.SHAP 值 高级使用 正文 理解各自特征的预测结果? 介绍 您已经看到(并使用)了从机器学习模型中提取一般解释技术。但是,如果你想要打破模型对单个预测的工作原理? SHAP 值…...

OpenCV官方教程中文版 —— 直方图均衡化
OpenCV官方教程中文版 —— 直方图均衡化 前言一、原理二、 OpenCV 中的直方图均衡化三、 CLAHE 有限对比适应性直方图均衡化 前言 本小节我们要学习直方图均衡化的概念,以及如何使用它来改善图片的对比。 一、原理 想象一下如果一副图像中的大多是像素点的像素值…...

如何使用navicat图形化工具远程连接MariaDB数据库【cpolar内网穿透】
公网远程连接MariaDB数据库【cpolar内网穿透】 文章目录 公网远程连接MariaDB数据库【cpolar内网穿透】1. 配置MariaDB数据库1.1 安装MariaDB数据库1.2 测试局域网内远程连接 2. 内网穿透2.1 创建隧道映射2.2 测试随机地址公网远程访问3. 配置固定TCP端口地址3.1 保留一个固定的…...

【uniapp】uview1.x使用upload上传图片
和2.x不同的是,要用 action 来配置后端上传图片的接口地址; 再来一些配置项的命名有所不同,一般1.x的命名用 -,2.x的命名使用小驼峰; 1.x 的上传会自带删除时的提示框,2.x 没有; 重要的几个配置…...

基于nodejs+vue食力派网上订餐系统
目 录 摘 要 I ABSTRACT II 目 录 II 第1章 绪论 1 1.1背景及意义 1 1.2 国内外研究概况 1 1.3 研究的内容 1 第2章 相关技术 3 2.1 nodejs简介 4 2.2 express框架介绍 6 2.4 MySQL数据库 4 第3章 系统分析 5 3.1 需求分析 5 3.2 系统可行性分析 5 3.2.1技术可行性:…...

软件测试常用的8种功能测试类型有哪些?
软件测试常用的8种功能测试类型有哪些? 单元测试 单元测试确保在一个段中编写的每一段代码都能产生最佳结果。开发人员在单元测试期间只看接口和确定部件。它提供了代码进展的文档,因为每个代码单元在继续下一个之前都经过了彻底的测试。 集成测试 至少对…...

动态规划之01背包问题
01背包问题 1. 【模板】01背包2. 分割等和子集3. 目标和4. 最后一块石头的重量 II 01背包问题是一种动态规划问题,用于求解在有限容量的背包中装入最大价值的物品组合。具体步骤如下: 定义一个二维数组dp[i][j],表示从前i个物品中选择若干个…...