Flink -- window(窗口)
1、窗口主要分成三大种:
1、Time Window (时间窗口):固定时间触发一次窗口
a、SlidingEventTimeWindows: 滑动的事件时间窗口
public class Demo1TImeWindow {public static void main(String[] args) throws Exception {/*** 时间窗口:由时间触发的窗口* SlidingEventTimeWindows: 滑动的事件时间窗口* SlidingProcessingTimeWindows:滑动的处理时间窗口* TumblingEventTimeWindows:滚动的事件时间窗口* TumblingProcessingTimeWindows:滚动的处理时间窗口*/StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> linesDS = env.socketTextStream("master", 8888);//解析数据,取出时间字段DataStream<Tuple2<String, Long>> wordAndTsDS = linesDS.map(line -> {String[] split = line.split(",");String word = split[0];//将时间戳转换成long类型long ts = Long.parseLong(split[1]);return Tuple2.of(word, ts);}, Types.TUPLE(Types.STRING, Types.LONG));//设置时间字段和水位线DataStream<Tuple2<String, Long>> assDS = wordAndTsDS.assignTimestampsAndWatermarks(WatermarkStrategy//水位线前移1秒.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((kv, ts) -> kv.f1));/*** 每隔5秒统计最近10秒单词的数量*/SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = assDS.map(kv -> Tuple2.of(kv.f0, 1), Types.TUPLE(Types.STRING, Types.INT));kvDS.keyBy(kv -> kv.f0)//滑动的事件时间窗口.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).sum(1).print();env.execute();}
}
b、SlidingProcessingTimeWindows:滑动的处理时间窗口
public class Demo03ProcessingTime {public static void main(String[] args) throws Exception{/*** 数据处理时间:一般会结合窗口使用,一般值的是接受数据后对数据操作的时间* 需求:每过5秒中统计15秒内的单词的数量*///构建Flink的环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//使用socket模拟实时的操作DataStreamSource<String> wordDS = env.socketTextStream("master", 8888);//将接受的数据的转换成kv的格式SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = wordDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING,Types.INT));//按照单词进行分组KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(key -> key.f0);//划分窗口,窗口的大小是10秒钟,滑动的时间是5秒钟WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));//对统计的单词进行求和SingleOutputStreamOperator<Tuple2<String, Integer>> countDS = windowDS.sum(1);countDS.print();//启动Flinkenv.execute();}
}
c、TumblingEventTimeWindows:滚动的事件时间窗口
kvDS.keyBy(kv -> kv.f0)
//滚动的事件时间窗口.window(TumblingEventTimeWindows.of(Time.seconds(5))).sum(1).print();env.execute();
d、TumblingProcessingTimeWindows:滚动的处理时间窗口
kvDS.keyBy(kv -> kv.f0)//滚动的处理时间窗口:
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1).print();env.execute();
2、Session Window (会话窗口):如果一段时间没有数据就会生成一个窗口,将前面的数据拉去过来一起计算。
1、 ProcessingTimeSessionWindows: 处理时间的会话窗口,是针对每一个key都会统计他的数量。
2、EventTimeSessionWindows: 事件时间的会话窗口(需要由时间字段和水位线)(使用的比较少)
public class Demo3SessionWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);/*** ProcessingTimeSessionWindows: 处理时间的会话窗口* EventTimeSessionWindows: 事件时间的会话窗口(需要由时间字段和水位线)**/linesDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(kv -> kv.f0)//某一个key如果5秒没有数据产生,将前面的数据放一起进行计算.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))).sum(1).print();env.execute();}
}
3、Count Window (统计窗口):固定的数据量计算一次
1、countWindow(10): 滚动的统计窗口
2、countWindow(10,2):滑动的统计窗口
public class Demo2CountWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);/*** 实时统计单词的数量,每10条数据统计一次* .countWindow(10): 滚动的统计窗口* .countWindow(10,2):滑动的统计窗口*/linesDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(kv -> kv.f0)//每隔10条数据计算一次,同一个key每隔10条计算一次.countWindow(10).sum(1).print();env.execute();}
}
注意:对于事件时间,需要指定时间字段和水位线,处理时间不需要指定。
相关文章:
Flink -- window(窗口)
1、窗口主要分成三大种: 1、Time Window (时间窗口):固定时间触发一次窗口 a、SlidingEventTimeWindows: 滑动的事件时间窗口 public class Demo1TImeWindow {public static void main(String[] args) throws Exception {/*** 时…...
原语:串并转换器
串并转换器OSERDESE2 可被Select IO IP核调用。 OSERDESE2允许DDR功能 参考: FPGA原语学习与整理第二弹,OSERDESE2串并转换器 - 知乎 (zhihu.com) 正点原子。 ISERDESE2原语和OSERDESE2原语是串并转换器,他的的功能都是实现串行数据和并行…...
没网络也能安装.Net 3.5!如何脱机安装.NET Framework 3.5
.NET框架是由微软制定的一个软件框架。它有助于在Windows上运行控制台、Web或移动应用程序。此有用的工具适用于Windows设备。 如何脱机安装.NET Framework 3.5 如果你拥有Windows 10、8、8.1或7,有时第三方软件可能会导致问题。你可能会在图片中看到这样的问题。 看这张照片…...
JVM运行时数据区-虚拟机栈
目录 一、内存中的栈 二、基本内容 三、优点 四、栈的存储单位 五、栈运行原理 六、栈的内部结构 (一)局部变量表 (二)操作数栈 (三)动态链接 (四)方法返回地址 …...
Java中介者模式
目录 定义 结构 案例 优点 缺点 使用场景 定义 又叫调停模式,定义一个中介角色来封装一系列对象之间的交互,使原有对象之间的耦合松散,且可以独立地改变它们之间的交互。 结构 中介者模式包含以下主要角色: 抽象中介者角…...
前端框架Vue学习 ——(五)前端工程化Vue-cli脚手架
文章目录 Vue-cliVue项目-创建Vue项目-目录结构Vue项目-启动Vue项目-配置端口Vue项目开发流程 Vue-cli 介绍:Vue-cli 是 Vue 官方提供的一个脚手架,用于快速生成一个 Vue 的项目模版 安装 NodeJS安装 Vue-cli npm install -g vue/cliVue项目-创建 图…...
App备案-iOS云管理式证书 Distribution Managed 公钥及证书SHA-1指纹的获取方法
根据近日工业和信息化部发布的《工业和信息化部关于开展移动互联网应用程序备案工作的通知》,相信不少要进行IOS平台App备案的朋友遇到了一个问题,就是apple不提供云管理式证书的下载,也就无法获取公钥及证书SHA-1指纹。 已经上架的应用不想重…...
Spring -Spring之依赖注入源码解析
依赖注入底层原理流程图:Spring中Bean的依赖注入原理| ProcessOn免费在线作图,在线流程图,在线思维导图 Spring中到底有几种依赖注入的方式? 首先分两种: 手动注入自动注入 手动注入 在XML中定义Bean时,就是手动注入…...
Spire.Office for .NET 8.10.2 同步更新-Crk
Spire.Office for .NET是 E-iceblue 提供的企业级 Office .NET API 的组合。它包括Spire.Doc、Spire.XLS、Spire.Spreadsheet、Spire.Presentation、Spire.PDF、Spire.DataExport、Spire.OfficeViewer、Spire.PDFViewer、Spire.DocViewer、Spire.Barcode和Spire.Email。Spire.O…...
MFC 基础篇(一)
目录 一.SDK编程 二.为什么要学MFC? 三.MFC能做什么? 四.MFC开发环境搭建 五.MFC项目创建 六.消息映射机制 一.SDK编程 Application Programming Interface 应用程序编程接口。 Software Development Kit 软件开发工具包,一般会包括A…...
Android技术-修改SO导出符号
背景 经常在使用第三方SDK的时候会莫名其妙报错,其中最常见的一种就是SO符号冲突,比如libA.so静态链接了libC.a,而libB.so动态链接了libC.so。这样便会导致符号冲突。又或者在使用不同版本的动态库,也会造成符号冲突。 报错案例 案例1 DEB…...
flutter 打包apk
Flutter项目打包生成APK_flutter打包apk_文阿花的博客-CSDN博客 关于iconData可能出现的错误: flutter build apk 打包报错调试过程 - 掘金 (juejin.cn) 使用命令行:flutter build apk --no-tree-shake-icons...
Halcon如何使用SaperaLT库连接dalsa相机
halcon安装好的时候,没有带SaperaLT的采集库,需要额外在Halcon官网下载此库。 以下是halcon官网下载此库的链接。官网需要注册才可以下载。 https://www.mvtec.com/downloads/interfaces?tx_mvtecproduct_extensiondownloadlist%5Bfilter%5D%5B0%5Dma…...
Vue 嵌套路由 多级路由规则
套娃路由 routes:[{path: /login,component: Login},{path: /user,component: User,children:[{ path: test, component: Test },{ path: test2, component: Test2 },]}]子路由不需要加/ 在父组件 子路由不需要加/ 需要带上父亲的路由路径 <router-link to"user/test…...
pandas教程:Introduction to pandas Data Structures pandas的数据结构
文章目录 Chapter 5 Getting Started with pandas5.1 Introduction to pandas Data Structures1 Series2 DataFrame3 Index Objects (索引对象) Chapter 5 Getting Started with pandas 这样导入pandas: import pandas as pde:\python3.7\lib\site-packages\numpy…...
MinIO 分布式文件(对象)存储
简介 MinIO是高性能、可扩展、云原生支持、操作简单、开源的分布式对象存储产品。 在中国:阿里巴巴、腾讯、百度、中国联通、华为、中国移动等等9000多家企业也都在使用MinIO产品 官网地址:http://www.minio.org.cn/ 下载 官网下载(8.4.3版本)&#x…...
HTML表单标签
## HTML标签:表单标签 * 表单: * 概念:用于采集用户输入的数据的。用于和服务器进行交互。 * form:用于定义表单的。可以定义一个范围,范围代表采集用户数据的范围 * 属性࿱…...
【黑马程序员】SpringCloud——Eureka
文章目录 前言一、提供者与消费者1. 服务调用关系 二、远程调用的问题三、eureka 原理分析1. eureka 的作用 四、Eureka 案例1. 搭建 eureka 服务1. 服务注册1.1 注册 user-service1.2 启动 user-service3. order-service 完成服务注册 3. 服务发现1. 在 order-service 完成服务…...
目标跟踪(DeepSORT)
本文首先将介绍在目标跟踪任务中常用的匈牙利算法(Hungarian Algorithm)和卡尔曼滤波(Kalman Filter),然后介绍经典算法DeepSORT的工作流程以及对相关源码进行解析。 目前主流的目标跟踪算法都是基于Tracking-by-Detec…...
2 任务2: 使用趋动云GPU进行猫狗识别实践
使用趋动云GPU进行猫狗识别实践 1 创建项目2 初始化开发环境3 调试代码4 提交离线任务5 结果集存储与下载 使用趋动云提供的免费GPU,进行猫狗识别实践。 虽然例程里面提供的是基于tensorflow的,但是你也可以使用pytorch的代码 使用这个平台的一个优点就是…...
在rocky linux 9.5上在线安装 docker
前面是指南,后面是日志 sudo dnf config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo sudo dnf install docker-ce docker-ce-cli containerd.io -y docker version sudo systemctl start docker sudo systemctl status docker …...
如何在看板中体现优先级变化
在看板中有效体现优先级变化的关键措施包括:采用颜色或标签标识优先级、设置任务排序规则、使用独立的优先级列或泳道、结合自动化规则同步优先级变化、建立定期的优先级审查流程。其中,设置任务排序规则尤其重要,因为它让看板视觉上直观地体…...
[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?
论文网址:pdf 英文是纯手打的!论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误,若有发现欢迎评论指正!文章偏向于笔记,谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...
数据库分批入库
今天在工作中,遇到一个问题,就是分批查询的时候,由于批次过大导致出现了一些问题,一下是问题描述和解决方案: 示例: // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...
【HTTP三个基础问题】
面试官您好!HTTP是超文本传输协议,是互联网上客户端和服务器之间传输超文本数据(比如文字、图片、音频、视频等)的核心协议,当前互联网应用最广泛的版本是HTTP1.1,它基于经典的C/S模型,也就是客…...
Java 二维码
Java 二维码 **技术:**谷歌 ZXing 实现 首先添加依赖 <!-- 二维码依赖 --><dependency><groupId>com.google.zxing</groupId><artifactId>core</artifactId><version>3.5.1</version></dependency><de…...
Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析
Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析 一、第一轮提问(基础概念问题) 1. 请解释Spring框架的核心容器是什么?它在Spring中起到什么作用? Spring框架的核心容器是IoC容器&#…...
SQL Server 触发器调用存储过程实现发送 HTTP 请求
文章目录 需求分析解决第 1 步:前置条件,启用 OLE 自动化方式 1:使用 SQL 实现启用 OLE 自动化方式 2:Sql Server 2005启动OLE自动化方式 3:Sql Server 2008启动OLE自动化第 2 步:创建存储过程第 3 步:创建触发器扩展 - 如何调试?第 1 步:登录 SQL Server 2008第 2 步…...
SpringAI实战:ChatModel智能对话全解
一、引言:Spring AI 与 Chat Model 的核心价值 🚀 在 Java 生态中集成大模型能力,Spring AI 提供了高效的解决方案 🤖。其中 Chat Model 作为核心交互组件,通过标准化接口简化了与大语言模型(LLM࿰…...
es6+和css3新增的特性有哪些
一:ECMAScript 新特性(ES6) ES6 (2015) - 革命性更新 1,记住的方法,从一个方法里面用到了哪些技术 1,let /const块级作用域声明2,**默认参数**:函数参数可以设置默认值。3&#x…...
