flink学习(14)—— 双流join
概述
Join:内连接
CoGroup:内连接,左连接,右连接
Interval Join:点对面
Join
1、Join 将有相同 Key 并且位于同一窗口中的两条流的元素进行关联。
2、Join 可以支持处理时间(processing time)和事件时间(event time)两种时间特征。
3、Join 通用用法如下:stream.join(otherStream).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<JoinFunction>)
滚动窗口
package com.bigdata.day07;import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.text.ParseException;
import java.time.Duration;
import java.util.Date;/*** 内连接* 可以通过两个socket流,将数据合并为一个三元组,key,value1,value2*/
public class _01_双流join_join_内连接 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 绿色的流DataStreamSource<String> source = env.socketTextStream("localhost", 7777);SingleOutputStreamOperator<Tuple3<String, Integer, String>> greenSource = source.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] split = line.split(",");return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {String timeStr = tuple3.f2;try {Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));// 红色的流DataStreamSource<String> source2 = env.socketTextStream("localhost", 7778);SingleOutputStreamOperator<Tuple3<String, Integer, String>> redSource = source2.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] split = line.split(",");return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {String timeStr = tuple3.f2;try {Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));// 双流joinDataStream<Tuple3<String, Integer, Integer>> rsSource = greenSource.join(redSource).where(new KeySelector<Tuple3<String, Integer, String>, String>() {@Overridepublic String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {return tuple3.f0;}}).equalTo(new KeySelector<Tuple3<String, Integer, String>, String>() {@Overridepublic String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {return tuple3.f0;}// 滚动窗口}).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new JoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, Integer, Integer>>() {@Overridepublic Tuple3<String, Integer, Integer> join(Tuple3<String, Integer, String> first, Tuple3<String, Integer, String> second) throws Exception {return Tuple3.of(first.f0, first.f1, second.f1);}});redSource.print("红色的流:");greenSource.print("绿色的流:");rsSource.print("合并后的流:");env.execute();}
}
滑动窗口
package com.bigdata.day07;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;/*** @基本功能: 演示join的滑动窗口* @program:FlinkDemo* @author: 闫哥* @create:2024-05-20 09:11:13**/
public class Demo02Join {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 将并行度设置为1,否则很难看到现象env.setParallelism(1);// 创建一个绿色的流DataStreamSource<String> greenSource = env.socketTextStream("localhost", 8899);// key,0,2021-03-26 12:09:00 将它变为三元组SingleOutputStreamOperator<Tuple3<String, Integer, String>> greenDataStream = greenSource.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String value) throws Exception {String[] arr = value.split(",");return new Tuple3<>(arr[0], Integer.valueOf(arr[1]), arr[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 为什么这个地方的代码比之前要长,原因是以前获取的数据都是long类型,并且都是毫秒值.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {// 指定你的数据中哪一个是时间戳,并且时间戳必须是long类型,必须是毫秒为单位的。String time = element.f2; //2021-03-26 12:09:00SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");try {Date date = sdf.parse(time);return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));// 创建一个橘色的流DataStreamSource<String> orangeSource = env.socketTextStream("localhost", 9988);// key,0,2021-03-26 12:09:00 将它变为三元组SingleOutputStreamOperator<Tuple3<String, Integer, String>> orangeDataStream = orangeSource.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String value) throws Exception {String[] arr = value.split(",");return new Tuple3<>(arr[0], Integer.valueOf(arr[1]), arr[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 为什么这个地方的代码比之前要长,原因是以前获取的数据都是long类型,并且都是毫秒值.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {// 指定你的数据中哪一个是时间戳,并且时间戳必须是long类型,必须是毫秒为单位的。String time = element.f2; //2021-03-26 12:09:00SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");try {Date date = sdf.parse(time);return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));//2. source-加载数据//3. transformation-数据处理转换DataStream<Tuple3<String, Integer, Integer>> resultStream = greenDataStream.join(orangeDataStream).where(tuple3 -> tuple3.f0).equalTo(tuple3 -> tuple3.f0)// 滑动窗口.window(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(1))).apply(new JoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, Integer, Integer>>() {@Overridepublic Tuple3<String, Integer, Integer> join(Tuple3<String, Integer, String> first, Tuple3<String, Integer, String> second) throws Exception {return Tuple3.of(first.f0, first.f1, second.f1);}});//4. sink-数据输出greenDataStream.print("绿色的流:");orangeDataStream.print("橘色的流:");resultStream.print("最终的结果:");//5. execute-执行env.execute();}
}
CoGroup
1、优势:可以实现内连接,左连接,右连接
2、劣势:内存压力大
3、和上面的写法区别:将join换成coGroup,apply中实现的具体方法有区别
4、流程
stream.coGroup(otherStream).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<CoGroupFunction>);
内连接
package com.bigdata.day07;import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.text.ParseException;
import java.time.Duration;
import java.util.Date;/*** 内连接*/
public class _02_双流join_CoGroup_内连接 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 绿色的流DataStreamSource<String> source = env.socketTextStream("localhost", 7777);SingleOutputStreamOperator<Tuple3<String, Integer, String>> greenSource = source.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] split = line.split(",");return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {String timeStr = tuple3.f2;try {Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));// 红色的流DataStreamSource<String> source2 = env.socketTextStream("localhost", 7778);SingleOutputStreamOperator<Tuple3<String, Integer, String>> redSource = source2.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] split = line.split(",");return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {String timeStr = tuple3.f2;try {Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));// 连接DataStream<Tuple3<String, String, String>> rsSource = greenSource.coGroup(redSource).where(new KeySelector<Tuple3<String, Integer, String>, String>() {@Overridepublic String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {return tuple3.f0;}}).equalTo(new KeySelector<Tuple3<String, Integer, String>, String>() {@Overridepublic String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {return tuple3.f0;}}).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, String, String>>() {@Overridepublic void coGroup(Iterable<Tuple3<String, Integer, String>> first, Iterable<Tuple3<String, Integer, String>> second, Collector<Tuple3<String, String, String>> out) throws Exception {for (Tuple3<String, Integer, String> firesTuple3 : first) {for (Tuple3<String, Integer, String> secondTuple3 : second) {out.collect(Tuple3.of(firesTuple3.f0,"green"+firesTuple3.f1,"red"+secondTuple3.f1));}}}});redSource.print("红色的流:");greenSource.print("绿色的流:");rsSource.print("合并后的流:");env.execute();}
}
外连接
package com.bigdata.day07;import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.text.ParseException;
import java.time.Duration;
import java.util.Date;/*** 外连接*/
public class _03_双流join_CoGroup_外连接 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 绿色的流DataStreamSource<String> source = env.socketTextStream("localhost", 7777);SingleOutputStreamOperator<Tuple3<String, Integer, String>> greenSource = source.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] split = line.split(",");return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {String timeStr = tuple3.f2;try {Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));// 红色的流DataStreamSource<String> source2 = env.socketTextStream("localhost", 7778);SingleOutputStreamOperator<Tuple3<String, Integer, String>> redSource = source2.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] split = line.split(",");return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {String timeStr = tuple3.f2;try {Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));DataStream<Tuple3<String, String, String>> rsSource = greenSource.coGroup(redSource).where(new KeySelector<Tuple3<String, Integer, String>, String>() {@Overridepublic String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {return tuple3.f0;}}).equalTo(new KeySelector<Tuple3<String, Integer, String>, String>() {@Overridepublic String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {return tuple3.f0;}}).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, String, String>>() {@Overridepublic void coGroup(Iterable<Tuple3<String, Integer, String>> first, Iterable<Tuple3<String, Integer, String>> second, Collector<Tuple3<String, String, String>> out) throws Exception {// 内连接,左连接,右连接的区别只在这里面存在,两层循环for (Tuple3<String, Integer, String> firesTuple3 : first) {boolean isExist = false;for (Tuple3<String, Integer, String> secondTuple3 : second) {isExist = true;out.collect(Tuple3.of(firesTuple3.f0,"green"+firesTuple3.f1,"red"+secondTuple3.f1));}if (!isExist){out.collect(Tuple3.of(firesTuple3.f0,"green"+firesTuple3.f1,"red null"));}}}});redSource.print("红色的流:");greenSource.print("绿色的流:");rsSource.print("合并后的流:");env.execute();}
}
Interval Join

1、Join以及CoGroup 原因是 Join和CoGroup是窗口Join,必须给定窗口
2、Interval Join不需要给窗口。Interval Join 必须先分组才能使用。
3、先对数据源进行keyBy
4、 外流.intervalJoin(内流).between(-2,2).processbetween 左不包,右包
内部的流为下面的流(取单个值)
代码实现
package com.bigdata.day07;import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.text.ParseException;
import java.time.Duration;
import java.util.Date;public class _04_双流join_Interval_Join {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//绿色的流DataStreamSource<String> source = env.socketTextStream("localhost", 7777);KeyedStream<Tuple3<String, Integer, String>, String> greenSource = source.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] split = line.split(",");return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}// 水印}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {String timeStr = tuple3.f2;try {Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}// keyBy})).keyBy(new KeySelector<Tuple3<String, Integer, String>, String>() {@Overridepublic String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {return tuple3.f0;}});// 红色的流DataStreamSource<String> source2 = env.socketTextStream("localhost", 7778);KeyedStream<Tuple3<String, Integer, String>, String> redSource = source2.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] split = line.split(",");return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}// 水印}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {String timeStr = tuple3.f2;try {Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}// 分组})).keyBy(new KeySelector<Tuple3<String, Integer, String>, String>() {@Overridepublic String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {return tuple3.f0;}});// 实现SingleOutputStreamOperator<String> rsSource = greenSource.intervalJoin(redSource).between(Time.seconds(-2), Time.seconds(2)).process(new ProcessJoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, String>() {@Overridepublic void processElement(Tuple3<String, Integer, String> left, Tuple3<String, Integer, String> right, ProcessJoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, String>.Context ctx, Collector<String> out) throws Exception {out.collect("left中的key:"+left.f0+",value="+left.f1+",time="+left.f2+",right中的key:"+right.f0+",value="+right.f1+",time="+right.f2);}});redSource.print("红色的流:");greenSource.print("绿色的流:");rsSource.print("合并后的流:");env.execute();
/*** 红色的为下面的流* 范围:* 假如现在是10* 9 10 11 12*/}
}
相关文章:
flink学习(14)—— 双流join
概述 Join:内连接 CoGroup:内连接,左连接,右连接 Interval Join:点对面 Join 1、Join 将有相同 Key 并且位于同一窗口中的两条流的元素进行关联。 2、Join 可以支持处理时间(processing time)和事件时…...
HTTP协议详解:从HTTP/1.0到HTTP/3的演变与优化
深入浅出:从头到尾全面解析HTTP协议 一、HTTP协议概述 1.1 HTTP协议简介 HTTP(HyperText Transfer Protocol,超文本传输协议)是互联网上应用最广泛的通信协议之一。它用于客户端与服务器之间的数据传输,尤其是在Web…...
张量并行和流水线并行在Transformer中的具体部位
目录 张量并行和流水线并行在Transformer中的具体部位 一、张量并行 二、流水线并行 张量并行和流水线并行在Transformer中的具体部位 张量并行和流水线并行是Transformer模型中用于提高训练效率的两种并行策略。它们分别作用于模型的不同部位,以下是对这两种并行的具体说…...
WEB开发: 丢掉包袱,拥抱ASP.NET CORE!
今天的 Web 开发可以说进入了一个全新的时代,前后端分离、云原生、微服务等等一系列现代技术架构应运而生。在这个背景下,作为开发者,你一定希望找到一个高效、灵活、易于扩展且具有良好性能的框架。那么,ASP.NET Core 显然是一个…...
【论文阅读】Federated learning backdoor attack detection with persistence diagram
目的:检测联邦学习环境下,上传上来的模型是不是恶意的。 1、将一个模型转换为|L|个PD,(其中|L|为层数) 如何将每一层转换成一个PD? 为了评估第𝑗层的激活值,我们需要𝑐个输入来获…...
Gooxi Eagle Stream 2U双路通用服务器:性能强劲 灵活扩展 稳定易用
人工智能的高速发展开启了飞轮效应,实施数字化变革成为了企业的一道“抢答题”和“必答题”,而数据已成为现代企业的命脉。以HPC和AI为代表的新业务就像节节攀高的树梢,象征着业务创新和企业成长。但在树梢之下,真正让企业保持成长…...
【计算机网络】实验2:总线型以太网的特性
实验 2:总线型以太网的特性 一、 实验目的 加深对MAC地址,IP地址,ARP协议的理解。 了解总线型以太网的特性(广播,竞争总线,冲突)。 二、 实验环境 • Cisco Packet Tracer 模拟器 三、 实…...
如何在Spark中使用gbdt模型分布式预测
这目录 1 训练gbdt模型2 第三方包python环境打包3 Spark中使用gbdt模型3.1 spark配置文件3.2 主函数main.py 4 spark任务提交 1 训练gbdt模型 我们可以基于lightgbm快速的训练一个gbdt模型,训练相对比较简单,只要把训练样本处理好,几行代码可…...
Qt-5.14.2 example
官方历程很丰富,modbus、串口、chart图表、3D、视频 共享方便使用 Building and Running an Example You can test that your Qt installation is successful by opening an existing example application project. To run an example application on an Android …...
virtualbox给Ubuntu22创建共享文件夹
1.在windows上的操作,创建共享文件夹Share 2.Ubuntu22上的操作,创建共享文件夹LinuxShare 3.在virtualbox虚拟机设置里,设置共享文件夹 共享文件夹路径:选择Windows系统中你需要共享的文件夹 共享文件夹名称:挂载至wi…...
GPT打字机效果—— fetchEventSouce进行sse流式请求
EventStream基本用法 与 WebSocket 不同的是,服务器发送事件是单向的。数据消息只能从服务端到发送到客户端(如用户的浏览器)。这使其成为不需要从客户端往服务器发送消息的情况下的最佳选择。 const evtSource new EventSource(“/api/v1/…...
SpringBoot 在线家具商城:设计考量与实现细节聚焦
第4章 系统设计 市面上设计比较好的系统都有一个共同特征,就是主题鲜明突出。通过对页面简洁清晰的布局,让页面的内容,包括文字语言,或者视频图片等元素可以清晰表达出系统的主题。让来访用户无需花费过多精力和时间找寻需要的内容…...
每日速记10道java面试题07
其他资料: 每日速记10道java面试题01-CSDN博客 每日速记10道java面试题02-CSDN博客 每日速记10道java面试题03-CSDN博客 每日速记10道java面试题04-CSDN博客 每日速记10道java面试题05-CSDN博客 每日速记10道java面试题06-CSDN博客 目录 1.线程的生命周期在j…...
前端面试热门题(二)[html\css\js\node\vue)
Vue 性能优化的方法 Vue 性能优化的方法多种多样,以下是一些常用的策略: 使用v-show替换v-if:v-show是通过CSS控制元素的显示与隐藏,而v-if是通过操作DOM来控制元素的显示与隐藏,频繁操作DOM会导致性能下降。因此&am…...
mvc基础及搭建一个静态网站
mvc asp.net core mvc环境 .net8vscode * Asp.Net Core 基础* .net8* 前辈* .net 4.9 非跨平台版本 VC* 跨平台版本* 1.0* 2.0* 2.1* 3.1* 5* 语言* C#* F# * Visual Basic* 框架* web应用* asp应用* WebFrom* mvc应用* 桌面应用* Winform* WPF* Web Api api应用或者叫服务* …...
AOSP的同步问题
repo sync同步时提示出错: error: .repo/manifests/: contains uncommitted changesRepo command failed due to the following UpdateManifestError errors: contains uncommitted changes解决方法: 1、cd 进入.repo/manifests cd .repo/manifests2、执行如下三…...
HarmonyOS4+NEXT星河版入门与项目实战(23)------实现手机游戏摇杆功能
文章目录 1、案例效果2、案例实现1、代码实现2、代码解释4、总结1、案例效果 2、案例实现 1、代码实现 代码如下(示例): import router from @ohos.router import {ResizeDirection } from @ohos.UiTest import curves...
Logistic Regression(逻辑回归)、Maximum Likelihood Estimatio(最大似然估计)
Logistic Regression(逻辑回归)、Maximum Likelihood Estimatio(最大似然估计) 逻辑回归(Logistic Regression,LR)逻辑回归的基本思想逻辑回归模型逻辑回归的目标最大似然估计优化方法 逻辑回归…...
Vue文字转语音实现
在开发流程中,面对语音支持的需求,小规模语音内容或许可以通过预处理后播放来轻松应对,但当涉及大量语音时,这一方法就显得繁琐低效了。为此,智慧的开发者们总能找到便捷的解决方案——利用Web技术实现语音播放&#x…...
Docker快速部署RabbitMq
在外网服务器拉取镜像 docker pull arm64v8/rabbitmq:3.8.9-management或者拉去我的服务器的 docker pull registry.cn-hangzhou.aliyuncs.com/qiluo-images/linux_arm64_rabbitmq:3.8.9-management重新命名 docker tag registry.cn-hangzhou.aliyuncs.com/qiluo-images/lin…...
【人工智能】神经网络的优化器optimizer(二):Adagrad自适应学习率优化器
一.自适应梯度算法Adagrad概述 Adagrad(Adaptive Gradient Algorithm)是一种自适应学习率的优化算法,由Duchi等人在2011年提出。其核心思想是针对不同参数自动调整学习率,适合处理稀疏数据和不同参数梯度差异较大的场景。Adagrad通…...
ETLCloud可能遇到的问题有哪些?常见坑位解析
数据集成平台ETLCloud,主要用于支持数据的抽取(Extract)、转换(Transform)和加载(Load)过程。提供了一个简洁直观的界面,以便用户可以在不同的数据源之间轻松地进行数据迁移和转换。…...
使用LangGraph和LangSmith构建多智能体人工智能系统
现在,通过组合几个较小的子智能体来创建一个强大的人工智能智能体正成为一种趋势。但这也带来了一些挑战,比如减少幻觉、管理对话流程、在测试期间留意智能体的工作方式、允许人工介入以及评估其性能。你需要进行大量的反复试验。 在这篇博客〔原作者&a…...
搭建DNS域名解析服务器(正向解析资源文件)
正向解析资源文件 1)准备工作 服务端及客户端都关闭安全软件 [rootlocalhost ~]# systemctl stop firewalld [rootlocalhost ~]# setenforce 0 2)服务端安装软件:bind 1.配置yum源 [rootlocalhost ~]# cat /etc/yum.repos.d/base.repo [Base…...
Python竞赛环境搭建全攻略
Python环境搭建竞赛技术文章大纲 竞赛背景与意义 竞赛的目的与价值Python在竞赛中的应用场景环境搭建对竞赛效率的影响 竞赛环境需求分析 常见竞赛类型(算法、数据分析、机器学习等)不同竞赛对Python版本及库的要求硬件与操作系统的兼容性问题 Pyth…...
6️⃣Go 语言中的哈希、加密与序列化:通往区块链世界的钥匙
Go 语言中的哈希、加密与序列化:通往区块链世界的钥匙 一、前言:离区块链还有多远? 区块链听起来可能遥不可及,似乎是只有密码学专家和资深工程师才能涉足的领域。但事实上,构建一个区块链的核心并不复杂,尤其当你已经掌握了一门系统编程语言,比如 Go。 要真正理解区…...
Java中HashMap底层原理深度解析:从数据结构到红黑树优化
一、HashMap概述与核心特性 HashMap作为Java集合框架中最常用的数据结构之一,是基于哈希表的Map接口非同步实现。它允许使用null键和null值(但只能有一个null键),并且不保证映射顺序的恒久不变。与Hashtable相比,Hash…...
6.计算机网络核心知识点精要手册
计算机网络核心知识点精要手册 1.协议基础篇 网络协议三要素 语法:数据与控制信息的结构或格式,如同语言中的语法规则语义:控制信息的具体含义和响应方式,规定通信双方"说什么"同步:事件执行的顺序与时序…...
C++中vector类型的介绍和使用
文章目录 一、vector 类型的简介1.1 基本介绍1.2 常见用法示例1.3 常见成员函数简表 二、vector 数据的插入2.1 push_back() —— 在尾部插入一个元素2.2 emplace_back() —— 在尾部“就地”构造对象2.3 insert() —— 在任意位置插入一个或多个元素2.4 emplace() —— 在任意…...
后端下载限速(redis记录实时并发,bucket4j动态限速)
✅ 使用 Redis 记录 所有用户的实时并发下载数✅ 使用 Bucket4j 实现 全局下载速率限制(动态)✅ 支持 动态调整限速策略✅ 下载接口安全、稳定、可监控 🧩 整体架构概览 模块功能Redis存储全局并发数和带宽令牌桶状态Bucket4j Redis分布式限…...
