当前位置: 首页 > news >正文

Flink时间和窗口

目录

时间语义

水位线(Watermarks) 

并行流中的水位线

窗口

滚动窗口—Tumbling Windows

滑动窗口—Sliding Windows

会话窗口—Session Windows

 全局窗口—Global Windows

例子


时间语义

        如图所示,由事件生成器(Event Producer)生成事件,生成的事件数据被收集起来,首先进入分布式消息队列(Message Queue),然后被 Flink 系统中的 Source 算子(Data Source)读取消费,进而向下游的窗口算子(Window Operator)传递,最终由窗口算子进行计算处理。

​ 有两个非常重要的时间点:

(1)一个是数据产生的时刻,我们把它叫作“事件时间”(Event Time);

(2)另一个是数据真正被Flink处理的时刻,叫作“处理时间”(Processing Time)。

我们所定义的窗口操作,到底是以哪种时间作为衡量标准,就是所谓的“时间语义”(Notions of Time)。由于分布式系统中存在网络传输延迟和时钟漂移,事件的处理时间相对发生时间会有所滞后。

事件时间(Event Time): 指每个事件在其产生设备上发生的时间。

处理时间(Processing Time): 是指对事件执行相应操作的机器的系统时间。

        当流程序(a streaming program)基于处理时间(Processing Time)运行时,所有基于时间的操作(如时间窗口)将使用运行相应操作的机器的系统时钟。每小时处理时间窗口(An hourly processing time window)将包括在系统时钟指示整小时之间到达特定操作的所有记录。例如,如果应用程序在上午9:15开始运行,那么第一个小时处理时间窗口,也就是[9,10),将包括上午9:15到上午10:00之间处理的事件,下一个窗口,也就是[10,11)将包括上午10:00到上午11:00之间处理的事件,依此类推。无论事件是以有序的状态进入Flink事件窗口,还是无序的状态进入Flink事件窗口,窗口都将按照处理时间对已经到达的事件进行计算。

        基于处理时间来处理数据的方式是有时间标准的,这个时间标准就是运行相应操作的机器的系统时钟。到什么时间就做什么事。

Processing time 提供了最佳的性能和最低的延迟,但是不能提供确定性,即计算结果是不确定的。例如,时间窗口为5min的求和统计,应用程序在 9:00 开始运行,则第一个时间窗口处理 [9:00, 9:05) 的事件,下一个窗口处理 [9:05, 9:10) 的事件,依此类推。通信延迟、作业故障重启等问题,可能导致窗口的计算结果是不一样的。如下图所示,假设事件(事件时间, 数值) 遇到上述问题,场景一:事件B(9:03,2)有网络延迟落在[9:10, 9:15),场景二:作业故障重启导致事件B(9:03,2)和事件C(9:06,3)落在[9:10, 9:15)。都会导致求和的结果不正确。

        Processing Time是指数据被Operator处理时当前所在主机的系统时间。当用户选择使用Processing Time时,在Flink中所有和时间相关的操作都会按照当前系统时间进行处理,例如:Window窗口划分。使用这种语义时Flink中处理数据延迟较低、处理性能高,无论进入到Flink中的源头数据是否有乱序,只要被Flink应用接收的数据都会按照当前数据处理时的系统时间赋值时间语义,可见这种语义虽然处理数据性能高但不能解决数据乱序和延迟问题,从而导致数据统计不精准。Processing Time适合计算精度要求不高的计算场景。

        既然使用Processing Time时间语义,计算的结果的正确性无法得到保证,那么使用Event Time语义呢?

        Event Time是每个事件在其生成设备上发生的时间,这个时间往往是嵌入在事件记录中,例如一条数据中的时间戳记录了该事件数据的产生时间,该时间与下游Flink处理时系统时间无关。如果每个事件包含事件时间,当事件经过网络传输流转到Flink中处理时,理论上来说,先产生的事件会比后产生的事件先到达Flink系统中被处理,但实际情况往往由于网络传输延迟导致早先产生的事件后到达Flink系统被处理的情况(数据延迟到达),这就出现了数据乱序。但基于Event Time的时间概念,我们可以让Flink进行数据处理时基于事件产生的时间处理,这样就可以还原事件的先后关系,保证数据处理的准确性。Event Time 时间语义在实际生产环境中使用较多,该时间语义能保证乱序数据处理的准确性。

        当Flink应用使用Event Time作为时间的衡量标准。窗口计算什么时候触发计算呢?假设事件是有序到达窗口的,窗口每隔1个小时计算一次,当Flink应用程序接收到的第一个事件的事件时间为9:15,程序则认为现在的时间是9:15,可能机器的系统时间已经为10:00,但这并不重要,因为Flink程序已经使用事件时间语义。此时[9,10)窗口不会触发计算,Flink应用继续接收事件,当接收到事件时间为10:00或者之后的事件,则Flink应用程序认为现在时间已经到了10:00或者超过10:00点了,应该触发[9,10)窗口计算。此时就可以使用事件时间作为时间标记来触发窗口计算。

        那当事件因为延迟等原因无序到达窗口呢?比如此时10:00的事件已经到达窗口,是否要触发[9,10)窗口计算呢?还不可以,因为此时可能还有10:00之前的事件尚未到达,例如9:15的事件。那么Flink应用就需要“等一等”,但是这里我们又不能无限期的等待下去,所以这里需要有一个时间标记来决定何时触发窗口,就是要告诉Flink程序要等多久,这个时间标记就是Watermark。所以Watermark是基于Event Time语义给出的。

        其实除了上面两个时间语义,还有一个时间语义叫Ingestion Time,它指的是事件进入Flink的时间。


水位线(Watermarks) 

        Flink中测量Event Time进展的机制是水位线(Watermarks)。Watermarks作为数据流的一部分,并携带时间戳t。实际上Watermarks的本质就是一个时间戳t,它度量了Event Time到底进展到什么时候了。Watermarks(t)表明事件时间在该流中已经达到时间t,这意味着后面的流中不应该再有时间戳小于等于t的事件。

        下图显示了有序事件流。图中每个方块表示一个事件,方块中的数字表示事件时间,在本例中,事件是按顺序排列的(相对于它们的时间戳),这意味着Watermark只是流中的周期性标记。所谓周期性标记指的是Flink每隔200ms计算一次Watermark。

图中有两个Watermak,w(11)和w(20),它们分别表示事件时间已经达到11和20。事实上,对于有序流,Watermark可以使用事件的事件时间即可。在流中周期性的标记即可。

        Watermark对于乱序流( out-of-order streams)是至关重要的,如下图所示,其中事件不是按事件时间戳排序的。一般来说,Watermark(t)是一种声明,声明到流中的那个点,在时间戳t之前的所有事件都应该已经到达。例如w(11)声明到流中的这个位置,时间已经是11了。一旦Watermark到达一个算子,算子可以将其内部事件时间时钟的时间调整到Watermark的值。

Watermark是如何计算的呢?还记得之前说的“等一等”吗?“等一等”其实是事件到达Flink延迟的一个时间。例如上图中,当事件到达后,再等待4s。

w= max(事件时间)-延迟的时间

例如上图中,当事件7到达时,w=max(7)-4=3,

当事件11到达时,w=max(7,11)-4=7,

当事件15到达时,w=max(7,11,15)-4=11,

当事件9到达时,w=max(7,11,15,9)-4=11,

当事件12到达时,w=max(7,11,15,12)-4=11,

此时到了Watermarks每隔200ms计算一次,此时时间已经走到了w(11)。这是Flink认为小于等于11的事件已经全部到达。

如果是基于事件时间语义的有序流,Watermask计算时延迟的时间为0,即w=max(事件时间)。


并行流中的水位线

         水位线是在源函数(source functions)处或直接在源函数之后生成的。源函数的每个并行子任务通常独立地生成其水位线。这些水位线定义了特定并行源处的事件时间。

        当水位线在流处理项目中流动时,它们会提前到达算子处的事件时间。每当一个算子将其事件时间向前推进时,它就会在其后续算子的下游生成一个新的水位线。

        一些算子(Operation)使用多个输入流;例如,联合,或keyBy(…)或partition(…)函数后面的算子。这样,一个算子的当前事件时间是其输入流事件时间的最小值。当它的输入流更新它们的事件时间时,算子也会更新。

        下图显示了在并行流中流动的事件和水位线的示例,以及跟踪事件时间的操作。

图中绘制了并行度为2的流处理。流处理有source、map和window操作。操作右上角黄色方块中数字表示该算子的事件时间。 

例如图中window算子的事件时间为14,原因是map(1)算子的事件时间为29,同时map(2)算子的事件时间为14,两者同时向下游window算子推进,到当前window算子时,window算子取两者事件时间最小值作为当前算子的事件时间,所以window(1)和window(2)两个并行的算子的事件时间为14。


窗口

        聚合事件(例如计数、求和)在流处理中的工作方式与批处理中的不同。例如,计算流中的所有元素是不可能的,因为流通常是无限的。相反,流上的聚合(计数,总和等)是由窗口限定的,例如“过去5分钟的计数”或“最近100个元素的总和”。

        Windows可以是时间驱动的(例如:每30秒)或数据驱动的(例如:每100个元素)。通常可以区分不同类型的窗口,例如时间驱动窗口分为滚动窗口(没有重叠)、滑动窗口(有重叠)、会话窗口(被不活动的间隙打断)和全局窗口。数据驱动窗口有计数窗口

 图中黄色箭头表示事件流方向,每隔灰色方块表示一个事件。Time windows表示时间驱动的窗口,每隔多长时间统计一次,Count(3) windows表示数据驱动的窗口,每3个事件统计一次。

滚动窗口—Tumbling Windows

        滚动窗口赋值器将每个元素赋给指定窗口大小的窗口。滚动窗口有固定的大小,不重叠。例如,如果指定大小为5分钟的滚动窗口,则将评估当前窗口,并每五分钟启动一个新窗口,如下图所示。

滑动窗口—Sliding Windows

        滑动窗口赋值器将元素赋给固定长度的窗口。与滚动窗口分配器类似,窗口的大小由窗口大小参数配置。另一个窗口滑动参数控制滑动窗口启动的频率。因此,如果滑动块小于窗口大小,则滑动窗口可以重叠。在这种情况下,元素被分配给多个窗口。

        例如,您可以设置大小为10分钟的窗口,每隔5分钟滑动一次。这样,每隔5分钟就会出现一个窗口,其中包含最近10分钟内到达的事件,如下图所示。

当window slide<window size时,滑动后的窗口与滑动前的窗口之间会有重叠部分,例如图中window 1和window 2之间有重叠;

当window slide>=window size,滑动后的窗口与滑动前的窗口之间可能还会有间隙部分。

会话窗口—Session Windows

        会话窗口分配器按活动的会话对元素进行分组。会话窗口不重叠,也没有固定的开始和结束时间,这与滚动窗口和滑动窗口不同。相反,当会话窗口在一段时间内没有接收到元素时,即当出现不活动间隙时,会话窗口关闭。会话窗口分配器可以配置一个静态会话间隙,也可以配置一个会话间隙提取器函数,该函数定义了不活动的时间长度。当此期限到期时,当前会话关闭,并将后续元素分配给新的会话窗口。

 全局窗口—Global Windows

        全局窗口赋值器将具有相同键的所有元素赋给同一个全局窗口。此窗口方案仅在指定自定义触发器时才有用。否则,将不执行任何计算,因为全局窗口没有自然的结束。


例子

        这里以单词统计为例来说明水位线生成以及窗口使用,本例的需求每隔5s中,每个单词的数量,代码如下:

package com.leboop;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import javax.annotation.Nullable;
import javax.sound.midi.Soundbank;/*** Description TODO.* Date 2024/7/24 11:10** @author leb* @version 2.0*/
public class OrderStreamWatermarkDemo {public static void main(String[] args) throws Exception {// 1. 获取流执行环境.StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();System.out.println("默认并行度parallelism=" + env.getParallelism());env.setParallelism(1); // 为了方便测试,这里并行度设置为1,默认并行度为8.// 2. 获取默认的时间语义. 本api对应的flink版本1.9.3,默认时间语义为:ProcessingTimeTimeCharacteristic streamTimeCharacteristic = env.getStreamTimeCharacteristic();System.out.println("默认时间语义streamTimeCharacteristic=" + streamTimeCharacteristic);long autoWatermarkInterval = env.getConfig().getAutoWatermarkInterval();System.out.println("默认水位线生成时间间隔autoWatermarkInterval=" + autoWatermarkInterval);// 3. 设置时间语义为事件时间.env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);autoWatermarkInterval = env.getConfig().getAutoWatermarkInterval();System.out.println("默认事件时间语义的水位线生成时间间隔autoWatermarkInterval=" + autoWatermarkInterval);// 设置水位线生成时间间隔.env.getConfig().setAutoWatermarkInterval(200);// 4. 监听socket数据源,每行输入格式:单词,事件时间,例如:hello,1000.final DataStreamSource<String> sourceDS = env.socketTextStream("bigdata111", 9999);// 5. 将socket数据源流转换成Word对象流SingleOutputStreamOperator<Word> wordDs = sourceDS.map(new MapFunction<String, Word>() {public Word map(String s) throws Exception {String[] wordArr = s.split(",");return new Word(wordArr[0], Long.valueOf(wordArr[1]), 1);}});// 6. 设置watermark.SingleOutputStreamOperator<Word> watermarkDS = wordDs.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Word>() {private Long lateTime = 5 * 1000L;private Long maxEventTime = 0L;@Nullablepublic Watermark getCurrentWatermark() {// 生成Watermark.Watermark watermark = new Watermark(maxEventTime - lateTime);
//                System.out.println("当前水位线watermark=" + watermark.getTimestamp());return watermark;}public long extractTimestamp(Word word, long previousElementTimestamp) {// 抽取事件时间.long eventTime = word.getTime() * 1000;// 计算最大事件时间.maxEventTime = Math.max(maxEventTime, eventTime);// 此处代码仅仅是为了打印当前word的水位线.Watermark eventWatermark = new Watermark(maxEventTime - lateTime);word.setWatermark(eventWatermark); // 当前word的水位线.System.out.println("抽取" + word);return eventTime;}});// 7. 将word对象流转换成key流.KeyedStream<Word, String> wordKS = watermarkDS.keyBy(new KeySelector<Word, String>() {public String getKey(Word word) throws Exception {return word.getWord();}});// 设置window计算:滚动窗口每隔5s计算一次.WindowedStream<Word, String, TimeWindow> wordWS = wordKS.window(TumblingEventTimeWindows.of(Time.seconds(5)));// 窗口中事件统计.SingleOutputStreamOperator<Word> wordCountResult = wordWS.sum("count");// 输出窗口中事件.wordWS.apply(new WindowFunction<Word, Word, String, TimeWindow>() {public void apply(String s, TimeWindow window, Iterable<Word> input, Collector<Word> out) throws Exception {System.out.println("窗口window=" + window + ",窗口中数据input=" + input);}});// 8.打印结果并执行.wordCountResult.print();env.execute();}}

 下面是Word实体类代码:

package com.leboop;import org.apache.flink.streaming.api.watermark.Watermark;/*** Description TODO.* Date 2024/7/24 13:05** @author leb* @version 2.0*/
public class Word {/*** 单词.*/private String word;/*** 单词产生的事件时间.*/private Long time;/*** 统计单词个数.*/private Integer count;private Watermark watermark;public Word() {}public Word(String word, Long time, Integer count) {this.word = word;this.time = time;this.count = count;}public String getWord() {return word;}public void setWord(String word) {this.word = word;}public Long getTime() {return time;}public void setTime(Long time) {this.time = time;}public Integer getCount() {return count;}public void setCount(Integer count) {this.count = count;}public Watermark getWatermark() {return watermark;}public void setWatermark(Watermark watermark) {this.watermark = watermark;}@Overridepublic String toString() {return "Word{" +"word='" + word + '\'' +", time=" + time +", count=" + count +", watermark=" + watermark.getTimestamp() +'}';}
}

代码中有几点需要注意的:

(1)代码的并行度最好设置为1,默认并行度为8,这样后面通过socket输入的单词都进去该并行度中进行计算,加快触发,否则并行度太多,需要在socket中输入更多的单词,才能触发。

env.setParallelism(1);

(2)本文使用的是flink 1.9.3的api,该版本默认时间语义是ProcessingTime,后面flink新版本默认的时间语义是EventTime,查看TimeCharacteristic源码时间语义总共有三种:

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.flink.streaming.api;import org.apache.flink.annotation.PublicEvolving;/*** The time characteristic defines how the system determines time for time-dependent* order and operations that depend on time (such as time windows).*/
@PublicEvolving
public enum TimeCharacteristic {/*** Processing time for operators means that the operator uses the system clock of the machine* to determine the current time of the data stream. Processing-time windows trigger based* on wall-clock time and include whatever elements happen to have arrived at the operator at* that point in time.** <p>Using processing time for window operations results in general in quite non-deterministic* results, because the contents of the windows depends on the speed in which elements arrive.* It is, however, the cheapest method of forming windows and the method that introduces the* least latency.*/ProcessingTime,/*** Ingestion time means that the time of each individual element in the stream is determined* when the element enters the Flink streaming data flow. Operations like windows group the* elements based on that time, meaning that processing speed within the streaming dataflow* does not affect windowing, but only the speed at which sources receive elements.** <p>Ingestion time is often a good compromise between processing time and event time.* It does not need any special manual form of watermark generation, and events are typically* not too much out-or-order when they arrive at operators; in fact, out-of-orderness can* only be introduced by streaming shuffles or split/join/union operations. The fact that* elements are not very much out-of-order means that the latency increase is moderate,* compared to event* time.*/IngestionTime,/*** Event time means that the time of each individual element in the stream (also called event)* is determined by the event's individual custom timestamp. These timestamps either exist in* the elements from before they entered the Flink streaming dataflow, or are user-assigned at* the sources. The big implication of this is that it allows for elements to arrive in the* sources and in all operators out of order, meaning that elements with earlier timestamps may* arrive after elements with later timestamps.** <p>Operators that window or order data with respect to event time must buffer data until they* can be sure that all timestamps for a certain time interval have been received. This is* handled by the so called "time watermarks".** <p>Operations based on event time are very predictable - the result of windowing operations* is typically identical no matter when the window is executed and how fast the streams* operate. At the same time, the buffering and tracking of event time is also costlier than* operating with processing time, and typically also introduces more latency. The amount of* extra cost depends mostly on how much out of order the elements arrive, i.e., how long the* time span between the arrival of early and late elements is. With respect to the* "time watermarks", this means that the cost typically depends on how early or late the* watermarks can be generated for their timestamp.** <p>In relation to {@link #IngestionTime}, the event time is similar, but refers the the* event's original time, rather than the time assigned at the data source. Practically, that* means that event time has generally more meaning, but also that it takes longer to determine* that all elements for a certain time have arrived.*/EventTime
}

本案例中使用EventTime语义。

(3) 默认情况下,EventTime时间语义的水位线生成时间间隔为200ms,可查看StreamExecutionEnvironment类中如下方法看到:

	@PublicEvolvingpublic void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {this.timeCharacteristic = Preconditions.checkNotNull(characteristic);if (characteristic == TimeCharacteristic.ProcessingTime) {getConfig().setAutoWatermarkInterval(0);} else {getConfig().setAutoWatermarkInterval(200);}}

(4)输入的数据格式如下:

hello,1
spring,1
java,1
hello,6
spring,11
hello,7
java,14
java,20
hello,3
hello,5
hello,10
java,21
java,31

 格式为单词,事件时间,例如第一行表示hello这个单词在1s时刻产生的。从数据中可以看到单词是无序的,例如hello,3在后面才出现。

(5)将每行输入数据通过英文逗号切分,转换成一个Word对象,代码如下:

        SingleOutputStreamOperator<Word> wordDs = sourceDS.map(new MapFunction<String, Word>() {public Word map(String s) throws Exception {String[] wordArr = s.split(",");return new Word(wordArr[0], Long.valueOf(wordArr[1]), 1);}});

 (6)窗口设置

可通过assignTimestampsAndWatermarks为事件抽出时间戳和生成水位线。代码中设置了延迟时间为5s,代码如下:

private Long lateTime = 5 * 1000L;

(7)Word对象流转换成键流

        为了统计每个单词的数量,需要将单词按照单词分流,因此需要一单词为键来统计,代码如下:

        KeyedStream<Word, String> wordKS = watermarkDS.keyBy(new KeySelector<Word, String>() {public String getKey(Word word) throws Exception {return word.getWord();}});

(8)为键流设置窗口大小为5s的滚动窗口

WindowedStream<Word, String, TimeWindow> wordWS = wordKS.window(TumblingEventTimeWindows.of(Time.seconds(5)));

此时键流转换成的窗口流。此时,划分的滚动窗口为[0,5000),[5000,10000),[10000,15000),……。每个窗口含头不含尾,窗口时间单位为ms。

(9)窗口流按照单词对象Word的count字段来统计单词数量,代码如下:

SingleOutputStreamOperator<Word> wordCountResult = wordWS.sum("count");

这里需要Word类必须是POJO,例如该实体类不是public修饰,在单词统计wordWS.sum("count")处会报错如下: 

Exception in thread "main" org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: Cannot reference field by field expression on GenericType<com.leboop.Word>Field expressions are only supported on POJO types, tuples, and case classes. (See the Flink documentation on what is considered a POJO.)at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:193)at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:55)at org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:1367)at com.leboop.OrderStreamWatermarkDemo.main(OrderStreamWatermarkDemo.java:101)

 (10)打印每个窗口中有哪些事件

        为了知道统计背后的原理,这里打印出每个窗口中的事件,代码如下:

wordWS.apply(new WindowFunction<Word, Word, String, TimeWindow>() {public void apply(String s, TimeWindow window, Iterable<Word> input, Collector<Word> out) throws Exception {System.out.println("窗口window=" + window + ",窗口中数据input=" + input);}});

 (11)最后打印出统计结果并执行程序

代码如下:

wordCountResult.print();env.execute();

         下面打开bigdata111的socket,并逐行输入数据,如下:

程序输出结果如下: 

默认并行度parallelism=8
默认时间语义streamTimeCharacteristic=ProcessingTime
默认水位线生成时间间隔autoWatermarkInterval=0
默认事件时间语义的水位线生成时间间隔autoWatermarkInterval=200
抽取Word{word='hello', time=1, count=1, watermark=-4000}
抽取Word{word='spring', time=1, count=1, watermark=-4000}
抽取Word{word='java', time=1, count=1, watermark=-4000}
抽取Word{word='hello', time=6, count=1, watermark=1000}
抽取Word{word='spring', time=11, count=1, watermark=6000}
窗口window=TimeWindow{start=0, end=5000},窗口中数据input=[Word{word='hello', time=1, count=1, watermark=-4000}]
窗口window=TimeWindow{start=0, end=5000},窗口中数据input=[Word{word='spring', time=1, count=1, watermark=-4000}]
窗口window=TimeWindow{start=0, end=5000},窗口中数据input=[Word{word='java', time=1, count=1, watermark=-4000}]
Word{word='hello', time=1, count=1, watermark=-4000}
Word{word='spring', time=1, count=1, watermark=-4000}
Word{word='java', time=1, count=1, watermark=-4000}
抽取Word{word='hello', time=7, count=1, watermark=6000}
抽取Word{word='java', time=14, count=1, watermark=9000}
抽取Word{word='java', time=20, count=1, watermark=15000}
窗口window=TimeWindow{start=5000, end=10000},窗口中数据input=[Word{word='hello', time=6, count=1, watermark=1000}, Word{word='hello', time=7, count=1, watermark=6000}]
Word{word='hello', time=6, count=2, watermark=1000}
窗口window=TimeWindow{start=10000, end=15000},窗口中数据input=[Word{word='spring', time=11, count=1, watermark=6000}]
窗口window=TimeWindow{start=10000, end=15000},窗口中数据input=[Word{word='java', time=14, count=1, watermark=9000}]
Word{word='spring', time=11, count=1, watermark=6000}
Word{word='java', time=14, count=1, watermark=9000}
抽取Word{word='hello', time=3, count=1, watermark=15000}
抽取Word{word='hello', time=5, count=1, watermark=15000}
抽取Word{word='hello', time=10, count=1, watermark=15000}
抽取Word{word='java', time=21, count=1, watermark=16000}
抽取Word{word='java', time=31, count=1, watermark=26000}
Word{word='java', time=20, count=2, watermark=15000}
窗口window=TimeWindow{start=20000, end=25000},窗口中数据input=[Word{word='java', time=20, count=1, watermark=15000}, Word{word='java', time=21, count=1, watermark=16000}]

 结果解释:

(1)程序运行后,即会输出如下结果:

默认并行度parallelism=8
默认时间语义streamTimeCharacteristic=ProcessingTime
默认水位线生成时间间隔autoWatermarkInterval=0
默认事件时间语义的水位线生成时间间隔autoWatermarkInterval=200

(2)当输入前四行数据

hello,1
spring,1
java,1
hello,6

会调用抽取时间戳extractTimestamp方法,然后会输出

抽取Word{word='hello', time=1, count=1, watermark=-4000}
抽取Word{word='spring', time=1, count=1, watermark=-4000}
抽取Word{word='java', time=1, count=1, watermark=-4000}
抽取Word{word='hello', time=6, count=1, watermark=1000}

此时水位线为1000ms,这表示程序认为当前的事件时间才到1000ms,此时窗口[0,5000)还不能触发计算,接着输入下一条数据:

spring,11

此时继续抽取时间戳,输出

抽取Word{word='spring', time=11, count=1, watermark=6000}

注意当前水位线已经到达6000,程序认为事件时间已经到了6000ms,既会触发[0,5000)窗口的计算,此时窗口中的数据有

hello,1
spring,1
java,1

这里注意已经输入的如下两条数据并不在该窗口中:

hello,6
spring,11

 数据会按照输入数据的事件时间正确地分配到每个窗口。因此[0,5000)窗口统计结果为:

Word{word='hello', time=1, count=1, watermark=-4000}
Word{word='spring', time=1, count=1, watermark=-4000}
Word{word='java', time=1, count=1, watermark=-4000}

也即hello、spring、java各出现1次。以此类推。

相关文章:

Flink时间和窗口

目录 时间语义 水位线&#xff08;Watermarks&#xff09; 并行流中的水位线 窗口 滚动窗口—Tumbling Windows 滑动窗口—Sliding Windows 会话窗口—Session Windows 全局窗口—Global Windows 例子 时间语义 如图所示&#xff0c;由事件生成器&#xff08;Event Pr…...

LLaMA模型量化方法优化:提高性能与减小模型大小

LLaMA模型量化方法优化:提高性能与减小模型大小 LLaMA模型量化方法优化:提高性能与减小模型大小引言新增量化方法性能评估7B模型13B模型 结果分析结论 LLaMA模型量化方法优化:提高性能与减小模型大小 引言 在大型语言模型(LLM)的应用中,模型大小和推理速度一直是关键的挑战。…...

前端CSS实现卡片抽奖效果

引言 在网页设计中&#xff0c;互动元素能够显著提升用户体验&#xff0c;吸引用户的注意力。其中&#xff0c;卡片抽奖效果常用于营销活动、游戏或娱乐场景&#xff0c;通过随机展示不同的卡片来增加趣味性和参与度。本文将详细介绍如何使用HTML和CSS来实现一个简单的卡片抽奖…...

Java在for循环中修改集合

前天看到一篇文章什么&#xff1f;for循环也会出问题&#xff1f;&#xff0c;里面涉及到在for循环中修改集合&#xff0c;想起来自己刚入行的时候就碰到过类似的问题&#xff0c;于是复现了一下文章中的问题&#xff0c;并试验了其它在循环中修改集合的方法。 底层原理参考什…...

Java小白入门到实战应用教程-运算符详解

Java小白入门到实战应用教程-运算符 上节回顾 在上节的内容中我们了解了变量和基本数据类型的内容&#xff0c;现在回顾一下上节课的内容。 声明变量的语法为&#xff1a; 数据类型 变量名&#xff1b; 其中在java中一共有8中基本数据类型&#xff0c;分别是&#xff1a;b…...

secureCRT同时在所有已打开窗口执行命令、mac-os下使用的SecureCRT版本 以及 SecureCRT一段时间不操作没有响应的问题

一、secureCRT命令行工具一次性同时在所有已打开窗口执行命令 公司的服务器比较多&#xff0c;最近因为opcache&#xff0c;上线发布后&#xff0c;需要重启所有的WEB服务器上的php。目前使用的jenkins发布&#xff0c;不过账号安全问题&#xff0c;给jenkins的账号权限受限不能…...

增材制造与智能制造关系

在撰写的增材制造技术与装备书籍中有着明确的描述&#xff0c;增材制造是智能制造的典型范例&#xff0c;是智能制造“类”的实例化过程。这种借助于计算机编程面向对象思想的解释可以更全面的理解增材制造和智能制造的关系。增材制造实例具备了智能制造类的属性&#xff0c;智…...

Google Test 学习笔记(简称GTest)

文章目录 一、介绍1.1 介绍1.2 教程 二、使用2.1 基本使用2.1.1 安装GTest &#xff08;下载和编译&#xff09;2.1.2 编写测试2.1.3 运行测试2.1.4 高级特性2.1.5 调试和分析 2.2 源码自带测试用例2.3 TEST 使用2.3.1 TestCase的介绍2.3.2 TEST宏demo1demo2 2.3.3 TEST_F宏2.3…...

不可变集合

定义&#xff1a;就是集合中的内容不可以被修改。 如何获取不可变集合&#xff1f; List、Set、Map类中提供的静态方法of可用来获取不可变集合。 特点&#xff1a;一旦创建完成只可以进行查询&#xff0c;不可以增删改。 细节&#xff1a;Map集合中的of方法只能添加10个键值…...

景区AR导航营销系统:技术解决方案与实施效益分析

随着旅游市场的竞争日益激烈&#xff0c;景区需要不断创新以吸引游客。景区 AR 导航将虚拟画面与现实场景相结合&#xff0c;为游客提供了更加直观、生动的导航服务。对于景区而言&#xff0c;这一创新技术无疑是吸引游客目光、提升景区知名度的有力武器。通过独特的 AR 导航体…...

MATLAB的基础知识

matlab的基本小常识 1. 在每行语句后面加上英文分号表示不在命令行窗口显示运行结果。 a 3; a 5 2. 多行注释快捷键&#xff0c;CTRLR。 3. 取消多行注释&#xff0c;CTRLT。 4. 清空工作区的所有变量使用clear。 5. 清空命令行窗口的所有变量使用clc。 6. clc和clear一起使…...

Redis-高级实战案例

文章目录 Redis集群崩溃时如何保证秒杀系统高可用1. 冗余与备份2. 故障检测与自动切换3. 降级策略4. 数据一致性5. 客户端缓存6. 异常处理与通知7. 测试与演练8. 服务降级与回滚Redis主从切换导致库存同步异常以及超卖问题主从切换导致的库存同步异常原因:解决方案:秒杀链路中…...

d3d12.dll 文件缺失如何解决?五种修复丢失问题的方法

d3d12.dll 文件缺失如何解决&#xff1f;它为什么会不见呢&#xff1f;今天&#xff0c;我们将探讨 d3d12.dll 文件的重要性、原因以及丢失时的解决策略。本文将全面介绍 d3d12.dll 文件&#xff0c;并提供五种修复丢失问题的方法。 d3d12.dll文件是什么的详细介绍 d3d12.dll …...

Linux下如何设置系统定时任务

在Linux系统中&#xff0c;用户可以使用cron工具来设置定时任务。cron是一个守护进程&#xff0c;用于在指定的时间间隔执行指定的命令或脚本。下面是在Linux系统中设置系统定时任务的步骤。 使用crontab命令编辑定时任务列表&#xff1a; crontab -e该命令会打开一个文本编辑…...

【React】JSX 实现列表渲染

文章目录 一、基础语法1. 使用 map() 方法2. key 属性的使用 二、常见错误和注意事项1. 忘记使用 key 属性2. key 属性的选择 三、列表渲染的高级用法1. 渲染嵌套列表2. 条件渲染列表项3. 动态生成组件 四、最佳实践 在 React 开发中&#xff0c;列表渲染是一个非常常见的需求。…...

写一个简单的兼容GET/POST请求的登录接口

本文目录 安装JDK17安装或者更新Intelij Idea 2024SpringBoot生成项目压缩包下载maven&#xff0c;idea添加maven写POST接口浏览器访问GET接口PostMan安装及访问POST接口 安装JDK17 参考&#xff1a;https://blog.csdn.net/tiehou/article/details/129575138 安装或者更新Int…...

【好玩的经典游戏】Docker环境下部署赛车小游戏

【好玩的经典游戏】Docker环境下部署赛车小游戏 一、小游戏介绍1.1 小游戏简介1.2 项目预览二、本次实践介绍2.1 本地环境规划2.2 本次实践介绍三、本地环境检查3.1 安装Docker环境3.2 检查Docker服务状态3.3 检查Docker版本3.4 检查docker compose 版本四、构建容器镜像4.1 下…...

物理机 gogs+jenkins+sonarqube 实现CI/CD

一、部署gogs_0.11.91_linux_amd64.tar.gz gogs官网下载&#xff1a;https://dl.gogs.io/ yum -y install mariadb-serversystemctl start mariadbsystemctl enable mariadbuseradd gittar zxvf gogs_0.11.91_linux_amd64.tar.gzcd gogsmysql -u root -p < scripts/mysql.…...

前端表格解析方法

工具类文件 // fileUtils.tsimport { ref } from vue; import * as xlsx from xlsx;interface RowData {[key: string]: any; }export const tableData ref<RowData[]>([]);export async function handleFileSelect(url: string): Promise<void> {try {const res…...

Leetcode 3227. Vowels Game in a String

Leetcode 3227. Vowels Game in a String 1. 解题思路2. 代码实现 题目链接&#xff1a;3227. Vowels Game in a String 1. 解题思路 这一题稍微分析一下之后就会发现&#xff0c;这个游戏有且只有一种情况Bob才能够赢&#xff0c;即原始字符串当中不存在元音字母的情况&…...

树莓派4B从装系统raspbian到vscode远程编程(python)

1、写在前面 前面用的一直是Ubuntu系统&#xff0c;但是遇到一个奇葩的问题&#xff1a; 北通手柄在终端可以正常使用&#xff0c;接收到数据 但在python程序中使用pygame库初始化时总是报错&#xff1a;Invalid device number&#xff0c;检测不到手柄 经过n次重装系统&am…...

vue上传Excel文件并直接点击文件列表进行预览

本文主要内容&#xff1a;用elementui的Upload 组件上传Excel文件&#xff0c;上传后的列表采用xlsx插件实现点击预览表格内容效果。 在项目中可能会有这样的需求&#xff0c;有很多种方法实现。但是不想要跳转外部地址&#xff0c;所以用了xlsx插件来解析表格&#xff0c;并展…...

OpenCV 像素操作—证件照换底色详细原理 C++纯手写实现

文章目录 总体步骤1.RGB转HSV2.找出要换的底色3.取反&#xff0c;黑白颠倒4.将原图像的非背景部分复制到新背景上 完整代码1.C纯手写版2.官方API版本 总体步骤 1.RGB转HSV 为什么一定要转为HSV 颜色空间&#xff1f; 将图像从BGR颜色空间转换为HSV颜色空间是因为HSV颜色空间更…...

tinygrad框架简介;MLX框架简介

目录 tinygrad框架简介 MLX框架简介 LLaMA​编辑 Stable Diffusion​编辑 tinygrad框架简介 极简主义与易扩展性 tinygrad 的设计理念是极简主义。与 XLA 类比,如果 XLA 是复杂指令集计算 (CISC),那么 tinygrad 就是精简指令集计算 (RISC)。这种简约的设计使得它成为添加…...

服务器重启了之后就卡在某个页面了,花屏,如何解决??

&#x1f3c6;本文收录于《CSDN问答解惑-专业版》专栏&#xff0c;主要记录项目实战过程中的Bug之前因后果及提供真实有效的解决方案&#xff0c;希望能够助你一臂之力&#xff0c;帮你早日登顶实现财富自由&#x1f680;&#xff1b;同时&#xff0c;欢迎大家关注&&收…...

Hospital 14.6.0全开源医院管理预约系统源码

InfyHMS 具有 60 种功能和 9 种不同类型的用户类型&#xff0c; 他们可以登录系统并根据他们的角色访问他们的数据。 源码下载&#xff1a;https://download.csdn.net/download/m0_66047725/89580674 更多资源下载&#xff1a;关注我。...

C/C++樱花树代码

目录 写在前面 系列文章 C简介 完整代码 代码分析 写在后面 写在前面 C实现精美的樱花树&#xff0c;只需这100行代码&#xff01; 系列文章 序号目录直达链接1爱心代码https://want595.blog.csdn.net/article/details/1363606842李峋同款跳动的爱心https://want595.b…...

sklearn基础学习

1. 简介 1.1 什么是sklearn sklearn&#xff0c;或者更正式地称为scikit-learn&#xff0c;是一个基于Python的开源机器学习库。它建立在NumPy、SciPy和matplotlib之上&#xff0c;提供了简单而有效的工具用于数据挖掘和数据分析。sklearn支持监督学习和无监督学习算法&#…...

SpringBoot 自动配置原理

一、Condition Condition 是在 Spring 4.0 增加的条件判断功能&#xff0c;通过这个可以功能可以实现选择性的创建 Bean 操 作。 思考&#xff1a; SpringBoot 是如何知道要创建哪个 Bean 的&#xff1f;比如 SpringBoot 是如何知道要创建 RedisTemplate 的&#xff1f; …...

Redisson中RQueue的使用场景附一个异步的例子

RQueue 是一个基于 Redis 的分布式作业队列系统&#xff0c;它允许开发者在 Ruby 应用程序中实现异步任务处理和计划任务调度。由于 Redis 提供了高性能的内存数据结构存储&#xff0c;RQueue 可以快速地存储和检索队列中的任务&#xff0c;这使得它非常适合于高并发和低延迟的…...

SpringMVC 控制层框架-下

五、SpringMVC其他扩展 1. 异常处理机制 1.1 异常处理概念 开发过程中是不可避免地会出现各种异常情况&#xff0c;例如网络连接异常、数据格式异常、空指针异常等等。异常的出现可能导致程序的运行出现问题&#xff0c;甚至直接导致程序崩溃。因此&#xff0c;在开发过程中&a…...

(四)js前端开发中设计模式之工厂方法模式

工厂方法模式,通过对产品类的抽象&#xff0c;使其创建业务主要用于负责创建多类产品的实例 const Java function (content) {this.content content;(function () {let oDiv document.createElement(div)oDiv.innerHTML contentoDiv.style.color greendocument.getElement…...

新版GPT-4omini上线!快!真TM快!

大半夜&#xff0c;OpenAI突然推出了GPT-4o mini版本。 当我看到这条消息时&#xff0c;正准备去睡觉。mini版本质上是GPT-4o模型的精简版本&#xff0c;没有什么革命性的创新&#xff0c;因此我并没有太在意。 结果今天早上一觉醒来发现伴随GPT-4o mini上线&#xff0c;官网和…...

【Unity】RPG2D龙城纷争(十七)敌方常规AI(Normal)的实现

更新日期:2024年7月24日。 项目源码:第五章发布(正式开始游戏逻辑的章节) 索引 简介一、AI_Normal类二、AI调遣策略第一阶段:收集1.提供战场数据收集方法2.收集战场数据三、AI调遣策略第二阶段:评估四、AI调遣策略第三阶段:行动简介 AI_Normal定位为框架自带的最基础的…...

Tracy 小笔记:微信小程序 mpx 雷达图的实现

使用文档&#xff1a; https://www.kancloud.cn/xchhhh/wx-chart/399337 https://github.com/xiaolin3303/wx-charts https://gitee.com/mirrors/wx-charts/#wx-charts 参数说明&#xff1a; https://github.com/xiaolin3303/wx-charts/issues/56 下载 dist 里的 wx-charts-…...

Unity UGUI 之 Input Field

本文仅作学习笔记与交流&#xff0c;不作任何商业用途 本文包括但不限于unity官方手册&#xff0c;唐老狮&#xff0c;麦扣教程知识&#xff0c;引用会标记&#xff0c;如有不足还请斧正 1.Input Field是什么&#xff1f; 给玩家提供输入的输入框 2.重要参数 中英文对照着看…...

SpringBoot接入mongodb例子,并有增删改查功能

1&#xff0c;首先&#xff0c;在pom.xml中添加依赖&#xff1a; <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb</artifactId></dependency><!--上面这…...

类和对象(三)

目录 一. 构造函数初始化列表 二. 类型转换 三. static成员 四. 友元 五. 内部类 六. 匿名对象 七. 对象拷贝时的编译器优化 一. 构造函数初始化列表 1. 之前我们实现构造函数时&#xff0c;初始化成员变量主要使用函数体内赋值&#xff0c;构造函数初始化还有一种方式&…...

Android SurfaceFlinger——GraphicBuffer初始化(二十九)

在 SurfaceFlinger 中,GraphicBuffer 是一个关键的数据结构,用于封装和管理图形数据的内存缓冲区。它不仅在 SurfaceFlinger 内部使用,也被其他组件如 GPU 驱动、摄像头服务、视频解码器等广泛利用,以实现高效的数据交换和图形渲染。 一、概述 GraphicBuffer 对象封装了一…...

pytest:4种方法实现 - 重复执行用例 - 展示迭代次数

简介&#xff1a;在软件测试中&#xff0c;我们经常需要重复执行测试用例&#xff0c;以确保代码的稳定性和可靠性。在本文中&#xff0c;我们将介绍四种方法来实现重复执行测试用例&#xff0c;并显示当前迭代次数和剩余执行次数。这些方法将帮助你更好地追踪测试执行过程&…...

一文入门SpringSecurity 5

目录 提示 Apache Shiro和Spring Security 认证和授权 RBAC Demo 环境 Controller 引入Spring Security 初探Security原理 认证授权图示​编辑 图中涉及的类和接口 流程总结 提示 Spring Security源码的接口名和方法名都很长&#xff0c;看源码的时候要见名知意&am…...

IPython的HTML魔法:%%html_header命令全解析

IPython的HTML魔法&#xff1a;%%html_header命令全解析 在IPython和Jupyter Notebook中&#xff0c;%%html_header是一个魔术命令&#xff0c;它允许用户在Notebook的单元格中添加HTML头部&#xff08;head&#xff09;内容。这个功能特别有用&#xff0c;当你需要定制Notebo…...

将SQL中的占位符替换成参数

将SQL中的占位符替换成参数 描述 描述 此方法是将SQL中的${}或#{}替换为直接拼接到SQL中或直接替换为?的形式。具体详情看下面代码。 import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern;/*** author HuYu* date 2023-09-21* since 1.0**…...

锁相环 vivado FPGA

原理 同步状态/跟踪状态&#xff1a;相位差在2kπ附近&#xff0c;频率差为0到达上述状态的过程称为捕获过程锁相环的捕获带&#xff1a;delta w的最大值&#xff0c;大于这个值的话就不能捕获鉴相器&#xff08;PD-phase discriminator&#xff09;&#xff1a;相乘加LPF&…...

英语科技写作 希拉里·格拉斯曼-蒂(英文版)pdf下载

下载链接&#xff1a; 链接1&#xff1a;https://pan.baidu.com 链接2&#xff1a;/s/1fxRUGnlJrKEzQVF6k1GmBA 提取码&#xff1a;b69t 由于是英文版&#xff0c;可能有些看着不太方便&#xff0c;可以在网页版使用以下软件中英文对照着看&#xff0c;看着更舒服&#xff0c;…...

《Dynamic Statistical Learning in Massive Datastreams》论文阅读笔记

论文地址: https://www3.stat.sinica.edu.tw/ss_newpaper/SS-2023-0195_na.pdf 论文题目翻译&#xff1a;《在大规模数据流中的动态统计学习》 核心观点&#xff1a; 动态跟踪和筛选框架&#xff08;DTS&#xff09;&#xff1a;论文提出了一个在线学习和模型更新的新框架&…...

【数据分享】2008-2022年我国省市县三级的逐日NO2数据(excel\shp格式)

空气质量数据是在我们日常研究中经常使用的数据&#xff01;之前我们给大家分享了2000-2022年的省市县三级的逐日PM2.5数据、2013-2022年的省市县三级的逐日CO数据和2013-2022年的省市县三级的逐日SO2数据&#xff08;均可查看之前的文章获悉详情&#xff09;&#xff01; 本次…...

JavaEE (1)

web开发概述 所谓web开发,指的是从网页中向后端程序发送请求,与后端程序进行 交互. 流程图如下 Web服务器是指驻留于因特网上某种类型计算机的程序. 可以向浏览器等Web客户端提供文档&#xff0c;也可以放置网站文件&#xff0c;让全世界浏览&#xff1b; 它是一个容器&…...

事务、函数和索引

什么是事务&#xff1f; 事务&#xff08;Transaction&#xff09;&#xff0c;就是将一组SQL语句放在同一批次内去执行&#xff0c;如果一个SQL语句出错&#xff0c;则该批次内 的所有SQL都将被取消执行。 特点 一个事务中如果有一个数据库操作失败&#xff0c;那么整个事务…...

Android APP 基于RecyclerView框架工程(知识体系积累)

说明&#xff1a;这个简单的基于RecyclerView的框架作用在于自己可以将平时积累的一些有效demo整合起来&#xff08;比如音视频编解码的、opengles的以及其他也去方向的、随着项目增多&#xff0c;工程量的增加&#xff0c;后期想高效的分析和查找并不容易&#xff09;&#xf…...