当前位置: 首页 > news >正文

flink学习(14)—— 双流join

概述

Join:内连接

CoGroup:内连接,左连接,右连接

Interval Join:点对面

Join

1、Join 将有相同 Key 并且位于同一窗口中的两条流的元素进行关联。
2、Join 可以支持处理时间(processing time)和事件时间(event time)两种时间特征。
3、Join 通用用法如下:stream.join(otherStream).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<JoinFunction>)

滚动窗口

package com.bigdata.day07;import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.text.ParseException;
import java.time.Duration;
import java.util.Date;/*** 内连接* 可以通过两个socket流,将数据合并为一个三元组,key,value1,value2*/
public class _01_双流join_join_内连接 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 绿色的流DataStreamSource<String> source = env.socketTextStream("localhost", 7777);SingleOutputStreamOperator<Tuple3<String, Integer, String>> greenSource = source.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] split = line.split(",");return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {String timeStr = tuple3.f2;try {Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));// 红色的流DataStreamSource<String> source2 = env.socketTextStream("localhost", 7778);SingleOutputStreamOperator<Tuple3<String, Integer, String>> redSource = source2.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] split = line.split(",");return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {String timeStr = tuple3.f2;try {Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));// 双流joinDataStream<Tuple3<String, Integer, Integer>> rsSource = greenSource.join(redSource).where(new KeySelector<Tuple3<String, Integer, String>, String>() {@Overridepublic String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {return tuple3.f0;}}).equalTo(new KeySelector<Tuple3<String, Integer, String>, String>() {@Overridepublic String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {return tuple3.f0;}// 滚动窗口}).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new JoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, Integer, Integer>>() {@Overridepublic Tuple3<String, Integer, Integer> join(Tuple3<String, Integer, String> first, Tuple3<String, Integer, String> second) throws Exception {return Tuple3.of(first.f0, first.f1, second.f1);}});redSource.print("红色的流:");greenSource.print("绿色的流:");rsSource.print("合并后的流:");env.execute();}
}

滑动窗口

package com.bigdata.day07;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;/*** @基本功能: 演示join的滑动窗口* @program:FlinkDemo* @author: 闫哥* @create:2024-05-20 09:11:13**/
public class Demo02Join {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 将并行度设置为1,否则很难看到现象env.setParallelism(1);// 创建一个绿色的流DataStreamSource<String> greenSource = env.socketTextStream("localhost", 8899);// key,0,2021-03-26 12:09:00 将它变为三元组SingleOutputStreamOperator<Tuple3<String, Integer, String>> greenDataStream = greenSource.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String value) throws Exception {String[] arr = value.split(",");return new Tuple3<>(arr[0], Integer.valueOf(arr[1]), arr[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 为什么这个地方的代码比之前要长,原因是以前获取的数据都是long类型,并且都是毫秒值.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {// 指定你的数据中哪一个是时间戳,并且时间戳必须是long类型,必须是毫秒为单位的。String time = element.f2; //2021-03-26 12:09:00SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");try {Date date = sdf.parse(time);return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));// 创建一个橘色的流DataStreamSource<String> orangeSource = env.socketTextStream("localhost", 9988);// key,0,2021-03-26 12:09:00 将它变为三元组SingleOutputStreamOperator<Tuple3<String, Integer, String>> orangeDataStream = orangeSource.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String value) throws Exception {String[] arr = value.split(",");return new Tuple3<>(arr[0], Integer.valueOf(arr[1]), arr[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 为什么这个地方的代码比之前要长,原因是以前获取的数据都是long类型,并且都是毫秒值.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {// 指定你的数据中哪一个是时间戳,并且时间戳必须是long类型,必须是毫秒为单位的。String time = element.f2; //2021-03-26 12:09:00SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");try {Date date = sdf.parse(time);return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));//2. source-加载数据//3. transformation-数据处理转换DataStream<Tuple3<String, Integer, Integer>> resultStream = greenDataStream.join(orangeDataStream).where(tuple3 -> tuple3.f0).equalTo(tuple3 -> tuple3.f0)// 滑动窗口.window(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(1))).apply(new JoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, Integer, Integer>>() {@Overridepublic Tuple3<String, Integer, Integer> join(Tuple3<String, Integer, String> first, Tuple3<String, Integer, String> second) throws Exception {return Tuple3.of(first.f0, first.f1, second.f1);}});//4. sink-数据输出greenDataStream.print("绿色的流:");orangeDataStream.print("橘色的流:");resultStream.print("最终的结果:");//5. execute-执行env.execute();}
}

CoGroup

1、优势:可以实现内连接,左连接,右连接
2、劣势:内存压力大
3、和上面的写法区别:将join换成coGroup,apply中实现的具体方法有区别 
4、流程
stream.coGroup(otherStream).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<CoGroupFunction>);

内连接

package com.bigdata.day07;import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.text.ParseException;
import java.time.Duration;
import java.util.Date;/*** 内连接*/
public class _02_双流join_CoGroup_内连接 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 绿色的流DataStreamSource<String> source = env.socketTextStream("localhost", 7777);SingleOutputStreamOperator<Tuple3<String, Integer, String>> greenSource = source.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] split = line.split(",");return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {String timeStr = tuple3.f2;try {Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));// 红色的流DataStreamSource<String> source2 = env.socketTextStream("localhost", 7778);SingleOutputStreamOperator<Tuple3<String, Integer, String>> redSource = source2.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] split = line.split(",");return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {String timeStr = tuple3.f2;try {Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));// 连接DataStream<Tuple3<String, String, String>> rsSource = greenSource.coGroup(redSource).where(new KeySelector<Tuple3<String, Integer, String>, String>() {@Overridepublic String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {return tuple3.f0;}}).equalTo(new KeySelector<Tuple3<String, Integer, String>, String>() {@Overridepublic String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {return tuple3.f0;}}).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, String, String>>() {@Overridepublic void coGroup(Iterable<Tuple3<String, Integer, String>> first, Iterable<Tuple3<String, Integer, String>> second, Collector<Tuple3<String, String, String>> out) throws Exception {for (Tuple3<String, Integer, String> firesTuple3 : first) {for (Tuple3<String, Integer, String> secondTuple3 : second) {out.collect(Tuple3.of(firesTuple3.f0,"green"+firesTuple3.f1,"red"+secondTuple3.f1));}}}});redSource.print("红色的流:");greenSource.print("绿色的流:");rsSource.print("合并后的流:");env.execute();}
}

外连接

package com.bigdata.day07;import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.text.ParseException;
import java.time.Duration;
import java.util.Date;/*** 外连接*/
public class _03_双流join_CoGroup_外连接 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 绿色的流DataStreamSource<String> source = env.socketTextStream("localhost", 7777);SingleOutputStreamOperator<Tuple3<String, Integer, String>> greenSource = source.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] split = line.split(",");return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {String timeStr = tuple3.f2;try {Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));// 红色的流DataStreamSource<String> source2 = env.socketTextStream("localhost", 7778);SingleOutputStreamOperator<Tuple3<String, Integer, String>> redSource = source2.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] split = line.split(",");return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {String timeStr = tuple3.f2;try {Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));DataStream<Tuple3<String, String, String>> rsSource = greenSource.coGroup(redSource).where(new KeySelector<Tuple3<String, Integer, String>, String>() {@Overridepublic String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {return tuple3.f0;}}).equalTo(new KeySelector<Tuple3<String, Integer, String>, String>() {@Overridepublic String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {return tuple3.f0;}}).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, String, String>>() {@Overridepublic void coGroup(Iterable<Tuple3<String, Integer, String>> first, Iterable<Tuple3<String, Integer, String>> second, Collector<Tuple3<String, String, String>> out) throws Exception {// 内连接,左连接,右连接的区别只在这里面存在,两层循环for (Tuple3<String, Integer, String> firesTuple3 : first) {boolean isExist = false;for (Tuple3<String, Integer, String> secondTuple3 : second) {isExist = true;out.collect(Tuple3.of(firesTuple3.f0,"green"+firesTuple3.f1,"red"+secondTuple3.f1));}if (!isExist){out.collect(Tuple3.of(firesTuple3.f0,"green"+firesTuple3.f1,"red null"));}}}});redSource.print("红色的流:");greenSource.print("绿色的流:");rsSource.print("合并后的流:");env.execute();}
}

Interval Join

1、Join以及CoGroup 原因是 Join和CoGroup是窗口Join,必须给定窗口
2、Interval Join不需要给窗口。Interval Join 必须先分组才能使用。 
3、先对数据源进行keyBy
4、 外流.intervalJoin(内流).between(-2,2).processbetween 左不包,右包
内部的流为下面的流(取单个值)

 代码实现

package com.bigdata.day07;import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.text.ParseException;
import java.time.Duration;
import java.util.Date;public class _04_双流join_Interval_Join {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//绿色的流DataStreamSource<String> source = env.socketTextStream("localhost", 7777);KeyedStream<Tuple3<String, Integer, String>, String> greenSource = source.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] split = line.split(",");return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}// 水印}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {String timeStr = tuple3.f2;try {Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}// keyBy})).keyBy(new KeySelector<Tuple3<String, Integer, String>, String>() {@Overridepublic String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {return tuple3.f0;}});// 红色的流DataStreamSource<String> source2 = env.socketTextStream("localhost", 7778);KeyedStream<Tuple3<String, Integer, String>, String> redSource = source2.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] split = line.split(",");return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}// 水印}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {String timeStr = tuple3.f2;try {Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}// 分组})).keyBy(new KeySelector<Tuple3<String, Integer, String>, String>() {@Overridepublic String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {return tuple3.f0;}});// 实现SingleOutputStreamOperator<String> rsSource = greenSource.intervalJoin(redSource).between(Time.seconds(-2), Time.seconds(2)).process(new ProcessJoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, String>() {@Overridepublic void processElement(Tuple3<String, Integer, String> left, Tuple3<String, Integer, String> right, ProcessJoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, String>.Context ctx, Collector<String> out) throws Exception {out.collect("left中的key:"+left.f0+",value="+left.f1+",time="+left.f2+",right中的key:"+right.f0+",value="+right.f1+",time="+right.f2);}});redSource.print("红色的流:");greenSource.print("绿色的流:");rsSource.print("合并后的流:");env.execute();
/*** 红色的为下面的流* 范围:* 假如现在是10* 9 10 11 12*/}
}

相关文章:

flink学习(14)—— 双流join

概述 Join:内连接 CoGroup&#xff1a;内连接&#xff0c;左连接&#xff0c;右连接 Interval Join&#xff1a;点对面 Join 1、Join 将有相同 Key 并且位于同一窗口中的两条流的元素进行关联。 2、Join 可以支持处理时间&#xff08;processing time&#xff09;和事件时…...

HTTP协议详解:从HTTP/1.0到HTTP/3的演变与优化

深入浅出&#xff1a;从头到尾全面解析HTTP协议 一、HTTP协议概述 1.1 HTTP协议简介 HTTP&#xff08;HyperText Transfer Protocol&#xff0c;超文本传输协议&#xff09;是互联网上应用最广泛的通信协议之一。它用于客户端与服务器之间的数据传输&#xff0c;尤其是在Web…...

张量并行和流水线并行在Transformer中的具体部位

目录 张量并行和流水线并行在Transformer中的具体部位 一、张量并行 二、流水线并行 张量并行和流水线并行在Transformer中的具体部位 张量并行和流水线并行是Transformer模型中用于提高训练效率的两种并行策略。它们分别作用于模型的不同部位,以下是对这两种并行的具体说…...

WEB开发: 丢掉包袱,拥抱ASP.NET CORE!

今天的 Web 开发可以说进入了一个全新的时代&#xff0c;前后端分离、云原生、微服务等等一系列现代技术架构应运而生。在这个背景下&#xff0c;作为开发者&#xff0c;你一定希望找到一个高效、灵活、易于扩展且具有良好性能的框架。那么&#xff0c;ASP.NET Core 显然是一个…...

【论文阅读】Federated learning backdoor attack detection with persistence diagram

目的&#xff1a;检测联邦学习环境下&#xff0c;上传上来的模型是不是恶意的。 1、将一个模型转换为|L|个PD,&#xff08;其中|L|为层数&#xff09; 如何将每一层转换成一个PD&#xff1f; 为了评估第&#x1d457;层的激活值&#xff0c;我们需要&#x1d450;个输入来获…...

Gooxi Eagle Stream 2U双路通用服务器:性能强劲 灵活扩展 稳定易用

人工智能的高速发展开启了飞轮效应&#xff0c;实施数字化变革成为了企业的一道“抢答题”和“必答题”&#xff0c;而数据已成为现代企业的命脉。以HPC和AI为代表的新业务就像节节攀高的树梢&#xff0c;象征着业务创新和企业成长。但在树梢之下&#xff0c;真正让企业保持成长…...

【计算机网络】实验2:总线型以太网的特性

实验 2&#xff1a;总线型以太网的特性 一、 实验目的 加深对MAC地址&#xff0c;IP地址&#xff0c;ARP协议的理解。 了解总线型以太网的特性&#xff08;广播&#xff0c;竞争总线&#xff0c;冲突&#xff09;。 二、 实验环境 • Cisco Packet Tracer 模拟器 三、 实…...

如何在Spark中使用gbdt模型分布式预测

这目录 1 训练gbdt模型2 第三方包python环境打包3 Spark中使用gbdt模型3.1 spark配置文件3.2 主函数main.py 4 spark任务提交 1 训练gbdt模型 我们可以基于lightgbm快速的训练一个gbdt模型&#xff0c;训练相对比较简单&#xff0c;只要把训练样本处理好&#xff0c;几行代码可…...

Qt-5.14.2 example

官方历程很丰富&#xff0c;modbus、串口、chart图表、3D、视频 共享方便使用 Building and Running an Example You can test that your Qt installation is successful by opening an existing example application project. To run an example application on an Android …...

virtualbox给Ubuntu22创建共享文件夹

1.在windows上的操作&#xff0c;创建共享文件夹Share 2.Ubuntu22上的操作&#xff0c;创建共享文件夹LinuxShare 3.在virtualbox虚拟机设置里&#xff0c;设置共享文件夹 共享文件夹路径&#xff1a;选择Windows系统中你需要共享的文件夹 共享文件夹名称&#xff1a;挂载至wi…...

GPT打字机效果—— fetchEventSouce进行sse流式请求

EventStream基本用法 与 WebSocket 不同的是&#xff0c;服务器发送事件是单向的。数据消息只能从服务端到发送到客户端&#xff08;如用户的浏览器&#xff09;。这使其成为不需要从客户端往服务器发送消息的情况下的最佳选择。 const evtSource new EventSource(“/api/v1/…...

SpringBoot 在线家具商城:设计考量与实现细节聚焦

第4章 系统设计 市面上设计比较好的系统都有一个共同特征&#xff0c;就是主题鲜明突出。通过对页面简洁清晰的布局&#xff0c;让页面的内容&#xff0c;包括文字语言&#xff0c;或者视频图片等元素可以清晰表达出系统的主题。让来访用户无需花费过多精力和时间找寻需要的内容…...

每日速记10道java面试题07

其他资料&#xff1a; 每日速记10道java面试题01-CSDN博客 每日速记10道java面试题02-CSDN博客 每日速记10道java面试题03-CSDN博客 每日速记10道java面试题04-CSDN博客 每日速记10道java面试题05-CSDN博客 每日速记10道java面试题06-CSDN博客 目录 1.线程的生命周期在j…...

前端面试热门题(二)[html\css\js\node\vue)

Vue 性能优化的方法 Vue 性能优化的方法多种多样&#xff0c;以下是一些常用的策略&#xff1a; 使用v-show替换v-if&#xff1a;v-show是通过CSS控制元素的显示与隐藏&#xff0c;而v-if是通过操作DOM来控制元素的显示与隐藏&#xff0c;频繁操作DOM会导致性能下降。因此&am…...

mvc基础及搭建一个静态网站

mvc asp.net core mvc环境 .net8vscode * Asp.Net Core 基础* .net8* 前辈* .net 4.9 非跨平台版本 VC* 跨平台版本* 1.0* 2.0* 2.1* 3.1* 5* 语言* C#* F# * Visual Basic* 框架* web应用* asp应用* WebFrom* mvc应用* 桌面应用* Winform* WPF* Web Api api应用或者叫服务* …...

AOSP的同步问题

repo sync同步时提示出错: error: .repo/manifests/: contains uncommitted changesRepo command failed due to the following UpdateManifestError errors: contains uncommitted changes解决方法&#xff1a; 1、cd 进入.repo/manifests cd .repo/manifests2、执行如下三…...

HarmonyOS4+NEXT星河版入门与项目实战(23)------实现手机游戏摇杆功能

文章目录 1、案例效果2、案例实现1、代码实现2、代码解释4、总结1、案例效果 2、案例实现 1、代码实现 代码如下(示例): import router from @ohos.router import {ResizeDirection } from @ohos.UiTest import curves...

Logistic Regression(逻辑回归)、Maximum Likelihood Estimatio(最大似然估计)

Logistic Regression&#xff08;逻辑回归&#xff09;、Maximum Likelihood Estimatio&#xff08;最大似然估计&#xff09; 逻辑回归&#xff08;Logistic Regression&#xff0c;LR&#xff09;逻辑回归的基本思想逻辑回归模型逻辑回归的目标最大似然估计优化方法 逻辑回归…...

Vue文字转语音实现

在开发流程中&#xff0c;面对语音支持的需求&#xff0c;小规模语音内容或许可以通过预处理后播放来轻松应对&#xff0c;但当涉及大量语音时&#xff0c;这一方法就显得繁琐低效了。为此&#xff0c;智慧的开发者们总能找到便捷的解决方案——利用Web技术实现语音播放&#x…...

Docker快速部署RabbitMq

在外网服务器拉取镜像 docker pull arm64v8/rabbitmq:3.8.9-management或者拉去我的服务器的 docker pull registry.cn-hangzhou.aliyuncs.com/qiluo-images/linux_arm64_rabbitmq:3.8.9-management重新命名 docker tag registry.cn-hangzhou.aliyuncs.com/qiluo-images/lin…...

STM32+rt-thread判断是否联网

一、根据NETDEV_FLAG_INTERNET_UP位判断 static bool is_conncected(void) {struct netdev *dev RT_NULL;dev netdev_get_first_by_flags(NETDEV_FLAG_INTERNET_UP);if (dev RT_NULL){printf("wait netdev internet up...");return false;}else{printf("loc…...

UR 协作机器人「三剑客」:精密轻量担当(UR7e)、全能协作主力(UR12e)、重型任务专家(UR15)

UR协作机器人正以其卓越性能在现代制造业自动化中扮演重要角色。UR7e、UR12e和UR15通过创新技术和精准设计满足了不同行业的多样化需求。其中&#xff0c;UR15以其速度、精度及人工智能准备能力成为自动化领域的重要突破。UR7e和UR12e则在负载规格和市场定位上不断优化&#xf…...

听写流程自动化实践,轻量级教育辅助

随着智能教育工具的发展&#xff0c;越来越多的传统学习方式正在被数字化、自动化所优化。听写作为语文、英语等学科中重要的基础训练形式&#xff0c;也迎来了更高效的解决方案。 这是一款轻量但功能强大的听写辅助工具。它是基于本地词库与可选在线语音引擎构建&#xff0c;…...

九天毕昇深度学习平台 | 如何安装库?

pip install 库名 -i https://pypi.tuna.tsinghua.edu.cn/simple --user 举个例子&#xff1a; 报错 ModuleNotFoundError: No module named torch 那么我需要安装 torch pip install torch -i https://pypi.tuna.tsinghua.edu.cn/simple --user pip install 库名&#x…...

Spring是如何解决Bean的循环依赖:三级缓存机制

1、什么是 Bean 的循环依赖 在 Spring框架中,Bean 的循环依赖是指多个 Bean 之间‌互相持有对方引用‌,形成闭环依赖关系的现象。 多个 Bean 的依赖关系构成环形链路,例如: 双向依赖:Bean A 依赖 Bean B,同时 Bean B 也依赖 Bean A(A↔B)。链条循环: Bean A → Bean…...

【Go语言基础【13】】函数、闭包、方法

文章目录 零、概述一、函数基础1、函数基础概念2、参数传递机制3、返回值特性3.1. 多返回值3.2. 命名返回值3.3. 错误处理 二、函数类型与高阶函数1. 函数类型定义2. 高阶函数&#xff08;函数作为参数、返回值&#xff09; 三、匿名函数与闭包1. 匿名函数&#xff08;Lambda函…...

【Linux】Linux 系统默认的目录及作用说明

博主介绍&#xff1a;✌全网粉丝23W&#xff0c;CSDN博客专家、Java领域优质创作者&#xff0c;掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域✌ 技术范围&#xff1a;SpringBoot、SpringCloud、Vue、SSM、HTML、Nodejs、Python、MySQL、PostgreSQL、大数据、物…...

高防服务器价格高原因分析

高防服务器的价格较高&#xff0c;主要是由于其特殊的防御机制、硬件配置、运营维护等多方面的综合成本。以下从技术、资源和服务三个维度详细解析高防服务器昂贵的原因&#xff1a; 一、硬件与技术投入 大带宽需求 DDoS攻击通过占用大量带宽资源瘫痪目标服务器&#xff0c;因此…...

算法—栈系列

一&#xff1a;删除字符串中的所有相邻重复项 class Solution { public:string removeDuplicates(string s) {stack<char> st;for(int i 0; i < s.size(); i){char target s[i];if(!st.empty() && target st.top())st.pop();elsest.push(s[i]);}string ret…...

轻量级Docker管理工具Docker Switchboard

简介 什么是 Docker Switchboard &#xff1f; Docker Switchboard 是一个轻量级的 Web 应用程序&#xff0c;用于管理 Docker 容器。它提供了一个干净、用户友好的界面来启动、停止和监控主机上运行的容器&#xff0c;使其成为本地开发、家庭实验室或小型服务器设置的理想选择…...