黄页88网/进行优化
《Flink SQL 基础概念》系列,共包含以下 5 篇文章:
- Flink SQL 基础概念(一):SQL & Table 运行环境、基本概念及常用 API
- Flink SQL 基础概念(二):数据类型
- Flink SQL 基础概念(三):SQL 动态表 & 连续查询
- Flink SQL 基础概念(四):SQL 的时间属性
- Flink SQL 基础概念(五):SQL 时区问题
😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!
Flink SQL 基础概念(四):SQL 的时间属性
- 1.Flink 三种时间属性简介
- 2.Flink 三种时间属性的应用场景
- 2.1 事件时间案例
- 2.2 处理时间案例
- 2.3 摄入时间案例
- 3.SQL 指定时间属性的两种方式
- 4.SQL 事件时间案例
- 5.SQL 处理时间案例
与离线处理中常见的时间分区字段一样,在实时处理中,时间属性也是一个核心概念。Flink 支持 处理时间、事件时间、摄入时间 三种时间语义。
三种时间在生产环境的使用频次 事件时间(SQL 常用) > > > 处理时间(SQL 几乎不用,DataStream 少用) > > > 摄入时间(不用)。
1.Flink 三种时间属性简介
- 事件时间:指的是数据本身携带的时间,这个时间是在事件产生时的时间,而且在 Flink SQL 触发计算时,也使用数据本身携带的时间。这就叫做事件时间。目前生产环境中用的最多。
- 处理时间:指的是具体算子计算数据执行时的机器时间(例如在算子中 Java 取
System.currentTimeMillis()
),在生产环境中用的次多。 - 摄入时间:指的是数据从数据源进入 Flink 的时间。摄入时间用的最少,可以说基本不使用。
小伙伴们要注意到:
- 上述的三种时间概念不是由于有了数据而诞生的,而是有了 Flink 之后根据实际的应用场景而诞生的。以事件时间举个例子,如果只是数据携带了时间,Flink 也消费了这个数据,但是在 Flink 中没有使用数据的这个时间作为计算的触发条件,也不能把这个 Flink 任务叫做事件时间的任务。
- 其次,要认识到,一般一个 Flink 任务只会有一个时间属性,所以时间属性通常认为是一个任务粒度的。举例:我们可以说 A 任务是事件时间语义的任务,B 任务是处理时间语义的任务。当然了,一个任务也可以存在多个时间属性。
2.Flink 三种时间属性的应用场景
讲到这里,有人会问,博主上面写的 3 种时间属性到底对我们的任务有啥影响呢?3 种时间属性的应用场景是啥?
先说结论,在 Flink 中时间的作用:
- 主要体现在包含时间窗口的计算中:用于标识任务的时间进度,来判断是否需要触发窗口的计算。比如常用的滚动窗口、滑动窗口等都需要时间推动触发。这些窗口的应用场景后续会详细介绍。
- 次要体现在自定义时间语义的计算中:举个例子,比如用户可以自定义每隔 10s 的本地时间,或者消费到的数据的时间戳每增大 10s,就把计算结果输出一次,时间在此类应用中也是一种标识任务进度的作用。
博主以 滚动窗口 的聚合任务为例来介绍一下事件时间和处理时间的对比区别。
2.1 事件时间案例
还是以之前的 clicks
表拿来举例。
上面这个案例的窗口大小是 1 小时,需求方需要按照用户点击时间戳 cTime
划分数据(划分滚动窗口),然后计算出 Count 聚合结果(这样计算能反映出事件的真实发生时间),那么就需要把 cTime
设置为窗口的划分时间戳,即代码中 tumble(cTime, interval '1' hour)
。
上面这种就叫做事件时间。即用数据中自带的时间戳进行窗口的划分(点击操作真实的发生时间)。
后续 Flink SQL 任务在运行的过程中也会实际按照 cTime
的当前时间作为一小时窗口结束触发条件并计算一个小时窗口内的数据。
2.2 处理时间案例
还是以之前的 clicks
表拿来举例。
还是上面那个案例,但是这次需求方不需要按照数据上的时间戳划分数据(划分滚动窗口),只需要数据来了之后, 在 Flink 机器上的时间作为一小时窗口结束的触发条件并计算。
那么这种触发机制就是处理时间。
2.3 摄入时间案例
在 Flink 从外部数据源读取到数据时,给这条数据带上的当前数据源算子的本地时间戳。下游可以用这个时间戳进行窗口聚合,不过这种几乎不使用。
3.SQL 指定时间属性的两种方式
如果要满足 Flink SQL 时间窗口类的聚合操作,SQL 或 Table API 中的 数据源表 就需要提供时间属性(相当于我们把这个时间属性在 数据源表 上面进行声明),以及支持时间相关的操作。
那么来看看 Flink SQL 为我们提供的两种指定时间戳的方式:
CREATE TABLE DDL
创建表的时候指定- 可以在
DataStream
中指定,在后续的 DataStream 转的 Table 中使用
一旦时间属性定义好,它就可以像普通列一样使用,也可以在时间相关的操作中使用。
4.SQL 事件时间案例
来看看 Flink 中如何指定事件时间。
CREATE TABLE DDL
指定时间戳的方式。
CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time TIMESTAMP(3),-- 使用下面这句来将 user_action_time 声明为事件时间,并且声明 watermark 的生成规则,即 user_action_time 减 5 秒-- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
-- 然后就可以在窗口算子中使用 user_action_time
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
从上面这条语句可以看到,如果想使用事件时间,那么我们的时间戳类型必须是 TIMESTAMP
或者 TIMESTAMP_LTZ
类型。很多小伙伴会想到,我们的时间戳一般不都是秒或者是毫秒(BIGINT
类型)嘛,那这种情况怎么办?
解决方案必须要有啊,如下。
CREATE TABLE user_actions (user_name STRING,data STRING,-- 1. 这个 ts 就是常见的毫秒级别时间戳ts BIGINT,-- 2. 将毫秒时间戳转换成 TIMESTAMP_LTZ 类型time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),-- 3. 使用下面这句来将 user_action_time 声明为事件时间,并且声明 watermark 的生成规则,即 user_action_time 减 5 秒-- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (...
);SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
- DataStream 中指定事件时间。
之前介绍了 Table
和 DataStream
可以互转,那么 Flink 也提供了一个能力,就是在 Table
转为 DataStream
时,指定时间戳字段。如下案例:
public class DataStreamSourceEventTimeTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);// 1. 分配 watermarkDataStream<Row> r = env.addSource(new UserDefinedSource()).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(0L)) {@Overridepublic long extractTimestamp(Row element) {return (long) element.getField("f2");}});// 2. 使用 f2.rowtime 的方式将 f2 字段指为事件时间时间戳Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2.rowtime");tEnv.createTemporaryView("source_table", sourceTable);// 3. 在 tumble window 中使用 f2String tumbleWindowSql ="SELECT TUMBLE_START(f2, INTERVAL '5' SECOND), COUNT(DISTINCT f0)\n"+ "FROM source_table\n"+ "GROUP BY TUMBLE(f2, INTERVAL '5' SECOND)";Table resultTable = tEnv.sqlQuery(tumbleWindowSql);tEnv.toDataStream(resultTable, Row.class).print();env.execute();}private static class UserDefinedSource implements SourceFunction<Row>, ResultTypeQueryable<Row> {private volatile boolean isCancel;@Overridepublic void run(SourceContext<Row> sourceContext) throws Exception {int i = 0;while (!this.isCancel) {sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis()));Thread.sleep(10L);i++;}}@Overridepublic void cancel() {this.isCancel = true;}@Overridepublic TypeInformation<Row> getProducedType() {return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class),TypeInformation.of(Long.class));}}
}
5.SQL 处理时间案例
来看看 Flink SQL 中如何指定处理时间。
CREATE TABLE DDL
指定时间戳的方式。
CREATE TABLE user_actions (user_name STRING,data STRING,-- 使用下面这句来将 user_action_time 声明为处理时间user_action_time AS PROCTIME()
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
-- 然后就可以在窗口算子中使用 user_action_time
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
- DataStream 中指定处理时间。
public class DataStreamSourceProcessingTimeTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);// 1. 分配 watermarkDataStream<Row> r = env.addSource(new UserDefinedSource());// 2. 使用 proctime.proctime 的方式将 f2 字段指为处理时间时间戳Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2, proctime.proctime");tEnv.createTemporaryView("source_table", sourceTable);// 3. 在 tumble window 中使用 f2String tumbleWindowSql ="SELECT TUMBLE_START(proctime, INTERVAL '5' SECOND), COUNT(DISTINCT f0)\n"+ "FROM source_table\n"+ "GROUP BY TUMBLE(proctime, INTERVAL '5' SECOND)";Table resultTable = tEnv.sqlQuery(tumbleWindowSql);tEnv.toDataStream(resultTable, Row.class).print();env.execute();}private static class UserDefinedSource implements SourceFunction<Row>, ResultTypeQueryable<Row> {private volatile boolean isCancel;@Overridepublic void run(SourceContext<Row> sourceContext) throws Exception {int i = 0;while (!this.isCancel) {sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis()));Thread.sleep(10L);i++;}}@Overridepublic void cancel() {this.isCancel = true;}@Overridepublic TypeInformation<Row> getProducedType() {return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class),TypeInformation.of(Long.class));}}
}
相关文章:

【Flink SQL】Flink SQL 基础概念(四):SQL 的时间属性
《Flink SQL 基础概念》系列,共包含以下 5 篇文章: Flink SQL 基础概念(一):SQL & Table 运行环境、基本概念及常用 APIFlink SQL 基础概念(二):数据类型Flink SQL 基础概念&am…...

文字弹性跳动CSS3代码
文字弹性跳动CSS3代码,源码由HTMLCSSJS组成,记事本打开源码文件可以进行内容文字之类的修改,双击html文件可以本地运行效果,也可以上传到服务器里面,重定向这个界面 下载地址 文字弹性跳动CSS3代码...

前端小白的学习之路(事件流)
提示:事件捕获,事件冒泡,事件委托 目录 事件模型(DOM事件流) 1.事件是什么 2.事件流 1).事件流的三个阶段 3.参考代码 二、事件委托 1.概念 2.使用案例 3.阻止冒泡行为 事件模型(DOM事件流) 1.事件是什么 1). 事件是HTML和Javascr…...

电脑文件误删除如何恢复?分享三个简单数据恢复方法
在日常使用电脑的过程中,文件误删除的情况时有发生。无论是由于操作失误还是病毒感染,丢失的文件都可能对我们的工作和学习造成极大的影响。因此,掌握文件恢复的方法显得尤为重要。下面围绕“电脑文件误删除如何恢复”这一主题,给…...

MySQL实战:监控
监控指标 性能类指标 名称说明QPS数据库每秒处理的请求数量TPS数据库每秒处理的事务数量并发数数据库实例当前并行处理的会话数量连接数连接到数据库会话的数量缓存命中率Innodb的缓存命中率 功能类指标 名称说明可用性数据库是否正常对外提供服务阻塞当前是否有阻塞的会话…...

MySQL自增主键自动生成的主键重置
需求描述: 从主键1开始,insert操作自增了五个,库里五条数主键是1、2、3、4、5; 然后把主键是3、4、5的三条数据给删了,再继续insert,主键就是6了 因为这里表会把最大的数即5记住,下次自增即为…...

reverse_iterator实现
对于实现reverse_iterator,我们可以学栈和队列的实现过程,利用适配器,实现如下; #pragma oncetemplate<class Iterator,class Ref,class Ptr> class reverse_Iterator { public://构造函数:reverse_Iterator(Iterator it):…...

C++:什么情况下函数应该声明为纯虚函数
在C中,函数应该在以下情况下声明为纯虚函数: 抽象基类:当你希望定义一个基类,该基类不能被实例化,只能作为其他类的基类时,你应该在基类中声明至少一个纯虚函数。这样的基类被称为抽象基类。纯虚函数通过在…...

【全面了解自然语言处理三大特征提取器】RNN(LSTM)、transformer(注意力机制)、CNN
目录 一 、RNN1.RNN单个cell的结构2.RNN工作原理3.RNN优缺点 二、LSTM1.LSTM单个cell的结构2. LSTM工作原理 三、transformer1 Encoder(1)position encoding(2)multi-head-attention(3)add&norm 残差链…...

区块链推广海外市场怎么做,CloudNEO服务商免费为您定制个性化营销方案
随着区块链技术的不断发展和应用场景的扩大,区块链项目希望能够进入海外市场并取得成功已成为越来越多公司的目标之一。然而,要在海外市场推广区块链项目,需要采取有效的营销策略和措施。作为您的区块链项目营销服务商,CloudNEO将…...

【S5PV210】 | ARM的指令集合
【S5PV210】 | ARM的指令集合 时间:2024年3月17日23:32:06 目录 文章目录 【S5PV210】 | ARM的指令集合目录 ARM指令集具有一系列显著的特点。首先,它属于RISC(精简指令集计算机)架构,这意味着译码机制相对简单。在AR…...

2024-3-17Go语言入门
在Go语言中: var a chan int 定义了一个名为 a 的变量,其类型为 chan int。这意味着 a 是一个整型值的通道(channel)。通道是Go语言中用于goroutine之间通信的一种机制,你可以通过通道发送和接收特定类型的值。在这个例…...

AJAX-XMLHttpRequest
XMLHttpRequest 定义: XMLHttpRequest对象用于与服务器交互。通过XMLHttpRequest可以在不断刷新页面的情况下请求特定URL,获取数据。这允许网页在不影响用户操作的情况下,更新页面的局部内容。 关系: axios内部采用XMLHttpReques…...

【GPT-SOVITS-04】SOVITS 模块-鉴别模型解析
说明:该系列文章从本人知乎账号迁入,主要原因是知乎图片附件过于模糊。 知乎专栏地址: 语音生成专栏 系列文章地址: 【GPT-SOVITS-01】源码梳理 【GPT-SOVITS-02】GPT模块解析 【GPT-SOVITS-03】SOVITS 模块-生成模型解析 【G…...

论文阅读_时序模型_iTransformer
1 2 3 4 5 6 7 8英文名称: ITRANSFORMER: INVERTED TRANSFORMERS ARE EFFECTIVE FOR TIME SERIES FORECASTING 中文名称: ITRANSFORMER:倒置Transformers在时间序列预测中的有效性 链接: https://openreview.net/forum?idX6ZmOsTYVs 代码: https://github.com/thum…...

Docker 哲学 - 容器操作 -cp
1、拷贝 容器绑定的 volume的 数据,到指定目录 2、匿名挂载 volume 只定义一个数据咋在容器内的path,docker自动生成一个 sha256 的key作为 volume 名字。这个 sha256 跟 commitID 一致都是唯一的所以 ,docker利用这个机制,可以…...

作品展示ETL
1、ETL 作业定义、作业导入、控件拖拽、执行、监控、稽核、告警、报告导出、定时设定 欧洲某国电信系统数据割接作业定义中文页面(作业顶层,可切英文,按F1弹当前页面帮助) 涉及文件拆分、文件到mysql、库到库、数据清洗、数据转…...

python的集合应用
在Python中,集合是一种无序、可变的数据类型,用于存储不重复的元素。Python提供了内置的集合类型 set,以及 frozenset(不可变的集合)。以下是一些Python集合的常见应用场景: 去重: 集合是存储唯…...

盒子IM开源仿微信聊天程序源码,可以商用
安装教程 1.安装运行环境 安装node:v14.16.0安装jdk:1.8安装maven:3.6.3安装mysql:5.7,密码分别为root/root,运行sql脚本(脚本在im-platfrom的resources/db目录)安装redis:5.0安装minio,命令端口使用9001,并创建一个名为”box-im”的bucket,…...

鸿蒙Harmony应用开发—ArkTS声明式开发(基础手势:Web)中篇
onBeforeUnload onBeforeUnload(callback: (event?: { url: string; message: string; result: JsResult }) > boolean) 刷新或关闭场景下,在即将离开当前页面时触发此回调。刷新或关闭当前页面应先通过点击等方式获取焦点,才会触发此回调。 参数…...

静默安装OGG21.3微服务版本FOR ORACLE版本
静默安装OGG21.3微服务版本FOR ORACLE版本 silent install ogg21.3 for oracle 某度找来找去都没有找到一份可靠的静默安装OGG21.3微服务版本的案例,特别难受,为此将自己静默安装的步骤一步步贴出来分享给大家,请指点,谢谢。 至…...

[二分查找]LeetCode2040:两个有序数组的第 K 小乘积
本文涉及的基础知识点 二分查找算法合集 题目 给你两个 从小到大排好序 且下标从 0 开始的整数数组 nums1 和 nums2 以及一个整数 k ,请你返回第 k (从 1 开始编号)小的 nums1[i] * nums2[j] 的乘积,其中 0 < i < nums1.…...

【Godot4.2】颜色完全使用手册
概述 本篇简单汇总Godot中的颜色的构造和使用,内容包括了: RGB、RGBA,HSV以及HTML16进制颜色值、颜色常量等形式构造颜色颜色的运算以及取反、插值用类型化数组、紧缩数组或PNG图片形式存储多个颜色 构造颜色 因为颜色是一种视觉元素&…...

Blocks —— 《Objective-C高级编程 iOS与OS X多线程和内存管理》
目录 Blocks概要什么是BlocksOC转C方法关于几种变量的特点 Blocks模式Block语法Block类型 变量截获局部变量值__block说明符截获的局部变量 Blocks的实现Block的实质 Blocks概要 什么是Blocks Blocks是C语言的扩充功能,即带有局部变量的匿名函数。 顾名思义&#x…...

Python零基础---爬虫技术相关
python 爬虫技术,关于数据相关的拆解: 1.对页面结构的拆解 2.数据包的分析(是否加密了参数)(Md5 aes)难易程度,价格 3.对接客户(433,334) # 数据库 CSV 4.结单(发一部分数据&a…...

利用 STM32 TIMER 触发 ADC 实现分组转换
1、问题描述 使用 STM32G4 系列芯片开发产品,用到其中一个 ADC 模块的多个通道,他希望使 用 TIMER 来定时触发这几个通道的转换。不过他有两点疑惑。第一,他期望定时器触发这几个 通道是每触发一次则只转换一个通道,这样依次触发…...

2024 年(第 12 届)“泰迪杯”数据挖掘挑战赛——B 题:基于多模态特征融合的图像文本检索完整思路与源代码分享
一、问题背景 随着近年来智能终端设备和多媒体社交网络平台的飞速发展,多媒体数据呈现海量增长 的趋势,使当今主流的社交网络平台充斥着海量的文本、图像等多模态媒体数据,也使得人 们对不同模态数据之间互相检索的需求不断增加。有效的信…...

Java12~14 switch语法
JDK8以后的语法没学习了,现在时代发展这么快,所以得加紧时间学习了。JDK12只有一个特性就是switch语法,算是比较容易学习的一个版本吧。总体来说就是三部分内容。具体内容可以看JEP-325的内容。 箭头语法 每个case可以放箭头了。以下是一个例…...

小狐狸ChatGPT智能聊天系统源码v2.7.6全开源Vue前后端+后端PHP
测试环境包括Linux系统的CentOS 7.6,宝塔面板,PHP 7.4和MySQL 5.6。网站的根目录是public, 使用thinkPHP进行伪静态处理,并已开启SSL证书。 该系统具有多种功能,包括文章改写、广告营销文案创作、编程助手、办公达人…...

The Rise and Potential of Large Language Model Based Agents: A Survey
OpenAI AI的应用研究主管Lilian Weng发布了关于AI Agents的《大语言模型(LLM)支持的自主代理》,在文章中她定义了基于LLM构建AI Agents的应用框架:AgentLLM(大型语言模型)记忆(Memory࿰…...