当前位置: 首页 > news >正文

flink学习(8)——窗口函数

增量聚合函数

——指窗口每进入一条数据就计算一次

例如:要计算数字之和,进去一个12 计算结果为20, 再进入一个7 ——结果为27

 reduce

aggregate(aggregateFunction)

package com.bigdata.day04;public class _04_agg函数 {public static final Tuple3[] ENGLISH = new Tuple3[] {Tuple3.of("class1", "张三", 100L),Tuple3.of("class1", "李四", 40L),Tuple3.of("class1", "王五", 60L),Tuple3.of("class2", "赵六", 20L),Tuple3.of("class2", "小七", 30L),Tuple3.of("class2", "小八", 50L),};public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<Tuple3<String,String,Long>> dataStreamSource = env.fromElements(ENGLISH);// 此时我要获取每个班级的平均成绩// 输入数据的类型(IN)、累加器的类型(ACC)和输出数据的类型(OUT)// IN——Tuple3<String, String, Long>// ACC——Tuple3<String, Integer,Long> 第一个是班级(key)第二个是数量,第三个是总的成绩// OUT —— Tuple2<String,Double> 第一个是班级 第二个是平均成绩dataStreamSource.countWindowAll(3).aggregate(new AggregateFunction<Tuple3<String, String, Long>, Tuple3<String, Integer,Long>, Tuple2<String,Double>>() {// 初始化一个 累加器@Overridepublic Tuple3<String, Integer, Long> createAccumulator() {return Tuple3.of(null,0,0L);}// 累加器和输入的值进行累加// Tuple3<String, String, Long> value 第一个是传入的值// Tuple3<String, Integer, Long> accumulator 第二个是累加器的值@Overridepublic Tuple3<String, Integer, Long> add(Tuple3<String, String, Long> value, Tuple3<String, Integer, Long> accumulator) {return Tuple3.of(value.f0,accumulator.f1+1,accumulator.f2+value.f2);}// 获取结果——在不同节点的结果进行汇总后实现@Overridepublic Tuple2<String, Double> getResult(Tuple3<String, Integer, Long> accumulator) {return Tuple2.of(accumulator.f0, (double) accumulator.f2 / accumulator.f1);}// 由于flink是分布式,所以在别的节点也会进行累加 ,该方法是不同节点的结果进行汇总// 即累加器之间的累加@Overridepublic Tuple3<String, Integer, Long> merge(Tuple3<String, Integer, Long> a, Tuple3<String, Integer, Long> b) {return Tuple3.of(a.f0,a.f1+b.f1,a.f2+b.f2);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

sum()

min()

max()

 全量聚合函数

指在窗口触发的时候才会对窗口内的所有数据进行一次计算(等窗口的数据到齐,才开始进行聚合计算,可实现对窗口内的数据进行排序等需求)

全量聚合函数比较简单,但是会将所有的数据存放在内存中,因此会占用大量的内存空间

apply 

package com.bigdata.day04;public class _05_app函数 {public static final Tuple3[] ENGLISH = new Tuple3[] {Tuple3.of("class1", "张三", 100L),Tuple3.of("class1", "李四", 40L),Tuple3.of("class1", "王五", 60L),Tuple3.of("class2", "赵六", 20L),Tuple3.of("class2", "小七", 30L),Tuple3.of("class2", "小八", 50L),};public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);DataStreamSource<Tuple3<String,String,Long>> dataStreamSource = env.fromElements(ENGLISH);//2. source-加载数据dataStreamSource.countWindowAll(3).apply(new AllWindowFunction<Tuple3<String,String,Long>, Tuple2<String,Double>, GlobalWindow>() {@Overridepublic void apply(GlobalWindow window, Iterable<Tuple3<String, String, Long>> values, Collector<Tuple2<String,Double>> out) throws Exception {Long sum = 0L;int length = 0;String key = null;for (Tuple3<String, String, Long> value : values) {sum += value.f2;length++;key = value.f0;}out.collect(Tuple2.of(key,(double) sum/length));}}).print();env.execute();}
}// 总结// 接口
new AllWindowFunction<Tuple3<String,String,Long>, Tuple2<String,Double>, GlobalWindow>()
GlobalWindow 窗口对象  Tuple3<String,String,Long> 传入的值  Tuple2<String,Double> 结果// 重写的方法
public void apply(GlobalWindow window, Iterable<Tuple3<String, String, Long>> values, Collector<Tuple2<String,Double>> out) Iterable<Tuple3<String, String, Long>> values 传入值的迭代器 进行遍历
Collector<Tuple2<String,Double>> out 收集器 调用collect方法收集即可
window 窗口对象//使用窗口对象我们可以拿到窗口的起始时间long start = window.getStart();long end = window.getEnd();

process

使用方式一:在connect合流之后对两个类型不同的流进行处理

使用方式二:在分流的时候使用,可以通过context.output方法对每个数据添加一个标签

 使用方式一
new CoProcessFunction<Long, String, String>()  
// 第一个泛型是第一个流的类型 第二个泛型是第二个流的类型  第三个泛型是合并后流的类型@Overridepublic void processElement1(Long l, CoProcessFunction<Long, String, String>.Context context, Collector<String> collector) throws Exception {// Long 是数据类型 结果使用collector中的collect 收集collector.collect(String.valueOf(l));}@Overridepublic void processElement2(String s, CoProcessFunction<Long, String, String>.Context context, Collector<String> collector) throws Exception {//  String 是数据类型 结果使用collector中的collect 收集collector.collect(s);}
使用方式二
此时使用的是context中的context.output(odd, element); 方法
odd 是标签
element 是元素OutputTag<Long> odd = new OutputTag<>("奇数",TypeInformation.of(Long.class));
OutputTag<Long> even = new OutputTag<>("偶数", TypeInformation.of(Long.class));

相关文章:

flink学习(8)——窗口函数

增量聚合函数 ——指窗口每进入一条数据就计算一次 例如&#xff1a;要计算数字之和&#xff0c;进去一个12 计算结果为20&#xff0c; 再进入一个7 ——结果为27 reduce aggregate(aggregateFunction) package com.bigdata.day04;public class _04_agg函数 {public static …...

「实战应用」如何用图表控件LightningChart .NET实现散点图?(一)

LightningChart .NET完全由GPU加速&#xff0c;并且性能经过优化&#xff0c;可用于实时显示海量数据-超过10亿个数据点。 LightningChart包括广泛的2D&#xff0c;高级3D&#xff0c;Polar&#xff0c;Smith&#xff0c;3D饼/甜甜圈&#xff0c;地理地图和GIS图表以及适用于科…...

鸿蒙Native使用Demo

DevecoStudio使用Native 今天,给大家带来的是关于DevecoStudio中使用Native进行开发 个人拙见:为什么要使用Native?无论是JS还是TS在复杂的情况下运行速度,肯定不如直接操作内存的C/C的运行速度快,所以,会选择使用Native;这里面的过程是什么?通过映射转化,使用napi提供的接口…...

29.UE5蓝图的网络通讯,多人自定义事件,变量同步

3-9 蓝图的网络通讯、多人自定义事件、变量同步_哔哩哔哩_bilibili 目录 1.网络通讯 1.1玩家Pawn之间的同步 1.2事件同步 1.3UI同步 1.4组播 1.5变量同步 1.网络通讯 1.1玩家Pawn之间的同步 创建一个第三人称项目 将网络模式更改为监听服务器&#xff0c;即将房主作为…...

Scala—列表(可变ListBuffer、不可变List)用法详解

Scala集合概述-链接 大家可以点击上方链接&#xff0c;先对Scala的集合有一个整体的概念&#x1f923;&#x1f923;&#x1f923; 在 Scala 中&#xff0c;列表&#xff08;List&#xff09;分为不可变列表&#xff08;List&#xff09;和可变列表&#xff08;ListBuffer&…...

【论文复现】偏标记学习+图像分类

&#x1f4dd;个人主页&#x1f339;&#xff1a;Eternity._ &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; ❀ 偏标记学习图像分类 概述算法原理核心逻辑效果演示使用方式参考文献 概述 本文复现论文 Progressive Identification of True Labels for Pa…...

C嘎嘎探索篇:栈与队列的交响:C++中的结构艺术

C嘎嘎探索篇&#xff1a;栈与队列的交响&#xff1a;C中的结构艺术 前言&#xff1a; 小编在之前刚完成了C中栈和队列&#xff08;stack和queue&#xff09;的讲解&#xff0c;忘记的小伙伴可以去我上一篇文章看一眼的&#xff0c;今天小编将会带领大家吹奏栈和队列的交响&am…...

AIGC-----AIGC在虚拟现实中的应用前景

AIGC在虚拟现实中的应用前景 引言 随着人工智能生成内容&#xff08;AIGC&#xff09;的快速发展&#xff0c;虚拟现实&#xff08;VR&#xff09;技术的应用也迎来了新的契机。AIGC与VR的结合为创造沉浸式体验带来了全新的可能性&#xff0c;这种组合不仅极大地降低了VR内容的…...

Django 路由层

1. 路由基础概念 URLconf (URL 配置)&#xff1a;Django 的路由系统是基于 urls.py 文件定义的。路径匹配&#xff1a;通过模式匹配 URL&#xff0c;并将请求传递给对应的视图处理函数。命名路由&#xff1a;每个路由可以定义一个名称&#xff0c;用于反向解析。 2. 基本路由配…...

《硬件架构的艺术》笔记(八):消抖技术

简介 在电子设备中两个金属触点随着触点的断开闭合便产生了多个信号&#xff0c;这就是抖动。 消抖是用来确保每一次断开或闭合触点时只有一个信号起作用的硬件设备或软件。&#xff08;就是每次断开闭合只对应一个操作&#xff09;。 抖动在某些模拟和逻辑电路中可能产生问…...

Spring 与 Spring MVC 与 Spring Boot三者之间的区别与联系

一.什么是Spring&#xff1f;它解决了什么问题&#xff1f; 1.1什么是Spring&#xff1f; Spring&#xff0c;一般指代的是Spring Framework 它是一个开源的应用程序框架&#xff0c;提供了一个简易的开发方式&#xff0c;通过这种开发方式&#xff0c;将避免那些可能致使代码…...

【算法】连通块问题(C/C++)

目录 连通块问题 解决思路 步骤&#xff1a; 初始化&#xff1a; DFS函数&#xff1a; 复杂度分析 代码实现&#xff08;C&#xff09; 题目链接&#xff1a;2060. 奶牛选美 - AcWing题库 解题思路&#xff1a; AC代码&#xff1a; 题目链接&#xff1a;687. 扫雷 -…...

如何选择黑白相机和彩色相机

我们在选择成像解决方案时黑白相机很容易被忽略&#xff0c;因为许多新相机提供鲜艳的颜色&#xff0c;鲜明的对比度和改进的弱光性能。然而&#xff0c;有许多应用&#xff0c;选择黑白相机将是更好的选择&#xff0c;因为他们产生更清晰的图像&#xff0c;更好的分辨率&#…...

Rust 力扣 - 740. 删除并获得点数

文章目录 题目描述题解思路题解代码题目链接 题目描述 题解思路 首先对于这题我们如果将所有点数装入一个切片f中&#xff0c;该切片f中的i号下标表示所有点数为i的点数之和 那么这题就转换成了打家劫舍这道题&#xff0c;也就是求选择了切片中某个下标的元素后&#xff0c;该…...

OpenCV从入门到精通实战(七)——探索图像处理:自定义滤波与OpenCV卷积核

本文主要介绍如何使用Python和OpenCV库通过卷积操作来应用不同的图像滤波效果。主要分为几个步骤&#xff1a;图像的读取与处理、自定义卷积函数的实现、不同卷积核的应用&#xff0c;以及结果的展示。 卷积 在图像处理中&#xff0c;卷积是一种重要的操作&#xff0c;它通过…...

Docker核心概念总结

本文只是对 Docker 的概念做了较为详细的介绍&#xff0c;并不涉及一些像 Docker 环境的安装以及 Docker 的一些常见操作和命令。 容器介绍 Docker 是世界领先的软件容器平台&#xff0c;所以想要搞懂 Docker 的概念我们必须先从容器开始说起。 什么是容器? 先来看看容器较为…...

环形缓冲区

什么是环形缓冲区 环形缓冲区,也称为循环缓冲区或环形队列,是一种特殊的FIFO(先进先出)数据结构。它使用一块固定大小的内存空间来缓存数据,并通过两个指针(读指针和写指针)来管理数据的读写。当任意一个指针到达缓冲区末尾时,会自动回绕到缓冲区开头,形成一个"环"。…...

jQuery-Word-Export 使用记录及完整修正文件下载 jquery.wordexport.js

参考资料&#xff1a; jQuery-Word-Export导出word_jquery.wordexport.js下载-CSDN博客 近期又需要自己做个 Html2Doc 的解决方案&#xff0c;因为客户又不想要 Html2pdf 的下载了&#xff0c;当初还给我费尽心思解决Html转pdf时中文输出的问题&#xff08;html转pdf文件下载之…...

云服务器部署WebSocket项目

WebSocket是一种在单个TCP连接上进行全双工通信的协议&#xff0c;其设计的目的是在Web浏览器和Web服务器之间进行实时通信&#xff08;实时Web&#xff09; WebSocket协议的优点包括&#xff1a; 1. 更高效的网络利用率&#xff1a;与HTTP相比&#xff0c;WebSocket的握手只…...

C#+数据库 实现动态权限设置

将权限信息存储在数据库中&#xff0c;支持动态调整。根据用户所属的角色、特定的功能模块&#xff0c;动态加载权限” 1. 数据库设计 根据这种需求&#xff0c;可以通过以下表设计&#xff1a; 用户表 (Users)&#xff1a;存储用户信息。角色表 (Roles)&#xff1a;存储角色…...

idea大量爆红问题解决

问题描述 在学习和工作中&#xff0c;idea是程序员不可缺少的一个工具&#xff0c;但是突然在有些时候就会出现大量爆红的问题&#xff0c;发现无法跳转&#xff0c;无论是关机重启或者是替换root都无法解决 就是如上所展示的问题&#xff0c;但是程序依然可以启动。 问题解决…...

基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销&#xff0c;平衡网络负载&#xff0c;延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...

短视频矩阵系统文案创作功能开发实践,定制化开发

在短视频行业迅猛发展的当下&#xff0c;企业和个人创作者为了扩大影响力、提升传播效果&#xff0c;纷纷采用短视频矩阵运营策略&#xff0c;同时管理多个平台、多个账号的内容发布。然而&#xff0c;频繁的文案创作需求让运营者疲于应对&#xff0c;如何高效产出高质量文案成…...

k8s从入门到放弃之HPA控制器

k8s从入门到放弃之HPA控制器 Kubernetes中的Horizontal Pod Autoscaler (HPA)控制器是一种用于自动扩展部署、副本集或复制控制器中Pod数量的机制。它可以根据观察到的CPU利用率&#xff08;或其他自定义指标&#xff09;来调整这些对象的规模&#xff0c;从而帮助应用程序在负…...

高考志愿填报管理系统---开发介绍

高考志愿填报管理系统是一款专为教育机构、学校和教师设计的学生信息管理和志愿填报辅助平台。系统基于Django框架开发&#xff0c;采用现代化的Web技术&#xff0c;为教育工作者提供高效、安全、便捷的学生管理解决方案。 ## &#x1f4cb; 系统概述 ### &#x1f3af; 系统定…...

《信号与系统》第 6 章 信号与系统的时域和频域特性

目录 6.0 引言 6.1 傅里叶变换的模和相位表示 6.2 线性时不变系统频率响应的模和相位表示 6.2.1 线性与非线性相位 6.2.2 群时延 6.2.3 对数模和相位图 6.3 理想频率选择性滤波器的时域特性 6.4 非理想滤波器的时域和频域特性讨论 6.5 一阶与二阶连续时间系统 6.5.1 …...

游戏开发中常见的战斗数值英文缩写对照表

游戏开发中常见的战斗数值英文缩写对照表 基础属性&#xff08;Basic Attributes&#xff09; 缩写英文全称中文释义常见使用场景HPHit Points / Health Points生命值角色生存状态MPMana Points / Magic Points魔法值技能释放资源SPStamina Points体力值动作消耗资源APAction…...

Redis上篇--知识点总结

Redis上篇–解析 本文大部分知识整理自网上&#xff0c;在正文结束后都会附上参考地址。如果想要深入或者详细学习可以通过文末链接跳转学习。 1. 基本介绍 Redis 是一个开源的、高性能的 内存键值数据库&#xff0c;Redis 的键值对中的 key 就是字符串对象&#xff0c;而 val…...

基于谷歌ADK的 智能产品推荐系统(2): 模块功能详解

在我的上一篇博客&#xff1a;基于谷歌ADK的 智能产品推荐系统(1): 功能简介-CSDN博客 中我们介绍了个性化购物 Agent 项目&#xff0c;该项目展示了一个强大的框架&#xff0c;旨在模拟和实现在线购物环境中的智能导购。它不仅仅是一个简单的聊天机器人&#xff0c;更是一个集…...

Gitlab + Jenkins 实现 CICD

CICD 是持续集成&#xff08;Continuous Integration, CI&#xff09;和持续交付/部署&#xff08;Continuous Delivery/Deployment, CD&#xff09;的缩写&#xff0c;是现代软件开发中的一种自动化流程实践。下面介绍 Web 项目如何在代码提交到 Gitlab 后&#xff0c;自动发布…...