玩转数据-大数据-Flink SQL 中的时间属性
一、说明
时间属性是大数据中的一个重要方面,像窗口(在 Table API 和 SQL )这种基于时间的操作,需要有时间信息。我们可以通过时间属性来更加灵活高效地处理数据,下面我们通过处理时间和事件时间来探讨一下Flink SQL 时间属性。
二、处理时间
2.1、准备WaterSensor类,方便使用
package com.lyh.bean;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {private String id;private Long ts;private Integer vc;
}
2.2、DataStream 到 Table 转换时定义
处理时间属性可以在 schema 定义的时候用 .proctime 后缀来定义。时间属性一定不能定义在一个已有字段上,所以它新增一个字段。
代码段:
package com.lyh.flink12;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;public class Flink_Sql_Proctime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> waterSensorStream =env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60));
// 1. 创建表的执行环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 声明一个额外的字段来作为处理时间字段Table sensorTable = tableEnv.fromDataStream(waterSensorStream, $("id"), $("ts"), $("vc"), $("pt").proctime());sensorTable.execute().print();}
}
执行结果:
2.3、创建数据文件sensor.txt 数据,方便使用
sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60
2.4、在创建表的 DDL 中定义
package com.lyh.flink12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink_Sql_ddl_Procetime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table sensor(id string,ts bigint,vc int,pt_time as PROCTIME()) with("+ "'connector' = 'filesystem',"+ "'path' = 'input/sensor.txt',"+ "'format' = 'csv'"+ ")");Table table = tableEnv.sqlQuery("select * from sensor");table.execute().print();}
}
运行结果:
三、事件时间
事件时间允许程序按照数据中包含的时间来处理,这样可以在有乱序或者晚到的数据的情况下产生一致的处理结果。它可以保证从外部存储读取数据后产生可以复现(replayable)的结果。
除此之外,事件时间可以让程序在流式和批式作业中使用同样的语法。在流式程序中的事件时间属性,在批式程序中就是一个正常的时间字段。
为了能够处理乱序的事件,并且区分正常到达和晚到的事件,Flink 需要从事件中获取事件时间并且产生 watermark(watermarks)。
3.1、DataStream 到 Table 转换时定义
事件时间属性可以用 .rowtime 后缀在定义 DataStream schema 的时候来定义。时间戳和 watermark 在这之前一定是在 DataStream 上已经定义好了。
在从 DataStream 到 Table 转换时定义事件时间属性有两种方式。取决于用 .rowtime 后缀修饰的字段名字是否是已有字段,事件时间字段可以是:
1、在 schema 的结尾追加一个新的字段
2、替换一个已经存在的字段。
不管在哪种情况下,事件时间字段都表示 DataStream 中定义的事件的时间戳。
代码:
援用上面WaterSensor类
package com.lyh.flink12;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;import static org.apache.flink.table.api.Expressions.$;public class Flink_Sql_EventTime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> waterSensorSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 100),new WaterSensor("sensor_1", 1000L, 100),new WaterSensor("sensor_2", 1000L, 200),new WaterSensor("sensor_2", 1000L, 200)).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordtime) -> element.getTs()));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.fromDataStream(waterSensorSource,$("id"),$("ts"),$("vc"),$("pt").rowtime()).execute().print();}
}
运行结果:
3.2、使用已有的字段作为时间属性
.fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));
3.3、在创建表的 DDL 中定义
事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。WATERMARK 语句在一个已有字段上定义一个 watermark 生成表达式,同时标记这个已有字段为时间属性字段.
package com.lyh.flink12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink_Sql_ddl_EventTime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table sensor(" +"id string," +"ts bigint," +"vc int, " +"t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +"watermark for t as t - interval '5' second)" +"with("+ "'connector' = 'filesystem',"+ "'path' = 'input/sensor.txt',"+ "'format' = 'csv'"+ ")");tableEnv.sqlQuery("select * from sensor").execute().print();}
}
运行结果:
说明:
1.把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列,它也可以是一个计算列。
2.严格递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column。
3.递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND。
乱序时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit。
相关文章:
玩转数据-大数据-Flink SQL 中的时间属性
一、说明 时间属性是大数据中的一个重要方面,像窗口(在 Table API 和 SQL )这种基于时间的操作,需要有时间信息。我们可以通过时间属性来更加灵活高效地处理数据,下面我们通过处理时间和事件时间来探讨一下Flink SQL …...
【论文笔记】A Review of Motion Planning for Highway Autonomous Driving
文章目录 I. INTRODUCTIONII. CONSIDERATIONS FOR HIGHWAY MOTION PLANNINGA. TerminologyB. Motion Planning SchemeC. Specificities of Highway DrivingD. Constraints on Highway DrivingE. What Is at Stake in this Paper III. STATE OF THE ARTA. Taxonomy DescriptionB…...
YOLOv8改进算法之添加CA注意力机制
1. CA注意力机制 CA(Coordinate Attention)注意力机制是一种用于加强深度学习模型对输入数据的空间结构理解的注意力机制。CA 注意力机制的核心思想是引入坐标信息,以便模型可以更好地理解不同位置之间的关系。如下图: 1. 输入特…...
2023年10月腾讯云优惠活动汇总:腾讯云最新优惠、代金券整理
腾讯云作为国内领先的云服务提供商,致力于为用户提供优质、稳定的云服务。为了更好地满足用户需求,腾讯云推出了各种优惠活动。本文将给大家分享腾讯云最新优惠活动,帮助用户充分利用腾讯云提供的优惠。 一、腾讯云优惠券领取【点此领取】 腾…...
BUUCTF reverse wp 65 - 70
[SWPU2019]ReverseMe 反编译的伪码看不明白, 直接动调 这里显示"Please input your flag", 然后接受输入, 再和32进行比较, 应该是flag长度要求32位, 符合要求则跳转到loc_E528EE分支继续执行 动调之后伪码可以读了 int __cdecl main(int argc, const char **arg…...
xorm数据库操作之Join、Union
golang的数据库操作xorm使用起来非常方便,不用再自己写SQl语句,而且xorm自己给我们做了SQL防注入等操作,用起来既方便又安全。此次文章我不会记录xorm的基本操作,我值记录一些特殊用法问题,包括动态创建表单、基于xorm…...
排序:基数排序算法分析
1.算法思想 假设长度为n的线性表中每个结点aj的关键字由d元组 ( k j d − 1 , k j d − 2 , k j d − 3 , . . . , k j 1 , k j 0 ) (k_{j}^{d-1},k_{j}^{d-2},k_{j}^{d-3},... ,k_{j}^{1} ,k_{j}^{0}) (kjd−1,kjd−2,kjd−3,...,kj1,kj0)组成, 其中&am…...
用go实现http服务端和请求端
一、概述 本文旨在学习记录下如何用go实现建立一个http服务器,同时构造一个专用格式的http客户端。 二、代码实现 2.1 构造http服务端 1、http服务处理流程 基于HTTP构建的服务标准模型包括两个端,客户端(Client)和服务端(Server)。HTTP 请求从客户端…...
幂级数和幂级数的和函数有什么关系?
幂级数和幂级数的和函数有什么关系? 本文例子引用自:80_1幂级数运算,逐项积分、求导【小元老师】高等数学,考研数学 求幂级数 ∑ n 1 ∞ 1 n x n \sum\limits_{n1}^{\infty}\frac{1}{n}x^n n1∑∞n1xn 的和函数 ÿ…...
Git多账号管理通过ssh 公钥的方式,git,gitlab,gitee
按照目前国内访问git,如果不科学上网,我们很大可能访问会超时。基于这个,所以我现在的git 配置已经增加到了3个了 一个公司gitlab,一个git,一个gitee. 以下基于这个环境,我们来说明下如何创建配置ssh公钥。…...
在nodejs常见的不良做法及其优化解决方案
在nodejs常见的不良做法及其优化解决方案 当涉及到在express和nodejs中开发应用程序时。遵循最佳实践对于确保项目的健壮性、可维护性和安全性至关重要。 在本文中,我们将探索开发人员经常遇到的几种常见的错误做法,并通过代码示例研究优化的最佳做法&…...
关于layui upload上传组件上传文件无反应的问题
最近使用layui upload组件时,碰到了上传文件无反应的问题,感到非常困惑。 因为使用layui upload组件不是一次两次了,之前每次都可以,这次使用同样的配方,同样的姿势,为什么就不行了呢? 照例先…...
容器网络之Flannel
第一个问题位置变化,往往是通过一个称为注册中心的地方统一管理的,这个是应用自己做的。当一个应用启动的时候,将自己所在环境的 IP 地址和端口,注册到注册中心指挥部,这样其他的应用请求它的时候,到指挥…...
SVM(下):如何进行乳腺癌检测?
⭐️⭐️⭐️⭐️⭐️欢迎来到我的博客⭐️⭐️⭐️⭐️⭐️ 🐴作者:秋无之地 🐴简介:CSDN爬虫、后端、大数据领域创作者。目前从事python爬虫、后端和大数据等相关工作,主要擅长领域有:爬虫、后端、大数据开发、数据分析等。 🐴欢迎小伙伴们点赞👍🏻、收藏⭐️、…...
嵌入式Linux应用开发-第十五章具体单板的按键驱动程序
嵌入式Linux应用开发-第十五章具体单板的按键驱动程序 第十五章 具体单板的按键驱动程序(查询方式)15.1 GPIO操作回顾15.2 AM335X的按键驱动程序(查询方式)15.2.1 先看原理图确定引脚及操作方法15.2.2 再看芯片手册确定寄存器及操作方法15.2.3 编程15.2.3.1 程序框架15.2.3.2 硬…...
MySQL体系结构和四层架构介绍
MySQL体系结构图如下: 四层介绍 1. 连接层: 它的主要功能是处理客户端与MySQL服务器之间的连接(比如Java应用程序通过JDBC连接MySQL)。当客户端应用程序连接到MySQL服务器时,连接层对用户进行身份验证、建立安全连接并管理会话状态。它还处理…...
【产品运营】如何做好B端产品规划
产品规划是基于当下掌握的多维度信息,为追求特定目的,而制定的产品资源投入计划。 产品规划是基于当下掌握的多维度信息(客户需求、市场趋势、竞争对手、竞争策略等),为追求特定目的(商业增长、客户满意等&…...
ruoyi-启动
1 springboot 版本 git 地址 ruoyi-vue-pro: 🔥 官方推荐 🔥 RuoYi-Vue 全新 Pro 版本,优化重构所有功能。基于 Spring Boot MyBatis Plus Vue & Element 实现的后台管理系统 微信小程序,支持 RBAC 动态权限、数据权限…...
select完成服务器并发
服务器 #include <myhead.h>#define PORT 4399 //端口号 #define IP "192.168.0.191"//IP地址//键盘输入事件 int keybord_events(fd_set readfds); //客户端交互事件 int cliRcvSnd_events(int , struct sockaddr_in*, fd_set *, int *); //客户端连接事件 …...
初级篇—第四章聚合函数
文章目录 聚合函数介绍聚合函数介绍COUNT函数AVG和SUM函数MIN和MAX函数 GROUP BY语法基本使用使用多个列分组WITH ROLLUP HAVING基本使用WHERE和HAVING的对比开发中的选择 SELECT的执行过程查询的结构SQL 的执行原理 练习流程函数 聚合函数介绍 聚合函数作用于一组数据&#x…...
计算机图像处理-中值滤波
非线性滤波 非线性滤波是利用原始图像跟模版之间的一种逻辑关系得到结果,常用的非线性滤波方法有中值滤波和高斯双边滤波,分别对应cv2.medianBlur(src, ksize)方法和cv2.bilateralFilter(src, d, sigmaColor, sigmaSpace[, dst[, borderType]])方法。 …...
Golang中的包和模块设计
Go,也被称为Golang,是一种静态类型、编译型语言,因其简洁性和对并发编程的强大支持而受到开发者们的喜爱。Go编程的一个关键方面是其包和模块系统,它允许创建可重用、可维护和高效的代码。本博客文章将深入探讨在Go中设计包和模块…...
web:[极客大挑战 2019]Upload
题目 页面显示为一个上传,猜测上传一句话木马文件 先查看源代码看一下有没有有用的信息,说明要先上传图片,先尝试上传含有一句话木马的图片 构造payload <?php eval($_POST[123]);?> 上传后页面显示为,不能包含<&…...
ICMP差错包
ICMP报文分类 Type Code 描述 查询/差错 0-Echo响应 0 Echo响应报文 查询 3-目的不可达 0 目标网络不可达报文 差错 1 目标主机不可达报文 差错 2 目标协议不可达报文 差错 3 目标端口不可达报文 差错 4 要求分段并设置DF flag标志报文 差错 5 源路由…...
算法基础课第二部分
算法基础课 第四讲 数学知识AcWing1381. 阶乘(同余,因式分解) 质数AcWing 866. 质数的判定---试除法AcWing 868. 质数的判定---埃氏筛AcWing867. 分解质因数---试除法AcWing 197. 阶乘---分解质因数---埃式筛 约数AcWing 869. 求约数---试除法AcWing 870. 约数个数-…...
【数据结构】外部排序、多路平衡归并与败者树、置换-选择排序(生成初始归并段)、最佳归并树算法
目录 1、外部排序 1.1 基本概念 1.2 方法 2、多路平衡归并与败者树 2.1 K路平衡归并 2.2 败者树 3、置换-选择排序(生成初始归并段)编辑 4、最佳归并树 4.1 理论基础编辑 4.2 构造方法 编辑 5、各种排序算法的性质 1、外部排序 1.1 基本概…...
抽象工厂模式 创建性模式之五
在看这篇文章之前,请先看看“简单工厂模式”和“工厂方法模式”这两篇博文,会更有助于理解。我们现在已经知道,简单工厂模式就是用一个简单工厂去创建多个产品,工厂方法模式是每一个具体的工厂只生产一个具体的产品,然…...
servlet如何获取PUT和DELETE请求的参数
1. servlet为何不能获取PUT和DELETE请求的参数 Servlet的规范是POST的数据需要转给request.getParameter*()方法,没有规定PUT和DELETE请求也这么做 The Servlet spec requires form data to be available for HTTP POST but not for HTTP PUT or PATCH requests. T…...
【Vue.js】使用Element中的Mock.js搭建首页导航左侧菜单---【超高级教学】
一,Mock.js 1.1 认识Mock.js Mock.js是一个用于前端开发中生成随机数据、模拟接口响应的 JavaScript 库。模拟数据的生成器,用来帮助前端调试开发、进行前后端的原型分离以及用来提高自动化测试效率 总结来说,Element中的Mock.js是一个用于…...
从技术创新到应用实践,百度智能云发起大模型平台应用开发挑战赛!
大模型已经成为未来技术发展方向的重大变革,热度之下更需去虚向实,让技术走进产业场景。在这样的背景下,百度智能云于近期发起了“百度智能云千帆大模型平台应用开发挑战赛”。 挖掘大模型落地应用 千帆大模型平台应用开发挑战赛启动 在不久前…...
网站关键词可以做几个/搜索排名提升
datanucleus吨 他的开源项目DataNucleus将发布访问平台2.0日程提前一天。 他们的符合标准的Java持久性产品可使用多种API和多种查询语言,将数据检索到一系列数据存储中。 2.0版引入了对特定于数据存储的查询汇编和新JPA2映射的缓存的支持,并支持一些新的…...
wordpress post 请求/网站制作流程和方法
一、对Kafka的认识 1.Kafka的基本概念 2.安装与配置 3.生产与消费 4.服务端参数配置 二、生产者 1.客户端开发 必要的参数配置消息的发送序列化分区器生产者拦截器 2.原理分析 整体架构元数据的更新 3.重要的生产者参数 三、消费者 1.消费者与消费组 2.客户端开发 必要的…...
线上推广员是干什么的/网站seo系统
‘can not read a block mapping entry; a multiline key may not be an implicit key’ 问题出现的地方: title: 2.8 Vue初体验 date: 2022-02-08 19:27:12 tags: 笔记 在这三个标题的:后面都必须添加一个空格,否则就会出错。...
郑州网站推广公司案例/百度收录的网站多久更新一次
如搜索框中,每改变一个数值就请求一次搜索接口,当快速的改变数值时并不需要多次请求接口,这就需要一个防抖函数: // 防抖函数 export function debounce(func, delay) { // func 函数 delay间隔时间let timerreturn function (...…...
交互网站建设需要做什么/在线客服系统平台有哪些
一、在spring的应用中我们存在两种过滤的用法,一种是拦截器、另外一种当然是过滤器。我们这里介绍过滤器在springboot的用法,在springmvc中的用法基本上一样,只是配置上面有点区别。 二、filter功能,它使用户可以改变一个 request…...
大学城网站开发公司/廊坊seo
学习于: https://www.bilibili.com/video/av29268873/?p23 《鸟哥的linux私房菜》先让我们来看看进程的虚拟地址中的0~3g用户空间的存储:正文即代码段,初始化的数据段,未初始化的数据即bss段。bss段和初始化的数据段加在一起就是…...