崇明网站怎么做seo/色盲悖论
按键分区处理函数(KeyedProcessFunction):先进行分区,然后定义处理操作
1.定时器(Timer)和定时服务(TimerService)
- 定时器(timers)是处理函数中进行时间相关操作的主要机制
- 定时服务(TimerService)提供了注册定时器的功能
TimerService
是 Flink 关于时间和定时器的基础服务接口:
// 获取当前的处理时间
long currentProcessingTime();
// 获取当前的水位线(事件时间)
long currentWatermark();
// 注册处理时间定时器,当处理时间超过 time 时触发
void registerProcessingTimeTimer(long time);
// 注册事件时间定时器,当水位线超过 time 时触发
void registerEventTimeTimer(long time);
// 删除触发时间为 time 的处理时间定时器
void deleteProcessingTimeTimer(long time);
// 删除触发时间为 time 的事件时间定时器
void deleteEventTimeTimer(long time);
六个方法可以分成两大类:基于处理时间和基于事件时间。而对应的操作主要有三个:获取当前时间,注册定时器,以及删除定时器
尽管处理函数中都可以直接访问TimerService,不过只有基于 KeyedStream 的处理函数,才能去调用注册和删除定时器的方法;未作按键分区的 DataStream 不支持定时器操作,只能获取当前时间
对于处理时间和事件时间这两种类型的定时器,TimerService 内部会用一个优先队列将它们的时间戳保存起来,排队等待执行;可以认为,定时器其实是 KeyedStream上处理算子的一个状态,它以时间戳作为区分。所以 TimerService 会以键(key)和时间戳为标准,对定时器进行去重;也就是说对于每个 key 和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次
基于 KeyedStream 注册定时器时,会传入一个定时器触发的时间戳,这个时间戳的定时器对于每个 key 都是有效的;利用这个特性,有时我们可以故意降低时间戳的精度,来减少定时器的数量,从而提高处理性能。比如我们可以在设置定时器时只保留整秒数,那么定时器的触发频率就是最多 1 秒一次:
long coalescedTime = time / 1000 * 1000; //时间戳(定时器默认的区分精度是毫秒)
ctx.timerService().registerProcessingTimeTimer(coalescedTime); //注册定时器
2.KeyedProcessFunction 的使用
基础用法:
stream.keyBy( t -> t.f0 ).process(new MyKeyedProcessFunction())
这里的MyKeyedProcessFunction
即是KeyedProcessFunction
的一个实现类;
源码解析
KeyedProcessFunction
源码如下:
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {private static final long serialVersionUID = 1L;/*** Process one element from the input stream.** <p>This function can output zero or more elements using the {@link Collector} parameter and* also update internal state or set timers using the {@link Context} parameter.** @param value The input value.* @param ctx A {@link Context} that allows querying the timestamp of the element and getting a* {@link TimerService} for registering timers and querying the time. The context is only* valid during the invocation of this method, do not store it.* @param out The collector for returning result values.* @throws Exception This method may throw exceptions. Throwing an exception will cause the* operation to fail and may trigger recovery.*/public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;/*** Called when a timer set using {@link TimerService} fires.** @param timestamp The timestamp of the firing timer.* @param ctx An {@link OnTimerContext} that allows querying the timestamp, the {@link* TimeDomain}, and the key of the firing timer and getting a {@link TimerService} for* registering timers and querying the time. The context is only valid during the invocation* of this method, do not store it.* @param out The collector for returning result values.* @throws Exception This method may throw exceptions. Throwing an exception will cause the* operation to fail and may trigger recovery.*/public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}/*** Information available in an invocation of {@link #processElement(Object, Context, Collector)}* or {@link #onTimer(long, OnTimerContext, Collector)}.*/public abstract class Context {/*** Timestamp of the element currently being processed or timestamp of a firing timer.** <p>This might be {@code null}, for example if the time characteristic of your program is* set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.*/public abstract Long timestamp();/** A {@link TimerService} for querying time and registering timers. */public abstract TimerService timerService();/*** Emits a record to the side output identified by the {@link OutputTag}.** @param outputTag the {@code OutputTag} that identifies the side output to emit to.* @param value The record to emit.*/public abstract <X> void output(OutputTag<X> outputTag, X value);/** Get key of the element being processed. */public abstract K getCurrentKey();}/*** Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.*/public abstract class OnTimerContext extends Context {/** The {@link TimeDomain} of the firing timer. */public abstract TimeDomain timeDomain();/** Get key of the firing timer. */@Overridepublic abstract K getCurrentKey();}
}
可以看到和ProcessFunction
类似,都有一个processElement()
和onTimer()
方法,并且定义了一个Context
抽象类;不同点在于类型参数多了一个K,也就是key的类型;
代码示例
①处理时间语义
public class ProcessingTimeTimerTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 处理时间语义,不需要分配时间戳和watermarkSingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());// 要用定时器,必须基于KeyedStreamstream.keyBy(data -> true).process(new KeyedProcessFunction<Boolean, Event, String>() {@Overridepublic void processElement(Event value, Context ctx, Collector<String> out) throws Exception {Long currTs = ctx.timerService().currentProcessingTime();out.collect("数据到达,到达时间:" + new Timestamp(currTs));// 注册一个10秒后的定时器ctx.timerService().registerProcessingTimeTimer(currTs + 10 * 1000L);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {out.collect("定时器触发,触发时间:" + new Timestamp(timestamp));}}).print();env.execute();}
}
通过ctx.timerService().currentProcessingTime()
获取当前处理时间;
通过ctx.timerService().registerProcessingTimeTimer
来设置一个定时器;
运行结果如下:
由于定时器是处理时间的定时器,不用考虑水位线延时问题,因此10s后能够准时触发定时操作;
②事件时间语义:
public class EventTimeTimerTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> stream = env.addSource(new CustomSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));// 基于KeyedStream定义事件时间定时器stream.keyBy(data -> true).process(new KeyedProcessFunction<Boolean, Event, String>() {@Overridepublic void processElement(Event value, Context ctx, Collector<String> out) throws Exception {out.collect("数据到达,时间戳为:" + ctx.timestamp());out.collect("数据到达,水位线为:" + ctx.timerService().currentWatermark() + "\n -------分割线-------");// 注册一个10秒后的定时器ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10 * 1000L);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {out.collect("定时器触发,触发时间:" + timestamp);}}).print();env.execute();}// 自定义测试数据源public static class CustomSource implements SourceFunction<Event> {@Overridepublic void run(SourceContext<Event> ctx) throws Exception {// 直接发出测试数据ctx.collect(new Event("Mary", "./home", 1000L));// 为了更加明显,中间停顿5秒钟Thread.sleep(5000L);// 发出10秒后的数据ctx.collect(new Event("Mary", "./home", 11000L));Thread.sleep(5000L);// 发出10秒+1ms后的数据ctx.collect(new Event("Alice", "./cart", 11001L));Thread.sleep(5000L);}@Overridepublic void cancel() { }}
}
运行结果如下:
运行结果解释:
①第一条数据到来时,时间戳为1000,但由于水位线的生成是周期性的(默认200ms),因此水位线不会立即发送改变,仍然是Long.MIN_VALUE,之后只要到了水位线生成的时间周期,就会依据当前最大的时间戳来生成水位线(默认减1)
②第二条数据到来时,显然水位线已经推进到了999,但仍然不会立即改变;
③在事件时间语义下,定时器触发的条件就是水位线推进到设定的时间;第一条数据到来之后,设定的定时器时间为11000,而当时间戳为11000的数据到来时,水位线还停留在999的位置,因此不会立即触发定时器;之后水位线会推进到10999(11000-1),同样无法触发定时器;
④第三条数据到来时,时间戳为11001,此时水位线推进到了10999,等到水位线周期性更新后,推进到11000(11001-1),这样第一个定时器就会触发
⑤然后等待5s后,没有新的数据到来,整个程序结束,将要退出,此时会将水位线推进到Long.MAX_VALUE,所以所有没有触发的定时器统一触发;
学习课程链接:【尚硅谷】Flink1.13实战教程(涵盖所有flink-Java知识点)_哔哩哔哩_bilibili
相关文章:

Flink处理函数(2)—— 按键分区处理函数
按键分区处理函数(KeyedProcessFunction):先进行分区,然后定义处理操作 1.定时器(Timer)和定时服务(TimerService) 定时器(timers)是处理函数中进行时间相关…...

服务器数据恢复—服务器进水导致阵列中磁盘同时掉线的数据恢复案例
服务器数据恢复环境: 数台服务器数台存储阵列柜,共上百块硬盘,划分了数十组lun。 服务器故障&检测: 外部因素导致服务器进水,进水服务器中一组阵列内的所有硬盘同时掉线。 北亚数据恢复工程师到达现场后发现机房内…...

npm或者pnpm或者yarn安装依赖报错ENOTFOUND解决办法
如果报错说安装依赖报错,大概率是因为npm源没有设置对,比如我这里安装protobufjs的时候报错:ENOTFOUND npm ERR! code ENOTFOUND npm ERR! syscall getaddrinfo npm ERR! errno ENOTFOUND npm ERR! network request to https://registry.cnpm…...

学会使用ubuntu——ubuntu22.04使用Google、git的魔法操作
ubuntu22.04使用Google、git的魔法操作 转战知乎写作 https://zhuanlan.zhihu.com/p/679332988...

【机组】计算机组成原理实验指导书.
🌈个人主页:Sarapines Programmer🔥 系列专栏:《机组 | 模块单元实验》⏰诗赋清音:云生高巅梦远游, 星光点缀碧海愁。 山川深邃情难晤, 剑气凌云志自修。 目录 第一章 性能特点 1.1 系…...

解决Sublime Text V3.2.2中文乱码问题
目录 中文乱码出现情形通过安装插件来解决乱码问题 中文乱码出现情形 打开一个中文txt文件,显示乱码,在File->Reopen With Encoding里面找不到支持简体中文正常显示的编码选项。 通过安装插件来解决乱码问题 安装Package Control插件 打开Tool->…...

Oracle 12CR2 RAC部署翻车,bug避坑经历
📢📢📢📣📣📣 哈喽!大家好,我是【IT邦德】,江湖人称jeames007,10余年DBA及大数据工作经验 一位上进心十足的【大数据领域博主】!😜&am…...

情绪共享机器:潜力与挑战
在设想的未来科技世界中,有一种神奇的机器,它能够让我们戴上后即刻感知并体验他人当下的情绪。这种情绪共享机器无疑将深刻地改变我们对人际关系、社会交互乃至人性本质的理解。然而,这一科技创新所带来的影响并非全然积极,也伴随…...

docker 安装python3.8环境镜像并导入局域网
一、安装docker yum -y install docker docker version #显示 Docker 版本信息 可以看到已经下载下来了 拉取镜像python3镜像 二、安装docker 中python3环境 运行本地镜像,并进入镜像环境 docker run -itd python-38 /bin/bash docker run -itd pyth…...

修复“电脑引用的账户当前已锁定”问题的几个方法,看下有没有能帮助到你的
面对“电脑引用的账户当前已锁定,且可能无法登录”可能会让你感到焦虑。这是重复输入错误密码后出现的登录错误。当帐户锁定阈值策略配置为限制未经授权的访问时,就会发生这种情况。 但是,如果你在等待半小时后输入正确的密码,你可以重新访问你的帐户。同样,如果你有一个…...

vp9协议笔记
vp9协议笔记📒 本文主要是对vp9协议的梳理,协议的细节参考官方文档:VP9协议链接(需要加速器) vp9协议笔记 vp9协议笔记📒1. 视频编码概述2. 超级帧superframe(sz):2. fr…...

信息检索与数据挖掘 | (九)Link Analysis(链接分析)
文章目录 📚链接分析📚随机矩阵📚random walk📚Google formulation 📚链接分析 将链接看做投票,从重要的网站来的链接其权重更高,所以是递归定义的。 如果网页j权重为rj,有n个出边&…...

yarn的安装及使用教程
Yarn 是一个快速、可靠、安全的包管理工具,用于管理 JavaScript 项目的依赖项。下面是关于 Yarn 的安装和基本使用的详细教程: 安装 Yarn 访问 Yarn 官网 并按照指示下载适合你操作系统的安装程序。安装程序会自动安装 Yarn,并将其添加到系…...

最新AI系统ChatGPT网站H5系统源码,支持Midjourney绘画,GPT语音对话+ChatFile文档对话总结+DALL-E3文生图
一、前言 SparkAi创作系统是基于ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统,支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美,那么如何搭建部署AI创作ChatGPT?小编这里写一个详细图文教程吧。已支持GPT…...

学会使用ubuntu——ubuntu22.04使用WebCatlog
Ubuntu22.04使用WebCatlog WebCatlog是适用于Gnu / Linux,Windows或Mac OS X系统的桌面程序。 引擎基于铬,它用于在我们的桌面上处理Web服务。简单点就是把网页单独一个窗口出来显示,当一个app用。本文就是利用WebCatlog安装后的notion编写的…...

(Arcgis)Python3.8批量裁剪利用shp文件裁剪tif栅格影像数据
使用环境: pycharm2020 arcgis pro 中的python3.8 一、pycharm中设置python编译器。左上角“文件”——“设置”——找到python interpreter——找到arcgis pro安装文件夹中的python D:\ArcGIS Pro\bin\Python\envs\arcgispro-py3\python.exe使用arcgis pro原因&a…...

漏洞补丁修复之openssl版本从1.1.1q升级到1.1.1t以及python版本默认2.7.5升级到2.7.18新版本和Nginx版本升级到1.24.0
一、Openssl升级 1、查看Openssl安装的版本 openssl version 2、查看Openssl路径 which openssl 3、上传openssl安装包到服务器:openssl-1.1.1t.tar.gz,并且解压,安装: mv /usr/local/openssl /usr/local/backup_openssl_1.1.1q_20240120 mkdir /usr/local/openssl tar…...

HCIP-BGP实验4
搭建实验拓扑图 要求 1.全网可达 2.isp只能配置IP地址 实验开始 配置IP地址及环回 r1,r2,r9,r10配ipv4地址(以r1为例) [Huawei]sysname r1 [r1]interface g0/0/0 [r1-GigabitEthernet0/0/0]ip address 12.1.1.1 24 [r1-GigabitEthernet0/0/0]q [r1]interface LoopBack 0…...

数据挖掘笔记1
课程:清华大学-数据挖掘:理论与算法(国家级精品课)_哔哩哔哩_bilibili 一、Learning Resources 二、Data 数据是最底层的一种表现形式。数据具有连续性。从存储上来讲,数据分为逻辑上的和物理层的。大数据࿱…...

Spring RabbitMQ那些事(3-消息可靠传输和订阅)
目录 一、序言二、生产者确保消息发送成功1、为什么需要Publisher Confirms2、哪些消息会被确认处理成功 三、消费者保证消息被处理四、Spring RabbitMQ支持代码示例1、 application.yml2、RabbigtMQ配置3、可靠生产者配置4、可靠消费者配置5、测试用例 一、序言 在有些业务场…...

揭秘 Kafka 高性能之谜:一文读懂背后的设计精粹与技术实现
Kafka在性能方面有着显著的优势,这也使得Kafka的应用非常广泛,那kakfa的性能为何如此优异呢?本文将带你探寻kafka高性能之谜。 kafka的高性能概括起来有如下几点:顺序写入磁盘与I/O优化、批量处理、页缓存、零拷贝技术、分区并行处…...

canvas绘制美国国旗(USA Flag)
查看专栏目录 canvas实例应用100专栏,提供canvas的基础知识,高级动画,相关应用扩展等信息。canvas作为html的一部分,是图像图标地图可视化的一个重要的基础,学好了canvas,在其他的一些应用上将会起到非常重…...

Python中的`__all__`魔法函数使用详解
概要 Python是一门灵活而强大的编程语言,提供了各种机制来控制模块的导入和访问。其中,__all__魔法函数是一种用于限制模块导入的机制,可以明确指定哪些变量、函数或类可以被导入。本文将深入探讨__all__的作用、用法以及示例,以…...

Studio One 6 mac 6.5.2 激活版 数字音乐编曲创作
PreSonus Studio One是PreSonus出品的一款功能强大的音乐创作软件。主要为用户提供音乐创作、录音、编辑、制作等功能。它可以让你创造音乐,无限的轨道,无限的MIDI和乐器轨道,虚拟乐器和效果通道,这些都是强大和完美的。 软件下载…...

GitHub图床TyporaPicGo相关配置
本文作者: slience_me 文章目录 GitHub图床&Typora&PicGo相关配置1. Github配置2. picGo配置3. Typora配置 GitHub图床&Typora&PicGo相关配置 关于Typora旧版的百度网盘下载路径 链接:https://pan.baidu.com/s/12mq-dMqWnRRoreGo4MTbKg?…...

FireAlpaca:轻量级、免费的Mac/Win绘图软件,让你的创意如火燃烧!
FireAlpaca是一款轻量级、免费的绘图软件,适用于Mac和Win系统,让你的创作过程更加快捷、简便。无论是绘制漫画、插图、设计作品还是进行简单的图片编辑,FireAlpaca都能满足你的需求。 首先,FireAlpaca具有直观友好的用户界面&…...

用 Python 制作可视化 GUI 界面,一键实现自动分类管理文件!
经常杂乱无章的文件夹会让我们找不到所想要的文件,因此小编特意制作了一个可视化GUI界面,通过输入路径一键点击实现文件分门别类的归档。 不同的文件后缀归类为不同的类别 我们先罗列一下大致有几类文件,根据文件的后缀来设定,大…...

【STM32】USB程序烧录需要重新上电 软件复位方法
文章目录 一、问题二、解决思路2.1 直接插拔USB2.2 给芯片复位 三、解决方法3.1 别人的解决方法3.2 在下载界面进行设置 一、问题 最近学习STM32的USB功能,主要是想要使用虚拟串口功能(VCP),发现每次烧录之后都需要重新上电才可以…...

Java数据结构与算法:图算法之深度优先搜索(DFS)
Java数据结构与算法:图算法之深度优先搜索(DFS) 大家好,我是免费搭建查券返利机器人赚佣金就用微赚淘客系统3.0的小编,一个热爱编程的程序猿。今天,让我们一起探索图算法中的深度优先搜索(DFS&…...

SpringBoot整合QQ邮箱发送验证码
一、QQ开启SMTP 打开QQ邮箱,点击设置,进入账号,往下滑后,看见服务状态后,点击管理服务 进入管理服务后,打开服务,然后获取授权码 二 、导入依赖 <!-- 邮箱--><dependency>&…...