Flink -- window(窗口)
1、窗口主要分成三大种:
1、Time Window (时间窗口):固定时间触发一次窗口
a、SlidingEventTimeWindows: 滑动的事件时间窗口
public class Demo1TImeWindow {public static void main(String[] args) throws Exception {/*** 时间窗口:由时间触发的窗口* SlidingEventTimeWindows: 滑动的事件时间窗口* SlidingProcessingTimeWindows:滑动的处理时间窗口* TumblingEventTimeWindows:滚动的事件时间窗口* TumblingProcessingTimeWindows:滚动的处理时间窗口*/StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> linesDS = env.socketTextStream("master", 8888);//解析数据,取出时间字段DataStream<Tuple2<String, Long>> wordAndTsDS = linesDS.map(line -> {String[] split = line.split(",");String word = split[0];//将时间戳转换成long类型long ts = Long.parseLong(split[1]);return Tuple2.of(word, ts);}, Types.TUPLE(Types.STRING, Types.LONG));//设置时间字段和水位线DataStream<Tuple2<String, Long>> assDS = wordAndTsDS.assignTimestampsAndWatermarks(WatermarkStrategy//水位线前移1秒.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((kv, ts) -> kv.f1));/*** 每隔5秒统计最近10秒单词的数量*/SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = assDS.map(kv -> Tuple2.of(kv.f0, 1), Types.TUPLE(Types.STRING, Types.INT));kvDS.keyBy(kv -> kv.f0)//滑动的事件时间窗口.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).sum(1).print();env.execute();}
}
b、SlidingProcessingTimeWindows:滑动的处理时间窗口
public class Demo03ProcessingTime {public static void main(String[] args) throws Exception{/*** 数据处理时间:一般会结合窗口使用,一般值的是接受数据后对数据操作的时间* 需求:每过5秒中统计15秒内的单词的数量*///构建Flink的环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//使用socket模拟实时的操作DataStreamSource<String> wordDS = env.socketTextStream("master", 8888);//将接受的数据的转换成kv的格式SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = wordDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING,Types.INT));//按照单词进行分组KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(key -> key.f0);//划分窗口,窗口的大小是10秒钟,滑动的时间是5秒钟WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));//对统计的单词进行求和SingleOutputStreamOperator<Tuple2<String, Integer>> countDS = windowDS.sum(1);countDS.print();//启动Flinkenv.execute();}
}
c、TumblingEventTimeWindows:滚动的事件时间窗口
kvDS.keyBy(kv -> kv.f0)
//滚动的事件时间窗口.window(TumblingEventTimeWindows.of(Time.seconds(5))).sum(1).print();env.execute();
d、TumblingProcessingTimeWindows:滚动的处理时间窗口
kvDS.keyBy(kv -> kv.f0)//滚动的处理时间窗口:
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1).print();env.execute();
2、Session Window (会话窗口):如果一段时间没有数据就会生成一个窗口,将前面的数据拉去过来一起计算。
1、 ProcessingTimeSessionWindows: 处理时间的会话窗口,是针对每一个key都会统计他的数量。
2、EventTimeSessionWindows: 事件时间的会话窗口(需要由时间字段和水位线)(使用的比较少)
public class Demo3SessionWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);/*** ProcessingTimeSessionWindows: 处理时间的会话窗口* EventTimeSessionWindows: 事件时间的会话窗口(需要由时间字段和水位线)**/linesDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(kv -> kv.f0)//某一个key如果5秒没有数据产生,将前面的数据放一起进行计算.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))).sum(1).print();env.execute();}
}
3、Count Window (统计窗口):固定的数据量计算一次
1、countWindow(10): 滚动的统计窗口
2、countWindow(10,2):滑动的统计窗口
public class Demo2CountWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);/*** 实时统计单词的数量,每10条数据统计一次* .countWindow(10): 滚动的统计窗口* .countWindow(10,2):滑动的统计窗口*/linesDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(kv -> kv.f0)//每隔10条数据计算一次,同一个key每隔10条计算一次.countWindow(10).sum(1).print();env.execute();}
}
注意:对于事件时间,需要指定时间字段和水位线,处理时间不需要指定。
相关文章:
Flink -- window(窗口)
1、窗口主要分成三大种: 1、Time Window (时间窗口):固定时间触发一次窗口 a、SlidingEventTimeWindows: 滑动的事件时间窗口 public class Demo1TImeWindow {public static void main(String[] args) throws Exception {/*** 时…...
原语:串并转换器
串并转换器OSERDESE2 可被Select IO IP核调用。 OSERDESE2允许DDR功能 参考: FPGA原语学习与整理第二弹,OSERDESE2串并转换器 - 知乎 (zhihu.com) 正点原子。 ISERDESE2原语和OSERDESE2原语是串并转换器,他的的功能都是实现串行数据和并行…...
没网络也能安装.Net 3.5!如何脱机安装.NET Framework 3.5
.NET框架是由微软制定的一个软件框架。它有助于在Windows上运行控制台、Web或移动应用程序。此有用的工具适用于Windows设备。 如何脱机安装.NET Framework 3.5 如果你拥有Windows 10、8、8.1或7,有时第三方软件可能会导致问题。你可能会在图片中看到这样的问题。 看这张照片…...
JVM运行时数据区-虚拟机栈
目录 一、内存中的栈 二、基本内容 三、优点 四、栈的存储单位 五、栈运行原理 六、栈的内部结构 (一)局部变量表 (二)操作数栈 (三)动态链接 (四)方法返回地址 …...
Java中介者模式
目录 定义 结构 案例 优点 缺点 使用场景 定义 又叫调停模式,定义一个中介角色来封装一系列对象之间的交互,使原有对象之间的耦合松散,且可以独立地改变它们之间的交互。 结构 中介者模式包含以下主要角色: 抽象中介者角…...
前端框架Vue学习 ——(五)前端工程化Vue-cli脚手架
文章目录 Vue-cliVue项目-创建Vue项目-目录结构Vue项目-启动Vue项目-配置端口Vue项目开发流程 Vue-cli 介绍:Vue-cli 是 Vue 官方提供的一个脚手架,用于快速生成一个 Vue 的项目模版 安装 NodeJS安装 Vue-cli npm install -g vue/cliVue项目-创建 图…...
App备案-iOS云管理式证书 Distribution Managed 公钥及证书SHA-1指纹的获取方法
根据近日工业和信息化部发布的《工业和信息化部关于开展移动互联网应用程序备案工作的通知》,相信不少要进行IOS平台App备案的朋友遇到了一个问题,就是apple不提供云管理式证书的下载,也就无法获取公钥及证书SHA-1指纹。 已经上架的应用不想重…...
Spring -Spring之依赖注入源码解析
依赖注入底层原理流程图:Spring中Bean的依赖注入原理| ProcessOn免费在线作图,在线流程图,在线思维导图 Spring中到底有几种依赖注入的方式? 首先分两种: 手动注入自动注入 手动注入 在XML中定义Bean时,就是手动注入…...
Spire.Office for .NET 8.10.2 同步更新-Crk
Spire.Office for .NET是 E-iceblue 提供的企业级 Office .NET API 的组合。它包括Spire.Doc、Spire.XLS、Spire.Spreadsheet、Spire.Presentation、Spire.PDF、Spire.DataExport、Spire.OfficeViewer、Spire.PDFViewer、Spire.DocViewer、Spire.Barcode和Spire.Email。Spire.O…...
MFC 基础篇(一)
目录 一.SDK编程 二.为什么要学MFC? 三.MFC能做什么? 四.MFC开发环境搭建 五.MFC项目创建 六.消息映射机制 一.SDK编程 Application Programming Interface 应用程序编程接口。 Software Development Kit 软件开发工具包,一般会包括A…...
Android技术-修改SO导出符号
背景 经常在使用第三方SDK的时候会莫名其妙报错,其中最常见的一种就是SO符号冲突,比如libA.so静态链接了libC.a,而libB.so动态链接了libC.so。这样便会导致符号冲突。又或者在使用不同版本的动态库,也会造成符号冲突。 报错案例 案例1 DEB…...
flutter 打包apk
Flutter项目打包生成APK_flutter打包apk_文阿花的博客-CSDN博客 关于iconData可能出现的错误: flutter build apk 打包报错调试过程 - 掘金 (juejin.cn) 使用命令行:flutter build apk --no-tree-shake-icons...
Halcon如何使用SaperaLT库连接dalsa相机
halcon安装好的时候,没有带SaperaLT的采集库,需要额外在Halcon官网下载此库。 以下是halcon官网下载此库的链接。官网需要注册才可以下载。 https://www.mvtec.com/downloads/interfaces?tx_mvtecproduct_extensiondownloadlist%5Bfilter%5D%5B0%5Dma…...
Vue 嵌套路由 多级路由规则
套娃路由 routes:[{path: /login,component: Login},{path: /user,component: User,children:[{ path: test, component: Test },{ path: test2, component: Test2 },]}]子路由不需要加/ 在父组件 子路由不需要加/ 需要带上父亲的路由路径 <router-link to"user/test…...
pandas教程:Introduction to pandas Data Structures pandas的数据结构
文章目录 Chapter 5 Getting Started with pandas5.1 Introduction to pandas Data Structures1 Series2 DataFrame3 Index Objects (索引对象) Chapter 5 Getting Started with pandas 这样导入pandas: import pandas as pde:\python3.7\lib\site-packages\numpy…...
MinIO 分布式文件(对象)存储
简介 MinIO是高性能、可扩展、云原生支持、操作简单、开源的分布式对象存储产品。 在中国:阿里巴巴、腾讯、百度、中国联通、华为、中国移动等等9000多家企业也都在使用MinIO产品 官网地址:http://www.minio.org.cn/ 下载 官网下载(8.4.3版本)&#x…...
HTML表单标签
## HTML标签:表单标签 * 表单: * 概念:用于采集用户输入的数据的。用于和服务器进行交互。 * form:用于定义表单的。可以定义一个范围,范围代表采集用户数据的范围 * 属性࿱…...
【黑马程序员】SpringCloud——Eureka
文章目录 前言一、提供者与消费者1. 服务调用关系 二、远程调用的问题三、eureka 原理分析1. eureka 的作用 四、Eureka 案例1. 搭建 eureka 服务1. 服务注册1.1 注册 user-service1.2 启动 user-service3. order-service 完成服务注册 3. 服务发现1. 在 order-service 完成服务…...
目标跟踪(DeepSORT)
本文首先将介绍在目标跟踪任务中常用的匈牙利算法(Hungarian Algorithm)和卡尔曼滤波(Kalman Filter),然后介绍经典算法DeepSORT的工作流程以及对相关源码进行解析。 目前主流的目标跟踪算法都是基于Tracking-by-Detec…...
2 任务2: 使用趋动云GPU进行猫狗识别实践
使用趋动云GPU进行猫狗识别实践 1 创建项目2 初始化开发环境3 调试代码4 提交离线任务5 结果集存储与下载 使用趋动云提供的免费GPU,进行猫狗识别实践。 虽然例程里面提供的是基于tensorflow的,但是你也可以使用pytorch的代码 使用这个平台的一个优点就是…...
Day131 | 灵神 | 回溯算法 | 子集型 子集
Day131 | 灵神 | 回溯算法 | 子集型 子集 78.子集 78. 子集 - 力扣(LeetCode) 思路: 笔者写过很多次这道题了,不想写题解了,大家看灵神讲解吧 回溯算法套路①子集型回溯【基础算法精讲 14】_哔哩哔哩_bilibili 完…...
【机器视觉】单目测距——运动结构恢复
ps:图是随便找的,为了凑个封面 前言 在前面对光流法进行进一步改进,希望将2D光流推广至3D场景流时,发现2D转3D过程中存在尺度歧义问题,需要补全摄像头拍摄图像中缺失的深度信息,否则解空间不收敛…...
C# 类和继承(抽象类)
抽象类 抽象类是指设计为被继承的类。抽象类只能被用作其他类的基类。 不能创建抽象类的实例。抽象类使用abstract修饰符声明。 抽象类可以包含抽象成员或普通的非抽象成员。抽象类的成员可以是抽象成员和普通带 实现的成员的任意组合。抽象类自己可以派生自另一个抽象类。例…...
GitHub 趋势日报 (2025年06月08日)
📊 由 TrendForge 系统生成 | 🌐 https://trendforge.devlive.org/ 🌐 本日报中的项目描述已自动翻译为中文 📈 今日获星趋势图 今日获星趋势图 884 cognee 566 dify 414 HumanSystemOptimization 414 omni-tools 321 note-gen …...
MySQL账号权限管理指南:安全创建账户与精细授权技巧
在MySQL数据库管理中,合理创建用户账号并分配精确权限是保障数据安全的核心环节。直接使用root账号进行所有操作不仅危险且难以审计操作行为。今天我们来全面解析MySQL账号创建与权限分配的专业方法。 一、为何需要创建独立账号? 最小权限原则…...
嵌入式学习笔记DAY33(网络编程——TCP)
一、网络架构 C/S (client/server 客户端/服务器):由客户端和服务器端两个部分组成。客户端通常是用户使用的应用程序,负责提供用户界面和交互逻辑 ,接收用户输入,向服务器发送请求,并展示服务…...
RabbitMQ入门4.1.0版本(基于java、SpringBoot操作)
RabbitMQ 一、RabbitMQ概述 RabbitMQ RabbitMQ最初由LShift和CohesiveFT于2007年开发,后来由Pivotal Software Inc.(现为VMware子公司)接管。RabbitMQ 是一个开源的消息代理和队列服务器,用 Erlang 语言编写。广泛应用于各种分布…...
华为OD最新机试真题-数组组成的最小数字-OD统一考试(B卷)
题目描述 给定一个整型数组,请从该数组中选择3个元素 组成最小数字并输出 (如果数组长度小于3,则选择数组中所有元素来组成最小数字)。 输入描述 行用半角逗号分割的字符串记录的整型数组,0<数组长度<= 100,0<整数的取值范围<= 10000。 输出描述 由3个元素组成…...
Python第七周作业
Python第七周作业 文章目录 Python第七周作业 1.使用open以只读模式打开文件data.txt,并逐行打印内容 2.使用pathlib模块获取当前脚本的绝对路径,并创建logs目录(若不存在) 3.递归遍历目录data,输出所有.csv文件的路径…...
MLP实战二:MLP 实现图像数字多分类
任务 实战(二):MLP 实现图像多分类 基于 mnist 数据集,建立 mlp 模型,实现 0-9 数字的十分类 task: 1、实现 mnist 数据载入,可视化图形数字; 2、完成数据预处理:图像数据维度转换与…...
