当前位置: 首页 > news >正文

flink watermark介绍及watermark的窗口触发机制

Flink的三种时间

在谈watermark之前,首先需要了解flink的三种时间概念。在flink中,有三种时间戳概念:Event Time 、Processing Time 和 Ingestion Time。其中watermark只对Event Time类型的时间戳有用。这三种时间概念分别表示:

Processing time

处理时间,指执行算子操作的机器的当前时间。当基于处理时间运行时,所有关于时间的操作(如时间窗口)都将使用执行算子操作的主机的本地时间。例如,当时间窗口为一小时,如果应用程序在9:15 am开始运行,则第一个窗口将包括在9:15 am到10:00 am之间被处理的事件,下一个窗口将包含在10:00 am到11:00 am之间被处理的事件,依此类推。

处理时间是最简单的时间概念,不需要流和机器之间的协调。它提供了最佳的性能和最低的延迟。但是,在分布式和异步环境中,处理时间不能提供确定性,因为它容易受到上流系统(例如从消息队列)到达Flink的速度、flink内部operators之间交互的速度,以及中断(调度或其他情况)等因素的影响。

Event Time

事件时间,是每个event在其生产设备上产生的时间,即元素在到达flink之前,本身就自带的时间戳。

所以说,Event Time的时间戳取决于数据,而与其他时间无关。使用Event Time,必须在从执行环境中先引入EventTime的时间属性。如:

 

java

复制代码

val env = StreamExecutionEnvironment.getExecutionEnvironment // 从调用时刻开始给env创建的每一个stream追加时间特征 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

然后通过Dstream的assignTimestampsAndWatermarks方法指定event time时间戳,具体操作不做赘述。

在理想情况下,事件时间是有序的。但实际上,由于分布式操作,以及网络延迟等原因,event可能不是按照event time的顺序到达的。所以flink对处理乱序数据的方案是提供一个允许延迟时间,在允许延迟时间内到达的元素将会重新触发一次计算。这个延迟时间时相对event time而不是其他时间的,而event time不是由flink决定的,那么如何判断当前的event time到底时多少呢?flink通过一个watermark来确定与维护当前event time的最大值。这也是本文将会在后面重点解释的。

Ingestion time

Ingestion time是event进入Flink的时间,即执行source操作时的时间。

Ingestion time从概念上讲介于Event Time和Processing time之间。

与Processing time相比 ,它花费的资源会稍微多一些,但结果却更可预测。由于 Ingestion time使用稳定的时间戳(仅在addSource处分配了一次),因此对记录的不同窗口操作将引用相同的时间戳,而在Processing time中,每个窗口操作都会更新事件的Processing time,所以可能一个上游窗口中的记录会分配给不同的下游窗口(基于本地系统时钟和任何可能的延误)。

与Event Time相比,Ingestion time程序无法处理任何乱序事件或迟到的数据,但是程序不必指定如何生成watermarks。

下图为三种时间语义的图解:

watermark

用我自己的语言总结,在flink的窗口计算中,的watermark就是触发窗口计算的一种机制。 那么,watermark到底是以怎样的一种形式存在的呢?实际上,watermark就是一种特殊的event,它被参杂在Dstream中,watermark由flink的某个操作生成后,就在整个程序中随event一同流转,如下图所示:

以下是watermark的代码,可以看出watermark的就是一个流元素,仅包含一个时间戳属性:

 

java

复制代码

public final class Watermark extends StreamElement { /** The watermark that signifies end-of-event-time. */ public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE); // ------------------------------------------------------------------------ /** The timestamp of the watermark in milliseconds. */ private final long timestamp; /** * Creates a new watermark with the given timestamp in milliseconds. */ public Watermark(long timestamp) { this.timestamp = timestamp; }

watermark的窗口触发机制

watermark会根据数据流中event的时间戳发生变化。通常情况下,event都是乱序的,不按时间排序的。watermark的计算逻辑为:当前最大的 event time - 最大允许延迟时间(MaxOutOfOrderness)。在同一个分区内部,当watermark大于或者等于窗口的结束时间时,才能触发该窗口的计算,即watermark>=windows endtime。如下图所示:

根据上图分析: MaxOutOfOrderness = 5s,窗口的大小为:10s。 watermark分别为:12:08、12:15、12:30 计算逻辑为:WM(12:08)=12:13 - 5s;WM(12:15)=12:20 - 5s;WM(12:30)=12:35 - 5s

  • 对于 [12:00,12:10) 窗口,需要在WM=12:15时,才能被触发计算,参与计算的event为:event(12:07)/event(12:01)/event(12:07)/event(12:09),event(12:10)/event(12:12)/event(12:12)/event(12:13)/event(12:20)/event(12:14)/event(12:15)不参与计算,因为还未到窗口时间,也就是event time 为 [12:00,12:10] 窗口内的event才能参与计算。 注意,如果过了这个窗口期,再收到 [12:00,12:10] 窗口内的event,就算超过了最大允许延迟时间(MaxOutOfOrderness),不会再参与计算,也就是数据被强制丢掉了。
  • 对于 [12:10,12:20][12:20,12:30] 窗口,会在WM=12:30时,被同时触发计算,参与**[12:10,12:20]** 窗口计算的event为:event(12:10)/event(12:12)/event(12:12)/event(12:13)/event(12:14)/event(12:15)/event(12:15)/event(12:18);参与 [12:20,12:30] 窗口计算的event为:event(12:20)/event(12:20);在这个过程中event(12:05)会被丢弃,不会参与计算,因为已经超了最大允许延迟时间(MaxOutOfOrderness)

迟到的事件

在介绍watermark时,提到了现实中往往处理的是乱序event,即当event处于某些原因而延后到达时,往往会发生该event time < watermark的情况,所以flink对处理乱序event的watermark有一个允许延迟的机制,这个机制就是最大允许延迟时间(MaxOutOfOrderness),允许在一定时间内迟到的event仍然视为有效event。

并行流的Watermarks

watermark可以在source处生成(也可以在source后通过其他算子生成,如map、filter等),如果source有多个并行度,那么每个并行度会单独生成一个watermark,这些watermark定义了各分区的event time。 当并行度发生变化(即上游的一个分区可能被下游多个分区使用时),每个分区的watermark是会广播至下游的每个分区的,如一些聚合多个流的操作,如 keyBy(…) 或者partition(…),此类操作的watermark是在所有输入流中取最小的watermark。当带有watermark的流通过此类算子时,会根据每个分区的watermark来更新watermark。

举个例子:当上游并行度数为4,下游的某个分区的窗口中的watermark如下:

  1. 当已到达的watermark分别为2、4、3、6时,窗口中的watermark为2,触发watermark为2的对应窗口计算,并将watermark=2广播至下游。

  2. 当第一个窗口的watermark被更新为4时,所有分区中已到达最小的watermark是3,则将窗口的watermark更新为3,触发对应窗口的计算,并将watermark=3广播至下游。

  3. 当第二个分区的watermark被更新为7,所有分区中已到达最小的watermark还是3,不做处理。

  4. 当第三个分区的watermark被更新为6,所有分区中已到达最小的watermark是4,则将窗口的watermark更新为4,触发对应窗口的计算,并将watermark=4广播至下游。

下图显示了event和watermark在一个并行流的示例,以及算子如何跟踪事件时间的:

watermark分配器

当watermark完全基于event time时,如果没有元素到达,则watermark不会被更新,这就说明,当一段时间没有元素到达,则在这个时间间隙内,watermark不会增加,那么也不会触发窗口计算。显然,如果这段时间很长的话,那么该窗口中已经到达的元素将会等待很久才会被输出计算结果。

为了避免这种情况,可以使用周期性的watermark分配器(AssignerWithPeriodicWatermarks 下面马上提到),这些分配器不仅仅基于event time进行分配。比如,可以使用一个分配器,当一段时间没有接收到新的event时,则将当前时间作为watermark。

watermark的两种分配器,flink生成watermark有两种机制:

  • AssignerWithPeriodicWatermarks :分配时间戳并定期生成watermark(可以取决于event time,或基于处理时间)。
  • AssignerWithPunctuatedWatermarks:分配时间戳并根据每一个元素生成watermark(每来一个元素都进行一次判断,相更消耗性能)

通常情况下会使用第一种机制,原因除了更节省性能外,在上面的分配器中也有提到。

下面分别对两种机制进行介绍。

AssignerWithPeriodicWatermarks

对每个元素都调用extractTimestamp方法获取时间戳,并维护一个最大时间戳。通过ExecutionConfig.setAutoWatermarkInterval(...)定义生成watermark的间隔(每n毫秒) 。根据这个间隔,周期性调用分配器的getCurrentWatermark()方法,为watermark分配值。

在flink自带的BoundedOutOfOrdernessGenerator分配器中, getCurrentWatermark是定期将当前watermark更新为最大时间戳减去允许延迟时间的值。

以下是两个使用AssignerWithPeriodicWatermarks 生成的时间戳分配器的简单示例:

 

java

复制代码

/** * This generator generates watermarks assuming that elements arrive out of order, * but only to a certain degree. The latest elements for a certain timestamp t will arrive * at most n milliseconds after the earliest elements for timestamp t. */ class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] { val maxOutOfOrderness = 3500L // 3.5 seconds var currentMaxTimestamp: Long = _ override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = { val timestamp = element.getCreationTime() currentMaxTimestamp = max(timestamp, currentMaxTimestamp) timestamp } override def getCurrentWatermark(): Watermark = { // return the watermark as current highest timestamp minus the out-of-orderness bound new Watermark(currentMaxTimestamp - maxOutOfOrderness) } } /** * This generator generates watermarks that are lagging behind processing time by a fixed amount. * It assumes that elements arrive in Flink after a bounded delay. */ class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] { val maxTimeLag = 5000L // 5 seconds override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = { element.getCreationTime } override def getCurrentWatermark(): Watermark = { // return the watermark as current time minus the maximum time lag new Watermark(System.currentTimeMillis() - maxTimeLag) } }

AssignerWithPunctuatedWatermarks

根据每个元素的event time生成watermark,通过extractTimestamp(...)方法为元素分配时间戳,通过checkAndGetNextWatermark(...)检查元素的watermark并更新watermark。

checkAndGetNextWatermark(...)方法的第二个参数是extractTimestamp(...) 返回的时间戳,根据这个时间戳决定是否要生成watermark。每当checkAndGetNextWatermark(...) 方法返回一个非空watermark,并且该watermark大于上一个watermark时,就会更新watermark。

 

java

复制代码

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] { override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = { element.getCreationTime } override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = { if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null } }

相关文章:

flink watermark介绍及watermark的窗口触发机制

Flink的三种时间 在谈watermark之前&#xff0c;首先需要了解flink的三种时间概念。在flink中&#xff0c;有三种时间戳概念&#xff1a;Event Time 、Processing Time 和 Ingestion Time。其中watermark只对Event Time类型的时间戳有用。这三种时间概念分别表示&#xff1a; …...

Spring Cloud: 云原生微服务实践

文章目录 1. Spring Cloud 简介2. Spring Cloud Eureka&#xff1a;服务注册与发现在Spring Cloud中使用Eureka 3. Spring Cloud Config&#xff1a;分布式配置中心在Spring Cloud中使用Config 4. Spring Cloud Hystrix&#xff1a;熔断器在Spring Cloud中使用Hystrix 5. Sprin…...

存bean和取bean

准备工作存bean获取bean三种方式 准备工作 bean:一个对象在多个地方使用。 spring和spring boot&#xff1a;spring和spring boot项目&#xff1b;spring相当于老版本 spring boot本质还是spring项目&#xff1b;为了方便spring项目的搭建&#xff1b;操作起来更加简单 spring…...

39. 组合总和

给你一个 无重复元素 的整数数组 candidates 和一个目标整数 target &#xff0c;找出 candidates 中可以使数字和为目标数 target 的 所有 不同组合 &#xff0c;并以列表形式返回。你可以按 任意顺序 返回这些组合。 candidates 中的 同一个 数字可以 无限制重复被选取 。如…...

100行以内Python能做那些事

Python100 找到一个很好的python教程分享出来---->非本人 B站视频连接 100行以内的Pyhton代码可以做哪些有意思的事 按照难度1-5颗星&#xff0c;分为五个文件夹 希望大家可以补充 关于运行环境的补充 Python3.7 Pycharm社区版2019 关于用到的Python库,有些是自带的&am…...

Android 电源键事件流程分析

Android 电源键事件流程分析 电源按键流程处理逻辑在 PhoneWindowManager.java类中的 dispatchUnhandledKey 方法中 frameworks/base/services/core/java/com/android/server/policy/PhoneWindowManager.java从dispatchUnhandledKey方法开始分析 Overridepublic KeyEvent dis…...

游戏搬砖简述-1

游戏搬砖是一种在游戏中通过重复性的任务来获取游戏内货币或物品的行为。这种行为在游戏中非常普遍&#xff0c;尤其是在一些MMORPG游戏中。虽然游戏搬砖看起来很无聊&#xff0c;但是它确实是一种可以赚钱的方式&#xff0c;而且对于一些玩家来说&#xff0c;游戏搬砖也是一种…...

多线程基础总结

1. 为什么要有多线程&#xff1f; 线程&#xff1a;线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中&#xff0c;是进程中实际运行单位。 进程&#xff1a;进程是程序的基本执行实体。 什么是多线程&#xff1f; 有了多线程&#xff0c;我们就可以让程序同时做…...

视频理解AI模型分类与汇总

人工智能领域视频模型大体也经历了从传统手工特征&#xff0c;到卷积神经网络、双流网络&#xff08;2014年-2017年&#xff09;、3D卷积网络、transformer的发展脉络。为了时序信息&#xff0c;有的模型也结合用LSTM。 视频的技术大多借鉴图像处理技术&#xff0c;只是视频比…...

【Linux】多线程 --- 线程同步与互斥+生产消费模型

人生总是那么痛苦吗&#xff1f;还是只有小时候是这样&#xff1f; —总是如此 文章目录 一、线程互斥1.多线程共享资源访问的不安全问题2.提出解决方案&#xff1a;加锁&#xff08;局部和静态锁的两种初始化/销毁方案&#xff09;2.1 对于锁的初步理解和实现2.2 局部和全局锁…...

17.模型的定义

学习要点&#xff1a; 1.默认设置 2.模型定义 本节课我们来开始学习数据库的模型部分的定义和默认值的设置。 一&#xff0e;默认设置 1. 框架可以使用 Eloquent ORM 进行数据库交互&#xff0c;也就是关系对象模型&#xff1b; 2. 在数据库入门阶段&#xff0c;我们已经创建了…...

golang 记录交叉编译sqlite的报错信息 go build -ldflags

go build -ldflags ‘-s -w --extldflags “-static -fpic”’ -o go-web main.go [gos20230512]# CGO_ENABLED1 CCaarch64-linux-gnu-gcc CXXaarch64-linux-gnu-g GOOSlinux GOARCHarm64 go build -ldflags -s -w --extldflags "-static -fpic" -o go-web m…...

ChatGPT AI使用成本

LLM “经济学”&#xff1a;ChatGPT 与开源模型&#xff0c;二者之间有哪些优劣权衡&#xff1f;谁的部署成本更低&#xff1f; 太长不看版&#xff1a;对于日均请求在 1000 次左右的低频使用场景&#xff0c;ChatGPT 的实现成本低于部署在 AWS 上的开源大模型。但面对每天数以…...

腾讯云与中电金信发布联合核心方案

5月11日&#xff0c;以“聚力革新&#xff0c;行稳致远”为主题的 “腾讯金融云国产化战略峰会”在北京举办&#xff0c;来自金融业、科技侧、研究机构的专家学者同聚一堂&#xff0c;共同探讨银行核心下移方法论以及国产化转型实践等话题。会议期间&#xff0c;中电金信副总经…...

老胡的周刊(第090期)

老胡的信息周刊[1]&#xff0c;记录这周我看到的有价值的信息&#xff0c;主要针对计算机领域&#xff0c;内容主题极大程度被我个人喜好主导。这个项目核心目的在于记录让自己有印象的信息做一个留存以及共享。 &#x1f3af; 项目 privateGPT[2] 为保证数据私密性&#xff0c…...

2023-数仓常见问题以及解决方案

01 数据仓库现状 小 A 公司创建时间比较短&#xff0c;才刚过完两周岁生日没多久&#xff1b;业务增长速度快&#xff0c;数据迅速增加&#xff0c;同时取数需求激增与数据应用场景对数据质量、响应速度、数据时效性与稳定要求越来越高&#xff1b;但技术能力滞后业务增长&…...

没关系,前端还死不了

前言 网络上的任何事情都可以在《乌合之众》书中找到答案。大众言论没有理性&#xff0c;全是极端&#xff0c;要么封神&#xff0c;要么踩死。不少人喷前端&#xff0c;说前端已死&#xff1f;前端内卷&#xff1f;前端一个月800包吃住&#xff1f; 对此我想说&#xff0c;“…...

OpenSSL-基于IP或域名生成自签名证书脚本

个人名片&#xff1a; 对人间的热爱与歌颂&#xff0c;可抵岁月冗长&#x1f31e; Github&#x1f468;&#x1f3fb;‍&#x1f4bb;&#xff1a;念舒_C.ying CSDN主页✏️&#xff1a;念舒_C.ying 个人博客&#x1f30f; &#xff1a;念舒_C.ying 一、安装 需要安装并配置Op…...

如何在C#中创建和使用自定义异常

C#是一种强类型语言&#xff0c;可以捕获和处理各种异常&#xff0c;从而帮助我们发现程序中出现的错误。在程序开发过程中&#xff0c;如果需要找到特定的错误情况并处理&#xff0c;这时就需要创建自定义异常。下面介绍一下如何在C#中创建和使用自定义异常。 1、什么是异常&…...

通过systemctl管理服务

文章目录 通过systemctl管理服务通过systemctl管理单一服务(service unit)使用案例服务启动/关闭/查看的练习关于systemctl命令启动/停止服务后面的后缀名是否加&#xff1f; 通过systemctl查看系统上所有的服务使用案例 通过systemctl管理不同的操作环境(target unit)使用案例…...

面经|小红书经营分析师

感觉面试官还挺严肃的&#xff0c;并且猎头说因为工作经验不够是外包岗位。 但是没想到最后败在了SQL上&#xff0c;很久没刷题了 平时工作中还是需要想下给公司整体带来的收益结果是什么&#xff0c;实际工作中不一定会用到&#xff0c;但是要有这个思路&#xff0c;面试的时候…...

abpvnext后台工作者使用quartz扩展的一些思路和使用细节记录--(未完待续)

需求背景描述&#xff1a; 我有一个温湿度数据采集的物联网系统&#xff0c;每个租户都需要定时执行若干种任务&#xff0c; 不同的租户&#xff0c; 他定时执行的间隔不一样 &#xff0c;比如 A租户&#xff0c;数据保存间隔60秒&#xff0c;数据是否超限的轮询间隔是是600…...

提升应届生职场竞争力:有效策略和关键推动因素

应届生进入职场是一个关键的阶段&#xff0c;他们需要通过有效的方法和策略来提高自己的竞争力&#xff0c;以适应职场的挑战并取得成功。以下是一些可以帮助应届生提升竞争力的方法和策略&#xff0c;以及对其职场发展起到关键推动和支撑作用的方面。 学习和继续教育&#xff…...

PBDB Data Service:List of fossil collections(化石采集记录列表)

List of fossil collections&#xff08;化石采集记录列表&#xff09; 描述用法参数以下参数可用于按各种条件查询集合。以下参数可用于筛选所选内容以下参数还可用于根据分类筛选结果列表以下参数可用于生成数据存档您可以使用以下参数选择要检索的额外信息&#xff0c;以及要…...

centos安装SNB服务

Samba 是一种开源软件&#xff0c;它提供了一种让 Linux 和 Unix 系统与 Windows 操作系统相互通信的标准协议。Samba 允许 Linux 和 Unix 系统作为文件服务器和打印服务器&#xff0c;提供 Windows 客户端所需的服务。 具体来说&#xff0c;Samba 通过实现 SMB/CIFS 协议来实现…...

课程《JavaWeb基础框架程序设计》考试题下篇——数据库与表单操作用题(人事管理平台的添加员工档案信息的操作题)

文章目录 &#x1f4cb;前言&#x1f3af;第三题&#xff08;40分&#xff09;&#x1f3af;报错以及解决方法&#x1f4dd;最后 &#x1f4cb;前言 这篇文章是大学课程《JavaWeb基础框架程序设计》考试题目的内容&#xff0c;包括了原题和答案。题目只包括了三道编程题&#…...

Linux-初学者系列——篇幅4_系统运行相关命令

系统运行相关命令-目录 一、关机重启注销命令1、重启或者关机命令-shutdown语法格式&#xff1a;常用参数&#xff1a;01 指定多久关闭/重启系统02 指定时间关闭/重启系统03 实现立即关闭/重启系统04 取消关闭/重启系统计划 2、重启或者关机命令-halt/poweroff/reboot/systemct…...

无缝集成:利用Requests库轻松实现数据抓取与处理

目录 引言安装基本用法发送HTTP请求处理HTTP响应高级功能总结 引言 Requests是Python中一个常用的第三方库&#xff0c;用于向Web服务器发起HTTP请求并获取响应。该库的使用简单&#xff0c;功能强大&#xff0c;被广泛应用于网络爬虫、API访问、Web应用开发等领域。 本文将介…...

几种内部排序算法的cpp代码实现与分析

零、测试函数 typedef void (*SortFunc) (int*&, int);inline void swap(int &a, int &b) {int tmp a;a b;b tmp; }inline void printArr(int* a, int n) {for (int k 0; k < n; k) {std::cout << a[k] << ;}std::cout << std::endl; }…...

第3天学习Docker-Docker部署常见应用(MySQL、Tomcat、Nginx、Redis、Centos)

前提须知&#xff1a; &#xff08;1&#xff09;搜索镜像命令 格式&#xff1a;docker search 镜像名 &#xff08;2&#xff09;设置Docker镜像加速器 详见文章&#xff1a;Docker设置ustc的镜像源&#xff08;镜像加速器&#xff09; 1、部署MySQL 拉取镜像&#xff08;这…...