Flink Watermark和时间语义
Flink 中的时间语义
时间语义: EventTime
:事件创建时间;Ingestion Time
:数据进入Flink
的时间;Processing Time
:执行操作算子的本地系统时间,与机器无关。不同的时间语义有不同的应用场合,我们往往更关系事件时间Event Time
。数据生成的时候就会自动注入时间戳,Event Time
可以从日志数据的时间戳timestamp)
中提取。
设置 Event Time
我们可以直接在代码中,对执行环境调用setStreamTimeCharacteristic
方法,设置流的时间特性。具体的时间,还需要从数据中提取时间戳timestamp
。
val env = StreamExecutionEnvironment.getExecutionEnvironment
//从调用时刻开始给 env 创建的每一个 stream 追加时间特性
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
乱序数据的影响
当Flink
以Event Time
模式处理数据流时,它会根据数据里的时间戳来处理基于时间的算子。由于网络、分布式等原因,会导致乱序数据的产生。如上图所示,理想情况与实际情况会存在差异,乱序数据会让窗口计算不准确。解决方案是让窗口等几分钟。
水位线 Watermark
怎么避免乱序数据带来计算不正确?
遇到一个时间戳到达了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口。Watermark
是一种衡量Event Time
进展的机制,可以设置延迟触发。Watermark
是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark
机制结合window
来实现。数据流中的Watermark
用于表示timestamp
小于Watermark
的数据,都已经达到了,因此,window
的执行也是由Watermark
触发的。Watermark
用来让程序自己延迟和结果正确性。
Watermark 的特点: Watermark
是一条特殊的数据记录,必须单调递增,以确保任务的事件时间时钟在向前推进,而不是在后退。Watermark
与数据的时间戳有关。
watermark 的传递、引入和设定
watermark
的传递: 一个Task
输入可以并行多个,如下有4
个并行度,输出也可能存在多个并行,如下有3个。每个任务Task
内部都有一个事件时钟,且每个分区也维护了对应的WM
,如下的Partition WM
。当事件流流进Partition
时会判断新事件流的WM
是否大于当前的Partition WM
,当大于时就更新Partition
的时间戳WM
为新流入的WM
(取最大值),如下1->2
象限Partition WM
的变化。同时,如下Task
也维护了一个全局的WM
表示事件时钟,该值取分区中最小的WM
作为输出的时间戳,如下第二象限的输出选择最小的WM=3
向下传递。当第二个(横线)分区Partition WM
流进来WM=7
的事件流时,就会出现第三象限的情景,但是最小的WM
还是=3
,因此不更新Task
全局的WM
。当第三个分区Partition WM
流进来WM=6
的事件流时,就会出现第四象限的情景,此时分区Partition WM
的最小值=4
,因此Task
全局WM
就=4
。
watermark
的引入: Event Time
的使用一定要指定数据源中的时间戳。对于排好序的数据,只需要指定时间戳就够了,不需要延迟触发。
import org.apache.flink.streaming.api.windowing.time.Time
//同时分配时间戳和水位线
dataStream.assignTimestampsAndWatermarks(
//无序数据 Time.milliseconds(1000)=延迟时间
new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.milliseconds(1000)) {//提取事件戳 = timestamp * 1000是因为出入的毫秒override def extractTimestamp(t: SensorReading): Long = {t.timestamp * 1000}
})
【1】对于排好序的数据,不需要延迟触发,可以只指定事件戳就行了
dataStream.assignTimestampsAndWatermarks(_.timestamp * 1000)
【2】Flink
暴露了TimestampAssigner
接口供我们实现,使我们可以自定义如何从事件数据中抽取时间戳和生成 watermark
。MyAssigner
可以有两种类型,都继承自TimestampAssigner
。
dataStream.assignTimestampsAndWatermarks(new MyAssigner())
TimestampAssigner
:定义了抽取时间戳,以及生成watermark
的方法,有两种类型:
【1】AssignerWithPeriodicWatermarks
: 系统会周期性的将Watermark
插入到流中。默认周期是200
毫秒(如果是processingTime
则Watermark = 0
),可以使用ExecutionConfig.setAutoWatermarkInterval()
方法进行设置。升序和前面乱序的处理BoundedOutOfOrderness
,都是基于周期性watermark
的。举例:如下产生watermark
的逻辑:每隔5
秒,Flink
调用AssignerWithPeriodicWatermarks
的getCurrentWatermark()
方法。如果方法返回一个时间戳大于之前水位的时间戳,新的water
会被插入到流中。这个检查保证了水位线是单调递增的。如果方法返回的时间戳小于之前水位的时间戳,则不会产生新的watermark
。
//方案一:
//EventTime是以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//每隔 5秒产生一个 watermark
env.getConfig.setAutoWatermarkInterval(5000);
//方案二
//自定义一个周期性的时间戳
class PeriodicAssigner extends AssignerWithPeriodicWatermarks[SensorReading]{val bound: Long = 60 * 1000 //延时为 1 分钟var maxTs: Long = Long.MinValue //观察到的最大时间戳//生成水位线override def getCurrentWatermark: Watermark = {new Watermark(maxTs - bound)}//抽取时间戳的方法override def extractTimestamp(t: SensorReading, l: Long): Long = {maxTs = maxTs.max(t.timestamp)t.timestamp}
}
【2】AssignerWithPunctuatedWatermarks
: 没有时间周期规律,可打断的生成watermark
。
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading]{val bound: Long = 60 * 1000//获取水位线,根据数据触发override def checkAndGetNextWatermark(t: SensorReading, l: Long): Watermark = {if(t.id == "sensor_1"){new Watermark(l - bound)}else{null}}//抽取时间戳的方法override def extractTimestamp(t: SensorReading, l: Long): Long = {t.timestamp}
}
watermark 的设定:
【1】在Flink
中,watermark
由应用程序开发人员生成,这通常需要对相应的领域有一定的了解。
【2】如果watermark
设置的延迟太久,收到结果的速度可能就会很慢,解决办法是在水位线到达之前输出一个近似结果。
【3】而如果watermark
到达得太早,则可能收到错误结果,不过Flink
处理迟到数据的机制可以解决这个问题。
相关文章:

Flink Watermark和时间语义
Flink 中的时间语义 时间语义: EventTime:事件创建时间;Ingestion Time:数据进入Flink的时间;Processing Time:执行操作算子的本地系统时间,与机器无关。不同的时间语义有不同的应用场合&#x…...
HarmonyOS UI框架简介
HarmonyOS UI框架介绍 HarmonyOSUI框架是一个用于构建跨设备应用的开发框架,它属于HarmonyOS系统架构的上层框架。该框架通过提供一系列的开发模型、声明式UI范式、系统API等,帮助开发者更高效地构建用户界面。 在HarmonyOSUI框架中,开发语…...

编程羔手解决Maven引入多个版本的依赖包,导致包冲突了
最近升级了些依赖发现有个hutool的方法老报错,java.lang.NoSuchMethodError: cn.hutool.core.util.ObjectUtil.defaultIfNull(Ljava/lang/Object;Ljava/util/function/Supplier;) 在 Maven 项目中,当不同的依赖模块引入 Hutool 的不同版本时,…...

C#,入门教程(08)——基本数据类型及使用的基础知识
上一篇: C#,入门教程(07)——软件项目的源文件与目录结构https://blog.csdn.net/beijinghorn/article/details/124139947 数据类型用于指定数据体(DataEntity,包括但不限于类或结构体的属性、变量、常量、函数返回值)…...

分类预测 | Matlab实现DBO-SVM蜣螂算法优化支持向量机多特征分类预测
分类预测 | Matlab实现DBO-SVM蜣螂算法优化支持向量机多特征分类预测 目录 分类预测 | Matlab实现DBO-SVM蜣螂算法优化支持向量机多特征分类预测分类效果基本描述程序设计参考资料 分类效果 基本描述 1.Matlab实现DBO-SVM蜣螂算法优化支持向量机多特征分类预测(完整…...
计算机二级Python选择题考点——公共基础部分
计算机完成一条指令所花费的时间称为一个指令周期。(指令周期越短,指令执行就越快)顺序程序不具有并发性。(具有顺序性、封闭性和可再现性)结构化程序设计强调程序的易读性。系统软件:操作系统、编译程序、数据库管理系统 应用软件:杀毒软件在…...

《微机原理与应用》期末考试题库(附答案解析)
第1章 微型计算机概述 1.微型计算机的硬件系统包括___A _____。 A.控制器、运算器、存储器和输入输出设备 B.控制器、主机、键盘和显示器 C.主机、电源、CPU和输入输出 D.CPU、键盘、显示器和打印机 2.微处…...

如何在Android Glide中结合使用CenterCrop和自定义圆角变换(图片部分圆角矩形)
如何在Android Glide中结合使用CenterCrop和自定义圆角变换(图片部分圆角矩形) 在Android开发中,使用Glide加载图片时,我们经常需要对图片进行特定的处理,比如裁剪和圆角变换,特别是一些设计稿,…...
华为机考-手拍球游戏
【手拍手计算次数和总数】游戏规则:左手和右手拍球初始数为0,首先左手第一次拍球数1下,右手拍球1下,接下来左手在拍球时是上一次左手上一次右手的总和,右手也是上一次左手上一次右手拍球的总和,最后拍球总数…...
【线上问题】两台服务器的时间不一致导致jwt解析错误
目录 一、问题描述二、解决方法 一、问题描述 1.线上生产问题,本地和测试环境均无问题 2.本地和测试由于网关和登录服务均在同一台机器 3.线上的登录服务和网关部署不在一起,登录服务的时间正常,网关服务的服务器时间比实际快5秒 4.登录服务j…...

58.网游逆向分析与插件开发-游戏增加自动化助手接口-游戏菜单文字资源读取的逆向分析
内容来源于:易道云信息技术研究院VIP课 之前的内容:接管游戏的自动药水设定功能-CSDN博客 码云地址(master分支):https://gitee.com/dye_your_fingers/sro_-ex.git 码云版本号:34b9c1d43b512d0b4a3c395b…...

Vue-2、初识Vue
1、helloword小案列 代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>初始Vue</title><!--引入vue--><script type"text/javascript" src"https://cdn.jsdelivr.n…...

机器学习项目标记图像数据 - 安装LabelImg及功能介绍
什么是LabelImg? LabelImg 是一款流行的图像标注工具,主要用于计算机视觉领域。它允许用户为机器学习项目标记图像数据,特别是用于训练目标检测模型。 如何安装LabelImg pip install PyQt5 pip install pyqt5-tools pip install lxml pip …...
12.15 log 122.买卖股票的最佳时机 II,55. 跳跃游戏
122.买卖股票的最佳时机 II class Solution { public:int maxProfit(vector<int>& prices) {int result0;for(int i0;i<prices.size();i){if(i>0&&prices[i]-prices[i-1]>0){resultprices[i]-prices[i-1];}}return result;} }; 这道题贪心贪的时每…...

Redis - 挖矿病毒 db0 库 backup 反复出现解决方案
问题描述 腾讯云的服务器,使用 Docker 部署了 Redis 之后,发现 DB0 中总是出现 4 条 key,分别是 backup01backup02backup03backup04 而自己每次存入 db0 中的数据过一会就会被无缘无故删除掉。 原因分析 挖矿病毒 解决方案 在启动的时候…...

LiveGBS流媒体平台GB/T28181常见问题-国标编号是什么设备编号和通道国标编号标记唯一的摄像头|视频|镜头通道
LiveGBS国标GB28181中国标编号是什么设备编号和通道国标编号标记唯一的摄像头|视频|镜头通道 1、什么是国标编号?2、国标设备ID和通道ID3、ID 统一编码规则4、搭建GB28181视频直播平台 1、什么是国标编号? 国标GB28181对接过程中,可能有的小…...

Unity ShaderGraph 技能冷却转圈效果
Unity ShaderGraph 技能冷却转圈效果 前言项目场景布置代码编写ShaderGraph 连线总结 参考 前言 遇到一个需求,要展示技能冷却的圆形遮罩效果。 项目 场景布置 代码编写 Shader核心的就两句 // 将uv坐标系的原点移到纹理中心 float2 uv i.uv - float2(0.5, 0…...

C++上位软件通过Snap7开源库访问西门子S7-1200/S7-1500数据块的方法
前言 本人一直从事C上位软件开发工作较多,在之前的项目中通过C访问西门子PLC S7-200/S7-1200/S7-1500并进行数据交互的应用中一直使用的是ModbusTCP/ModbusRTU协议进行。Modbus上位开源库采用的LibModbus。经过实际应用发现Modbus开源库单次发送和接受的数据不能超过…...

如何正确安装Axure插件?详细步骤分享
产品经理在使用Axure导出html文件时,如果选择“完成后打开浏览器”,浏览器往往无法识别。此时,我们需要使用Axure官方谷歌浏览器插件直接访问浏览器中的本地html项目,否则我们需要上传到AxureCloud或使用软件本身的预览功能。接下…...

[SwiftUI]工程最低适配iOS13
问题: 新建工程,选择最低支持iOS13报错: main() is only available in iOS 14.0 or newer Scene is only available in iOS 14.0 or newer WindowGroup is only available in iOS 14.0 or newer 解决: 注释掉上面代码&#x…...

工业安全零事故的智能守护者:一体化AI智能安防平台
前言: 通过AI视觉技术,为船厂提供全面的安全监控解决方案,涵盖交通违规检测、起重机轨道安全、非法入侵检测、盗窃防范、安全规范执行监控等多个方面,能够实现对应负责人反馈机制,并最终实现数据的统计报表。提升船厂…...

渗透实战PortSwigger靶场-XSS Lab 14:大多数标签和属性被阻止
<script>标签被拦截 我们需要把全部可用的 tag 和 event 进行暴力破解 XSS cheat sheet: https://portswigger.net/web-security/cross-site-scripting/cheat-sheet 通过爆破发现body可以用 再把全部 events 放进去爆破 这些 event 全部可用 <body onres…...

STM32F4基本定时器使用和原理详解
STM32F4基本定时器使用和原理详解 前言如何确定定时器挂载在哪条时钟线上配置及使用方法参数配置PrescalerCounter ModeCounter Periodauto-reload preloadTrigger Event Selection 中断配置生成的代码及使用方法初始化代码基本定时器触发DCA或者ADC的代码讲解中断代码定时启动…...
【磁盘】每天掌握一个Linux命令 - iostat
目录 【磁盘】每天掌握一个Linux命令 - iostat工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景 注意事项 【磁盘】每天掌握一个Linux命令 - iostat 工具概述 iostat(I/O Statistics)是Linux系统下用于监视系统输入输出设备和CPU使…...

家政维修平台实战20:权限设计
目录 1 获取工人信息2 搭建工人入口3 权限判断总结 目前我们已经搭建好了基础的用户体系,主要是分成几个表,用户表我们是记录用户的基础信息,包括手机、昵称、头像。而工人和员工各有各的表。那么就有一个问题,不同的角色…...
postgresql|数据库|只读用户的创建和删除(备忘)
CREATE USER read_only WITH PASSWORD 密码 -- 连接到xxx数据库 \c xxx -- 授予对xxx数据库的只读权限 GRANT CONNECT ON DATABASE xxx TO read_only; GRANT USAGE ON SCHEMA public TO read_only; GRANT SELECT ON ALL TABLES IN SCHEMA public TO read_only; GRANT EXECUTE O…...

Nuxt.js 中的路由配置详解
Nuxt.js 通过其内置的路由系统简化了应用的路由配置,使得开发者可以轻松地管理页面导航和 URL 结构。路由配置主要涉及页面组件的组织、动态路由的设置以及路由元信息的配置。 自动路由生成 Nuxt.js 会根据 pages 目录下的文件结构自动生成路由配置。每个文件都会对…...

C++ 求圆面积的程序(Program to find area of a circle)
给定半径r,求圆的面积。圆的面积应精确到小数点后5位。 例子: 输入:r 5 输出:78.53982 解释:由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982,因为我们只保留小数点后 5 位数字。 输…...

华为云Flexus+DeepSeek征文|DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建
华为云FlexusDeepSeek征文|DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建 前言 如今大模型其性能出色,华为云 ModelArts Studio_MaaS大模型即服务平台华为云内置了大模型,能助力我们轻松驾驭 DeepSeek-V3/R1,本文中将分享如何…...

什么是Ansible Jinja2
理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具,可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板,允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板,并通…...