Flink的处理函数——processFunction
目录
一、处理函数概述
二、Process函数分类——8个
(1)ProcessFunction
(2)KeyedProcessFunction
(3)ProcessWindowFunction
(4)ProcessAllWindowFunction
(5)CoProcessFunction
(6)ProcessJoinFunction
(7)BroadcastProcessFunction
(8)KeyedBroadcastProcessFunction
三、KeyedProcessFunction案例
1.运行processElement方法中的事件时间
(1)输入数据
2.运行processElement方法中的处理时间
(1)先输入一条数据
(2)再快速输入两条数据
3.运行processElement方法中的水位线
(1)输入数据
4.总结:
四、应用案例——求TopN
(一)思路一:hashmap
(二)思路二:keyby
(三)思路三:使用侧输出流——推荐
一、处理函数概述
在Flink更底层,我们可以不定义任何具体的算子(比如map,filter,或者window),而只是提炼出一个统一的“处理”(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作“处理函数”(process function)。
二、Process函数分类——8个
Flink提供了8个不同的处理函数:
(1)ProcessFunction
最基本的处理函数,基于DataStream直接调用.process()时作为参数传入。
(2)KeyedProcessFunction
对流按键分区后的处理函数,基于KeyedStream调用.process()时作为参数传入。要想使用定时器,比如基于KeyedStream。
(3)ProcessWindowFunction
开窗之后的处理函数,也是全窗口函数的代表。基于WindowedStream调用.process()时作为参数传入。
(4)ProcessAllWindowFunction
同样是开窗之后的处理函数,基于AllWindowedStream调用.process()时作为参数传入。
(5)CoProcessFunction
合并(connect)两条流之后的处理函数,基于ConnectedStreams调用.process()时作为参数传入。关于流的连接合并操作,我们会在后续章节详细介绍。
(6)ProcessJoinFunction
间隔连接(interval join)两条流之后的处理函数,基于IntervalJoined调用.process()时作为参数传入。
(7)BroadcastProcessFunction
广播连接流处理函数,基于BroadcastConnectedStream调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未keyBy的普通DataStream与一个广播流(BroadcastStream)做连接(conncet)之后的产物。关于广播流的相关操作,我们会在后续章节详细介绍。
(8)KeyedBroadcastProcessFunction
按键分区的广播连接流处理函数,同样是基于BroadcastConnectedStream调用.process()时作为参数传入。与BroadcastProcessFunction不同的是,这时的广播连接流,是一个KeyedStream与广播流(BroadcastStream)做连接之后的产物。
三、KeyedProcessFunction案例
基于keyBy之后的KeyedStream,直接调用.process()方法,这时需要传入的参数就是KeyedProcessFunction的实现类。
stream.keyBy( t -> t.f0 ).process(new MyKeyedProcessFunction())
类似地,KeyedProcessFunction也是继承自AbstractRichFunction的一个抽象类,与ProcessFunction的定义几乎完全一样,区别只是在于类型参数多了一个K,这是当前按键分区的key的类型。同样地,我们必须实现一个.processElement()抽象方法,用来处理流中的每一个数据;另外还有一个非抽象方法.onTimer(),用来定义定时器触发时的回调操作。
代码示例:
import com.atguigu.bean.WaterSensor;
import com.atguigu.split.WaterSensorMapFunction;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node141", 9999).map(new WaterSensorMapFunction());WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 乱序// 提取watermark的时间戳.withTimestampAssigner((element, recordTimestamp) ->element.getTs() * 1000L);SingleOutputStreamOperator<WaterSensor> sensorDSWithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);KeyedStream<WaterSensor, String> sensorKS = sensorDSWithWatermark.keyBy(WaterSensor::getId);// key 输入的类型 输出的类型SingleOutputStreamOperator<String> process = sensorKS.process(new KeyedProcessFunction<String, WaterSensor, String>() {/*** 来一条数据调用一次* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {//获取当前数据的keyString currentKey = ctx.getCurrentKey();// TODO 1.定时器注册TimerService timerService = ctx.timerService();// 1、事件时间的案例
// Long currentEventTime = ctx.timestamp(); // 数据中提取出来的事件时间
// timerService.registerEventTimeTimer(5000L);
// System.out.println("当前key=" + currentKey + ",当前时间=" + currentEventTime + ",注册了一个5s的定时器");// 2、处理时间的案例long currentTs = timerService.currentProcessingTime();timerService.registerProcessingTimeTimer(currentTs + 5000L);System.out.println("当前key=" + currentKey + ",当前时间=" + currentTs + ",注册了一个5s后的定时器");// 3、获取 process的 当前watermark
// long currentWatermark = timerService.currentWatermark();
// System.out.println("当前数据=" + value + ",当前watermark=" + currentWatermark);// 注册定时器: 处理时间、事件时间
// timerService.registerProcessingTimeTimer();
// timerService.registerEventTimeTimer();// 删除定时器: 处理时间、事件时间
// timerService.deleteEventTimeTimer();
// timerService.deleteProcessingTimeTimer();// 获取当前时间进展: 处理时间-当前系统时间, 事件时间-当前watermark
// long currentTs = timerService.currentProcessingTime();
// long wm = timerService.currentWatermark();}/*** TODO 2.时间进展到定时器注册的时间,调用该方法* @param timestamp 当前时间进展,就是定时器被触发时的时间* @param ctx 上下文* @param out 采集器* @throws Exception*/@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);String currentKey = ctx.getCurrentKey();System.out.println("key=" + currentKey + "现在时间是" + timestamp + "定时器触发");}});process.print();env.execute();}
}
1.运行processElement方法中的事件时间
// 1、事件时间的案例
Long currentEventTime = ctx.timestamp(); // 数据中提取出来的事件时间
timerService.registerEventTimeTimer(5000L);
System.out.println("当前key=" + currentKey + ",当前时间=" + currentEventTime + ",注册了一个5s的定时器");
(1)输入数据
[root@node141 ~]# nc -lk 9999
s1,1,1
s1,2,2
s1,3,3
s2,4,4
s2,5,5
s3,9,9
运行结果:注册定时器后,5s后触发
2.运行processElement方法中的处理时间
// 2、处理时间的案例
long currentTs = timerService.currentProcessingTime();
timerService.registerProcessingTimeTimer(currentTs + 5000L);
System.out.println("当前key=" + currentKey + ",当前时间=" + currentTs + ",注册了一个5s后的定时器");
(1)先输入一条数据
[root@node141 ~]# nc -lk 9999
s1,1,1
运行结果:相差5s
(2)再快速输入两条数据
[root@node141 ~]# nc -lk 9999
s1,1,1
s1,2,2
s2,2,2
运行结果:相差5s,5s后定时器触发
3.运行processElement方法中的水位线
// 3、获取 process的 当前watermark
long currentWatermark = timerService.currentWatermark();
System.out.println("当前数据=" + value + ",当前watermark=" + currentWatermark);
(1)输入数据
[root@node141 ~]# nc -lk 9999
s1,1,1
s1,5,5
s1,9,9
运行结果:
数据经过map算子,然后再到process方法中。
当第一条数据输入时,进入map时的watermark是1s-3s-1ms=-2001ms,数据随后进入process方法,调用了processElement方法,在processElement方法中获取当前的watermark,此时-2001ms这一watermark还没有进入process中,所以当前process的watermark是long的最小值;
第一条数据处理完成后,第二条继续输入到map中,此时process中的watermark变为-2001ms,而map中的watermark是5s-3s-1ms=1999ms,同样,数据再进入process中,调用processElement方法,此时1999ms这一watermark仍然还没有进入process中,所以当前process中的watermark是之前的-2001ms;
第二条数据处理完成后,第三条数据继续输入到map中,此时process中的watermark变为1999ms,而map中的watermark是9s-3s-1ms=5999ms,同样,数据再进入process中,调用processElement方法,此时5999ms这一watermark也没有来得及进入process中,所以当前process中的watermark是之前的1999ms。
4.总结:
TODO 定时器
1、keyed才有
2、事件时间定时器,通过watermark来触发的
watermark >= 注册的时间
注意: watermark = 当前最大事件时间 - 等待时间 -1ms, 因为 -1ms,所以会推迟一条数据
比如, 5s的定时器,
如果 等待=3s, watermark = 8s - 3s -1ms = 4999ms,不会触发5s的定时器
需要 watermark = 9s -3s -1ms = 5999ms ,才能去触发 5s的定时器
3、在process中获取当前watermark,显示的是上一次的watermark
==> 因为process还没接收到这条数据对应生成的新watermark
四、应用案例——求TopN
(一)思路一:hashmap
使用所有数据到一起,用hashmap来存储,key=vc,value=count值
import com.atguigu.bean.WaterSensor;
import com.atguigu.split.WaterSensorMapFunction;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.*;public class TopNDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node141", 9999).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 乱序.withTimestampAssigner((element, recordTimestamp) ->element.getTs() * 1000L));// 最近10s=窗口长度 每5s输出=滑动步长// TODO 思路一:使用所有数据到一起,用hashmap来存储,key=vc,value=count值sensorDS.windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).process(new MyTopNPawf()).print();env.execute();}public static class MyTopNPawf extends ProcessAllWindowFunction<WaterSensor, String, TimeWindow> {@Overridepublic void process(ProcessAllWindowFunction<WaterSensor, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {// 定义一个hashmap用来存,key=vc,value=count值Map<Integer, Integer> vcCountMap = new HashMap<>();// 1.遍历数据,统计各个vc出现的次数for (WaterSensor element : elements) {Integer vc = element.getVc();if (vcCountMap.containsKey(vc)) {// 1.1 key存在,不是这个key的第一条数据,直接累加vcCountMap.put(vc, vcCountMap.get(vc) + 1);} else {// 1.2 如果key不存在,则初始化vcCountMap.put(vc, 1);}}// 2.对count值进行排序,利用list来实现排序List<Tuple2<Integer, Integer>> datas = new ArrayList<>();// 2.1 对list进行排序,根据count值进行降序for (Integer vc : vcCountMap.keySet()) {datas.add(Tuple2.of(vc, vcCountMap.get(vc)));}// count值相减datas.sort((o1, o2) -> o2.f1 - o1.f1);// 3.取出count最大的2个vcStringBuffer outStr = new StringBuffer();outStr.append("\n=======================\n");for (int i = 0; i < Math.min(2, datas.size()); i++) {Tuple2<Integer, Integer> vcCount = datas.get(i);outStr.append("窗口开始时间:" + DateFormatUtils.format(context.window().getStart(), "yyyy-MM-dd HH:mm:ss.SSS"));outStr.append("\n");outStr.append("top:" + (i + 1));outStr.append("\n");outStr.append("vc=" + vcCount.f0);outStr.append("\n");outStr.append("count=" + vcCount.f1);outStr.append("\n");outStr.append("窗口结束时间:" + DateFormatUtils.format(context.window().getEnd(), "yyyy-MM-dd HH:mm:ss.SSS\n"));}outStr.append("\n=======================\n");// 输出out.collect(outStr.toString());}}
}
输入数据:
[root@node141 ~]# nc -lk 9999
s1,1,1
s1,2,1
s1,3,3
s1,6,1
s1,8,2
s1,9,3
s1,10,2
s1,11,1
s1,13,2
运行结果:
思路一不推荐,因为要将数据攒到一起才会计算,效果不好。
(二)思路二:keyby
代码思路:
实现步骤:
1、按照vc做keyby,开窗,分别count
==》 增量聚合,计算 count
==》 全窗口,对计算结果 count值封装 , 带上 窗口结束时间的 标签
==》 为了让同一个窗口时间范围的计算结果到一起去
2、对同一个窗口范围的count值进行处理: 排序、取前N个
=》 按照 windowEnd做keyby
=》 使用process, 来一条调用一次,需要先存,分开存,用HashMap进行存储,key=windowEnd,value=List
=》 使用定时器,对 存起来的结果 进行 排序、取前N个
import com.atguigu.bean.WaterSensor;
import com.atguigu.split.WaterSensorMapFunction;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class KeyedProcessFunctionTopNDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node141", 9999).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 乱序.withTimestampAssigner((element, recordTimestamp) ->element.getTs() * 1000L));// 最近10s=窗口长度 每5s输出=滑动步长// TODO 使用keyedProcessFunction实现// 1.按照vc分组、开窗、聚合(增量计算+全量打标签)// 开窗聚合后,就是普通的流,没有了窗口信息,需要自己打上窗口的标记 windowEndSingleOutputStreamOperator<Tuple3<Integer, Integer, Long>> windowAgg = sensorDS.keyBy(WaterSensor::getVc).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate(new VcCountAgg(),new WindowResult());// 2.按照窗口标签(窗口结束时间)keyby,保证同一个窗口时间范围的结果,到一起去,排序、取TopNwindowAgg.keyBy(r -> r.f2).process(new TopN(2)).print();env.execute();}public static class VcCountAgg implements AggregateFunction<WaterSensor, Integer, Integer> {@Overridepublic Integer createAccumulator() {return 0;}@Overridepublic Integer add(WaterSensor value, Integer accumulator) {return accumulator + 1;}@Overridepublic Integer getResult(Integer accumulator) {return accumulator;}@Overridepublic Integer merge(Integer a, Integer b) {return null;}}/*** 泛型如下:* 第一个:输入类型=增量函数的输出 count值,Integer* 第二个:输出类型=Tuple3<vc,count,windowEnd>,带上 窗口结束时间的标签* 第三个:key的类型:vc,Integer* 第四个:窗口类型*/public static class WindowResult extends ProcessWindowFunction<Integer, Tuple3<Integer, Integer, Long>, Integer, TimeWindow> {@Overridepublic void process(Integer key, Context context, Iterable<Integer> elements, Collector<Tuple3<Integer, Integer, Long>> out) throws Exception {// 迭代器里面只有一条数据,next一次即可Integer count = elements.iterator().next();long windowEnd = context.window().getEnd();out.collect(Tuple3.of(key, count, windowEnd));}}/*** 泛型如下:* 第一个:key的类型,是windowEnd* 第二个:输入的类型,三元组Tuple3<vc,count,windowEnd>,带上 窗口结束时间的标签* 第三个:打印输出结果*/public static class TopN extends KeyedProcessFunction<Long, Tuple3<Integer, Integer, Long>, String> {private Map<Long, List<Tuple3<Integer, Integer, Long>>> dataListMap;// 要取的top数量private int threshold;public TopN(int threshold) {this.threshold = threshold;dataListMap = new HashMap<>();}@Overridepublic void processElement(Tuple3<Integer, Integer, Long> value, KeyedProcessFunction<Long, Tuple3<Integer, Integer, Long>, String>.Context ctx, Collector<String> out) throws Exception {// 进入这个方法,只是一条数据,要排序,必须等数据到齐才行,将不同窗口的数据用hashmap分开存起来,// todo 1.存到hashmap中 注意此时的key是窗口的结束时间Long windowEnd = value.f2;if (dataListMap.containsKey(windowEnd)) {// 1.1 包含vc,不是该vc的第一条,直接添加到list中List<Tuple3<Integer, Integer, Long>> dataList = dataListMap.get(windowEnd);dataList.add(value);// 也就是说只需要把value的值添加到dataListMap中即可} else {// 1.2 不包含,是该vc的第一条,需要初始化listList<Tuple3<Integer, Integer, Long>> dataList = new ArrayList<>();dataList.add(value);dataListMap.put(windowEnd, dataList);}// todo 2.注册一个定时器,windowEnd+1ms即可// 同一个窗口范围,应该同时输出ctx.timerService().registerEventTimeTimer(windowEnd + 1);}@Overridepublic void onTimer(long timestamp, KeyedProcessFunction<Long, Tuple3<Integer, Integer, Long>, String>.OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);// 定时器触发,同一个窗口范围的计算结果攒齐了,开始 排序、取TopNLong windowEnd = ctx.getCurrentKey();// 1.排序List<Tuple3<Integer, Integer, Long>> dataList = dataListMap.get(windowEnd);dataList.sort((o1, o2) -> o2.f1 - o1.f1);// 2.取TopNStringBuffer outStr = new StringBuffer();outStr.append("\n=======================\n");for (int i = 0; i < Math.min(threshold, dataList.size()); i++) {Tuple3<Integer, Integer, Long> vcCount = dataList.get(i);outStr.append("top:" + (i + 1));outStr.append("\n");outStr.append("vc=" + vcCount.f0);outStr.append("\n");outStr.append("count=" + vcCount.f1);outStr.append("\n");outStr.append("窗口结束时间:" + DateFormatUtils.format(vcCount.f2, "yyyy-MM-dd HH:mm:ss.SSS" + "\n"));outStr.append("=======================\n");}// 用完的List,及时清理dataList.clear();// 输出out.collect(outStr.toString());}}
}
输入数据:
[root@node141 ~]# nc -lk 9999
s1,1,1
s1,2,2
s1,8,1
s1,10,1
运行结果:
(三)思路三:使用侧输出流——推荐
import com.atguigu.bean.WaterSensor;
import com.atguigu.split.WaterSensorMapFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.time.Duration;public class SideOutputDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node141", 9999).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 乱序.withTimestampAssigner((element, recordTimestamp) ->element.getTs() * 1000L));OutputTag<String> warnTag = new OutputTag<>("warn", Types.STRING);SingleOutputStreamOperator<WaterSensor> process = sensorDS.keyBy(WaterSensor::getId).process(new KeyedProcessFunction<String, WaterSensor, WaterSensor>() {@Overridepublic void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, WaterSensor>.Context ctx, Collector<WaterSensor> out) throws Exception {// 使用侧输出流告警if (value.getVc() > 10) {ctx.output(warnTag, "当前水位=" + value.getVc() + ",大于阈值10!!!");}// 主流正常发送数据out.collect(value);}});process.print("主流");process.getSideOutput(warnTag).printToErr("warn");env.execute();}
}
输入数据:
[root@node141 ~]# nc -lk 9999
s1,1,5
s1,10,1
s1,6,11
s1,7,20
运行结果:
相关文章:
Flink的处理函数——processFunction
目录 一、处理函数概述 二、Process函数分类——8个 (1)ProcessFunction (2)KeyedProcessFunction (3)ProcessWindowFunction (4)ProcessAllWindowFunction ÿ…...
Linux系统中的ps命令详解及用法介绍
文章目录 一、介绍ps命令A. ps命令的作用B. ps命令的参数 二、常见的ps命令用法A. 显示所有进程信息B. 显示指定进程信息C. 显示指定用户的进程信息D. 按CPU使用率排序显示进程信息E. 按内存使用率排序显示进程信息 三、进一步了解ps命令A. 显示进程树信息B. 显示线程和进程关系…...
机器学习笔记 - 基于pytorch、grad-cam的计算机视觉的高级可解释人工智能
一、pytorch-gradcam简介 Grad-CAM是常见的神经网络可视化的工具,用于探索模型的可解释性,广泛出现在各大顶会论文中,以详细具体地描述模型的效果。Grad-CAM的好处是,可以在不额外训练的情况下,只使用训练好的权重即可获得热力图。 1、CAM是什么? CAM全称Class Activa…...
Python 编程基础 | 第五章-类与对象 | 5.1、定义类
一、类 1、定义类 Python中使用class关键字定义类,class之后为类的名称并以:结尾,类的结构如下: class 类名:多个(≥0)类属性...多个(≥0)类方法...下面定义一个Dog类,如…...
合宙Air780e+luatos+腾讯云物联网平台完成设备通信与控制(属性上报+4G远程点灯)
1.腾讯云物联网平台 首先需要在腾讯云物联网平台创建产品、创建设备、定义设备属性和行为,例如: (1)创建产品 (2)定义设备属性和行为 (3)创建设备 (4)准备参…...
c++系列之string的模拟实现
💗 💗 博客:小怡同学 💗 💗 个人简介:编程小萌新 💗 💗 如果博客对大家有用的话,请点赞关注再收藏 🌞 string() //注意事项: 1.初始化列表随声明的顺序进行初始化 2.cons…...
Spring的beanName生成器AnnotationBeanNameGenerator
博主介绍:✌全网粉丝4W,全栈开发工程师,从事多年软件开发,在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战,博主也曾写过优秀论文,查重率极低,在这方面有丰富的经验…...
FFmpeg 命令:从入门到精通 | ffmpeg 命令直播
FFmpeg 命令:从入门到精通 | ffmpeg 命令直播 FFmpeg 命令:从入门到精通 | ffmpeg 命令直播直播拉流直播推流 FFmpeg 命令:从入门到精通 | ffmpeg 命令直播 本节主要介绍了ffmpeg 命令进行直播拉流、推流的方法,并列举了一些例子…...
A (1087) : DS单链表--类实现
Description 用C语言和类实现单链表,含头结点 属性包括:data数据域、next指针域 操作包括:插入、删除、查找 注意:单链表不是数组,所以位置从1开始对应首结点,头结点不放数据 类定义参考 #include<…...
异常:找不到匹配的key exchange算法
目录 问题描述原因分析解决方案 问题描述 PC 操作系统:Windows 10 企业版 LTSC PC 异常软件:XshellPortable 4(Build 0127) PC 正常软件:PuTTY Release 0.74、MobaXterm_Personal_23.1 服务器操作系统:OpenEuler 22.03 (LTS-SP2)…...
Arcgis打开影像分析窗口没反应
Arcgis打开影像分析窗口没反应 问题描述 做NDVI计算的时候,一直点击窗口-影像分析,发现影像分析的小界面一直不跳出来。 原因 后来发现是被内容列表给遮住了,其实是已经出来了的。。 拖动内容列表就能找到。 解决方案 内容列表和影像分…...
Spring(JavaEE进阶系列1)
目录 前言: 1.Servlet与Spring对比 2.什么是Spring 2.1什么是容器 2.2什么是IoC 2.3SpringIoC容器的理解 2.4DI依赖注入 2.5IoC与DI的区别 3.Spring项目的创建和使用 3.1正确配置Maven国内源 3.2Spring的项目创建 3.3将Bean对象存储到Spring(…...
Flink状态管理与检查点机制
1.状态分类 相对于其他流计算框架,Flink 一个比较重要的特性就是其支持有状态计算。即你可以将中间的计算结果进行保存,并提供给后续的计算使用: 具体而言,Flink 又将状态 (State) 分为 Keyed State 与 Operator State: 1.1 算子状态 算子状态 (Operator State):顾名思义…...
【threejs】基本编程概念及海岛模型展示逻辑
采用three封装模式完成的海岛动画(点击这里查看) 直接上代码吧 <template><div class"scene"><video id"videoContainer" style"position:absolute;top:0px;left:0px;z-index:100;visibility: hidden"&g…...
python小技巧:创建单链表及删除元素
目前只有单链表(无法查找上一个元素),后面再更新循环链表和双链表。 class SingleLinkedList:def createList(self, raw_list):if len(raw_list) 0:head ListNode()else:head ListNode(raw_list[0])cur headfor i in range(1, len(raw_l…...
ADuM1250 ADuM1251 模块 I2C IIC总线2500V电磁隔离 接口保护
功能说明: 1,2500V电磁隔离,2通道双向I2C; 2,支持电压在3到5.5V,最大时钟频率可达1000KHz; 3,将该隔离模块接入总线,可以保护主MCU引脚,降低I2C总线上的干…...
C# 把多个dll合成一个dll
Nuget 下载ILMerge两个工程 dog为测试工程 TestIlmerge为准备合并的类库 如下图所示, 由于我们引用下面4个库 正常生成后,会有TestIlmerge.dll和下面的这4个dll 只生成TestIlmerge.dll 打开工程文件 在最下方加入以下两段 <Target Name"ILMerge…...
scipy.sparse.coo_matrix.sum()关于axis的用法
以下面的矩阵为例 [1,2,0] [0,3,0] [0,0,0]示例代码 from scipy.sparse import coo_matrix# 创建一个稀疏矩阵 data [1, 2, 3] row [0, 0, 1] col [0, 1, 1] sparse_matrix coo_matrix((data, (row, col)), shape(3,3))# 计算稀疏矩阵中每行非零元素的总和 sum_of_column…...
C++类与对象(下)
文章目录 1.非类型模板2.模板特化2.1.类模板特化2.1.1.全特化2.1.2.偏特化 2.2.函数模板特化 3.函数模板声明定义分离 之前我们学习的模板能达到泛型的原因是:使用了“泛型的类型”,但是如果经过后面的“造轮子”(后面会尝试实现一下 STL的一…...
SpringBoot——》引入Redis
推荐链接: 总结——》【Java】 总结——》【Mysql】 总结——》【Redis】 总结——》【Kafka】 总结——》【Spring】 总结——》【SpringBoot】 总结——》【MyBatis、MyBatis-Plus】 总结——》【Linux】 总结——》【MongoD…...
C# newtonsoft序列化将long类型转化为字符串
/// <summary> /// 转化为json的时候long类型转为string /// </summary> public class LongJsonConverter: JsonConverter {public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer){try{return r…...
黑马点评-02使用Redis代替session,Redis + token机制实现
Redis代替session session共享问题 每个Tomcat中都有一份属于自己的session,所以多台Tomcat并不共享session存储空间,当请求切换到不同tomcat服务时可能会导致数据丢失 用户第一次访问1号tomcat并把自己的信息存放session域中, 如果第二次访问到了2号tomcat就无法获取到在1号…...
arm 点灯实验代码以及现象
.text .global _start _start: 1.设置GPIOE寄存器的时钟使能 RCC_MP_AHB4ENSETR[4]->1 0x50000a28 LDR R0,0x50000A28 LDR R1,[R0] ORR R1,R1,#(0x1<<4) 第4位置1 STR R1,[R0] 1.设置GPIOF寄存器的时钟使能 RCC_MP_AHB4ENSETR[4]->1 0x50000a28 LDR R…...
选择适合普通公司的项目管理软件
不管是打工人还是学生党都适合使用Zoho Projects项目管理软件。利用项目概览功能,将整体项目尽收眼底,作为项目管理者,项目日程、进度都可见,Zoho Projects项目管理APP助推项目每一环节的进展,更便于管理者设计项目的下…...
E (1081) : DS堆栈--逆序输出(STL栈使用)
Description C中已经自带堆栈对象stack,无需编写堆栈操作的具体实现代码。 本题目主要帮助大家熟悉stack对象的使用,然后实现字符串的逆序输出 输入一个字符串,按字符按输入顺序压入堆栈,然后根据堆栈后进先出的特点࿰…...
访问者模式 行为型设计模式之九
1.定义 在不改变数据结构的前提下,增加作用于一组对象元素的新功能。 2.动机 访问者模式适用于数据结构相对稳定的系统它把数据结构和作用于数据结构之上的操作之间的耦合解脱开,使得操作集合可以相对自由的演化。访问者模式的目的是要把处理从数据结构…...
JVM垃圾回收之JVM GC算法探究
JVM垃圾回收之JVM GC算法探究 在Java虚拟机(JVM)中,垃圾回收(Garbage Collection,GC)是自动管理内存的重要机制,它负责回收程序中不再使用的对象所占用的内存。GC算法是垃圾回收的核心…...
Django 前端模板显示换行符、日期格式
linebreaksbr 显示换行符 <td>{{ data.sku_list|default:"无"|linebreaksbr }}</td> date:"Y年m月d日 H:i" 设置日期格式 <td>{{ data.submit_time|date:"Y年m月d日 H:i" }}</td> 其他语法 forloop 获取循环的索引 …...
Aurora中的策略模式和模板模式
Aurora中的策略模式和模板模式 在aurora中为了方便以后的扩展使用了策略模式和模板模式实现图片上传和搜索功能,能够在配置类中设置使用Oss或者minio上传图片,es或者mysql文章搜索。后续有新的上传方式或者搜索方式只需要编写对应的实现类即可ÿ…...
Ubuntu 22.04 安装系统 手动分区 针对只有一块硬盘 lvm 单独分出/home
自动安装的信息 参考自动安装时产生的分区信息 rootyeqiang-MS-7B23:~# fdisk /dev/sdb -l Disk /dev/sdb:894.25 GiB,960197124096 字节,1875385008 个扇区 Disk model: INTEL SSDSC2KB96 单元:扇区 / 1 * 512 512 字节 扇区大…...
室内设计案例分析/seo培训教程视频
课程亮点:一图胜千言,让文科生都能看得懂的python教程!!!另外加详细的笔记作为辅助工具,500多条笔记帮助学员学习Python500多个知识点课程内容:1. Python3语言总体介绍以及环境安装2. Python数据类型(一)-数…...
移动端网站搭建/竞价托管怎么做
Spring boot 是 Spring 的一套快速配置脚手架,可以基于spring boot 快速开发单个微服务,Spring Boot,看名字就知道是Spring的引导,就是用于启动Spring的,使得Spring的学习和使用变得快速无痛。不仅适合替换原有的工程结…...
网站构建/营销型网站的分类不包含
作为开发人员,每个人都会遇到有关在生产服务器上启用GC日志的问题。 建议在生产服务器上启用GC登录吗? 是的,建议在生产服务器上启用GC登录 。 通过在JVM上启用GC登录的开销很小。 根据标准性能评估公司(SPEC) &#x…...
c 网站开发/免费技能培训在哪里报名
/*** * A:案例演示* 集合嵌套之ArrayList嵌套ArrayList* 案例:* 我们学科,学科又分为若个班级* 整个学科一个大集合* 若干个班级分为每一个小集合*/Testpublic void twoArrary() {ArrayList<ArrayList<Person>> list new ArrayList<>();ArrayList<Person…...
怎样做免费外贸网站/seo排名怎么做
因为et模式需要循环读取,但是在读取过程中,如果有新的事件到达,很可能触发了其他线程来处理这个socket,那就乱了。 EPOLL_ONESHOT就是用来避免这种情况。注意在一个线程处理完一个socket的数据,也就是触发EAGAIN errno…...
简单网站开发/网站自动提交收录
一 概念 缓存的分类: 客户端缓存 (Client Caching) 代理缓存 (Proxy Caching) 反向代理缓存 (Reverse Proxy Caching) 服务器缓存(Web Server Caching) 使用缓存的好处&…...