当前位置: 首页 > news >正文

Flink Watermark和时间语义

Flink 中的时间语义

[点击并拖拽以移动] ​

时间语义: EventTime:事件创建时间;Ingestion Time:数据进入Flink的时间;Processing Time:执行操作算子的本地系统时间,与机器无关。不同的时间语义有不同的应用场合,我们往往更关系事件时间Event Time。数据生成的时候就会自动注入时间戳,Event Time可以从日志数据的时间戳timestamp)中提取。

设置 Event Time

我们可以直接在代码中,对执行环境调用setStreamTimeCharacteristic方法,设置流的时间特性。具体的时间,还需要从数据中提取时间戳timestamp

val env = StreamExecutionEnvironment.getExecutionEnvironment
//从调用时刻开始给 env 创建的每一个 stream 追加时间特性
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

乱序数据的影响

[点击并拖拽以移动] ​

FlinkEvent Time模式处理数据流时,它会根据数据里的时间戳来处理基于时间的算子。由于网络、分布式等原因,会导致乱序数据的产生。如上图所示,理想情况与实际情况会存在差异,乱序数据会让窗口计算不准确。解决方案是让窗口等几分钟。

水位线 Watermark

怎么避免乱序数据带来计算不正确?
遇到一个时间戳到达了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口。Watermark是一种衡量Event Time进展的机制,可以设置延迟触发。Watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark机制结合window来实现。数据流中的Watermark用于表示timestamp小于Watermark的数据,都已经达到了,因此,window的执行也是由Watermark触发的。Watermark用来让程序自己延迟和结果正确性。

Watermark 的特点: Watermark是一条特殊的数据记录,必须单调递增,以确保任务的事件时间时钟在向前推进,而不是在后退。Watermark与数据的时间戳有关。
[点击并拖拽以移动] ​

watermark 的传递、引入和设定

watermark的传递: 一个Task输入可以并行多个,如下有4个并行度,输出也可能存在多个并行,如下有3个。每个任务Task内部都有一个事件时钟,且每个分区也维护了对应的WM,如下的Partition WM。当事件流流进Partition时会判断新事件流的WM是否大于当前的Partition WM,当大于时就更新Partition的时间戳WM为新流入的WM(取最大值),如下1->2象限Partition WM的变化。同时,如下Task也维护了一个全局的WM表示事件时钟,该值取分区中最小的WM作为输出的时间戳,如下第二象限的输出选择最小的WM=3向下传递。当第二个(横线)分区Partition WM流进来WM=7的事件流时,就会出现第三象限的情景,但是最小的WM还是=3,因此不更新Task全局的WM。当第三个分区Partition WM流进来WM=6的事件流时,就会出现第四象限的情景,此时分区Partition WM的最小值=4,因此Task全局WM=4
[点击并拖拽以移动] ​

watermark的引入: Event Time的使用一定要指定数据源中的时间戳。对于排好序的数据,只需要指定时间戳就够了,不需要延迟触发。

import org.apache.flink.streaming.api.windowing.time.Time
//同时分配时间戳和水位线
dataStream.assignTimestampsAndWatermarks(
//无序数据       Time.milliseconds(1000)=延迟时间
new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.milliseconds(1000)) {//提取事件戳 = timestamp * 1000是因为出入的毫秒override def extractTimestamp(t: SensorReading): Long = {t.timestamp * 1000}
})

【1】对于排好序的数据,不需要延迟触发,可以只指定事件戳就行了

dataStream.assignTimestampsAndWatermarks(_.timestamp * 1000)

【2】Flink暴露了TimestampAssigner接口供我们实现,使我们可以自定义如何从事件数据中抽取时间戳和生成 watermarkMyAssigner可以有两种类型,都继承自TimestampAssigner

dataStream.assignTimestampsAndWatermarks(new MyAssigner())

TimestampAssigner:定义了抽取时间戳,以及生成watermark的方法,有两种类型:
【1】AssignerWithPeriodicWatermarks 系统会周期性的将Watermark插入到流中。默认周期是200毫秒(如果是processingTimeWatermark = 0 ),可以使用ExecutionConfig.setAutoWatermarkInterval()方法进行设置。升序和前面乱序的处理BoundedOutOfOrderness,都是基于周期性watermark的。举例:如下产生watermark的逻辑:每隔5秒,Flink调用AssignerWithPeriodicWatermarksgetCurrentWatermark()方法。如果方法返回一个时间戳大于之前水位的时间戳,新的water会被插入到流中。这个检查保证了水位线是单调递增的。如果方法返回的时间戳小于之前水位的时间戳,则不会产生新的watermark

//方案一:
//EventTime是以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//每隔 5秒产生一个 watermark
env.getConfig.setAutoWatermarkInterval(5000);//方案二
//自定义一个周期性的时间戳
class PeriodicAssigner extends AssignerWithPeriodicWatermarks[SensorReading]{val bound: Long = 60 * 1000 //延时为 1 分钟var maxTs: Long = Long.MinValue //观察到的最大时间戳//生成水位线override def getCurrentWatermark: Watermark = {new Watermark(maxTs - bound)}//抽取时间戳的方法override def extractTimestamp(t: SensorReading, l: Long): Long = {maxTs = maxTs.max(t.timestamp)t.timestamp}
}

【2】AssignerWithPunctuatedWatermarks 没有时间周期规律,可打断的生成watermark

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading]{val bound: Long = 60 * 1000//获取水位线,根据数据触发override def checkAndGetNextWatermark(t: SensorReading, l: Long): Watermark = {if(t.id == "sensor_1"){new Watermark(l - bound)}else{null}}//抽取时间戳的方法override def extractTimestamp(t: SensorReading, l: Long): Long = {t.timestamp}
}

watermark 的设定:
【1】在Flink中,watermark由应用程序开发人员生成,这通常需要对相应的领域有一定的了解。
【2】如果watermark设置的延迟太久,收到结果的速度可能就会很慢,解决办法是在水位线到达之前输出一个近似结果。
【3】而如果watermark到达得太早,则可能收到错误结果,不过Flink处理迟到数据的机制可以解决这个问题。

相关文章:

Flink Watermark和时间语义

Flink 中的时间语义 时间语义: EventTime:事件创建时间;Ingestion Time:数据进入Flink的时间;Processing Time:执行操作算子的本地系统时间,与机器无关。不同的时间语义有不同的应用场合&#x…...

HarmonyOS UI框架简介

HarmonyOS UI框架介绍 HarmonyOSUI框架是一个用于构建跨设备应用的开发框架,它属于HarmonyOS系统架构的上层框架。该框架通过提供一系列的开发模型、声明式UI范式、系统API等,帮助开发者更高效地构建用户界面。 在HarmonyOSUI框架中,开发语…...

编程羔手解决Maven引入多个版本的依赖包,导致包冲突了

最近升级了些依赖发现有个hutool的方法老报错,java.lang.NoSuchMethodError: cn.hutool.core.util.ObjectUtil.defaultIfNull(Ljava/lang/Object;Ljava/util/function/Supplier;) 在 Maven 项目中,当不同的依赖模块引入 Hutool 的不同版本时&#xff0c…...

C#,入门教程(08)——基本数据类型及使用的基础知识

上一篇: C#,入门教程(07)——软件项目的源文件与目录结构https://blog.csdn.net/beijinghorn/article/details/124139947 数据类型用于指定数据体(DataEntity,包括但不限于类或结构体的属性、变量、常量、函数返回值)…...

分类预测 | Matlab实现DBO-SVM蜣螂算法优化支持向量机多特征分类预测

分类预测 | Matlab实现DBO-SVM蜣螂算法优化支持向量机多特征分类预测 目录 分类预测 | Matlab实现DBO-SVM蜣螂算法优化支持向量机多特征分类预测分类效果基本描述程序设计参考资料 分类效果 基本描述 1.Matlab实现DBO-SVM蜣螂算法优化支持向量机多特征分类预测(完整…...

计算机二级Python选择题考点——公共基础部分

计算机完成一条指令所花费的时间称为一个指令周期。(指令周期越短,指令执行就越快)顺序程序不具有并发性。(具有顺序性、封闭性和可再现性)结构化程序设计强调程序的易读性。系统软件:操作系统、编译程序、数据库管理系统 应用软件:杀毒软件在…...

《微机原理与应用》期末考试题库(附答案解析)

第1章 微型计算机概述 1.微型计算机的硬件系统包括___A _____。 A.控制器、运算器、存储器和输入输出设备 B.控制器、主机、键盘和显示器 C.主机、电源、CPU和输入输出 D.CPU、键盘、显示器和打印机 2.微处…...

如何在Android Glide中结合使用CenterCrop和自定义圆角变换(图片部分圆角矩形)

如何在Android Glide中结合使用CenterCrop和自定义圆角变换(图片部分圆角矩形) 在Android开发中,使用Glide加载图片时,我们经常需要对图片进行特定的处理,比如裁剪和圆角变换,特别是一些设计稿,…...

华为机考-手拍球游戏

【手拍手计算次数和总数】游戏规则:左手和右手拍球初始数为0,首先左手第一次拍球数1下,右手拍球1下,接下来左手在拍球时是上一次左手上一次右手的总和,右手也是上一次左手上一次右手拍球的总和,最后拍球总数…...

【线上问题】两台服务器的时间不一致导致jwt解析错误

目录 一、问题描述二、解决方法 一、问题描述 1.线上生产问题,本地和测试环境均无问题 2.本地和测试由于网关和登录服务均在同一台机器 3.线上的登录服务和网关部署不在一起,登录服务的时间正常,网关服务的服务器时间比实际快5秒 4.登录服务j…...

58.网游逆向分析与插件开发-游戏增加自动化助手接口-游戏菜单文字资源读取的逆向分析

内容来源于:易道云信息技术研究院VIP课 之前的内容:接管游戏的自动药水设定功能-CSDN博客 码云地址(master分支):https://gitee.com/dye_your_fingers/sro_-ex.git 码云版本号:34b9c1d43b512d0b4a3c395b…...

Vue-2、初识Vue

1、helloword小案列 代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>初始Vue</title><!--引入vue--><script type"text/javascript" src"https://cdn.jsdelivr.n…...

机器学习项目标记图像数据 - 安装LabelImg及功能介绍

什么是LabelImg&#xff1f; LabelImg 是一款流行的图像标注工具&#xff0c;主要用于计算机视觉领域。它允许用户为机器学习项目标记图像数据&#xff0c;特别是用于训练目标检测模型。 如何安装LabelImg pip install PyQt5 pip install pyqt5-tools pip install lxml pip …...

12.15 log 122.买卖股票的最佳时机 II,55. 跳跃游戏

122.买卖股票的最佳时机 II class Solution { public:int maxProfit(vector<int>& prices) {int result0;for(int i0;i<prices.size();i){if(i>0&&prices[i]-prices[i-1]>0){resultprices[i]-prices[i-1];}}return result;} }; 这道题贪心贪的时每…...

Redis - 挖矿病毒 db0 库 backup 反复出现解决方案

问题描述 腾讯云的服务器&#xff0c;使用 Docker 部署了 Redis 之后&#xff0c;发现 DB0 中总是出现 4 条 key&#xff0c;分别是 backup01backup02backup03backup04 而自己每次存入 db0 中的数据过一会就会被无缘无故删除掉。 原因分析 挖矿病毒 解决方案 在启动的时候…...

LiveGBS流媒体平台GB/T28181常见问题-国标编号是什么设备编号和通道国标编号标记唯一的摄像头|视频|镜头通道

LiveGBS国标GB28181中国标编号是什么设备编号和通道国标编号标记唯一的摄像头|视频|镜头通道 1、什么是国标编号&#xff1f;2、国标设备ID和通道ID3、ID 统一编码规则4、搭建GB28181视频直播平台 1、什么是国标编号&#xff1f; 国标GB28181对接过程中&#xff0c;可能有的小…...

Unity ShaderGraph 技能冷却转圈效果

Unity ShaderGraph 技能冷却转圈效果 前言项目场景布置代码编写ShaderGraph 连线总结 参考 前言 遇到一个需求&#xff0c;要展示技能冷却的圆形遮罩效果。 项目 场景布置 代码编写 Shader核心的就两句 // 将uv坐标系的原点移到纹理中心 float2 uv i.uv - float2(0.5, 0…...

C++上位软件通过Snap7开源库访问西门子S7-1200/S7-1500数据块的方法

前言 本人一直从事C上位软件开发工作较多&#xff0c;在之前的项目中通过C访问西门子PLC S7-200/S7-1200/S7-1500并进行数据交互的应用中一直使用的是ModbusTCP/ModbusRTU协议进行。Modbus上位开源库采用的LibModbus。经过实际应用发现Modbus开源库单次发送和接受的数据不能超过…...

如何正确安装Axure插件?详细步骤分享

产品经理在使用Axure导出html文件时&#xff0c;如果选择“完成后打开浏览器”&#xff0c;浏览器往往无法识别。此时&#xff0c;我们需要使用Axure官方谷歌浏览器插件直接访问浏览器中的本地html项目&#xff0c;否则我们需要上传到AxureCloud或使用软件本身的预览功能。接下…...

[SwiftUI]工程最低适配iOS13

问题&#xff1a; 新建工程&#xff0c;选择最低支持iOS13报错&#xff1a; main() is only available in iOS 14.0 or newer Scene is only available in iOS 14.0 or newer WindowGroup is only available in iOS 14.0 or newer 解决&#xff1a; 注释掉上面代码&#x…...

MyBatis-Plus框架学习笔记

先赞后看&#xff0c;养成习惯&#xff01;&#xff01;&#xff01;❤️ ❤️ ❤️ 文章码字不易&#xff0c;如果喜欢可以关注我哦&#xff01; ​如果本篇内容对你有所启发&#xff0c;欢迎访问我的个人博客了解更多内容&#xff1a;链接地址 MyBatisPlus &#xff08;简称…...

【Java】——期末复习题题库(六)

&#x1f383;个人专栏&#xff1a; &#x1f42c; 算法设计与分析&#xff1a;算法设计与分析_IT闫的博客-CSDN博客 &#x1f433;Java基础&#xff1a;Java基础_IT闫的博客-CSDN博客 &#x1f40b;c语言&#xff1a;c语言_IT闫的博客-CSDN博客 &#x1f41f;MySQL&#xff1a…...

【水文】实现四则运算的简易计算器

代码&#xff1a; #include <stdio.h>int main() {double num1, num2;char operator, ch;printf("请输入两个操作数和运算符&#xff08;空格分隔&#xff09;&#xff1a;");scanf("%lf %lf %c", &num1, &num2, &operator);switch (op…...

计算机毕业设计-----ssm+mysql实现的JavaWeb酒店管理系统

项目介绍 本项目为基于ssmmysql实现的JavaWeb酒店管理系统; 主要功能包括&#xff1a; 管理员登录,收入统计,客房管理,商品管理,客房预订,住宿登记,财务统计,旅客管理,接待对象管理等功能。 环境需要 1.运行环境&#xff1a;最好是java jdk 1.8&#xff0c;我们在这个平台上…...

安防监控EasyCVR视频融合/汇聚平台大华热成像摄像机智能告警上报配置步骤

安防视频监控/视频集中存储/云存储/磁盘阵列EasyCVR平台可拓展性强、视频能力灵活、部署轻快&#xff0c;可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等&#xff0c;以及支持厂家私有协议与SDK接入&#xff0c;包括海康Ehome、海大宇等设备的SDK等。平台既具备传统安…...

关于“Python”的核心知识点整理大全64

目录 20.2.15 确保项目的安全 settings.py 20.2.16 提交并推送修改 20.2.17 创建自定义错误页面 1. 创建自定义模板 500.html settings.py settings.py 注意 views.py 20.2.18 继续开发 往期快速传送门&#x1f446;&#xff08;在文章最后&#xff09;&#xff1a…...

Docker overlay2文件busy,容器不能删除问题解决

文章目录 在删除docker容器的时候报错,说设备正忙通过 docker ps -a 查看有两个状态的dead的容器解决方法&#xff1a;1.查看所有挂载的设备2.截取设备的进程id3.清理进程(kill掉即可) 在删除docker容器的时候报错,说设备正忙 Error response from daemon: Driver overlay2 fai…...

栈的数据结构实验报告

一、实验目的&#xff1a; 1、理解栈的定义&#xff1b; 2、利用栈处理实际问题。 二、实验内容&#xff08;实验题目与说明&#xff09; 利用栈实现数据的分类&#xff0c;将输入的整数以奇偶为标准分别存放到两个栈中&#xff0c;并最终从栈1和栈2输出偶数和奇数序列。 …...

ValueError: Could not find a backend to open path with iomode `wI` 解决

运行如下语句时报错 imageio.mimwrite(os.path.join(savedir, video.mp4)报错信息&#xff1a; ValueError: Could not find a backend to open video.mp4 with iomode wI. Based on the extension, the following plugins might add capable backends:FFMPEG: pip install …...

小白入门基础 - spring Boot 入门

1.简介 spring Boot是为了简化java的开发流程而构建的&#xff0c;即使是使用springMVC框架&#xff0c;也依然需要大量配置和依赖导入&#xff0c; 这无疑是繁琐的&#xff0c;spring Boot采用了”习惯由于配置“的原则&#xff0c;进行一键化部署&#xff0c;这样极大…...

济宁祥云网站建设/搜索引擎调词平台哪个好

国际上好用的外贸邮箱&#xff0c;企业开发信、发票、货单&#xff0c;尽享其中。矩阵式服务器搭载&#xff0c;信号组网&#xff0c;全球极致传书。 注册域名&#xff0c;或登录TOM官网&#xff0c;或搜索&#xff08;qy.tom.com&#xff09;&#xff0c;客服热线&#xff0…...

建设摩托车官网中国官网报价大全/太原seo关键词优化

<meta http-equiv"X-UA-Compatible" content"IEEmulateIE7"/>转载于:https://www.cnblogs.com/xjt360/p/3604410.html...

wordpress收费下载模板/seo顾问咨询

编者按&#xff1a;当今&#xff0c;以交换机和路由器为主建立起来的网络连接和拓扑已经构成相当完善的信息基础设施&#xff0c;差异化提供网络个性服务的呼声更加强烈&#xff0c;智能化提升网络综合品质的要求更加迫切&#xff0c;关于“中间设备”&#xff08;middlebox&am…...

网站浏览图片怎么做/关键词查询的分析网站

Lazarus crack,功能丰富的 Delphi 开发环境 Lazarus为开发人员提供了一个功能丰富的 Delphi 开发环境&#xff0c;使他们能够创建功能齐全的跨平台应用程序&#xff0c;专为个人和商业用途而设计。 Lazarus 项目拥有超过 15 年的历史&#xff0c;包括一个用于 Free Pascal 的综…...

中国设计师联盟官网/美国seo薪酬

移植了下HAL&#xff0c;发现编译出现如下错误 error: LOGE was not declared in this scope 比较了一下android4.1的 system/core/include/cutils/log.h和android4.0的对应文件&#xff0c; 发现在4.1当中已经将所有的LOG宏前面加了一个字母A 。所以出现上述编译错误。 修改HA…...

如何解析到凡科建设的网站/国际新闻快报

// -------------------- UISlider (滑块控件 是一个滑杆 存放着一系列的值) // UISlider *slider [[UISlider alloc]initWithFrame:CGRectMake(30, 100, 300, 30)]; // 设置滑块的显示图片 [slider setThumbImage:[UIImage imageNamed:"1.png"] forSta…...