Flink四大基石之Time (时间语义) 的使用详解
目录
一、引言
二、Time 的分类及 EventTime 的重要性
Time 分类详述
EventTime 重要性凸显
三、Watermark 机制详解
核心原理
Watermark能解决什么问题,如何解决的?
Watermark图解原理
举例
总结
多并行度的水印触发
Watermark代码演示
需求
代码演示:
需求升级
代码演示
四、Flink对于迟到数据的处理
对于延迟数据的理解:
延迟数据的处理
1)allowedLateness
2) 侧输出-SideOutput
处理主要使用两个方式:
sideOutputLateData方法
DataStream.getSideOutput方法
代码演示
五、总结展望
本文聚焦于 Flink 中 Time 的分类(EventTime、IngestionTime、ProcessingTime),着重阐述 EventTime 的重要性,以及为应对基于 EventTime 处理流数据时因网络等因素导致的数据乱序、延迟问题所引入的 Watermark 机制。详细剖析 Watermark 的原理、功能、相关代码实现及与延迟数据处理机制(allowedLateness、侧输出 SideOutput)的协同运作,助力读者深入掌握 Flink 实时数据处理的核心要点。
一、引言
在 Flink 实时数据处理场景下,准确把握时间语义以及高效处理数据乱序、延迟问题至关重要。不同的时间语义各有侧重,而 EventTime 凭借能反映事件本质的特性成为关键,但也因其对数据顺序的 “严苛要求” 催生了 Watermark 等配套机制来保障处理的准确性与完整性。
二、Time 的分类及 EventTime 的重要性
Time 分类详述
EventTime:作为事件(数据)真正发生、产生之时的时间戳记录,它锚定了事件的源头时间,不受数据后续流转环节的干扰,比如用户在电商平台下单操作瞬间对应的时间,即便后续网络波动致数据传输受阻,该时间依旧锁定下单那一刻。
IngestionTime:侧重标记数据抵达流处理系统的时间节点,当数据跨越系统边界进入 Flink 体系时,此刻的系统时钟时间即为 IngestionTime,反映数据流入的 “入门时刻”。
ProcessingTime:聚焦数据在流处理系统内被具体处理、计算的当下系统时间,同一批数据在不同负载时段处理,ProcessingTime 会因处理时刻差异而不同。
EventTime 重要性凸显
假设,你正在去往地下停车场的路上,并且打算用手机点一份外卖。选好了外卖后,你就用在线支付功能付款了,这个时候是11点59分(EventTime)。恰好这时,你走进了地下停车库,而这里并没有手机信号。因此外卖的在线支付并没有立刻成功,而支付系统一直在Retry重试“支付”这个操作。
当你找到自己的车并且开出地下停车场的时候,已经是12点01分了(ProcessingTime)。这个时候手机重新有了信号,手机上的支付数据成功发到了外卖在线支付系统,支付完成。在上面这个场景中你可以看到,
支付数据的事件时间是11点59分,而支付数据的处理时间是12点01分问题:
如果要统计12之前的订单金额,那么这笔交易是否应被统计?
答案:
应该被统计,因为该数据的真真正正的产生时间为11点59分,即该数据的事件时间为11点59分,
事件时间能够真正反映/代表事件的本质! 所以一般在实际开发中会以事件时间作为计算标准。还可以通过钉钉打卡、饭卡机 等 举例子。
一条错误日志的内容为:
2020-11-11 23:59:58 error NullPointExcep --事件时间
进入Flink的时间为2020-11-11 23:59:59 --摄入时间
到达Window的时间为2020-11-12 00:00:01 --处理时间
问题:
对于业务来说,要统计每天的的故障日志个数,哪个时间是最有意义的?
答案:
EventTime事件时间,因为bug真真正正产生的时间就是事件时间,只有事件时间才能真正反映/代表事件的本质!某 App 会记录用户的所有点击行为,并回传日志(在网络不好的情况下,先保存在本地,延后回传)。
A用户在 11:01:00 对 App 进行操作,B用户在 11:02:00 操作了 App,
但是A用户的网络不太稳定,回传日志延迟了,导致我们在服务端先接受到B用户的消息,然后再接受到A用户的消息,消息乱序了。
问题:
如果这个是一个根据用户操作先后顺序,进行抢购的业务,那么是A用户成功还是B用户成功?
答案:
应该算A成功,因为A确实比B操作的早,但是实际中考虑到实现难度,可能直接按B成功算
也就是说,实际开发中希望基于事件时间来处理数据,但因为数据可能因为网络延迟等原因,出现了乱序,按照事件时间处理起来有难度!在实际环境中,经常会出现,因为网络原因,数据有可能会延迟一会才到达Flink实时处理系统。我们先来设想一下下面这个场景:
原本应该被该窗口计算的数据因为网络延迟等原因晚到了,就有可能丢失了总结:
1.事件时间确实重要, 因为它能够代表事件/数据的本质,是事件/数据真真正正发生/产生的时间
2.按照事件时间进去处理/计算,会存在一定的难度, 因为数据可能会因为网路延迟等原因, 发生乱序或延迟到达, 那么最后的计算结果就有可能错误或数据丢失
3.需要有技术来解决上面的问题,使用Watermark技术来解决!
三、Watermark 机制详解
当 Flink 依 EventTime 处理流数据,网络波动、分布式架构下节点通信延迟等因素,常使数据到达顺序背离事件发生顺序,陷入乱序困境。为化解此难题,Watermark 机制应运而生,犹如给无序数据流转引入 “秩序规则”。
核心原理
Watermark 本质是延迟触发机制,通过公式 “Watermark = 当前最大的事件时间 - 最大允许的延迟时间(或最大允许的乱序度时间)” 动态生成。例如设定出行集合场景,09:00 集合且允许迟到 10 分钟,人员陆续到达时间对应不同 Watermark 值,能否上车依 Watermark 与集合时间对比判定,将时间规则量化应用到数据处理上,为数据窗口触发时机把控筑牢基础。
Watermark能解决什么问题,如何解决的?
有了Watermark 就可以在一定程度上解决数据乱序或延迟达到问题!
不添加watermark ,窗口如何触发:
1)窗口有数据
2)窗口的结束时间到了。
有了Watermark就可以根据Watermark来决定窗口的触发时机,满足下面的条件才触发:
1.窗口有数据
2.Watermark >= 窗口的结束时间
满足以上条件则触发窗口计算!
以前窗口触发:系统时间到了窗口结束时间就触发
现在窗口触发:Watermark >= 窗口的结束时间
而Watermark = 当前最大的事件时间 - 最大允许的延迟时间(或最大允许的乱序度时间)
就意味着, 通过Watermark改变了窗口的触发时机了, 那么接下来我们看如何改变的/如何解决前面的问题的
需要记住:
Watermark = 当前最大的事件时间 - 最大允许的延迟时间(或最大允许的乱序度时间)
窗口触发时机 : Watermark >= 窗口的结束时间
水印(watermark)就是一个时间戳,Flink可以给数据流添加水印,可以理解为:收到一条消息后,额外给这个消息添加了一个时间字段,这就是添加水印,一般人为添加的消息的水印都会比当前消息的事件时间小一些。
窗口是否关闭,按照水印时间来判断,但原有事件时间不会被修改,窗口的边界依旧是事件时间来决定。
- 水印并不会影响原有Eventtime
- 当数据流添加水印后,会按照水印时间来触发窗口计算
- 一般会设置水印时间,比Eventtime小一些(一般几秒钟)
- 当接收到的水印时间>= 窗口的endTime且窗口内有数据,则触发计算
水印(水印时间)的计算:事件时间– 设置的水印长度 = 水印时间
比如,事件时间是10分30秒, 水印长度是2秒,那么水印时间就是10分28秒
Watermark图解原理
举例
窗口5秒,延迟3秒,按照事件时间计算
数据事件时间3, 落入窗口0-5.水印时间0
来一条数据事件时间7, 落入窗口5-10,水印时间4
来一条数据事件时间4,落入窗口0-5,水印时间 4 (因为此时的数据中最大的事件时间是 7,延迟 3 秒,按照公式 7-3=4)
来一条数据事件时间8,落入窗口5-10,水印时间5
这一条数据水印时间大于等于窗口0-5的窗口结束时间。
满足了对窗口0-5的提交,这个窗口关闭,并触发数据计算
可以看出,第三条数据,其是延迟数据,它的事件时间是4,却来的比事件时间为7的数据还要晚。
但是因为水印的机制,这个数据未错过它的窗口,依旧成功进入属于它的窗口并且被计算
这就是水印的功能:在不影响按照事件时间判断数据属于哪个窗口的前提下,延迟某个窗口的关闭时间,让其等待一会儿延迟数据。
总结
Watermark 是一个单独计算出来的时间戳
Watermark = 当前最大的事件时间 - 最大允许的延迟时间(乱序度)
Watermark可以通过改变窗口的触发时机 在 一定程度上解决数据乱序或延迟达到的问题
Watermark >= 窗口结束时间 时 就会触发窗口计算(窗口中得有数据)
延迟或乱序严重的数据还是丢失, 但是可以通过调大 最大允许的延迟时间(乱序度) 来解决, 或 使用后面要学习的侧道输出流来单独收集延迟或乱序严重的数据,保证数据不丢失!
多并行度的水印触发
在多并行度下,每个并行有一个水印
比如并行度是6,那么程序中就有6个watermark
分别属于这6个并行度(线程)
那么,触发条件以6个水印中最小的那个为准
比如, 有个窗口是0-5
其中5个并行度的水印都超过了5
但有一个并行度的水印是3
那么,不管另外5个并行度中的水印达到了多大,都不会触发
因为6个并行度中的6个水印,最小的是3,不满足大于等于窗口结束5的条件
在测试水印的时候,记得把并行度设置为1 ,好看结果,否则,结果不太容易看出来。
Watermark代码演示
需求
实时模拟生成订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
要求每隔5s,计算5秒内,每个用户的订单总金额
并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。
不使用水印的时候【不能使用eventtime时间语义】,进行开发:
package com.bigdata.day05;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.time.Duration;
import java.util.Random;
import java.util.UUID;/**** 实时模拟生成订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)* 要求每隔5s,计算5秒内,每个用户的订单总金额* 并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。**/public class _01WatermarkDemo {@Data // set get toString@AllArgsConstructor@NoArgsConstructorpublic static class OrderInfo2{private String orderId;private int uid;private int money;private long timeStamp;}public static class MySource implements SourceFunction<OrderInfo2> {boolean flag = true;@Overridepublic void run(SourceContext ctx) throws Exception {// 源源不断的产生数据Random random = new Random();while(flag){OrderInfo2 orderInfo = new OrderInfo2();orderInfo.setOrderId(UUID.randomUUID().toString());orderInfo.setUid(random.nextInt(3));orderInfo.setMoney(random.nextInt(101));orderInfo.setTimeStamp(System.currentTimeMillis());ctx.collect(orderInfo);Thread.sleep(1000);// 间隔1s}}// source 停止之前需要干点啥@Overridepublic void cancel() {flag = false;}}public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<OrderInfo2> orderSourceStream = env.addSource(new MySource());//3. transformation-数据处理转换// 每个用户的订单总额KeyedStream<OrderInfo2, Integer> keyedStream = orderDSWithWatermark.keyBy(orderInfo2 -> orderInfo2.getUid());SingleOutputStreamOperator<OrderInfo2> result1 = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("money");result1.print();//4. sink-数据输出//5. execute-执行env.execute();}
}
假如出现如下错误:
Exception in thread "main" org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: Cannot reference field by field expression on GenericType<com.bigdata.day05.OrderInfo2>Field expressions are only supported on POJO types, tuples, and case classes. (See the Flink documentation on what is considered a POJO.)at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:224)at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:53)at org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:688)at com.bigdata.day05._01WatermarkDemo.main(_01WatermarkDemo.java:103)
说明这个pojo 必须是public 的,否则不解析。
代码演示:
package com.bigdata.time;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.Date;
import java.util.Random;
import java.util.UUID;@Data // set get toString
@AllArgsConstructor
@NoArgsConstructor
class OrderInfo{private String orderId;private int uid;private int money;private long timeStamp;
}class MySource extends RichSourceFunction<OrderInfo> {boolean flag = true;@Overridepublic void run(SourceContext<OrderInfo> ctx) throws Exception {while(flag){OrderInfo orderInfo = new OrderInfo();Random random = new Random();int userId = random.nextInt(10);int money = random.nextInt(100);long timeStamp = System.currentTimeMillis() - random.nextInt(3000);orderInfo.setOrderId(UUID.randomUUID().toString());orderInfo.setUid(userId);orderInfo.setMoney(money);orderInfo.setTimeStamp(timeStamp);ctx.collect(orderInfo);Thread.sleep(20);}}@Overridepublic void cancel() {flag = false;}
}
public class _01_OrderDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);//2. source-加载数据DataStreamSource<OrderInfo> streamSource = env.addSource(new MySource());//3. transformation-数据处理转换streamSource.keyBy(new KeySelector<OrderInfo, Integer>() {@Overridepublic Integer getKey(OrderInfo orderInfo) throws Exception {return orderInfo.getUid();}}).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))//.sum("money").print();.apply(new WindowFunction<OrderInfo, String, Integer, TimeWindow>() {@Overridepublic void apply(Integer userId, TimeWindow window, Iterable<OrderInfo> input, Collector<String> out) throws Exception {String beginTime = DateFormatUtils.format(window.getStart(), "yyyy-MM-dd HH:mm:ss");String endTime = DateFormatUtils.format(window.getEnd(), "yyyy-MM-dd HH:mm:ss");int sum = 0;for (OrderInfo orderInfo : input) {sum += orderInfo.getMoney();}out.collect(beginTime+","+endTime+","+userId +","+sum);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}
需求升级
实时模拟生成订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
* 要求每隔5s,计算5秒内,每个用户的订单总金额
* 并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。
假如你添加了 eventTime 缺没有添加水印的代码,会报如下错误:
Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:83)at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:302)
代码演示
生成 Watermark | Apache Flink
package com.bigdata.time;import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;public class _02_OrderDemoWithWaterMark {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);//2. source-加载数据DataStreamSource<OrderInfo> streamSource = env.addSource(new MySource());//3. transformation-数据处理转换streamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<OrderInfo>() {// long 是时间戳吗?是秒值还是毫秒呢?年月日时分秒的的字段怎么办呢?@Overridepublic long extractTimestamp(OrderInfo orderInfo, long recordTimestamp) {// 这个方法的返回值是毫秒,所有的数据只要不是这个毫秒值,都需要转换为毫秒return orderInfo.getTimeStamp();}})).keyBy(new KeySelector<OrderInfo, Integer>() {@Overridepublic Integer getKey(OrderInfo orderInfo) throws Exception {return orderInfo.getUid();}}).window(TumblingEventTimeWindows.of(Time.seconds(5)))//.sum("money").print();.apply(new WindowFunction<OrderInfo, String, Integer, TimeWindow>() {@Overridepublic void apply(Integer userId, TimeWindow window, Iterable<OrderInfo> input, Collector<String> out) throws Exception {String beginTime = DateFormatUtils.format(window.getStart(), "yyyy-MM-dd HH:mm:ss");String endTime = DateFormatUtils.format(window.getEnd(), "yyyy-MM-dd HH:mm:ss");int sum = 0;for (OrderInfo orderInfo : input) {sum += orderInfo.getMoney();}out.collect(beginTime+","+endTime+","+userId +","+sum);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}
通过静态方法forBoundedOutOfOrderness提供,入参接收一个Duration类型的时间间隔,也就是我们可以接受的最大的延迟时间.使用这种延迟策略的时候需要我们对数据的延迟时间有一个大概的预估判断。
WatermarkStrategy#forBoundedOutOfOrderness(Duration maxOutOfOrderness)
我们实现一个延迟3秒的固定延迟水印,可以这样做:
DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)));
四、Flink对于迟到数据的处理
水印:对于迟到数据不长
allowedLateness: 迟到时间很长
侧道输出:对于迟到时间特别长
对于延迟数据的理解:
水印机制(水位线、watermark)机制可以帮助我们在短期延迟下,允许乱序数据的到来。
这个机制很好的处理了那些因为网络等情况短期延迟的数据,让窗口等它们一会儿。
但是水印机制无法长期的等待下去,因为水印机制简单说就是让窗口一直等在那里,等达到水印时间才会触发计算和关闭窗口
这个等待不能一直等,因为会一直缓着数据不计算。
一般水印也就是几秒钟最多几分钟而已(看业务)
那么,在现实世界中,延迟数据除了有短期延迟外,长期延迟也是很常见的。
比如:
l 客户端断网,等了好几个小时才恢复
l 车联网系统进入隧道后没有信号无法上报数据
l 手机欠费没有网
等等,这些场景下数据的迟到就不是简单的网络堵塞造成的几秒延迟了
而是小时、天级别的延迟
对于水印来说,这样的长期延迟数据是无法很好处理的。
那么有没有什么办法去处理这些长期延迟的数据呢?让其可以找到其所属的窗口正常完成计算,哪怕晚了几个小时。
这个场景的解决方式就是:延迟数据处理机制(allowedLateness方法)。
水印:乱序数据处理(时间很短的延迟)
延迟处理:长期延迟数据的处理机制
- allowedLateness 机制剖析
Watermark 应对短期延迟后,对于长期延迟数据(如客户端断网数小时后恢复传输数据),allowedLateness 机制接力。对窗口对象设置 “allowedLateness (Time.seconds (4))”(以允许 4 秒长期延迟为例),区别于 Watermark 触发计算且关闭窗口,它仅触发计算、暂不关闭,在允许时长内后续同窗口数据入流可再次计算整合,灵活适配长延迟业务场景,延展窗口 “有效期”。 - SideOutput 侧输出机制运用
即便有 Watermark 和 allowedLateness 双重保障,仍有极度延迟数据 “漏网”。SideOutput 则像 “兜底方案”,借助 “sideOutputLateData (outputTag)” 将错过双重机制的数据收集至特定 DataStream,后续利用 “getSideOutput (tag)” 取出单独处理,赋予开发者自定义逻辑应对极端情况权限,如将严重迟到订单数据单独分析、告警,完善数据处理全流程可靠性。
延迟数据的处理
waterMark和Window机制解决了流式数据的乱序问题,对于因为延迟而顺序有误的数据,可以根据eventTime进行业务处理,对于延迟的数据Flink也有自己的解决办法,
主要的办法是给定一个允许延迟的时间,在该时间范围内仍可以接受处理延迟数据
设置允许延迟的时间是通过allowedLateness(lateness: Time)设置
保存延迟数据则是通过sideOutputLateData(outputTag: OutputTag[T])保存
获取延迟数据是通过DataStream.getSideOutput(tag: OutputTag[X])获取
1)allowedLateness
当我们对流设置窗口后得到的WindowedSteam对象就可以使用allowedLateness方法
该方法传入一个Time值,设置允许的长期延迟(迟到)的时间。
和watermark不同。
未设置allowedLateness(为0),当watermark满足条件,会触发窗口的 执行 + 关闭
当设置了allowedLateness,当watermark满足条件后,只会触发窗口的执行,不会触发窗口关闭
也就是,watermark满足条件后会正常触发窗口计算,将已有的数据完成计算。
但是,不会关闭窗口。如果在allowedLateness允许的时间内仍有这个窗口的数据进来,那么每进来一条,会和已经计算过的(被watermark触发的)数据一起在计算一次。
水印:短期延迟,达到条件后触发计算并且关闭窗口(触发+关闭同时进行)
水印+allowedLateness : 短期延迟+ 等待长期延迟效果, 达到水印条件后,会触发窗口计算,但是不关闭窗口。事件时间延迟达到水印+allowedLateness之和后会关闭窗口。
2) 侧输出-SideOutput
Flink 通过watermark在短时间内允许了乱序到来的数据
通过延迟数据处理机制,可以处理长期迟到的数据。
但总有那么些数据来的晚的太久了。允许迟到1天的设置,它迟到了2天才来。
对于这样的迟到数据,水印无能为力,设置allowedLateness也无能为力,那对于这样的数据Flink就只能任其丢掉了吗?
不会,Flink的两个迟到机制尽量确保了数据不会错过了属于他们的窗口,但是真的迟到太久了,Flink也有一个机制将这些数据收集起来
保存成为一个DataStream,然后,交由开发人员自行处理。
那么这个机制就叫做侧输出机制(Side Output)
侧输出机制:可以将错过水印又错过allowedLateness允许的时间的数据,单独的存放到一个DataStream中,然后开发人员可以自定逻辑对这些超级迟到数据进行处理。
处理主要使用两个方式:
对窗口对象调用sideOutputLateData(OutputTag outputTag)方法,将数据存储到一个地方
对DataStream对象调用getSideOutput(OutputTag outputTag)方法,取出这些被单独处理的数据的DataStream
注意,取到的是一个DataStream,这意味着你可以对这些超级迟到数据继续写 如keyBy, window等处理逻辑。
sideOutputLateData方法
使用方式:
先定义OutputTag对象(注意,必须new一个匿名内部类形式的OutputTag对象的实例)
然后调用sideOutputLateData方法
// side output OutputTag对象必须是匿名内部类的形式创建出来, 本质上得到的是OutputTag对象的一个匿名子类
OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("side output"){};
WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> sideOutputLateData =
allowedLateness.sideOutputLateData(outputTag);
DataStream.getSideOutput方法
用法:
DataStream<Tuple2<String, Long>> sideOutput = result.getSideOutput(outputTag);
// 对得到的保存超级迟到数据的DataStream进行处理
sideOutput.print("late>>>");
代码演示
前面的案例中已经可以使用Watermark 来解决一定程度上的数据延迟和数据乱序问题
但是对于延迟/迟到/乱序严重的数据还是会丢失,所以接下来
使用Watermark + AllowedLateness + SideOutput ,即使用侧道输出机制来单独收集延迟/迟到/乱序严重的数据,避免数据丢失!
package com.bigdata.day05;import com.bigdata.day04.OrderInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;
import java.util.Random;
import java.util.UUID;class MyOrderSource2 implements SourceFunction<OrderInfo> {@Overridepublic void run(SourceContext<OrderInfo> ctx) throws Exception {Random random = new Random();while(true){OrderInfo orderInfo = new OrderInfo();orderInfo.setOrderId(UUID.randomUUID().toString().replace("-",""));// 在这个地方可以模拟迟到数据long orderTime = System.currentTimeMillis() - 1000*random.nextInt(100);orderInfo.setOrdertime(orderTime);int money = random.nextInt(10);System.out.println("订单产生的时间:"+ DateFormatUtils.format(orderTime,"yyyy-MM-dd HH:mm:ss")+",金额:"+money);orderInfo.setMoney(money);orderInfo.setUserId(random.nextInt(2));ctx.collect(orderInfo);Thread.sleep(500);}}@Overridepublic void cancel() {}
}
public class Demo01 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// 每隔五秒统计每个用户的前面5秒的订单的总金额//2. source-加载数据DataStreamSource<OrderInfo> streamSource = env.addSource(new MyOrderSource2());//-2.告诉Flink最大允许的延迟时间/乱序时间为多少SingleOutputStreamOperator<OrderInfo> orderDSWithWatermark = streamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))//-3.告诉Flink哪一列是事件时间.withTimestampAssigner((order, time) -> order.getOrdertime()));OutputTag<OrderInfo> outputTag = new OutputTag<OrderInfo>("side output"){};//3. transformation-数据处理转换SingleOutputStreamOperator<String> result = orderDSWithWatermark.keyBy(orderInfo -> orderInfo.getUserId()).window(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(4)).sideOutputLateData(outputTag).apply(new WindowFunction<OrderInfo, String, Integer, TimeWindow>() {@Overridepublic void apply(Integer key, // 代表分组key值 五旬老太守国门TimeWindow window, // 代表窗口对象Iterable<OrderInfo> input, // 分组过之后的数据 [1,1,1,1,1]Collector<String> out // 用于输出的对象) throws Exception {SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");long start = window.getStart();long end = window.getEnd();int sum = 0;// 专门存放迟到的订单时间for (OrderInfo orderInfo : input) {sum += orderInfo.getMoney();}out.collect(key + ",窗口开始:" + dateFormat.format(new Date(start)) + ",结束时间:" + dateFormat.format(new Date(end)) + "," + sum);//out.collect(key+",窗口开始:"+start+",结束时间:"+end+","+sum);}});result.print("流中的数据,包含迟到的数据:");result.getSideOutput(outputTag).print("严重迟到的数据:");//4. sink-数据输出//5. execute-执行env.execute();}
}
运行结果:
订单产生的时间:2024-05-16 11:19:00,金额:1
订单产生的时间:2024-05-16 11:18:13,金额:3
严重迟到的数据:> f96d34c438ce400eb21a25328fe772ee,1,3,2024-05-16 11:18:13
订单产生的时间:2024-05-16 11:19:10,金额:3
2024-05-16 11:19:00
流中的数据,包含迟到的数据:> 1,窗口开始:2024-05-16 11:19:00,结束时间:2024-05-16 11:19:05,1,迟到的订单时间:
订单产生的时间:2024-05-16 11:18:19,金额:8
严重迟到的数据:> cf85339600c647c99856841021466c5d,1,8,2024-05-16 11:18:19
订单产生的时间:2024-05-16 11:18:19,金额:9
严重迟到的数据:> f087ee9c9eaa4e3f9eac06a907b47fb4,1,9,2024-05-16 11:18:19
订单产生的时间:2024-05-16 11:17:48,金额:3
严重迟到的数据:> 22f48bb693874a99b80177f6764f0912,0,3,2024-05-16 11:17:48
订单产生的时间:2024-05-16 11:19:23,金额:1
2024-05-16 11:19:10
流中的数据,包含迟到的数据:> 1,窗口开始:2024-05-16 11:19:10,结束时间:2024-05-16 11:19:15,3,迟到的订单时间:
订单产生的时间:2024-05-16 11:19:19,金额:7
2024-05-16 11:19:19
流中的数据,包含迟到的数据:> 1,窗口开始:2024-05-16 11:19:15,结束时间:2024-05-16 11:19:20,7,迟到的订单时间:
订单产生的时间:2024-05-16 11:18:25,金额:2
严重迟到的数据:> 9b91cfd413784e4da9fb7f16b68186d0,0,2,2024-05-16 11:18:25
订单产生的时间:2024-05-16 11:18:48,金额:3
严重迟到的数据:> 62bdd621dd4d43b2b9f401949c1c5686,1,3,2024-05-16 11:18:48
订单产生的时间:2024-05-16 11:18:38,金额:1
严重迟到的数据:> e6fe834c88d043ef94b66853ad67dc46,1,1,2024-05-16 11:18:38
订单产生的时间:2024-05-16 11:18:39,金额:5
严重迟到的数据:> 472b35fc32744c39990d13bd490ae131,1,5,2024-05-16 11:18:39
订单产生的时间:2024-05-16 11:18:19,金额:0
严重迟到的数据:> 49270e8cf00445f38a4fbcfebedfdc0a,0,0,2024-05-16 11:18:19
订单产生的时间:2024-05-16 11:19:07,金额:8
严重迟到的数据:> a337af0e6f1c46e48e2ed2ec99d6dc53,0,8,2024-05-16 11:19:07
订单产生的时间:2024-05-16 11:19:22,金额:8
订单产生的时间:2024-05-16 11:19:01,金额:3
严重迟到的数据:> 5bf845744c9d486f97fbd8106deb22bb,0,3,2024-05-16 11:19:01
订单产生的时间:2024-05-16 11:18:42,金额:2
严重迟到的数据:> f861897e49a54b589863efd6866ce61f,0,2,2024-05-16 11:18:42
订单产生的时间:2024-05-16 11:18:15,金额:7
严重迟到的数据:> 37a8d94dc8b94854963ddcbefa3d00dc,0,7,2024-05-16 11:18:15
订单产生的时间:2024-05-16 11:17:56,金额:6
严重迟到的数据:> ab81cf409c604bb398d43b15f59d7e53,1,6,2024-05-16 11:17:56
订单产生的时间:2024-05-16 11:18:24,金额:3
严重迟到的数据:> f85598988f4b43599e719b873d930440,1,3,2024-05-16 11:18:24
订单产生的时间:2024-05-16 11:17:52,金额:2
严重迟到的数据:> 03ade6f3972d441289bd745f979c961a,1,2,2024-05-16 11:17:52
订单产生的时间:2024-05-16 11:19:20,金额:9
订单产生的时间:2024-05-16 11:18:07,金额:1
严重迟到的数据:> d8b8992ea2b74a5fb10bffb198917c8f,0,1,2024-05-16 11:18:07
订单产生的时间:2024-05-16 11:18:34,金额:2
严重迟到的数据:> a8855d90701449588efc4dfd48df1dd3,0,2,2024-05-16 11:18:34
订单产生的时间:2024-05-16 11:19:11,金额:3
严重迟到的数据:> a7e245864b2c4c03b463b121277309eb,0,3,2024-05-16 11:19:11
订单产生的时间:2024-05-16 11:18:11,金额:9
严重迟到的数据:> cb99f1b5c1e24e5bb6ccacb9f0f9ea78,0,9,2024-05-16 11:18:11
订单产生的时间:2024-05-16 11:18:36,金额:7Process finished with exit code 130
虽然我们添加了延迟的效果,就是说侧道输出,侧道输出不能触发窗口的执行,窗口的执行只能通过水印时间触发 ,允许迟到的数据,不放入到当前窗口中,而是作为一个触发条件看到,它需要放入到它对应的窗口中。
只考虑 1 个并行度的问题
订单发生的真实事件:窗口5秒,间隔5秒,允许迟到 3秒 最晚允许迟到4秒
10:44:00 区间就应该是10:44:00 10:44:05
10:44:01
10:44:02
10:44:03
10:44:04
10:44:05
10:44:07 区间就应该是10:44:05 10:44:10
10:44:22 区间就应该是10:44:20 10:44:25
10:44:30
10:44:28
10:44:20
通过上面这个图可以知道,44:07没有办法触发00~05的执行,但是07不放入00~05区间,而是放入10:44:05 10:44:10
44:22 一个数据触发了两个区间的执行 00~05 05~10
假如有一个订单时44:10产生的,应该放入10~15这个区间
五、总结展望
Flink 中 Time 与 Watermark 体系紧密交织,EventTime 定 “基准”,Watermark 破 “乱序”,allowedLateness 延 “时效”,SideOutput 兜 “底线”,协同赋能实时流数据精准、稳健处理。未来随着业务场景愈发复杂、数据规模持续膨胀,对这套机制性能优化、智能自适应调整等方面探索将不断深入,持续筑牢大数据实时处理 “基石”,解锁更多高效数据利用价值。
相关文章:
Flink四大基石之Time (时间语义) 的使用详解
目录 一、引言 二、Time 的分类及 EventTime 的重要性 Time 分类详述 EventTime 重要性凸显 三、Watermark 机制详解 核心原理 Watermark能解决什么问题,如何解决的? Watermark图解原理 举例 总结 多并行度的水印触发 Watermark代码演示 需求 代码演示ÿ…...
Spring WebFlux与Spring MVC
Spring WebFlux 是对 Spring Boot 项目中传统 Spring MVC 部分的一种替代选择,主要是为了解决现代 Web 应用在高并发和低延迟场景下的性能瓶颈。 1.WebFlux 是对 Spring MVC 的替代 架构替代: Spring MVC 使用的是基于 Servlet 规范的阻塞式模型…...
【深度学习基础】一篇入门模型评估指标(分类篇)
🌈 个人主页:十二月的猫-CSDN博客 🔥 系列专栏: 🏀深度学习_十二月的猫的博客-CSDN博客 💪🏻 十二月的寒冬阻挡不了春天的脚步,十二点的黑夜遮蔽不住黎明的曙光 目录 1. 前言 2. 模…...
D80【 python 接口自动化学习】- python基础之HTTP
day80 requests请求加入headers 学习日期:20241126 学习目标:http定义及实战 -- requests请求加入headers 学习笔记: requests请求加入headers import requestsurlhttps://movie.douban.com/j/search_subjects params{"type":…...
⽂件操作详解
⽬录 一 文件操作的引入 1 为什么使⽤⽂件? 2 什么是⽂件? 3 文件分类(1 从⽂件功能的⻆度来分类:程序⽂件/数据⽂件 2根据数据的组织形式:为⽂本⽂件/⼆进制⽂件) 二 ⽂件的打开和关闭 1 …...
双高(高比例新能源发电和高比例电力电子设备)系统宽频振荡研究现状
1 为什么会形成双高电力系统 (1)新能源发电比例增加 双碳计划,新能源革命,可再生能源逐步代替传统化石能源,未来新能源发电将成为最终能源需求的主要来源。 (2)电力电子设备数量增加 为了实…...
TorchMoji使用教程/环境配置(2024)
TorchMoji使用教程/环境配置(2024) TorchMoji简介 这是一个基于pytorch库,用于将文本分类成不同的多种emoji表情的库,适用于文本的情感分析 配置流程 从Anaconda官网根据提示安装conda git拉取TorchMoji git clone https://gi…...
使用 Python 中的 TripoSR 根据图像创建 3D 对象
使用 Python 中的 TripoSR 根据图像创建 3D 对象 1. 效果图2. 步骤图像到 3D 对象设置环境导入必要的库设置设备创建计时器实用程序上传并准备图像处理输入图像生成 3D 模型并渲染下载.stl 文件展示结果3. 源码4. 遇到的问题及解决参考这篇博客将引导如何使用Python 及 TripoSR…...
Spring 框架中AOP(面向切面编程)和 IoC(控制反转)
在 Spring 框架中,AOP(面向切面编程)和 IoC(控制反转)是两个核心概念,它们分别负责不同的功能。下面我将通过通俗易懂的解释来帮助你理解这两个概念。 IoC(控制反转) IoC 是 Inver…...
电机瞬态分析基础(7):坐标变换(3)αβ0变换,dq0变换
1. 三相静止坐标系与两相静止坐标系的坐标变换―αβ0坐标变换 若上述x、y坐标系在空间静止不动,且x轴与A轴重合,即,如图1所示,则为两相静止坐标系,常称为坐标系,考虑到零轴分量,也称为αβ0坐标…...
Open3D (C++) 生成任意3D椭圆点云
目录 一、算法原理1、几何参数2、数学公式二、代码实现三、结果展示一、算法原理 1、几何参数 在三维空间中,椭圆由以下参数定义: 椭圆中心点 c = ( x 0 , y 0 , z...
5.利用Pandas以及Numpy进行数据清洗
1、缺失值处理 import pandas as pd import numpy as np#创建一张7行5列带有缺失值的表,表中的数据0-100随机生成,索引是python1. df pd.DataFrame(datanp.random.randint(0,100,size(7,5)), index [i for i in pythonl])df.iloc[2,3] Nonedf.iloc[4…...
@Bean注解详细介绍以及应用
目录 一、概念二、应用(一)代码示例1、首先创建一个简单的 Java 类User2、然后创建一个配置类AppConfig3、在其他组件中使用Bean创建的 bean4、通过 Spring 的ApplicationContext来获取UserService并调用其方法 (二)bean的方法名详…...
基于SpringBoot的预制菜销售系统
作者:计算机学姐 开发技术:SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等,“文末源码”。 专栏推荐:前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏:…...
从 EXCEL 小白到 EXCEL 高手的成长之路
在职场与日常生活中,Excel 作为一款强大的数据处理与分析工具,扮演着不可或缺的角色。无论是初学者还是资深职场人士,掌握 Excel 技能都能极大地提高工作效率。那么,从一个 Excel 小白蜕变成为 Excel 高手,究竟需要多久…...
【纸飞机串口调试工具】数值显示器及四则运算
目录 纸飞机串口工具介绍软件下载适用场合功能介绍 纸飞机串口工具介绍 纸飞机一款性能强劲且专业的串口/网络/HID调试助手,具有多窗口绘图、关键字高亮、数据分窗和数据过滤等众多功能,可以极大的方便嵌入式开发人员的调试过程。本文介绍数值显示器的四…...
浅谈volatile
volatile有三个特性: (1)可见性 (2)不保证原子性 (3)禁止指令重排 下面我们一一介绍 (一)可见性 volatile的可见性是说共享变量只要修改,就可以被其他线…...
Python3 爬虫 Scrapy的使用
安装完成Scrapy以后,可以使用Scrapy自带的命令来创建一个工程模板。 一、创建项目 使用Scrapy创建工程的命令为: scrapy startproject <工程名> 例如,创建一个抓取百度的Scrapy项目,可以将命令写为: scrapy s…...
多线程篇-4--重点概念1(volatile,Synchronized,内存屏障,MESI协议)
一、volatile (1)、简述 volatile是java提供的一个关键字,英文意思为不稳定的。 可以保障被声明对象的可见性和一定程度上的有序性,但不能保证操作的原子性。 当一个变量被声明为volatile时,意味着该变量的值会直接从…...
本地学习axios源码-如何在本地打印axios里面的信息
1. 下载axios到本地 git clone https://github.com/axios/axios.git 2. 下载react项目, 用vite按照提示命令配置一下vite react ts项目 npm create vite my-vue-app --template react 3. 下载koa, 搭建一个axios请求地址的服务端 a.初始化package.json mkdir koa-server…...
1、SpringBoo中Mybatis多数据源动态切换
我们以一个实例来详细说明一下如何在SpringBoot中动态切换MyBatis的数据源。 一、需求 1、用户可以界面新增数据源相关信息,提交后,保存到数据库 2、保存后的数据源需要动态生效,并且可以由用户动态切换选择使用哪个数据源 3、数据库保存了多个数据源的相关记录后,要求…...
【浏览器】缓存与存储
我是目录 浏览器缓存为什么需要浏览器缓存?对浏览器的缓存机制的理解协商缓存和强缓存的区别强缓存协商缓存 点击刷新按钮或者按 F5、按 CtrlF5 (强制刷新)、地址栏回车有什么区别? 浏览器本地存储前端储存的方式有哪些࿱…...
积鼎科技携手西北工业大学动力与能源学院共建复杂多相流仿真联合实验室
11月26日,复杂多相流仿真联合实验室揭牌仪式及技术研讨活动在西北工业大学动力与能源学院成功举办。复杂多相流仿真联合实验室是由西北工业大学动力与能源学院牵头,携手上海积鼎信息科技有限公司与三航铸剑(西安)科技发展有限公司…...
5. langgraph实现高级RAG (Adaptive RAG)
1. 数据准备 from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.document_loaders import WebBaseLoader from langchain_community.vectorstores import Chromaurls ["https://lilianweng.github.io/posts/2023-06-23-age…...
Postman设置接口关联,实现参数化
🍅 点击文末小卡片 ,免费获取软件测试全套资料,资料在手,涨薪更快 postman设置接口关联 在实际的接口测试中,后一个接口经常需要用到前一个接口返回的结果, 从而让后一个接口能正常执行,这…...
代码随想录day02--链表
移除链表元素 题目 地址:https://leetcode.cn/problems/remove-linked-list-elements/description/ 给你一个链表的头节点 head 和一个整数 val ,请你删除链表中所有满足 Node.val val 的节点,并返回 新的头节点 。 思路是使用虚拟节点的…...
杰发科技AC7803——不同晶振频率时钟的配置
计算公式 PLL_POSDIV [2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62] PLL_PREDIV_1 1 2 4 USE_XTAL 24M SYSCLK_FREQ 64M SYSCLK_DIVIDER 1 VCO USE_XTAL*…...
ArcGIS栅格影像裁剪工具
1、前言 在最近的栅格转矢量处理过程中,发现二值化栅格规模太大,3601*3601,并且其中的面元太过细碎,通过arcgis直接栅格转面有将近几十万的要素,拿这样的栅格数据直接运行代码,发现速度很慢还难以执行出来结…...
【查询目录】.NET开源 ORM 框架 SqlSugar 系列
.NET开源 ORM 框架 SqlSugar 系列 【开篇】.NET开源 ORM 框架 SqlSugar 系列【入门必看】.NET开源 ORM 框架 SqlSugar 系列【实体配置】.NET开源 ORM 框架 SqlSugar 系列【Db First】.NET开源 ORM 框架 SqlSugar 系列【Code First】.NET开源 ORM 框架 SqlSugar 系列【数据事务…...
docker快速安装zookeeper
一、拉取镜像 docker pull zookeeper:3.9.3 二、启动zookeeper docker run --restartalways -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime zookeeper:3.9.3 如果需要挂载zookeeper文件及目录,则参数增加: -v /mydata/zookeeper/d…...
wordpress 分类目录描述显示/腾讯效果推广
铜灵 发自 凹非寺量子位 出品 | 公众号 QbitAI今天,又有新的自动驾驶数据集开源了。数据集来自Lyft,官方称作是目前同类产品中最大的公开数据集。这份L5数据集内容丰富,加入了原始传感摄像头和激光雷达收集到的内容,内含55000个人…...
wordpress的数据库主机/佛山做seo推广公司
问题 描述 Description 学校实行学分制。每门的必修课都有固定的学分,同时还必须获得相应的选修课程学分。学校开设了N(N<300)门的选修课程,每个学生可选课程的数量M是给定的。学生选修了这M门课并考核通过就能获得相应的学分。…...
企业法治建设第一责任人述职报告/seo公司网站推广
http://blog.csdn.net/goldfighter/article/details/6150309转载于:https://www.cnblogs.com/xiaohuo/p/3751713.html...
开发软件网站建设/网络推广工具有哪些
介绍编写一个应用程序并行运行很困难,对吧?我的意思是,它一定很难,否则我们会看到各处的并行程序。我们所看到的都是平滑的并行应用程序,可以毫不费力地使用每个可用的核心。相反,多线程应用程序是例外而不…...
自己做的网站网页打开速度慢/现在感染症状有哪些
LT项目使用的EIP是运行在JETTY上,此文供开发和实施参考 1、windows下 win下部署多个jetty8很简单,首先将jetty8复制多个文件夹,其次按分配的端口号修改[JETTY_HOME]/etc/jetty.xml和jetty-proxy.xml。如该文件夹下的jetty分配8888端口 jetty.…...
什么网站可以做高数/五年级下册数学优化设计答案
基于注解方式的Bean管理 AOP 写法一:匿名内部类 写法二: AOP操作(AspectJ注解) AOP操作(AspectJ配置文件) JdbcTemplate(概念和准备)...