当前位置: 首页 > news >正文

flink watermark 实例分析

WATERMARK 定义了表的事件时间属性,其形式为:

 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression 

rowtime_column_name 把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3)/TIMESTAMP_LTZ(3),且是 schema 中的顶层列,它也可以是一个计算列。
watermark是触发计算的机制,只要事件时间<= watermark,就会触发当前行数据的计算,watermark的形象描述如下:
在这里插入图片描述

watermark的窗口触发机制

watermark会根据数据流中event的时间戳发生变化。通常情况下,event都是乱序的,不按时间排序的。watermark的计算逻辑为:当前最大的 event time - 最大允许延迟时间(MaxOutOfOrderness)。在同一个分区内部,当watermark大于或者等于窗口的结束时间时,才能触发该窗口的计算,即watermark>=windows endtime。如下图所示:
在这里插入图片描述
根据上图分析:
MaxOutOfOrderness = 5s,窗口的大小为:10s。
watermark分别为:12:08、12:15、12:30
计算逻辑为:WM(12:08)=12:13 - 5s;WM(12:15)=12:20 - 5s;WM(12:30)=12:35 - 5s

  • 对于 [12:00,12:10) 窗口,需要在WM=12:15时,才能被触发计算,参与计算的event为:event(12:07)/event(12:01)/event(12:07)/event(12:09),event(12:10)/event(12:12)/event(12:12)/event(12:13)/event(12:20)/event(12:14)/event(12:15)不参与计算,因为还未到窗口时间,也就是event time 为 [12:00,12:10] 窗口内的event才能参与计算。
    注意,如果过了这个窗口期,再收到 [12:00,12:10] 窗口内的event,就算超过了最大允许延迟时间(MaxOutOfOrderness),不会再参与计算,也就是数据被强制丢掉了。
  • 对于 [12:10,12:20][12:20,12:30] 窗口,会在WM=12:30时,被同时触发计算,参与**[12:10,12:20]** 窗口计算的event为:event(12:10)/event(12:12)/event(12:12)/event(12:13)/event(12:14)/event(12:15)/event(12:15)/event(12:18);参与 [12:20,12:30] 窗口计算的event为:event(12:20)/event(12:20);在这个过程中event(12:05)会被丢弃,不会参与计算,因为已经超了最大允许延迟时间(MaxOutOfOrderness)

迟到的事件的处理,在介绍watermark时,提到了现实中往往处理的是乱序event,即当event处于某些原因而延后到达时,往往会发生该event time < watermark的情况,所以flink对处理乱序event的watermark有一个允许延迟的机制,这个机制就是最大允许延迟时间(MaxOutOfOrderness),允许在一定时间内迟到的event仍然视为有效event。

WATERMARK rowtime_column_name 取值两种方式

rowtime_column_name为计算列

CREATE TABLE pageviews (mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map < string, string >,cur map < string, string >,cus map < string, string >,event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), --计算列,必须为TIMESTAMP(3)/TIMESTAMP_LTZ(3)类型WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND
) WITH ('connector' = 'kafka','properties.bootstrap.servers' = '***','topic' = 'topic1','format' = 'json','properties.group.id' = '*****','scan.startup.mode' = 'earliest-offset'-- 取值 : group-offsets    latest-offset     earliest-offset
);

rowtime_column_name为事件时间属性

CREATE TABLE dataGen(uuid VARCHAR(20),name INT,age INT,ts TIMESTAMP(3), --事件时间属性,字段类型为TIMESTAMP(3)WATERMARK FOR ts AS ts
)with('connector' = 'datagen','rows-per-second' = '10','number-of-rows' = '100','fields.age.kind' = 'random','fields.age.min' = '1','fields.age.max' = '10','fields.name.kind' = 'random','fields.name.min' = '1','fields.name.max' = '10');

watermark使用demo

CREATE TABLE kafka_table(mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map < string, string >,cur map < string, string >,cus map < string, string >,group_name as COALESCE(cur['group_name'], src['group_name']),batch_number as COALESCE(cur['batch_number'], src['batch_number']),event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE     --SECOND
) WITH ('connector' = 'kafka','properties.bootstrap.servers' = '***','topic' = 'topic1','format' = 'json','properties.group.id' = '*****','scan.startup.mode' = 'earliest-offset'-- 取值 : group-offsets    latest-offset     earliest-offset
);

watermark在over聚合中的使用

--RANGE:每个group_name计算当前group_name前10分钟内收到的同一group_name的所有总数
selectgroup_name
,event_time
,COUNT(group_name) OVER w1 as cnt
from kafka_table
where UPPER(opt) <> 'DELETE'
WINDOW w1 AS (PARTITION BY group_nameORDER BY event_timeRANGE BETWEEN INTERVAL '10' MINUTE PRECEDING AND CURRENT ROW)

watermark在windows聚合中的使用

--求每10分钟的滚动窗口内同一group_name的所有总数
create view tmp as
SELECT group_name,event_time FROM kafka_table where UPPER(opt) <> 'DELETE';select window_start,window_end,window_time,group_name,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end,window_time,group_name

参考:
Window Aggregation
Over Aggregation

相关文章:

flink watermark 实例分析

WATERMARK 定义了表的事件时间属性&#xff0c;其形式为: WATERMARK FOR rowtime_column_name AS watermark_strategy_expression rowtime_column_name 把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3)/TIMESTAMP_LTZ(3)&#xff0c;且是 sche…...

系列十二(面试)、Java中的GC回收类型有哪些?

一、Java中的GC回收类型 1.1、概述 Java中的GC回收类型主要包含以下几种&#xff0c;即&#xff1a;UseSerialGC、UseParallelGC、UseConcMarkSweepGC、UseParNewGC、UseParallelOldGC、UseG1GC。 1.2、源码...

华为数通方向HCIP-DataCom H12-831题库(多选题:201-220)

第201题 在多集群RR组网中,每个集群中部署了一台RR设备及其客户机,各集群的RR与为非客户机关系,并建立IBGP全连接。以下关于BGP路由反射器发布路由规则的描述,正确的有哪些? A、若某RR从EBGP对等体学到的路由,此RR会传递给其他集群的RR B、若某RR从非客户机IBGP对等体学…...

NLP论文阅读记录 - | 使用GPT对大型文档集合进行抽象总结

文章目录 前言0、论文摘要一、Introduction二.相关工作2.1Summarization2.2 神经网络抽象概括2.2.1训练和测试数据集。2.2.2 评估。 2.3 最先进的抽象摘要器 三.本文方法3.1 查询支持3.2 文档聚类3.3主题句提取3.4 语义分块3.5 GPT 零样本总结 四 实验效果4.1数据集4.2 对比模型…...

华为全屋wifi6蜂鸟套装标准

华为政企42 华为政企 目录 上一篇华为安防监控摄像头下一篇华为企业级无线路由器...

系列二十八、如何在Oracle官网下载JDK的api文档

一、官网下载JDK的api文档 1.1、官网地址 https://www.oracle.com/java/technologies/javase-jdk21-doc-downloads.html 1.2、我分享的api.chm 链接&#xff1a;https://pan.baidu.com/s/1Bf55Fz-eMTErmQDtZZcewQ?pwdyyds 提取码&#xff1a;yyds 1.3、参考 https://ww…...

STM32-ADC模数转换器

目录 一、ADC简介 二、逐次逼近型ADC内部结构 三、STM32内部ADC转换结构 四、ADC基本结构 五、输入通道 六、转换模式 6.1单次转换&#xff0c;非扫描模式 6.2连续转换&#xff0c;非扫描模式 6.3单次转换&#xff0c;扫描模式 6.4连续转换&#xff0c;扫描模式 七、…...

谷歌手机安装证书到根目录

1、前提你已经root&#xff0c;安装好面具 2&#xff0c;下载movecert模块&#xff0c;自动帮你把证书从用户证书移动成系统证书 视频教程&#xff0c;手机为谷歌手机 https://www.bilibili.com/video/BV1pG4y1A7Cj?p11&vd_source9c0a32b00d6d59fecae05b4133f22f06 软件下…...

代码随想录 322. 零钱兑换

题目 给你一个整数数组 coins &#xff0c;表示不同面额的硬币&#xff1b;以及一个整数 amount &#xff0c;表示总金额。 计算并返回可以凑成总金额所需的 最少的硬币个数 。如果没有任何一种硬币组合能组成总金额&#xff0c;返回 -1 。 你可以认为每种硬币的数量是无限的。…...

【图的应用二:最短路径】- 用 C 语言实现迪杰斯特拉算法和弗洛伊德算法

目录 一、最短路径 二、迪杰斯特拉算法 三、弗洛伊德算法 一、最短路径 假若要在计算机上建立一个交通咨询系统&#xff0c;则可以采用图的结构来表示实际的交通网络。如下图所示&#xff0c;图中顶点表示城市&#xff0c;边表示城市间的交通联系。 这个咨询系统可以回答旅…...

Qt之判断一个点是否在多边形内部(射线法)

算法思想: 以被测点Q为端点,向任意方向作射线(一般水平向右作射线),统计该射线与多边形的交点数。如果为奇数,Q在多边形内;如果为偶数,Q在多边形外。计数的时候会有一些特殊情况。这种方法适用于任意多边形,不需要考虑精度误差和多边形点给出的顺序,时间复杂度为O(n)…...

压力测试过程中内存溢出(堆溢出、栈溢出、持久代溢出)情况如何解决

在压力测试过程中&#xff0c;可能会遇到内存溢出的问题&#xff0c;其中常见的包括堆内存溢出、栈内存溢出和持久代溢出。解决这类问题需要首先理解各种内存溢出的原因和特点。 堆内存溢出&#xff1a;这种情况通常发生在稳定性压测一段时间后&#xff0c;系统报错&#xff0…...

【工业智能】音频信号相关场景

【工业智能】音频信号相关场景 DcaseDcase introduction&#xff1a;dcase2024有10个主题的任务&#xff1a; ASD硬件设备产品商 方法制造业应用场景 zenodo音频事件检测 与计算机视觉CV相对应&#xff0c;计算机听觉computer audition&#xff0c;简称CA。 Dcase 这里推荐一个…...

(PC+WAP)装修设计公司网站模板 家装公司网站源码下载

(PCWAP)装修设计公司网站模板 家装公司网站源码下载 PbootCMS内核开发的网站模板&#xff0c;该模板适用于装修设计、家装公司类等企业&#xff0c;当然其他行业也可以做&#xff0c;只需要把文字图片换成其他行业的即可&#xff1b; PCWAP&#xff0c;同一个后台&#xff0c…...

使用opencv实现图像中几何图形检测

1 几何图形检测介绍 1.1 轮廓(contours) 什么是轮廓&#xff0c;简单说轮廓就是一些列点相连组成形状、它们拥有同样的颜色、轮廓发现在图像的对象分析、对象检测等方面是非常有用的工具&#xff0c;在OpenCV 中使用轮廓发现相关函数时候要求输入图像是二值图像&#xff0c;这…...

补题与周总结:leetcode第 376 场周赛

文章目录 复盘与一周总结2967. 使数组成为等数数组的最小代价&#xff08;中位数贪心 回文数判断&#xff09;2968. 执行操作使频率分数最大&#xff08;中位数贪心 前缀和 滑窗&#xff09; 复盘与一周总结 wa穿了第3题&#xff0c;赛时其实想到了思路&#xff1a;中位数贪心…...

js指纹库,可跟踪用户唯一性

fingerprintjs官网 资料&#xff1a; Browserleaks - Check your browser for privacy leaks...

Shell三剑客:awk(内部变量)

一、$0 &#xff1a;完整的输入记录 [rootlocalhost ~]# awk -F: {print $0} passwd.txt root:x:0:0:root:/root:/bin/bash bin:x:1:1:bin:/bin:/sbin/nologin daemon:x:2:2:daemon:/sbin:/sbin/nologin adm:x:3:4:adm:/var/adm:/sbin/nologin lp:x:4:7:lp:/var/spool/lpd:/s…...

JVM中的虚拟机栈的动态链接部分存放到底是什么

在Java虚拟机&#xff08;JVM&#xff09;中&#xff0c;每个线程在执行一个方法时都会创建一个栈帧&#xff08;Stack Frame&#xff09;&#xff0c;栈帧中包含了方法的运行时数据。栈帧通常包括局部变量表、操作数栈、动态链接、方法返回地址等部分。 动态链接 动态链接&a…...

Leetcode 55 跳跃游戏

题意理解&#xff1a; 非负整数数组 nums, 最初位于数组的 第一个下标 。 数组中的每个元素代表你在该位置可以跳跃的最大长度。 需要跳到nums最后一个元素即为成功。 目标&#xff1a;是否能够跳到最后一个元素。 解题思路&#xff1a; 使用贪心算法来解题&#xff0c;需要理解…...

构建陪诊预约系统:技术实战指南

在医疗科技的飞速发展中&#xff0c;陪诊预约系统的应用为患者和陪诊人员提供了更为便捷和贴心的服务。本文将带领您通过技术实现&#xff0c;构建一个简单而实用的陪诊预约系统&#xff0c;以提升医疗服务的效率和用户体验。 技术栈选择 在开始之前&#xff0c;我们需要选择…...

windows和linux将文件删除至回收站【C++】【Go】语言实现

目录 C Windows平台 Linux平台 开平台&#xff0c;代码合并 Go 实现步骤 Go语言实现示例 go单独的windows版本实现 代码解释 C 在C中&#xff0c;将文件移动到回收站的实现在Linux和Windows平台上是不同的。首先&#xff0c;我会为你提供在Windows平台上实现的代码示例…...

10 Vue3中v-html指令的用法

概述 v-html主要是用来渲染富文本内容&#xff0c;比如评论信息&#xff0c;新闻信息&#xff0c;文章信息等。 v-html是一个特别不安全的指令&#xff0c;因为它会将文本以HTML的显示进行渲染&#xff0c;一旦文本里面包含一些恶意的js代码&#xff0c;可能会导致整个网页发…...

华为数通方向HCIP-DataCom H12-831题库(多选题:181-200)

第181题 如图所示,R1、R2、R3、R4都部署为SPF区域0,链路的cost值如图中标识。R1、R2R3、R4的Loopback0通告入OSPF。R1、R2、R3与R4使用Loopback0作为连接接口,建立BGP对等体关系,其中R4为RR设备,R1、R2、R3是R4的客户端。当R4的直连地址172.20,1,4/32通告入BGP后,以下关R…...

DC-磁盘管理

2023年全国网络系统管理赛项真题 模块B-Windows解析 题目 在DC2上安装及配置软RAID 5。在安装好的DC2虚拟机中添加三块10G虚拟磁盘。组成RAID 5,磁盘分区命名为卷标H盘:Raid5。手动测试破坏一块磁盘,做RAID磁盘修复,确认RAID 5配置完毕。配置步骤 关闭虚拟机,添加3块10G磁…...

使用Docker运行镜像文件与设置端口

1&#xff0c;创建镜像文件前准备 # 使用基础镜像FROM alpine:latest# 设置工作目录WORKDIR /app# 复制应用程序文件到镜像中COPY . .# 暴露容器的端口 不会自动将容器的端口映射到宿主机上 docker run -d -p <宿主机端口>:7080 <镜像名称>EXPOSE 7080# 定义容器启…...

Centos 8.5 Oracle12c安装

由于多次安装踩坑&#xff0c;所以本次写了一份12c安装的完整版。可以直接使用。 一、安装数据库基本信息 名称 值 主机名 database 操作系统 CentOS Linux release 8.5.2111 Oracle用户名/密码 oracle Oracle 版本 12c Enterprise Edition Release 12.2.0.1.0 oracle…...

Apache Tomcat httpoxy 安全漏洞 CVE-2016-5388 已亲自复现

Apache Tomcat httpoxy 安全漏洞 CVE-2016-5388 已亲自复现 漏洞名称漏洞描述影响版本 漏洞复现环境搭建漏洞利用 修复建议总结 漏洞名称 漏洞描述 在Apache Tomcat中发现了一个被归类为关键的漏洞&#xff0c;该漏洞在8.5.4(Application Server Soft ware)以下。受影响的是组…...

ChatGLM3-6B 的调用参数说明,chat 与stream_chat 接口函数的参数说明

ChatGLM3-6B 是一个语言大模型&#xff0c;最近在评估这个模型&#xff0c;但发现它的文档有限&#xff0c;只能从demo代码中猜测调用的参数的含义&#xff0c;准确度是有限的&#xff1b;于是&#xff0c;通过查看源代码来研究&#xff0c;目前整理笔记如下&#xff1a; Chat…...

Vuex的学习-2

Vuex的核心概念 StateMutationAction 1.State State提供唯一的公共数据源&#xff0c;所有共享的数据都统一放在Store的State中进行存储。 const store new Vuex.Store({state : { count: 0 } }) 这是渲染的页面 组件访问数据的第一种方式 组件访问数据的第二种方式 // 1…...

社保个人网站/体验式营销

&#xff08;pdf文件下载&#xff09; http://files.cnblogs.com/Robotke1/VMWare8.0%E5%AE%89%E8%A3%85Ubuntu12.04%E6%95%99%E7%A8%8B.pdf 转载于:https://www.cnblogs.com/Robotke1/archive/2013/05/09/3070183.html...

网页升级访问未成年/绍兴seo排名外包

摘要&#xff1a; 最近做项目中遇到批量添加和修改的问题&#xff0c;在老大的指导下学会了使用表类型参数来做批量操作。为了巩固强化&#xff0c;围绕这个技术又做了个小demo。 开始正题&#xff1a; 首先&#xff0c;我们在SQL Server 2008下创建一个示例数据库名为Table…...

公司建设网站的申请报告/厨师培训

Boss的需要时这样的&#xff0c;Item是可变大小的&#xff0c;同时根据不同的Window size&#xff0c;来确定Item的结构和大小Window 小的时候是 大的时候是这样的&#xff1a; 当然这size变化的过程中也允许其他结构&#xff0c;我这里只是举了最大和最小时候的样子。 当拿到需…...

口碑营销的策略技巧/seo是搜索引擎营销吗

yum -y install screen screen -S lnmp 记录session会话 ssh客户端中断再连接后 screen -r lnmp 恢复会话转载于:https://blog.51cto.com/kongdq/967746...

杭州app定制/关键词排名优化工具有用吗

隐藏index.php 一、codeigniter codeigniter和许多php框架一样&#xff0c;有个单一入口index.php,从url上看&#xff0c;显得很不友好。通过apache的rewirte&#xff0c;是可以隐藏掉的&#xff0c;实现伪url。 打开codeigniter下system\application\config中的config.php …...

厦门市翔安建设局网站/网站统计分析平台

DNS 配置篇二一、子域配置1、基本概念子域的作用是在本地DNS下再划分一个小的&#xff08;子&#xff09;DNS。作用的方便集中管理&#xff0c;不过问题是要配置转发。父DNS可以知道解析子DNS&#xff0c;子DNS 则只可以解析自己本地记录&#xff0c;不能解析父DNS。正向子域授…...