网页设计个人总结800/英文seo推广
Flink笔记整理(五)
文章目录
- Flink笔记整理(五)
- 七、处理函数(最底层最常用最灵活)
- 7.1基本处理函数(ProcessFunction)
- 处理函数的功能和使用
- ProcessFunction解析
- 7.2按键分区处理函数(KeyedProcessFunction)
- 定时器(Timer)和定时服务(TimeService)
- 7.3 窗口处理函数
- 窗口处理函数的使用
- ProcessWindowFunction解析
- 7.4 应用案例——Top N
- 总结
七、处理函数(最底层最常用最灵活)
之前所介绍的流处理API,无论是基本的转换、聚合,还是更为复杂的窗口操作,其实都是基于DataStream进行转换的,所以可以统称为DataStream API。
在Flink更底层,我们可以不定义任何具体的算子(比如map,filter,或者window),而只是提炼出一个统一的“处理”(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作“处理函数”(process function)。
7.1基本处理函数(ProcessFunction)
处理函数的功能和使用
之前学习的转换算子,一般只是针对某种具体操作来定义的,能够拿到的信息比较有限。如果我们想要访问事件的时间戳,或者当前的水位线信息,都是完全做不到的。跟时间相关的操作,目前我们只会用窗口来处理。而在很多应用需求中,要求我们对时间有更精细的控制,需要能够获取水位线,甚至要“把控时间”、定义什么时候做什么事,这就不是基本的时间窗口能够实现的了。
这时就需要使用底层的处理函数。处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”。而且处理函数继承了AbstractRichFunction抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑。
处理函数的使用与基本的转换操作类似,只需要直接基于DataStream调用.process()方法就可以了。方法需要传入一个ProcessFunction作为参数,用来定义处理逻辑。
stream.process(new MyProcessFunction())
这里ProcessFunction不是接口,而是一个抽象类,继承了AbstractRichFunction;MyProcessFunction是它的一个具体实现。所以所有的处理函数,都是富函数(RichFunction),富函数可以调用的东西这里同样都可以调用。
ProcessFunction解析
在源码中我们可以看到,抽象类ProcessFunction继承了AbstractRichFunction,有两个泛型类型参数:I表示Input,也就是输入的数据类型;O表示Output,也就是处理完成之后输出的数据类型。内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()。
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {...public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}...
}
ProcessFunction解析
7.2按键分区处理函数(KeyedProcessFunction)
在上节中提到,只有在KeyedStream中才支持使用TimerService设置定时器的操作。所以一般情况下,我们都是先做了keyBy分区之后,再去定义处理操作;代码中更加常见的处理函数是KeyedProcessFunction。
ProcessFunction解析
定时器(Timer)和定时服务(TimeService)
定时器(Timer)和定时服务(TimeService)及例子
7.3 窗口处理函数
除了KeyedProcessFunction,另外一大类常用的处理函数,就是基于窗口的ProcessWindowFunction和ProcessAllWindowFunction了。在第六章窗口函数的介绍中,我们之前已经简单地使用过窗口处理函数了。
窗口处理函数的使用
进行窗口计算,我们可以直接调用现成的简单聚合方法(sum/max/min),也可以通过调用.reduce()或.aggregate()来自定义一般的增量聚合函数(ReduceFunction/AggregateFucntion);而对于更加复杂、需要窗口信息和额外状态的一些场景,我们还可以直接使用全窗口函数、把数据全部收集保存在窗口内,等到触发窗口计算时再统一处理。窗口处理函数就是一种典型的全窗口函数。
窗口处理函数ProcessWindowFunction的使用与其他窗口函数类似,也是基于WindowedStream直接调用方法就可以,只不过这时调用的是.process()。
stream.keyBy( t -> t.f0 ).window( TumblingEventTimeWindows.of(Time.seconds(10)) ).process(new MyProcessWindowFunction())
ProcessWindowFunction解析
ProcessWindowFunction既是处理函数又是全窗口函数。从名字上也可以推测出,它的本质似乎更倾向于“窗口函数”一些。事实上它的用法也确实跟其他处理函数有很大不同。我们可以从源码中的定义看到这一点:
ProcessWindowFunction解析
7.4 应用案例——Top N
案例需求:实时统计一段时间内的出现次数最多的水位。例如,统计最近10秒钟内出现次数最多的两个水位,并且每5秒钟更新一次。我们知道,这可以用一个滑动窗口来实现。于是就需要开滑动窗口收集传感器的数据,按照不同的水位进行统计,而后汇总排序并最终输出前两名。这其实就是著名的“Top N”问题。
案例实现代码
总结
相关文章:

Flink笔记整理(五)
Flink笔记整理(五) 文章目录 Flink笔记整理(五)七、处理函数(最底层最常用最灵活)7.1基本处理函数(ProcessFunction)处理函数的功能和使用ProcessFunction解析 7.2按键分区处理函数&…...

数据分析概要【数据分析---偏企业】
各位大佬好 ,这里是阿川的博客,祝您变得更强 个人主页:在线OJ的阿川 大佬的支持和鼓励,将是我成长路上最大的动力 阿川水平有限,如有错误,欢迎大佬指正 数据分析概要前 必看 Python 初阶 Python–语言基础…...

PDF编辑器大分享,这三款加速PDF编辑!
嘿,各位办公室的小伙伴们,今儿咱们来聊聊那些让咱们文员生活变得更加轻松愉快的神器——PDF编辑器!作为每天跟文档打交道的“文字魔术师”,选对工具那可真是事半功倍啊。今天,我就从我的亲身体验出发,给大伙…...

Python --Pandas库基础方法(2)
文章目录 Pandas 变量类型的转换查看各列数据类型改变数据类型 重置索引删除行索引和切片seriesDataFrame取列按行列索引选择loc与iloc获取 isin()选择query()的使用排序用索引排序使用变量值排序 修改替换变量值对应数值的替换 数据分组基于拆分进行筛选 分组汇总引用自定义函…...

《Programming from the Ground Up》阅读笔记:p75-p87
《Programming from the Ground Up》学习第4天,p75-p87总结,总计13页。 一、技术总结 1.persistent data p75, Data which is stored in files is called persistent data, because it persists in files that remain on disk even when the program …...

Python面试整理-常用标准库
Python的标准库包含了大量的模块和包,支持各种编程任务,从文件处理、数据序列化,到网络编程等。这些模块预安装在Python中,无需额外安装就可以使用。以下是一些非常有用且常用的标准库模块: 1. os 用于与操作系统进行交互,包括文件和目录管理操作。 import os # 获取当前…...

halcon_C#联合halcon打开摄像头
1. 创建halcon项目 -> 2.测试连接 -> 3. 在halcon中打开摄像头成功 -> 4. 插入代码 -> 5. 导出为.cs文件 6. 创建VS项目 -> 7.将action部分代码嵌入winform -> 8. 编写代码 -> // 导入HalconDotNet命名空间,这是用于Halcon图像处理的…...

无标题栏窗口通过消息模拟拖动窗口时,无法拖动的一个原因
在使用DUI库或者web控件来做窗口和UI时,常常遇到一个问题:整个窗口如果设置了CAPTION区域,那么在CAPTION区域中,web页面的内容无法正常响应鼠标事件,如果不设置CAPTION区域,那么对于窗口的拖动又有影响。在…...

每天一个数据分析题(四百五十四)- 调研问卷
选择题是设计市场调查问卷时常用的题目类型,关于多选题和单选题的优缺点,以下说法不正确的是? A. 多选题相比单选题提供的信息量大。 B. 单选题提供的信息量相对较少,但比较便于后期编码和统计分析。 C. 单选题和多选题可以同时…...

红酒与家居:打造优雅生活空间
在繁忙的都市生活中,我们渴望拥有一处宁静而优雅的家居空间,那里不仅是我们休憩的港湾,更是我们品味生活、享受时光的地方。当定制红酒与家居设计相遇,它们便共同绘制出一幅充满韵味与格调的生活画卷。今天,就让我们一…...

未来生成式 AI 的发展方向,是 Chat 还是 Agent?
什么是生成式AI? 生成式人工智能(Generative AI)是一种人工智能技术,它能够基于已有的数据模式和结构生成新的数据实例,这些实例可以是文本、图像、音频、视频或任何其他类型的数据。这种技术通常依赖于复杂的算法&am…...

powershell@日期和时间命令和对象
文章目录 abstract获取当前日期和时间格式化日期和时间日期计算👺创建自定义日期和时间👺**[datetime] 类型**及其构造函数缺省值计算日期差异获取特定部分的日期和时间比较日期和时间 常用日期操作总结表时间间隔 TimeSpan 👺创建TimeSpan对…...

【Golang 面试 - 基础题】每日 5 题(八)
✍个人博客:Pandaconda-CSDN博客 📣专栏地址:http://t.csdnimg.cn/UWz06 📚专栏简介:在这个专栏中,我将会分享 Golang 面试中常见的面试题给大家~ ❤️如果有收获的话,欢迎点赞👍收藏…...

LeetCode 算法:在排序数组中查找元素的第一个和最后一个位置 c++
原题链接🔗:在排序数组中查找元素的第一个和最后一个位置 难度:中等⭐️⭐️ 题目 给你一个按照非递减顺序排列的整数数组 nums,和一个目标值 target。请你找出给定目标值在数组中的开始位置和结束位置。 如果数组中不存在目标…...

会话存储、本地存储,路由导航守卫、web会话跟踪、JWT生成token、axios请求拦截、响应拦截
1、会话存储、本地存储 前端浏览器中存储用户信息,会话存储、本地存储、cookie 会话存储(sessionStorage):会话期间存储,关闭浏览器后,数据就会销毁 sessionStorage.setItem("account",resp.d…...

strcmp库函数原型
int strcmp(const char *str1, const char *str2) {unsigned const char *s1 (unsigned const char *) str1;unsigned const char *s2 (unsigned const char *) str2;while (*s1 && *s1 *s2) {s1;s2;}return *s1 - *s2; }while (*s1 && *s1 *s2) 一直循环&…...

在 Vue.js 项目中延迟加载子组件
在 Vue.js 中,当父组件渲染时,子组件的生命周期钩子函数会立即执行,即使这些子组件并未显示。这是因为 Vue.js 会在渲染父组件时实例化所有引用的子组件。为了避免不必要的函数执行,我们可以通过使用 v-if 指令和异步组件延迟加载…...

何时会用到设计模式、七大设计原则介绍
以下关于b站尚硅谷相关设计模式视频的总结 设计模式的重要性: 代码重用性(相同的代码,不用编写很多次)、 可读性(编程规范,便于其他程序员阅读和理解)、 可扩展性(增加新功能时&am…...

编程语言发展历史:赋值与相等运算符的变迁历程
本文摘取自笔者书稿《编程语言发展历史》 赋值运算符是编程语言最基础的运算符,其发展历史也非常有趣。最早的赋值语句就是使用等号“”来表示,一些语言为了让赋值运算在数学形式上更加严谨(形如“x x 1”的表达式在数学上不成立࿰…...

求职Leetcode题目(2)
1.柱状图中最大的矩形 据说这是2024年字节二面的题目,我感觉这道题跟接雨水有点类似,最重要的思路还是要找到什么时候能形成矩形的这么个情况,某个范围的矩形的高度,是由最短的柱形来决定的。 我们先整理一下,解决这道…...

深入探索 Postman:使用 API 性能测试优化你的 Web 服务
引言 在当今快速发展的互联网时代,Web 服务的性能至关重要。API 作为服务之间的桥梁,其性能直接影响到整个应用的响应速度和用户体验。Postman,作为一个多功能的 API 开发工具,提供了强大的性能测试功能,帮助开发者评…...

校车购票小程序的设计
管理员账户功能包括:系统首页,个人中心,学生管理,我的乘车信息管理,车辆信息管理,座位管理,系统管理 微信端账号功能包括:系统首页,车辆信息,我的 开发系统…...

拯救数据危机!2024年最受欢迎的数据恢复软件评测
现在大家快速传输资料的方式都变成了电子档,有些数据是存储在电脑上,有些存储在手机,有的存储在U盘甚至其他一些电子设备上。电子设备存储数据方便,丢失数据也总在意料之外。很多时候我们多学会一个工具,比如转转大师数…...

记一次因为在html两个地方引入vue.js导致组件注入失败的问题
这个问题我遇到两次了,是在恼火,不对,三次了,我如果不做这个笔记,我确定我还会遇到第三次。 尾部这个去掉就行 因为头部有了 遇到这种bu g好恼火,解决了又怎么样呢?重蹈覆辙的滋味不好受...

Postman中的智慧重试:API测试用例的错误处理与重试逻辑设置
Postman中的智慧重试:API测试用例的错误处理与重试逻辑设置 在API测试过程中,错误处理和重试逻辑是确保测试准确性和可靠性的重要环节。Postman提供了多种功能来处理测试中可能出现的错误,并允许自定义重试逻辑以适应不同的测试场景。本文将…...

docker部署本地词向量模型
开源项目:GitHub - huggingface/text-embeddings-inference: A blazing fast inference solution for text embeddings models 1. 下载词向量模型 参考我的另一篇博客:langchain 加载本地词向量模型 2. 部署词向量模型 就三行命令 model/data/BAAI/…...

接口自动化中对于文件上传的处理方法
正常的接口自动化基本都是json的格式,对于文件上传是一种特殊的格式是表单格式针对这种表单格式在接口自动化中怎么处理,主要通过工作中使用的一个实际的例子进行分享 举例:web上需要导入一个文件实现相关的功能,主要通过两个接口…...

Java高频面试题分享
文章目录 1. 策略模式怎么控制策略的选取1.1 追问:如果有100种策略呢?1.2 追问:什么情况下初始化Map 2. 什么是索引?什么时候用索引?2.1 追问:怎么判断系统什么时候用量比较少2.2 追问:如何实时…...

kvm虚拟化平台部署
kvm虚拟化平台部署 kvm概念简介 kvm自linux2.6版本以后就整合到内核中,因此可以看做是一个原生架构. kvm虚拟化架构 硬件底层提供物理层面的硬件支持 linux(host),就相当于这个架构中的宿主机,上面运行了多个虚拟机。…...

利用arthas热更新class文件
利用arthas热更新class文件 背景:发现一个bug,家里难以复现,需要在现场环境更新几行代码验证。 arthas-boot version: 3.7.1 java -jar arthas-boot.jar启动arthas 1、利用arthas的sc命令查找确定类名称 sc com.**2、反编译为java文件 …...