Flink 滚动窗口、滑动窗口详解
1 滚动窗口(Tumbling Windows)
滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。如果我们把多个窗口的创建,看作一个窗口的运动,那就好像它在不停地向前“翻滚”一样。这是最简单的窗口形式,我们之前所举的例子都是滚动窗口。也正是因为滚动窗口是“无缝衔接”,所以每个数据都会被分配到一个窗口,而且只会属于一个窗口。
滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有一个,就是窗口的大小(window size)。比如我们可以定义一个长度为 1 小时的滚动时间窗口,那么每个小时就会进行一次统计;或者定义一个长度为 10 的滚动计数窗口,就会每 10 个数进行一次统计。
2 滑动窗口(Sliding Windows)
与滚动窗口类似,滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的,而是可以“错开”一定的位置。如果看作一个窗口的运动,那么就像是向前小步“滑动”一样。
既然是向前滑动,那么每一步滑多远,就也是可以控制的。所以定义滑动窗口的参数有两个:除去窗口大小(window size)之外,还有一个“滑动步长”(window slide),它其实就代
表了窗口计算的频率。滑动的距离代表了下个窗口开始的时间间隔,而窗口大小是固定的,所以也就是两个窗口结束时间的间隔;窗口在结束时间触发计算输出结果,那么滑动步长就代表
了计算频率。例如,我们定义一个长度为 1 小时、滑动步长为 5 分钟的滑动窗口,那么就会统计 1 小时内的数据,每 5 分钟统计一次。同样,滑动窗口可以基于时间定义,也可以基于数据
个数定义。
我们可以看到,当滑动步长小于窗口大小时,滑动窗口就会出现重叠,这时数据也可能会被同时分配到多个窗口中。而具体的个数,就由窗口大小和滑动步长的比值(size/slide)来决
定。如图 6-18 所示,滑动步长刚好是窗口大小的一半,那么每个数据都会被分配到 2 个窗口里。比如我们定义的窗口长度为 1 小时、滑动步长为 30 分钟,那么对于 8 点 55 分的数据,应该同时属于[8 点, 9 点)和[8 点半, 9 点半)两个窗口;而对于 8 点 10 分的数据,则同时属于[8点, 9 点)和[7 点半, 8 点半)两个窗口。所以,滑动窗口其实是固定大小窗口的更广义的一种形式;换句话说,滚动窗口也可以看作是一种特殊的滑动窗口——窗口大小等于滑动步长(size = slide)。当然,我们也可以定义滑动步长大于窗口大小,这样的话就会出现窗口不重叠、但会有间隔的情况;这时有些数据不
属于任何一个窗口,就会出现遗漏统计。所以一般情况下,我们会让滑动步长小于窗口大小,并尽量设置为整数倍的关系。
在一些场景中,可能需要统计最近一段时间内的指标,而结果的输出频率要求又很高,甚至要求实时更新,比如股票价格的 24 小时涨跌幅统计,或者基于一段时间内行为检测的异常报警。这时滑动窗口无疑就是很好的实现方式。
3 窗口API
3.1 按键分区窗口(Keyed Windows)
经过按键分区 keyBy 操作后,数据流会按照 key 被分为多条逻辑流(logical streams),这就是 KeyedStream。基于 KeyedStream 进行窗口操作时, 窗口计算会在多个并行子任务上同时
执行。相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的处理。所以可以认为,每个 key 上都定义了一组窗口,各自独立地进行统计计算。
在代码实现上,我们需要先对 DataStream 调用.keyBy()进行按键分区,然后再调用.window()定义窗口。
stream.keyBy(...) .window(...)
3.2 非按键分区(Non-Keyed Windows)
如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用这种方式。
在代码中,直接基于 DataStream 调用.windowAll()定义窗口。
stream.windowAll(...)
3.3 代码中窗口 API 的调用
窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。
stream.keyBy(<key selector>) .window(<window assigner>) .aggregate(<window function>)
其中.window()方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。窗口分配器有各种形式,而窗口函数的调用方法也不只.aggregate()一种,另外,在实际应用中,一般都需要并行执行任务,非按键分区很少用到,所以我们之后都以按键分区窗口为例;如果想要实现非按键分区窗口,只要前面不做 keyBy,后面调用.window()时直接换成.windowAll()就可以了。
3.4 滚动处理时间窗口
窗口分配器由类 TumblingProcessingTimeWindows 提供,需要调用它的静态方法.of()。
stream.keyBy(...).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(...)
这里.of()方法需要传入一个 Time 类型的参数 size,表示滚动窗口的大小,我们这里创建了一个长度为 5 秒的滚动窗口。另外,.of()还有一个重载方法,可以传入两个 Time 类型的参数:size 和 offset。第一个参数当然还是窗口大小,第二个参数则表示窗口起始点的偏移量,用这个偏移量可以处理时区。
例如:我们所在的时区是东八区,也就是 UTC+8,跟 UTC 有 8小时的时差。我们定义 1 天滚动窗口时,如果用默认的起始点,那么得到就是伦敦时间每天 0点开启窗口,这时是北京时间早上 8 点。那怎样得到北京时间每天 0 点开启的滚动窗口呢?只要设置-8 小时的偏移量就可以了。
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
3.5 滑动处理时间窗口
窗口分配器由类 SlidingProcessingTimeWindows 提供,同样需要调用它的静态方法.of()。
这里.of()方法需要传入两个 Time 类型的参数:size 和 slide,前者表示滑动窗口的大小,后者表示滑动窗口的滑动步长。我们这里创建了一个长度为 10 秒、滑动步长为 5 秒的滑动窗口。
滑动窗口同样可以追加第三个参数,用于指定窗口起始点的偏移量,用法与滚动窗口完全一致。
stream.keyBy(...).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate(...)
4 窗口函数
定义了窗口分配器,我们只是知道了数据属于哪个窗口,可以将数据收集起来了;至于收集起来到底要做什么,其实还完全没有头绪。所以在窗口分配器之后,必须再接上一个定义窗
口如何进行计算的操作,这就是所谓的“窗口函数”(window functions)。经窗口分配器处理之后,数据可以分配到对应的窗口中,而数据流经过转换得到的数据类型是 WindowedStream。这个类型并不是 DataStream,所以并不能直接进行其他转换,而必须进一步调用窗口函数,对收集到的数据进行处理计算之后,才能最终再次得到 DataStream。
4.1 增量聚合函数(incremental aggregation functions)
4.1.1 归约函数(ReduceFunction)
将窗口中收集到的数据两两进行归约。当我们进行流处理时,就是要保存一个状态;每来一个新的数据,就和之前的聚合状态做归约,这样就实现了增量式的聚合。
统计每一小时用户的访问量:
package com.rosh.flink.pojo;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@AllArgsConstructor
@NoArgsConstructor
@Data
public class UserPojo {private Integer userId;private String name;private String uri;private Long timestamp;}
package com.rosh.flink.wartermark;import com.rosh.flink.pojo.UserPojo;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;public class WindowTS {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<UserPojo> dataDS = env.fromCollection(getUserLists());//生成有序水位线SingleOutputStreamOperator<UserPojo> orderStreamDS = dataDS.assignTimestampsAndWatermarks(WatermarkStrategy.<UserPojo>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<UserPojo>() {@Overridepublic long extractTimestamp(UserPojo element, long recordTimestamp) {return element.getTimestamp();}}));//聚合SingleOutputStreamOperator<Tuple2<Integer, Long>> userDS = orderStreamDS.map(new MapFunction<UserPojo, Tuple2<Integer, Long>>() {@Overridepublic Tuple2<Integer, Long> map(UserPojo value) throws Exception {return Tuple2.of(value.getUserId(), 1L);}});//开窗统计每1小时用户访问了多少次SingleOutputStreamOperator<Tuple2<Integer, Long>> resultDS = userDS.keyBy(tuple -> tuple.f0).window(TumblingEventTimeWindows.of(Time.hours(1))).reduce(new ReduceFunction<Tuple2<Integer, Long>>() {@Overridepublic Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {value1.f1 = value1.f1 + value2.f1;return value1;}});resultDS.print();env.execute("WarterMarkTest");}private static List<UserPojo> getUserLists() throws NoSuchAlgorithmException {List<UserPojo> lists = new ArrayList<>();Random random = SecureRandom.getInstanceStrong();for (int i = 1; i <= 1000; i++) {String uri = "/goods/" + i;int userId = random.nextInt(10);//有序时间UserPojo userPojo = new UserPojo(userId, "name" + userId, uri, (long) (1000 * i));//无序时间lists.add(userPojo);}return lists;}}
4.1.2 聚合函数(AggregateFunction)
ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。这就迫使我们必须在聚合前,先将数
据转换(map)成预期结果类型;而在有些情况下,还需要对状态进行进一步处理才能得到输出结果,这时它们的类型可能不同,使用 ReduceFunction 就会非常麻烦。
例如,如果我们希望计算一组数据的平均值,应该怎样做聚合呢?很明显,这时我们需要计算两个状态量:数据的总和(sum),以及数据的个数(count),而最终输出结果是两者的商(sum/count)。如果用 ReduceFunction,那么我们应该先把数据转换成二元组(sum, count)的形式,然后进行归约聚合,最后再将元组的两个元素相除转换得到最后的平均值。本来应该只是一个任务,可我们却需要 map-reduce-map 三步操作,这显然不够高效。
于是自然可以想到,如果取消类型一致的限制,让输入数据、中间状态、输出结果三者类型都可以不同,不就可以一步直接搞定了吗?Flink 的 Window API 中的 aggregate 就提供了这样的操作。直接基于 WindowedStream 调用.aggregate()方法,就可以定义更加灵活的窗口聚合操作。这个方法需要传入一个AggregateFunction 的实现类作为参数。AggregateFunction 在源码中的定义如下:
/**** The type of the values that are aggregated (input values)* The type of the accumulator (intermediate aggregate state).* The type of the aggregated result**/
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable
{/*** 创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。*/ ACC createAccumulator();/*** 将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程。方法传入两个参数:当前新到的数据 value,和当前的累加器accumulator;* 返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之后都会调用这个方法。*/ACC add(IN value, ACC accumulator);/*** 从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出。比如之前我们提到的计算平均* 值,就可以把 sum 和 count 作为状态放入累加器,而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用。*/OUT getResult(ACC accumulator);/*** 合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景* 就是会话窗口(Session Windows)。*/ACC merge(ACC a, ACC b);
}
所以可以看到,AggregateFunction 的工作原理是:首先调用 createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次 add()方法,对数据进行聚合,得到的
结果保存在状态中;等到了窗口需要输出时,再调用 getResult()方法得到计算结果。很明显,与 ReduceFunction 相同,AggregateFunction 也是增量式的聚合;而由于输入、中间状态、输
出的类型可以不同,使得应用更加灵活方便。
·统计人均访问次数:
package com.rosh.flink.wartermark;import com.rosh.flink.pojo.UserPojo;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.*;public class AggWindowTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<UserPojo> userDS = env.fromCollection(getUserLists()).assignTimestampsAndWatermarks(WatermarkStrategy.<UserPojo>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<UserPojo>() {@Overridepublic long extractTimestamp(UserPojo element, long recordTimestamp) {return element.getTimestamp();}}));//统计5秒内,人均访问次数SingleOutputStreamOperator<Double> resultDS = userDS.keyBy(key -> true).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate(new PeopleHourAvgCount());resultDS.print("人均访问次数为:");env.execute("AggWindowTest");}private static class PeopleHourAvgCount implements AggregateFunction<UserPojo, Tuple2<HashSet<Integer>, Long>, Double> {/*** 初始化累加器*/@Overridepublic Tuple2<HashSet<Integer>, Long> createAccumulator() {return Tuple2.of(new HashSet<>(), 0L);}/****/@Overridepublic Tuple2<HashSet<Integer>, Long> add(UserPojo value, Tuple2<HashSet<Integer>, Long> accumulator) {//distinct userIdaccumulator.f0.add(value.getUserId());//次数+1accumulator.f1 = accumulator.f1 + 1;//返回累加器return accumulator;}@Overridepublic Double getResult(Tuple2<HashSet<Integer>, Long> accumulator) {return accumulator.f1 * 1.0 / accumulator.f0.size();}@Overridepublic Tuple2<HashSet<Integer>, Long> merge(Tuple2<HashSet<Integer>, Long> a, Tuple2<HashSet<Integer>, Long> b) {return null;}}/*** 获取随机人数的1000次访问*/private static List<UserPojo> getUserLists() throws NoSuchAlgorithmException {List<UserPojo> lists = new ArrayList<>();Random random = SecureRandom.getInstanceStrong();//获取随机人数int peopleCount = random.nextInt(20);System.out.println("随机人数为:" + peopleCount);for (int i = 1; i <= 1000; i++) {String uri = "/goods/" + i;int userId = random.nextInt(peopleCount);//有序时间UserPojo userPojo = new UserPojo(userId, "name" + userId, uri, new Date().getTime());//无序时间lists.add(userPojo);}return lists;}
}
4.2 全窗口函数(full window functions)
窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。
ProcessWindowFunction 是 Window API 中最底层的通用窗口函数接口。之所以说它“最底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction 还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得 ProcessWindowFunction 更加灵活、功能更加丰富。事实上,ProcessWindowFunction 是 Flink 底层 API——处理函数(process function)中的一员。
统计10秒访问UV:
package com.rosh.flink.wartermark;import com.rosh.flink.pojo.UserPojo;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.sql.Timestamp;
import java.util.*;public class ProcessWindowTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<UserPojo> userDS = env.fromCollection(getUserLists());//水位线SingleOutputStreamOperator<UserPojo> watermarks = userDS.assignTimestampsAndWatermarks(WatermarkStrategy.<UserPojo>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<UserPojo>() {@Overridepublic long extractTimestamp(UserPojo element, long recordTimestamp) {return element.getTimestamp();}}));//开窗10秒UV统计SingleOutputStreamOperator<String> resultDS = watermarks.keyBy(key -> true).window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new UserUVCount());resultDS.print("UV:");env.execute("ProcessWindowTest");}private static class UserUVCount extends ProcessWindowFunction<UserPojo, String, Boolean, TimeWindow> {@Overridepublic void process(Boolean aBoolean, ProcessWindowFunction<UserPojo, String, Boolean, TimeWindow>.Context context, Iterable<UserPojo> elements, Collector<String> out) throws Exception {//用户集合HashSet<Integer> hashSet = new HashSet<>();for (UserPojo user : elements) {hashSet.add(user.getUserId());}//获取时间信息long start = context.window().getStart();long end = context.window().getEnd();String rs = "窗口信息,startTime:" + new Timestamp(start) + ",endTime: " + new Timestamp(end) + ",用户访问的次数为:" + hashSet.size();out.collect(rs);}}private static List<UserPojo> getUserLists() throws NoSuchAlgorithmException {List<UserPojo> lists = new ArrayList<>();Random random = SecureRandom.getInstanceStrong();int userCount = random.nextInt(100);for (int i = 1; i <= 1000; i++) {String uri = "/goods/" + i;int userId = random.nextInt(userCount);//有序时间UserPojo userPojo = new UserPojo(userId, "name" + userId, uri, new Date().getTime());//无序时间lists.add(userPojo);}return lists;}}
4.3 增量聚合和全窗口函数的结合使用
增量聚合函数处理计算会更高效。全窗口函数的优势在于提供了更多的信息,可以认为是更加“通用”的窗口操作。所以在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink 的Window API 就给我们实现了这样的用法。
在调用 WindowedStream 的.reduce()和.aggregate()方法时,只是简单地直接传入了一个 ReduceFunction 或 AggregateFunction 进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是 WindowFunction 或者 ProcessWindowFunction。
// ReduceFunction 与 WindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce( ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) // ReduceFunction 与 ProcessWindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce( ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function)// AggregateFunction 与 WindowFunction 结合
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, WindowFunction<V, R, K, W> windowFunction)// AggregateFunction 与 ProcessWindowFunction 结合
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate( AggregateFunction<T, ACC, V> aggFunction, ProcessWindowFunction<V, R, K, W> windowFunction)
这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输
出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了 Iterable 类型的输入。一般情况下,这时的可迭代集合中就只有一个元素了。
统计10秒的url浏览量:
package com.rosh.flink.wartermark;import com.alibaba.fastjson.JSONObject;
import com.rosh.flink.pojo.UserPojo;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random;public class UrlWindowTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//读取数据源DataStreamSource<UserPojo> userDS = env.fromCollection(getUserLists());//水位线SingleOutputStreamOperator<UserPojo> waterDS = userDS.assignTimestampsAndWatermarks(WatermarkStrategy.<UserPojo>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<UserPojo>() {@Overridepublic long extractTimestamp(UserPojo element, long recordTimestamp) {return element.getTimestamp();}}));//url countSingleOutputStreamOperator<Tuple2<String, Long>> urlDS = waterDS.map(new MapFunction<UserPojo, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(UserPojo value) throws Exception {return Tuple2.of(value.getUri(), 1L);}});SingleOutputStreamOperator<JSONObject> resultDS = urlDS.keyBy(data -> data.f0).window(TumblingEventTimeWindows.of(Time.seconds(10))).reduce(new ReduceFunction<Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {value1.f1 = value1.f1 + value2.f1;return value1;}}, new WindowFunction<Tuple2<String, Long>, JSONObject, String, TimeWindow>() {@Overridepublic void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<JSONObject> out) throws Exception {Tuple2<String, Long> tuple2 = input.iterator().next();JSONObject jsonObject = new JSONObject();jsonObject.put("url", tuple2.f0);jsonObject.put("count", tuple2.f1);new Timestamp(window.getStart());jsonObject.put("startTime", new Timestamp(window.getStart()).toString());jsonObject.put("endTime", new Timestamp(window.getEnd()).toString());out.collect(jsonObject);}});resultDS.print();env.execute("UrlWindowTest");}private static List<UserPojo> getUserLists() throws NoSuchAlgorithmException {List<UserPojo> lists = new ArrayList<>();Random random = SecureRandom.getInstanceStrong();for (int i = 1; i <= 1000; i++) {//随机生成userId、goodIdint userId = random.nextInt(100);int goodId = random.nextInt(50);String uri = "/goods/" + goodId;//有序时间UserPojo userPojo = new UserPojo(userId, "name" + userId, uri, new Date().getTime());//无序时间lists.add(userPojo);}return lists;}}
相关文章:
Flink 滚动窗口、滑动窗口详解
1 滚动窗口(Tumbling Windows) 滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。如果我们把多个窗口的创建,看作一个窗口的运动,那就好像它在不…...
想要精通算法和SQL的成长之路 - 柱状图中最大的矩形
想要精通算法和SQL的成长之路 - 柱状图中最大的矩形前言一. 柱状图中最大的矩形前言 想要精通算法和SQL的成长之路 - 系列导航 一. 柱状图中最大的矩形 原题链接 给定 n 个非负整数,用来表示柱状图中各个柱子的高度。每个柱子彼此相邻,且宽度为 1 。求…...
网络安全实验室5.上传关
5.上传关 1.请上传一张jpg格式的图片 url:http://lab1.xseclab.com/upload1_a4daf6890f1166fd88f386f098b182af/ 上传一张后缀名为jpg的图片,上传抓包修改后缀名为别的,s或者直接删掉,放包 得到key is IKHJL9786#$%^& 2.请…...
JavaScript 严格模式(use strict)
文章目录JavaScript 严格模式(use strict)使用 "use strict" 指令严格模式声明严格模式的限制保留关键字JavaScript 严格模式(use strict) JavaScript 严格模式(strict mode)即在严格的条件下运行。 使用 “use strict” 指令 “use strict”…...
硬件设计—高性能ADC前端电路
高性能模数转换器(ADC)一般对系统的性能有非常高的要求,而AD芯片的“前端”的输入电路设计对ADC系统的的性能有非常大的影响。以下主要介绍了ADC芯片前端输入使用放大器和变压器各自的优势。 1、放大器和变压器根本区别 放大器是有源器件&am…...
详讲常见的字符函数
👦个人主页:Weraphael ✍🏻作者简介:目前是C语言学习者 ✈️专栏:C语言航路 🐋 希望大家多多支持,咱一起进步!😁 如果文章对你有帮助的话 欢迎 评论💬 点赞&a…...
for循环中异步请求问题:循环里面使用异步函数,如何等所有的异步函数都执行完再进行下一步
场景是这样的: 在一个列表循环里,对数据进行赋值,调用接口,循环外后面的代码需等待所有请求执行完成后再去执行。 1. Promise.all实现 Promise.all() 方法接收一个 promise 的 iterable 类型(注:Array&am…...
【iOS-系统框架】
文章目录前言47.熟悉系统框架CoreFoundation框架其他框架要点48. 多用块枚举,少用for循环for循环NSEnumerator遍历快速遍历基于块的遍历方式要点49.对自定义其内存管理语义的collection使用无缝桥接要点50.构建缓存时选用NSCache而非NSDictionaryNSCacheNSCache实例…...
Android APK 签名打包原理分析(二)【Android签名原理】
说到签名,从这个词来理解,正常个人需要签名的时候,一般是用来证明这是某个人的特属认证。 大家是否有印象?还记得我们之前在学习、总结网络相关知识的时候,说到过,客户端和服务端虽然通信数据上,可以采用对称加密和非对称加密组合去进行数据的加密,但是这时还有一个问题…...
linux判断文件不存在退出jenkins编译流程
# linux判断文件不存在退出jenkins编译流程 file"${WORKSPACE}/mc/jenkins_arm64.sh" if [ ! -f "$file" ]; then echo "jenkins_arm64.sh not exist" exit 0 fi dir(charge){checkout([$class: GitSCM, branches: [[name: …...
shell脚本(语法)
一、什么是shell脚本 1.1、shell 的两层含义:既是一种应用程序,又是一种程序设计语言 1.1.1、shell是一种应用程序 交互式地解释、执行用户输入的命令,将用户的操作翻译成机器可以识别的语言,完成相应功能称之为 shell 命令解析器。 shell 是…...
java高频面试题(2023最新)
目录一.java基础1.八大基础类型2.java三大特性3.重载和重写的区别4.pubilc、protected、(dafault)不写、private修饰符的作用范围5.和equals的区别6.hashcode()值相同,equals就一定为true7.short s 1;s s 1;(程序1)和 short s 1ÿ…...
视觉感知(二):车位线检测
1. 简介 本期为大家带来车位线检测相关知识点,以及算法工程落地的全流程演示。车位线检测是自动泊车领域必不可缺的一环,顾名思义就是采用环视鱼眼相机对路面上的车位线进行检测,从而识别出车位进行泊车。 较为常规的做法是使用四颗鱼眼相机环视拼接然后在鸟瞰图上做停车位…...
2023.2.10学习记录Docker容器
Docker 必须跑在Linux内核上 镜像是一个轻量级可执行的独立软件包 新建一个docker容器只需要几秒钟 Docker常用命令 启动类命令 镜像命令 容器命令 docker images docker search --limit 5 redis docker pull redis:6.0.8 docker system df 查看镜像/容器/…...
扩散模型diffusion model用于图像恢复任务详细原理 (去雨,去雾等皆可),附实现代码
文章目录1. 去噪扩散概率模型2. 前向扩散3. 反向采样3. 图像条件扩散模型4. 可以考虑改进的点5. 实现代码1. 去噪扩散概率模型 扩散模型是一类生成模型, 和生成对抗网络GAN 、变分自动编码器VAE和标准化流模型NFM等生成网络不同的是, 扩散模型在前向扩散过程中对图像逐步施加噪…...
pytorch
PyTorch基础 import torch torch.__version__ #return 1.13.1cu116基本使用方法 矩阵 x torch.empty(5, 3)tensor([[1.4586e-19, 1.1578e27, 2.0780e-07],[6.0542e22, 7.8675e34, 4.6894e27],[1.6217e-19, 1.4333e-19, 2.7530e12],[7.5338e28, 8.1173e-10, 4.3861e-43],[2.…...
软件测试—对职业生涯发展的一些感想
目录:导读 职场生涯 1、短期规划 2、长期规划 自身定位 1、你在哪儿? 2、你想要什么? 3、你拥有什么? 4、你需要做什么?什么时候做? 5、淡定啊淡定 最近工作不是很忙,有空都是在看书&a…...
5年经验之谈:月薪3000到30000,测试工程师的变“行”记!
自我介绍下,我是一名转IT测试人,我的专业是化学,去化工厂实习才发现这专业的坑人之处,化学试剂害人不浅,有毒,易燃易爆,实验室经常用丙酮,甲醇,四氯化碳,接触…...
全价值链赋能,数字化助力营销价值全力释放 | 爱分析报告
报告编委 张扬 爱分析联合创始人&首席分析师 文鸿伟 爱分析高级分析师 王鹏 爱分析分析师 外部专家(按姓氏拼音排序) 黄洵 客易达 联合创始人 毛健 云徙科技 副总裁 & COO 特别鸣谢(按拼音排序) 报告摘要 在…...
【自学Docker 】Docker search命令
大纲 Docker search命令 docker search命令教程 docker search 命令用于从 Docker Hub 查找镜像。 docker search命令语法 haicoder(www.haicoder.net)# docker search [OPTIONS] TERMdocker search命令参数 参数描述docker search --filter设置过滤条件。docker search -…...
银行零售如何更贴近客户?是时候升级你的客户旅程平台了
随着数字化战略推进,各大银行持续加大对线上多渠道的建设投入,客户触达也愈发移动化、智能化。与此同时,手机银行飞速发展产生并累积了大量客户行为数据,呈多样化、海量化等特点,将在用户体验、客户经营、手机银行运营…...
零入门kubernetes网络实战-12->基于DNAT技术使得外网可以访问本宿主机上veth-pair链接的内部网络
视频地址(稍后上传) 本篇文章测试如何让veth pair链接的内网网络可以被本局域网的其他宿主机访问到? 1、测试环境介绍 一台centos虚拟机 # 查看操作系统版本 cat /etc/centos-release # 内核版本 uname -a uname -r # 查看网卡信息 ip a s eth02、网络拓扑 3、操…...
conda环境管理命令
conda环境管理命令 1.环境检查 1)查看安装了哪些包 conda list 2)查看当前存在哪些虚拟环境 conda env list conda info -e [rootoracledb anaconda3]# conda info -e # conda environments: # base * /home/anaconda33)检查更新当前conda con…...
ubuntu clion从0开始搭建一个风格转换ONNX推理网络 opencv cuda::dnn::net
系统搭建 系统搭建 OpenCV的安装 cmake sudo apt-get install cmake其他环境以来 sudo apt-get install build-essential libgtk2.0-dev libavcodec-dev libavformat-dev libjpeg.dev libtiff5.dev libswscale-dev libjasper-dev 不安装会报这个错误 OpenCV(4.6.0) /hom…...
1.十大排序算法
1.什么是排序算法? 在梳理十大排序算法之前,虽然知道排序算法是将数字或字母按增序排列的算法,但该理解过于片面,那排序算法的权威定义是什么呢。 一个排序算法(英语:Sorting algorithm)是一种…...
算法导论—SAT、NP、NPC、NP-Hard问题
算法导论—SAT、NP、NP-Hard、NPC问题SAT 问题基本定义问题复杂性P、NP、NP-Hard、NP-Complete(NPC)证明NP-Hard关系图NP问题的概念约化的定义NPC问题NP-Hard问题SAT 问题基本定义 SAT 问题 (Boolean satisfiability problem, 布尔可满足性问题,SAT): 给…...
linux入门---基础指令(上)
这里写目录标题前言ls指令pwd指令cd指令touch指令mkdirrmdirrmman指令cp指令mv指令前言 我们平时使用电脑主要是通过鼠标键盘以及操作系统中自带的图形来对电脑执行相应的命令,比如说我想打开D盘中的cctalk这个文件: 我就可以先用鼠标左键单击这个文件…...
大数据Kylin(一):基础概念和Kylin简介
文章目录 基础概念和Kylin简介 一、OLTP与OLAP 1、OLTP 2、OLAP 3、OLTP与OLAP的关系 二、数据分析模型 1、星型模型 2、雪花模型 …...
推进行业生态发展完善,中国信通院第八批RPA评测工作正式启动
随着人工智能、云计算、大数据等新兴数字技术的高速发展,数字劳动力应用实践步伐加快,以数字生产力、数字创造力为基础的数字经济占比逐年上升。近年来,机器人流程自动化(Robotic Process Automation,RPA)成…...
DOM编程-获取下拉列表选中项的value
<!DOCTYPE html> <html> <head> <meta charset"utf-8"> <title>获取下拉列表选中项的value</title> </head> <body> <script type"text/javascript"> …...
乐清做网站公司哪家好/一点优化
这几天重读了一遍,倒没什么特别意图,纯粹是放松自己,也顺便记录在案。书由英国作家蕾秋乔伊斯著,黄妙瑜译,译本读起来还是比较舒服的,只不过文中出现两次关于“北京”的词语,感觉挺突兀的&#…...
可以做3d电影网站/百度优化教程
git push git push如果直接使用,不加repository和refspec,那么首先根据当前branch的branch name,在配置文件中找到branch.branchName.remote(没有就是origin),然后push 所有的local-tracking branch(即有对应的remote-tracking br…...
网站建设申请方案/品牌广告语
消息队列排队过程中的消息。这第一条消息将首先被处理。但假设消息本身指定要处理的时间。我们必须等待,直到时间的消息处理能力。新闻MessageQueue正在使用Message类的表示,队列中的邮件保存结构清单,Message内部对象包括:next变…...
wordpress搬入域名/网站外链工具
jQuery效果 1.基本效果 (1)show([speed,[easing],[fn]]) 显示隐藏的匹配元素。 这个就是 ‘show( speed, [callback] )’ 无动画的版本。如果选择的元素是可见的,这个方法将不会改变任何东西。无论这个元素是通过hide()方法隐藏的还是在CSS里…...
如何判断网站被google k/今日头条网页版入口
//总组件!!!!!!!,这里引入其他组件的 /*组件名应该和文件名一样 */ const App()>{const onClickHandler(event)>{event.preventDefault();//取消默认行为event.stopPropagatio…...
动态ip代理/谷歌seo公司
在ENVI统计遥感多波段图像中每个波段的均值、方差、最大值、最小值是比较容易办到的,但是如果要处理多批的数据就没有那么方便了,这里转载一个MatLab读取ENVI图像(imghdr)的程序,并且计算了相关系数。之前我在利用MatLab读取ENVI图像里分享了…...