flink学习(8)——窗口函数
增量聚合函数
——指窗口每进入一条数据就计算一次
例如:要计算数字之和,进去一个12 计算结果为20, 再进入一个7 ——结果为27
reduce
aggregate(aggregateFunction)
package com.bigdata.day04;public class _04_agg函数 {public static final Tuple3[] ENGLISH = new Tuple3[] {Tuple3.of("class1", "张三", 100L),Tuple3.of("class1", "李四", 40L),Tuple3.of("class1", "王五", 60L),Tuple3.of("class2", "赵六", 20L),Tuple3.of("class2", "小七", 30L),Tuple3.of("class2", "小八", 50L),};public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<Tuple3<String,String,Long>> dataStreamSource = env.fromElements(ENGLISH);// 此时我要获取每个班级的平均成绩// 输入数据的类型(IN)、累加器的类型(ACC)和输出数据的类型(OUT)// IN——Tuple3<String, String, Long>// ACC——Tuple3<String, Integer,Long> 第一个是班级(key)第二个是数量,第三个是总的成绩// OUT —— Tuple2<String,Double> 第一个是班级 第二个是平均成绩dataStreamSource.countWindowAll(3).aggregate(new AggregateFunction<Tuple3<String, String, Long>, Tuple3<String, Integer,Long>, Tuple2<String,Double>>() {// 初始化一个 累加器@Overridepublic Tuple3<String, Integer, Long> createAccumulator() {return Tuple3.of(null,0,0L);}// 累加器和输入的值进行累加// Tuple3<String, String, Long> value 第一个是传入的值// Tuple3<String, Integer, Long> accumulator 第二个是累加器的值@Overridepublic Tuple3<String, Integer, Long> add(Tuple3<String, String, Long> value, Tuple3<String, Integer, Long> accumulator) {return Tuple3.of(value.f0,accumulator.f1+1,accumulator.f2+value.f2);}// 获取结果——在不同节点的结果进行汇总后实现@Overridepublic Tuple2<String, Double> getResult(Tuple3<String, Integer, Long> accumulator) {return Tuple2.of(accumulator.f0, (double) accumulator.f2 / accumulator.f1);}// 由于flink是分布式,所以在别的节点也会进行累加 ,该方法是不同节点的结果进行汇总// 即累加器之间的累加@Overridepublic Tuple3<String, Integer, Long> merge(Tuple3<String, Integer, Long> a, Tuple3<String, Integer, Long> b) {return Tuple3.of(a.f0,a.f1+b.f1,a.f2+b.f2);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}
sum()
min()
max()
全量聚合函数
指在窗口触发的时候才会对窗口内的所有数据进行一次计算(等窗口的数据到齐,才开始进行聚合计算,可实现对窗口内的数据进行排序等需求)
全量聚合函数比较简单,但是会将所有的数据存放在内存中,因此会占用大量的内存空间
apply
package com.bigdata.day04;public class _05_app函数 {public static final Tuple3[] ENGLISH = new Tuple3[] {Tuple3.of("class1", "张三", 100L),Tuple3.of("class1", "李四", 40L),Tuple3.of("class1", "王五", 60L),Tuple3.of("class2", "赵六", 20L),Tuple3.of("class2", "小七", 30L),Tuple3.of("class2", "小八", 50L),};public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);DataStreamSource<Tuple3<String,String,Long>> dataStreamSource = env.fromElements(ENGLISH);//2. source-加载数据dataStreamSource.countWindowAll(3).apply(new AllWindowFunction<Tuple3<String,String,Long>, Tuple2<String,Double>, GlobalWindow>() {@Overridepublic void apply(GlobalWindow window, Iterable<Tuple3<String, String, Long>> values, Collector<Tuple2<String,Double>> out) throws Exception {Long sum = 0L;int length = 0;String key = null;for (Tuple3<String, String, Long> value : values) {sum += value.f2;length++;key = value.f0;}out.collect(Tuple2.of(key,(double) sum/length));}}).print();env.execute();}
}// 总结// 接口
new AllWindowFunction<Tuple3<String,String,Long>, Tuple2<String,Double>, GlobalWindow>()
GlobalWindow 窗口对象 Tuple3<String,String,Long> 传入的值 Tuple2<String,Double> 结果// 重写的方法
public void apply(GlobalWindow window, Iterable<Tuple3<String, String, Long>> values, Collector<Tuple2<String,Double>> out) Iterable<Tuple3<String, String, Long>> values 传入值的迭代器 进行遍历
Collector<Tuple2<String,Double>> out 收集器 调用collect方法收集即可
window 窗口对象//使用窗口对象我们可以拿到窗口的起始时间long start = window.getStart();long end = window.getEnd();
process
使用方式一:在connect合流之后对两个类型不同的流进行处理
使用方式二:在分流的时候使用,可以通过context.output方法对每个数据添加一个标签
使用方式一
new CoProcessFunction<Long, String, String>()
// 第一个泛型是第一个流的类型 第二个泛型是第二个流的类型 第三个泛型是合并后流的类型@Overridepublic void processElement1(Long l, CoProcessFunction<Long, String, String>.Context context, Collector<String> collector) throws Exception {// Long 是数据类型 结果使用collector中的collect 收集collector.collect(String.valueOf(l));}@Overridepublic void processElement2(String s, CoProcessFunction<Long, String, String>.Context context, Collector<String> collector) throws Exception {// String 是数据类型 结果使用collector中的collect 收集collector.collect(s);}
使用方式二
此时使用的是context中的context.output(odd, element); 方法
odd 是标签
element 是元素OutputTag<Long> odd = new OutputTag<>("奇数",TypeInformation.of(Long.class));
OutputTag<Long> even = new OutputTag<>("偶数", TypeInformation.of(Long.class));
相关文章:
flink学习(8)——窗口函数
增量聚合函数 ——指窗口每进入一条数据就计算一次 例如:要计算数字之和,进去一个12 计算结果为20, 再进入一个7 ——结果为27 reduce aggregate(aggregateFunction) package com.bigdata.day04;public class _04_agg函数 {public static …...
「实战应用」如何用图表控件LightningChart .NET实现散点图?(一)
LightningChart .NET完全由GPU加速,并且性能经过优化,可用于实时显示海量数据-超过10亿个数据点。 LightningChart包括广泛的2D,高级3D,Polar,Smith,3D饼/甜甜圈,地理地图和GIS图表以及适用于科…...
鸿蒙Native使用Demo
DevecoStudio使用Native 今天,给大家带来的是关于DevecoStudio中使用Native进行开发 个人拙见:为什么要使用Native?无论是JS还是TS在复杂的情况下运行速度,肯定不如直接操作内存的C/C的运行速度快,所以,会选择使用Native;这里面的过程是什么?通过映射转化,使用napi提供的接口…...
29.UE5蓝图的网络通讯,多人自定义事件,变量同步
3-9 蓝图的网络通讯、多人自定义事件、变量同步_哔哩哔哩_bilibili 目录 1.网络通讯 1.1玩家Pawn之间的同步 1.2事件同步 1.3UI同步 1.4组播 1.5变量同步 1.网络通讯 1.1玩家Pawn之间的同步 创建一个第三人称项目 将网络模式更改为监听服务器,即将房主作为…...
Scala—列表(可变ListBuffer、不可变List)用法详解
Scala集合概述-链接 大家可以点击上方链接,先对Scala的集合有一个整体的概念🤣🤣🤣 在 Scala 中,列表(List)分为不可变列表(List)和可变列表(ListBuffer&…...
【论文复现】偏标记学习+图像分类
📝个人主页🌹:Eternity._ 🌹🌹期待您的关注 🌹🌹 ❀ 偏标记学习图像分类 概述算法原理核心逻辑效果演示使用方式参考文献 概述 本文复现论文 Progressive Identification of True Labels for Pa…...
C嘎嘎探索篇:栈与队列的交响:C++中的结构艺术
C嘎嘎探索篇:栈与队列的交响:C中的结构艺术 前言: 小编在之前刚完成了C中栈和队列(stack和queue)的讲解,忘记的小伙伴可以去我上一篇文章看一眼的,今天小编将会带领大家吹奏栈和队列的交响&am…...
AIGC-----AIGC在虚拟现实中的应用前景
AIGC在虚拟现实中的应用前景 引言 随着人工智能生成内容(AIGC)的快速发展,虚拟现实(VR)技术的应用也迎来了新的契机。AIGC与VR的结合为创造沉浸式体验带来了全新的可能性,这种组合不仅极大地降低了VR内容的…...
Django 路由层
1. 路由基础概念 URLconf (URL 配置):Django 的路由系统是基于 urls.py 文件定义的。路径匹配:通过模式匹配 URL,并将请求传递给对应的视图处理函数。命名路由:每个路由可以定义一个名称,用于反向解析。 2. 基本路由配…...
《硬件架构的艺术》笔记(八):消抖技术
简介 在电子设备中两个金属触点随着触点的断开闭合便产生了多个信号,这就是抖动。 消抖是用来确保每一次断开或闭合触点时只有一个信号起作用的硬件设备或软件。(就是每次断开闭合只对应一个操作)。 抖动在某些模拟和逻辑电路中可能产生问…...
Spring 与 Spring MVC 与 Spring Boot三者之间的区别与联系
一.什么是Spring?它解决了什么问题? 1.1什么是Spring? Spring,一般指代的是Spring Framework 它是一个开源的应用程序框架,提供了一个简易的开发方式,通过这种开发方式,将避免那些可能致使代码…...
【算法】连通块问题(C/C++)
目录 连通块问题 解决思路 步骤: 初始化: DFS函数: 复杂度分析 代码实现(C) 题目链接:2060. 奶牛选美 - AcWing题库 解题思路: AC代码: 题目链接:687. 扫雷 -…...
如何选择黑白相机和彩色相机
我们在选择成像解决方案时黑白相机很容易被忽略,因为许多新相机提供鲜艳的颜色,鲜明的对比度和改进的弱光性能。然而,有许多应用,选择黑白相机将是更好的选择,因为他们产生更清晰的图像,更好的分辨率&#…...
Rust 力扣 - 740. 删除并获得点数
文章目录 题目描述题解思路题解代码题目链接 题目描述 题解思路 首先对于这题我们如果将所有点数装入一个切片f中,该切片f中的i号下标表示所有点数为i的点数之和 那么这题就转换成了打家劫舍这道题,也就是求选择了切片中某个下标的元素后,该…...
OpenCV从入门到精通实战(七)——探索图像处理:自定义滤波与OpenCV卷积核
本文主要介绍如何使用Python和OpenCV库通过卷积操作来应用不同的图像滤波效果。主要分为几个步骤:图像的读取与处理、自定义卷积函数的实现、不同卷积核的应用,以及结果的展示。 卷积 在图像处理中,卷积是一种重要的操作,它通过…...
Docker核心概念总结
本文只是对 Docker 的概念做了较为详细的介绍,并不涉及一些像 Docker 环境的安装以及 Docker 的一些常见操作和命令。 容器介绍 Docker 是世界领先的软件容器平台,所以想要搞懂 Docker 的概念我们必须先从容器开始说起。 什么是容器? 先来看看容器较为…...
环形缓冲区
什么是环形缓冲区 环形缓冲区,也称为循环缓冲区或环形队列,是一种特殊的FIFO(先进先出)数据结构。它使用一块固定大小的内存空间来缓存数据,并通过两个指针(读指针和写指针)来管理数据的读写。当任意一个指针到达缓冲区末尾时,会自动回绕到缓冲区开头,形成一个"环"。…...
jQuery-Word-Export 使用记录及完整修正文件下载 jquery.wordexport.js
参考资料: jQuery-Word-Export导出word_jquery.wordexport.js下载-CSDN博客 近期又需要自己做个 Html2Doc 的解决方案,因为客户又不想要 Html2pdf 的下载了,当初还给我费尽心思解决Html转pdf时中文输出的问题(html转pdf文件下载之…...
云服务器部署WebSocket项目
WebSocket是一种在单个TCP连接上进行全双工通信的协议,其设计的目的是在Web浏览器和Web服务器之间进行实时通信(实时Web) WebSocket协议的优点包括: 1. 更高效的网络利用率:与HTTP相比,WebSocket的握手只…...
C#+数据库 实现动态权限设置
将权限信息存储在数据库中,支持动态调整。根据用户所属的角色、特定的功能模块,动态加载权限” 1. 数据库设计 根据这种需求,可以通过以下表设计: 用户表 (Users):存储用户信息。角色表 (Roles):存储角色…...
(原创)Android Studio新老界面UI切换及老版本下载地址
前言 这两天下载了一个新版的Android Studio,发现整个界面都发生了很大改动: 新的界面的一些设置可参考一些博客: Android Studio新版UI常用设置 但是对于一些急着开发的小伙伴来说,没有时间去适应,那么怎么办呢&am…...
Ubuntu24虚拟机-gnome-boxes
推荐使用gnome-boxes, virtualbox构建失败,multipass需要开启防火墙 sudo apt install gnome-boxes创建完毕~...
k8s rainbond centos7/win10 -20241124
参考 https://www.rainbond.com/ 国内一站式云原生平台 对centos7环境支持不太行 [lighthouseVM-16-5-centos ~]$ curl -o install.sh https://get.rainbond.com && bash ./install.sh 2024-11-24 09:56:57 ERROR: Ops! Docker daemon is not running. Start docke…...
SpringBoot+Vue滑雪社区网站设计与实现
【1】系统介绍 研究背景 随着互联网技术的快速发展和冰雪运动的普及,滑雪作为一种受欢迎的冬季运动项目,吸引了越来越多的爱好者。与此同时,社交媒体和在线社区平台的兴起为滑雪爱好者提供了一个交流经验、分享心得、获取信息的重要渠道。滑…...
MySql.2
sql查询语句执行过程 SQL 查询语句的执行过程是一个复杂的过程,涉及多个步骤。以下是典型的关系数据库管理系统 (RDBMS) 中 SQL 查询语句的执行过程概述: 1. 客户端发送查询 用户通过 SQL 客户端或应用程序发送 SQL 查询语句给数据库服务器。 2. …...
算法之区间和题目讲解
题干 难度:简单 题目分析 题目要求算出每个指定区间内元素的总和。 然而,区间在输入的最下面,所以按照暴力破解的思路,我们首先要遍历数组,把它的值都存进去。 然后,遍历下面的区间,从索引a…...
价格分类(神经网络)
# 1.导入依赖包 import timeimport torch import torch.nn as nn import torch.optim as optimfrom torch.utils.data import TensorDataset, DataLoader from sklearn.model_selection import train_test_splitimport numpy as np import pandas as pd import matplotlib.pypl…...
对智能电视直播App的恶意监控
首先我们要指出中国广电总局推出的一个政策性文件是恶意监控的始作俑者,这个广电总局的政策性文件禁止智能电视和电视盒子安装直播软件。应该说这个政策性文件是为了保护特殊利益集团,阻挠技术进步和发展的。 有那么一些电视机和电视盒子的厂商和电信运…...
【JavaEE初阶】多线程初阶下部
文章目录 前言一、volatile关键字volatile 能保证内存可见性 二、wait 和 notify2.1 wait()方法2.2 notify()方法2.3 notifyAll()方法2.4 wait 和 sleep 的对比(面试题) 三、多线程案例单例模式 四、总结-保证线程安全的思路五、对比线程和进程总结 前言…...
macOS上进行Ant Design Pro实战教程(一)
由于一个AI项目的前端使用了umi,本教程根据阿里官网上的 《Ant Design 实战教程(beta 版)》来实操一下,我使用macOS操作系统,VS Code 开发环境。 一、开发环境 1、安装nodejs, npm, yarn 官网上建议使用cnpm…...
摄影师网站制作/seo技术网网
本篇的思维导图: 数据预处理:数据的汇总 数据透视表pivot_table()函数 透视表功能该功能的主要目的就是实现数据的汇总统计。pandas模块中的pivot_table函数就是实现透视表功能的强大函数。 代码 import numpy as...
网站建设报告实训步骤/百度客服电话号码
统一项目管理平台(UMPlatForm.NET) 4.10 用户权限管理模块 统一项目管理平台(UMPlatForm.NET),基于.NET的快速开发、整合框架。 4.10用户权限管理模块 在实际应用中我们会发现,权限控制会经常变动,如&#…...
网站百度抓取/google安卓手机下载
需求如下: 有一个教学周历,每个月的第一天显示月份,比如九月、十月、十一月。该月其余的日期直接显示“日”即可。 举例: 原数据显示值2022-07-12122022-08-01八月2022-09-02022022-10-11112022-11-01十一月 方法一:…...
wap 网站 css学习/seo营销技巧培训班
一个朋友推荐使用的低代码开发平台,Joget,中文明捷得,来自马来,这两年已在国内陆续推广,这周抽空试用一段时间,感觉很不错,感觉和unifier有的一拼(某些领域),…...
深圳网站制作公司兴田德润信任高/重庆森林电影完整版
2019独角兽企业重金招聘Python工程师标准>>> Web服务器工作原理概述 很多时候我们都想知道,web容器或web服务器(比如Tomcat或者jboss)是怎样工作的?它们是怎样处理来自全世界的http请求的?它们在幕后做了什…...
网站嵌入js/广东百度seo关键词排名
本文主要向大家介绍了 C/C知识点之vs2015下配置MySQL,使之能使用c连接完美运行,通过具体的内容向大家展示,希望对大家学习C/C知识点有所帮助。这个网址安装及配置MySQL,里边内容很全,可是有一点不足,就是按…...