当前位置: 首页 > news >正文

【2024】kafka streams的详细使用与案例练习(2)

目录

  • 前言
  • 使用
    • 1、整体结构
      • 1.1、序列化
    • 2、 Kafka Streams 常用的 API
      • 2.1、 StreamsBuilder
      • 2.2、 KStream 和 KTable
      • 2.3、 filter和 filterNot
      • 2.4、 map 和 mapValues
      • 2.5、 flatMap 和 flatMapValues
      • 2.6、 groupByKey 和 groupBy
      • 2.7、 count、reduce 和 aggregate
      • 2.8、 join 和 leftJoin
      • 2.9、 to 和 toTable
      • 2.10、 foreach
    • 3、简单案例练习
      • 3.1、通过streams实现数据流处理,把字符串装为大写
      • 3.2、map 使用,把内容转为key,把value转为内容长度
      • 3.3、filter和 filterNot使用过滤消息
      • 3.4、通过selectKey配置key
    • 4、复杂对象案例练习
      • 4.1、自定义序列化
      • 4.2、发送消息
      • 4.3、通过KafkaStreams接收复杂对象,并且使用split

前言

上一篇文章已经对kafka streams进行了大致的介绍以及简单案例使用,这一片文章主要是对kafka streams常用API的介绍以及使用,如果需要看概念介绍的可以看
Kafka Streams详细介绍与具体使用

下面直接开始进行kafka streams使用操作

使用

1、整体结构

在我们整体处理streams时,总共就分为三部分

  1. 第一部分是创建配置,告诉kafka我们的连接信息,通过StreamsConfig传递,然后创建一个StreamsBuilder类用于构建kafka streams拓扑的主要类。该类主要用于定义数据流处理的拓扑结构。
    还有就是通过Serde进行一个序列化

  2. 第二部分主要是构建流处理拓扑,通过StreamsBuilder类得到源Processor处理器也就是KStream类,然后通过KStream的API方法得到不同的Processor处理器(重点

  3. 第三部分通过配置流处理拓扑,创建流处理实例,然后通过kafkaStreams.start()方法启动,结束时再通过kafkaStreams.close()关闭

我们在使用时,1和3基本上都是相同的,一般我们要处理的就是2,根据不同的业务需求,我们得到不同的Processor处理器,然后发送到不同的topic

1.1、序列化

在使用Kafka Streams时,我们需要把从topic接收到的消息进行序列化,然后再把返回topic的消息进行反序列化,
可以和kafka发送一样使用Properties类直接定义全部的,但这样会有很大的局限性,因为我们发送给不同的处理过的topic消息往往都会是不同的类型,所以我们会使用到Serde进行该操作,再每次和topic进行读取或写入消息时通过Serde进行指定key和消息的类型。
基础数据类型都有默认的写好的通过Serdes方法可以获得,如果需要定义复杂的则需要自己定义一个序列化和反序列化的类。
如下:

  • Serde<String> stringSerde = Serdes.String():String类型
  • Serde<Purchase> propertiesSerde = Serdes.serdeFrom(serializer, deserializer):自定义复杂类型
    • JsonSerializer<Purchase> serializer = new JsonSerializer<>(); :自定义的序列化类
    • JsonDeserializer<Purchase> deserializer = new JsonDeserializer<>(Purchase.class);:反序列化类

kafka也有默认的可以直接用的:org.springframework.kafka.support.serializer.JsonSerializer,但一般建议自定义序列化

2、 Kafka Streams 常用的 API

2.1、 StreamsBuilder

StreamsBuilder 是构建 Kafka Streams 拓扑的主要类。它用于定义数据流处理的拓扑结构。

StreamsBuilder builder = new StreamsBuilder();

2.2、 KStream 和 KTable

  • KStream:表示一个无界的数据流,每个记录都是一个键值对。
KStream<String, String> stream = builder.stream("input-topic");
  • KTable:表示一个表,每个键对应一个最新的值。
KTable<String, Long> table = builder.table("input-topic");

2.3、 filter和 filterNot

  • filter:根据条件过滤记录。
KStream<String, String> filteredStream = stream.filter((key, value) -> value.contains("important"));
  • filterNot:过滤掉不符合条件的记录。
KStream<String, String> filteredStream = stream.filterNot((key, value) -> value.contains("unimportant"));

2.4、 map 和 mapValues

  • map:对每个记录的键和值进行转换。
KStream<String, Integer> mappedStream = stream.map((key, value) -> new KeyValue<>(key, value.length()));
  • mapValues:仅对记录的值进行转换。
KStream<String, Integer> mappedStream = stream.mapValues(value -> value.length());

2.5、 flatMap 和 flatMapValues

  • flatMap:将每个输入记录映射为零个或多个输出记录。
KStream<String, String> flatMappedStream = stream.flatMap((key, value) -> {List<KeyValue<String, String>> result = new ArrayList<>();for (String word : value.split(" ")) {result.add(new KeyValue<>(key, word));}return result;});
  • flatMapValues:仅将每个输入记录的值映射为零个或多个输出值。

****KStream<String, String> flatMappedStream = stream.flatMapValues(value -> Arrays.asList(value.split(" ")));

2.6、 groupByKey 和 groupBy

  • groupByKey:根据记录的键进行分组。
KGroupedStream<String, String> groupedStream = stream.groupByKey();
  • groupBy:根据新的键进行分组。
KGroupedStream<String, String> groupedStream = stream.groupBy((key, value) -> value);

2.7、 count、reduce 和 aggregate

  • count:计算每个分组中的记录数。
KTable<String, Long> countTable = groupedStream.count();
  • reduce:对每个分组中的记录进行归约操作。
KTable<String, String> reducedTable = groupedStream.reduce((aggValue, newValue) -> aggValue + newValue);
  • aggregate:自定义聚合操作。
  KTable<String, Integer> aggregatedTable = groupedStream.aggregate(() -> 0,(key, value, aggregate) -> aggregate + value.length(),Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("aggregated-store"));

2.8、 join 和 leftJoin

  • join:对两个流或表进行内连接操作。
  KStream<String, String> joinedStream = stream.join(otherStream,(value1, value2) -> value1 + value2,JoinWindows.of(Duration.ofMinutes(5)));
  • leftJoin:对两个流或表进行左连接操作。
  KStream<String, String> leftJoinedStream = stream.leftJoin(otherStream,(value1, value2) -> value1 + (value2 == null ? "" : value2),JoinWindows.of(Duration.ofMinutes(5)));

2.9、 to 和 toTable

  • to:将流中的记录写入到Kafka主题。
**stream.to("output-topic");**
  • toTable:将流转换为表。
  KTable<String, String> table = stream.toTable();

2.10、 foreach

对每个记录执行一个操作,但不返回新的流。

stream.foreach((key, value) -> System.out.println(key + ": " + value));

上面这些就是KafkaStreams常用到的一些API,通过组合使用这些API,你可以构建复杂的流处理拓扑,以满足各种数据处理需求。

3、简单案例练习

使用脚本

#先进入容器
docker exec -it kafka-server /bin/bash#创建topic(把ip换为自己的)
/opt/kafka/bin/kafka-topics.sh --create --topic sell.purchase.transaction --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1# 进入生产者发消息
/opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic input-topic#进入消费者监听
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sell.purchase.transaction

3.1、通过streams实现数据流处理,把字符串装为大写

@Slf4j
public class KafkaStreamsYellingApp {
//    appidprivate final static String APPLICATION_ID = "yelling_app_id";private final static String INPUT_TOPIC = "input-topic";private final static String OUTPUT_TOPIC = "out-topic";private final static String BOOTSTRAP_SERVERS = "localhost:9092";public static void main(String[] args) throws InterruptedException {//1、配置客户端和序列化器
//        配置kafka stream属性连接Properties properties = new Properties();properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);StreamsConfig streamsConfig = new StreamsConfig(properties);
//        配置键值对的序列化/反序列化Serdes对象Serde<String> stringSerde = Serdes.String();
//        构建流处理拓扑(用于输出)StreamsBuilder builder = new StreamsBuilder();//2、构建流处理拓扑
//        数据源处理器:从指定的topic中取出数据KStream<String, String> inputStream = builder.stream(INPUT_TOPIC, Consumed.with(stringSerde, stringSerde));
//KStream<String, String> upperStream = inputStream.peek((key, value) -> {log.info("[收集]key:{},value:{}", key, value);}).filter((key, value) -> value.length() > 5).mapValues(time -> time.toUpperCase()).peek((key, value) -> log.info("[过滤结束]key:{},value:{}", key, value));
//        日志打印upperStream处理器的数据upperStream.print(Printed.toSysOut());
//        把upperStream处理器的数据输出到指定的topic中upperStream.to(OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde));//3、通过配置流处理拓扑,创建流处理实例KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);
//        jvm关闭时,把流也关闭CountDownLatch downLatch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread(() -> {kafkaStreams.close();downLatch.countDown();log.info("关闭流处理");}));kafkaStreams.start();log.info("启动执行!");}
}

在这里插入图片描述

3.2、map 使用,把内容转为key,把value转为内容长度


@Slf4j
public class KafkaStreamsYellingApp2 {private final static String APPLICATION_ID = "yelling_app_id";private final static String INPUT_TOPIC = "input-topic";private final static String OUTPUT_TOPIC = "output-topic";private final static String BOOTSTRAP_SERVERS = "localhost:9092";public static void main(String[] args) throws InterruptedException {//        配置kafka stream属性连接Properties properties = new Properties();properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);StreamsConfig streamsConfig = new StreamsConfig(properties);
//        配置键值对的序列化/反序列化Serdes对象Serde<String> stringSerde = Serdes.String();Serde<Integer> integerSerde = Serdes.Integer();
//        构建流处理拓扑(用于输出)StreamsBuilder builder = new StreamsBuilder();
//        数据源处理器:从指定的topic中取出数据KStream<String, String> inputStream = builder.stream(INPUT_TOPIC, Consumed.with(stringSerde, stringSerde));
//KStream<String, Integer> k1 = inputStream.map((noKey, value) -> KeyValue.pair(value, value.length()));k1.print(Printed.<String,Integer>toSysOut().withLabel("map"));//        k1.to(OUTPUT_TOPIC, Produced.with(stringSerde, integerSerde));KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);//        jvm关闭时,把流也关闭CountDownLatch downLatch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread(() -> {kafkaStreams.close();downLatch.countDown();log.info("关闭流处理");}));kafkaStreams.start();log.info("启动执行!");}}

3.3、filter和 filterNot使用过滤消息

/*** filter和 filterNot使用*/
@Slf4j
public class KafkaStreamsYellingApp3 {private final static String APPLICATION_ID = "yelling_app_id";private final static String INPUT_TOPIC = "input-topic";private final static String OUTPUT_TOPIC = "output-topic";private final static String BOOTSTRAP_SERVERS = "localhost:9092";public static void main(String[] args) throws InterruptedException {//        配置kafka stream属性连接Properties properties = new Properties();properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);StreamsConfig streamsConfig = new StreamsConfig(properties);
//        配置键值对的序列化/反序列化Serdes对象Serde<String> stringSerde = Serdes.String();Serde<Integer> integerSerde = Serdes.Integer();
//        构建流处理拓扑(用于输出)StreamsBuilder builder = new StreamsBuilder();
//        数据源处理器:从指定的topic中取出数据KStream<String, String> inputStream = builder.stream(INPUT_TOPIC, Consumed.with(stringSerde, stringSerde));
//inputStream.filter((key, value) -> (value.contains("kafka")), Named.as("filtering-processor")).print(Printed.<String,String>toSysOut().withLabel("filtering"));inputStream.filterNot((key, value) -> (value.contains("kafka")), Named.as("filtering-not-processor")).print(Printed.<String,String>toSysOut().withLabel("filtering-not"));KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);//        jvm关闭时,把流也关闭CountDownLatch downLatch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread(() -> {kafkaStreams.close();downLatch.countDown();log.info("关闭流处理");}));kafkaStreams.start();log.info("启动执行!");}
}

3.4、通过selectKey配置key

@Slf4j
public class KafkaStreamsYellingApp4 {private final static String APPLICATION_ID = "yelling_app_id";private final static String INPUT_TOPIC = "input-topic";private final static String OUTPUT_TOPIC = "output-topic";private final static String BOOTSTRAP_SERVERS = "localhost:9092";public static void main(String[] args) throws InterruptedException {//        配置kafka stream属性连接Properties properties = new Properties();properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);StreamsConfig streamsConfig = new StreamsConfig(properties);
//        配置键值对的序列化/反序列化Serdes对象Serde<String> stringSerde = Serdes.String();Serde<Integer> integerSerde = Serdes.Integer();
//        构建流处理拓扑(用于输出)StreamsBuilder builder = new StreamsBuilder();
//        数据源处理器:从指定的topic中取出数据KStream<String, String> inputStream = builder.stream(INPUT_TOPIC, Consumed.with(stringSerde, stringSerde));
//inputStream.flatMap( //把一个消息的内容通过转为多条消息以map的方式返回,(key, value) -> Arrays.stream(value.split(" ")) //通过使用java的stream把内容转为map.map(e -> KeyValue.pair(e, e.length())).collect(Collectors.toList())).print(Printed.<String,Integer>toSysOut().withLabel("flatMap"));inputStream.flatMapValues( //不改变key,直接转换valuevalue -> Arrays.stream(value.split(" ")).map(String::toUpperCase).toList()).print(Printed.<String,String>toSysOut().withLabel("flatMapValues"));//        配置keyinputStream.selectKey((key, value) -> value).print(Printed.<String,String>toSysOut().withLabel("selectKey"));KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);//        jvm关闭时,把流也关闭CountDownLatch downLatch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread(() -> {kafkaStreams.close();downLatch.countDown();log.info("关闭流处理");}));kafkaStreams.start();log.info("启动执行!");}}

4、复杂对象案例练习

4.1、自定义序列化

对接收topic的消息到拓扑还是发送消息到topic都需要进行序列化和反序列化

  • 序列化
public class JsonSerializer<T> implements Serializer<T> {private Gson gson=  new Gson();public void configure(Map<String ,?> map, boolean b) {}public byte[] serialize(String topic, T t) {return gson.toJson(t).getBytes();}@Overridepublic void close() {Serializer.super.close();}
}
  • 反序列化
/*** 反序列化* @param <T>*/
public class JsonDeserializer<T> implements Deserializer<T> {private Gson gson=  new Gson();private Class<T> deserializeClass;public JsonDeserializer(Class<T> deserializeClass){this.deserializeClass=deserializeClass;}public JsonDeserializer(){}@Override@SuppressWarnings("unchecked")public void configure(Map<String,?> map, boolean b){if (deserializeClass == null){deserializeClass = (Class<T>) map.get("serializedClass");}}@Overridepublic T deserialize(String topic, byte[] data) {if (data == null){return null;}return gson.fromJson(new String(data),deserializeClass);}@Overridepublic void close() {}
}

4.2、发送消息

/*** 生产消息到kafka*/
@Slf4j
public class MyProducer {private static final String BOOTSTRAP_SERVERS = "localhost:9092";private final static String TOPIC_NAME = "production-topic";public static void main(String[] args) throws ExecutionException, InterruptedException {Properties props = new Properties();
//        设置参数props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);
//        设置序列化props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//        连接客户端KafkaProducer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 5; i++) {Purchase purchase = new Purchase();purchase.setId("12431234253");purchase.setDate(new Date().toString());purchase.setName("苹果");purchase.setPrice(100.0+i);String json = new JSONObject(purchase).toString();ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, 0,"my-keyValue3", json);//        同步send(producer,producerRecord);}}/*** @param producer: 客户端对象* @return void* 同步发送* @date 2024/3/22 17:09*/private static void send(KafkaProducer<String, String> producer,ProducerRecord<String, String> producerRecord) throws InterruptedException, ExecutionException {//          等待发送成功的阻塞方法RecordMetadata metadata = producer.send(producerRecord).get();log.info("同步发送消息"+ "topic-"+metadata.topic()+"====partition:"+metadata.partition()+"=====offset:"+metadata.offset());}}

4.3、通过KafkaStreams接收复杂对象,并且使用split

split使用介绍

  • BranchedKStream<K, V> split(final Named named):split方法一个参数,该参数主要是用来定义分裂后的分支的一个统一前缀。会返回一个 BranchedKStream对象。

  • BranchedKStream:主要用来使用分裂校验的

    • branch(Predicate<? super K, ? super V> predicate, Branched<K, V> branched):会接收两个参数,前面的是Predicate用做校验判断作为判断条件的,后面的是一个Branched对象,用作处理满足该分支条件拆分出来的消息进行处理
    • defaultBranch(Branched<K, V> branched):作为结尾方法用作对不满足前面的全部branch条件的消息,进行一个最后处理
    • noDefaultBranch():结尾方法,表示对接下来的消息不做处理
  • Predicate:判断验证条件,满足该条件的消息会被拆分出来到当前分支

  • Branched:对分裂到当前分支的消息进行处理

    • as(final String name) :给该分支添加一个名字,不做处理。
    • withFunction():对该分支的消息进行处理,然后会把处理后的消息返回到结果的map中去
    • withConsumer():对该分支的消息进行处理,不会把消息返回回去

在这里插入图片描述

在这里插入图片描述

  • 具体代码

在测试使用前需要把几个topic先创建出来

@Slf4j
public class ZMartKafkaStreamsApp {private final static String APPLICATION_ID = "ZMart_app_id";private static final String BOOTSTRAP_SERVERS = "localhost:9092";private final static String TOPIC_NAME = "production-topic";private final static String APPLE_TOPIC_NAME = "apple-topic";private final static String WATERMELON_TOPIC_NAME = "watermelon-topic";private final static String OUT_TOPIC_NAME = "out-topic";public static void main(String[] args) throws InterruptedException {StreamsConfig streamsConfig = new StreamsConfig(getProperties());JsonSerializer<Purchase> serializer = new JsonSerializer<>();JsonDeserializer<Purchase> deserializer = new JsonDeserializer<>(Purchase.class);Serde<Purchase> propertiesSerde = Serdes.serdeFrom(serializer, deserializer);Serde<String> stringSerde = Serdes.String();StreamsBuilder builder = new StreamsBuilder();KStream<String, Purchase> inputStream = builder.stream(TOPIC_NAME, Consumed.with(stringSerde, propertiesSerde));//        谓词条件 判断把流输送到哪个分支上Predicate<String, Purchase> isWatermelon = (key, value) -> {String name = value.getName();return name.equals("西瓜");};//          split:分裂流Map<String, KStream<String, Purchase>> stringKStreamMap = inputStream.peek((k,v)-> log.info("[分裂消息===>] k:{},value:{}" ,k,v)).split(Named.as("split-"))//拼接的key前缀.branch(isWatermelon, Branched.withFunction(ks -> {  //对满足isWatermelon条件的分支的消息进行处理的处理器
//                    ks.print(Printed.<String, Purchase>toSysOut().withLabel("西瓜分支"));return  ks.mapValues(v -> {String modifiedNumber = v.getId().replaceAll("(?<=\\d)\\d(?=\\d)", "*");v.setId( modifiedNumber);return v;});},"watermelon")) //这个地方需要指定name,用作为返回的map的key 如该分支存放到返回的map的key为:split-watermelon.branch((key, value) -> value.getName().equals("苹果"),Branched.withConsumer( //不满足上面的条件的会继续向下匹配,满足条件直接发送ks -> ks.peek((k, v) -> log.info("[苹果分支] value:{}" ,v)).to(APPLE_TOPIC_NAME, Produced.with(stringSerde, propertiesSerde)),"apple")).defaultBranch(Branched.withConsumer( //把不满足前面所以branch条件的消息发送到默认主题ks -> ks.to(OUT_TOPIC_NAME, Produced.with(stringSerde, propertiesSerde)),"defaultBranch"));//   把上面的消息打印出来stringKStreamMap.forEach((s, kStream) ->kStream.print(Printed.<String, Purchase>toSysOut().withLabel(s)));KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);
//        jvm关闭时,把流也关闭CountDownLatch latch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread(() -> {kafkaStreams.close();latch.countDown();log.info("The Kafka Streams 执行关闭!");}));kafkaStreams.start();log.info("kafka streams 启动成功!>>>>");latch.await();}@NotNullprivate static Properties getProperties() {Properties properties = new Properties();properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);return properties;}
}

相关文章:

【2024】kafka streams的详细使用与案例练习(2)

目录 前言使用1、整体结构1.1、序列化 2、 Kafka Streams 常用的 API2.1、 StreamsBuilder2.2、 KStream 和 KTable2.3、 filter和 filterNot2.4、 map 和 mapValues2.5、 flatMap 和 flatMapValues2.6、 groupByKey 和 groupBy2.7、 count、reduce 和 aggregate2.8、 join 和 …...

qt 简单实验 读取json格式的配置文件

1.概要 2.代码 //#include "mainwindow.h"#include <QApplication> #include <QFile> #include <QJsonDocument> #include <QJsonObject> #include <QDebug> //读取json数据的配置文件QJsonObject readJsonConfigFile(const QString …...

Docker常用命令与实战示例

docker 1. 安装2. 常用命令3. 存储4. 网络5. redis主从复制示例6. wordpress示例7. DockerFile8. 一键安装超多中间件&#xff08;compose&#xff09; 1. 安装 以centOS系统为例 # 移除旧版本docker sudo yum remove docker \docker-client \docker-client-latest \docker-c…...

数据结构(基础知识)

基础概念&#xff1a; 数据&#xff1a;数据是信息的载体&#xff0c;是描述客观事物属性的数&#xff0c;字符及所有能输入到计算机中并被计算机程序识别和处理的符号的集合 数据元素&#xff1a;是数据的基本单位&#xff0c;在程序中常作为一个整体来考虑 数据对象&#…...

计算机网络:网络层 - 路由选择协议

计算机网络&#xff1a;网络层 - 路由选择协议 路由器的结构路由选择协议概述自治系统 AS内部网关协议路由信息协议 RIP距离向量算法RIP报文格式收敛问题 开放最短路径优先 OSPF基本工作原理自治系统分区 外部网关协议BGP-4 路由器的结构 如图所示&#xff0c;路由器被分为路由…...

JupyterLab使用指南(六):JupyterLab的 Widget 控件

1. 什么是 Widget 控件 JupyterLab 中的 Widget 控件是一种交互式的小部件&#xff0c;可以用于创建动态的、响应用户输入的界面。通过使用 ipywidgets 库&#xff0c;用户可以在 Jupyter notebook 中创建滑块、按钮、文本框、选择器等控件&#xff0c;从而实现数据的交互式展…...

OpenCV 特征点检测与匹配

一 OpenCV特征场景 ①图像搜索&#xff0c;如以图搜图&#xff1b; ②拼图游戏&#xff1b; ③图像拼接&#xff0c;将两长有关联得图拼接到一起&#xff1b; 1 拼图方法 寻找特征 特征是唯一的 可追踪的 能比较的 二 角点 在特征中最重要的是角点 灰度剃度的最大值对应的…...

css布局之flex应用

/*父 100*/.parent-div {/* 这里添加你想要的属性 */display: flex;flex-direction: row; //行justify-content: space-between; //左右对齐align-items: center;flex-wrap: wrap; //换行}/*中 90 10 */.middle-div {/* 这里添加你想要的属性 */display: flex;flex-direction:…...

树莓派4B设置AP热点步骤

树莓派4B设置AP热点步骤&#xff1a;先进入root模式 预先进行apt-get update 第1步&#xff1a;安装network-manager ​sudo apt-get install network-manager第2步&#xff1a;安装git apt-get install git apt-get install util-linux procps hostapd iproute2 iw haveged …...

Java程序之百鸡百钱问题

题目&#xff1a; 百钱买百鸡的问题算是一套非常经典的不定方程的问题&#xff0c;题目很简单&#xff1a;公鸡5文钱一只&#xff0c;母鸡3文钱一只&#xff0c;小鸡3只一文钱&#xff0c;用100文钱买一百只鸡,其中公鸡&#xff0c;母鸡&#xff0c;小鸡都必须要有&#xff0c;…...

Mybatis——动态sql

if标签 用于判断条件是否成立。使用test属性进行条件判断&#xff0c;如果条件为true&#xff0c;则拼接sql。 <where>标签用于识别语句是否需要连接词and&#xff0c;识别sql语句。 package com.t0.maybatisc.mapper;import com.t0.maybatisc.pojo.Emp; import org.a…...

可视化大屏开发系列——页面布局

页面布局是可视化大屏的基础&#xff0c;想要拥有一个基本美观的大屏&#xff0c;就得考虑页面整体模块的宽高自适应&#xff0c;我们自然就会想到具有强大灵活性flex布局&#xff0c;再借助百分比布局来辅助。至此&#xff0c;大屏页面布局问题即可得到解决。 可视化大屏开发系…...

Python statistics 模块

Python 的 statistics 模块提供了一组用于执行各种统计计算的函数&#xff0c;包括平均值、中位数、标准差、方差以及其他统计量。让我来简单介绍一下。 首先&#xff0c;你可以使用以下方式导入 statistics 模块&#xff1a; python import statistics 接下来&#xff0c;…...

wireshark常见使用表达式

目录 1. 捕获过滤器 (Capture Filters)基本捕获过滤器组合捕获过滤器 2. 显示过滤器 (Display Filters)基本显示过滤器复杂显示过滤器协议特定显示过滤器 3. 进阶显示过滤器技巧使用函数和操作符逻辑操作符 4. 常见网络协议过滤表达式示例HTTP 协议HTTPS 协议DNS 协议DHCP 协议…...

用Java获取键盘输入数的个十百位数

这段Java代码是一个简单的程序&#xff0c;用于接收用户输入的一个三位数&#xff0c;并将其分解为个位、十位和百位数字&#xff0c;然后分别打印出来。下面是代码的详细解释&#xff1a; 导入所需类库: import java.util.Scanner;&#xff1a;导入Scanner类&#xff0c;用于从…...

第10章 启动过程组 (制定项目章程)

第10章 启动过程组 9.1制定项目章程&#xff0c;在第三版教材第356~360页&#xff1b; 文字图片音频方式 视频12 第一个知识点&#xff1a;主要输出 1、项目章程&#xff08;重要知识点&#xff09; 项目目的 为了稳定与发展公司的客户群(抽象&#xff0c;非具体) 可测量的项目…...

html侧导航栏客服栏

ico 替换 ICO <html xmlns"http://www.w3.org/1999/xhtml"><head><meta http-equiv"Content-Type" content"text/html; charsetutf-8"><title>返回顶部</title><script src"js/jquery-2.0.3.min.js"…...

Clonable接口和拷贝

Hello~小伙伴们&#xff01;本篇学习Clonable接口与深拷贝&#xff0c;一起往下看吧~(画图水平有限&#xff0c;两张图&#xff0c;&#xff0c;我真的画了巨久&#xff0c;求路过的朋友来个3连~阿阿阿~~~) 目录 1、Clonable接口概念 2、拷贝 2、1浅拷贝 2、2深拷贝 1、Clon…...

关于小蛋の编程和小蛋编程为同一作者的说明

小蛋の编程和小蛋编程的作品为同一人制作&#xff0c;因前者为父母的手机号进行注册&#xff0c;现用本人手机号注册了新账号小蛋编程&#xff0c;后续文章将在新账号小蛋编程上进行刊登&#xff0c;同时在小蛋编程上对原账号文章进行转载。此账号不再发布帖子&#xff0c;请大…...

大数据平台之Spark

Apache Spark 是一个开源的分布式计算系统&#xff0c;主要用于大规模数据处理和分析。它由UC Berkeley AMPLab开发&#xff0c;并由Apache Software Foundation维护。Spark旨在提供比Hadoop MapReduce更快的处理速度和更丰富的功能&#xff0c;特别是在处理迭代算法和交互式数…...

How to use ModelSim

How to use ModelSim These are all written by a robot Remember, you can only simulate tb files....

【专业英语 复习】第8章 Communications and Networks

1. 单选题 One of the most dramatic changes in connectivity and communications in the past few years has been ____. A. widespread use of mobile devices with wireless Internet connectivity B. chat rooms C. satellite uplinks D. running programs on rem…...

运行vue3项目相关报错

1. VSCode打开TSVue3项目很多地方报错 报错内容 几乎所有文件都会出现未知飘红 error Delete CR prettier/prettier报错原因 插件冲突&#xff0c;Windows系统回车换行符与MAC不一致&#xff08;所以这个问题Windows系统才会出现&#xff09; 解决 需要安装Vue - Official…...

2024年6月计算机视觉论文推荐:扩散模型、视觉语言模型、视频生成等

6月还有一周就要结束了&#xff0c;我们今天来总结2024年6月上半月发表的最重要的论文&#xff0c;重点介绍了计算机视觉领域的最新研究和进展。 Diffusion Models 1、Autoregressive Model Beats Diffusion: Llama for Scalable Image Generation LlamaGen&#xff0c;是一个…...

Centos Stream9 和Centos Stream10的下载网址

Index of /https://mirror.stream.centos.org/...

chrome 录制器及性能分析工具的使用

需求背景&#xff1a; 对比不同VPN方案网络延迟的差异。 验证工具&#xff1a; chrome浏览器自带的录制器、性能插件可以完美的解决这个问题。 注意&#xff1a;录制的操作都在当前页面&#xff0c;不存在新开标签页的场景 解决方案&#xff1a; 使用chrome录制器&#xf…...

如何打造稳定、好用的 Android LayoutInspector?

速度极慢&#xff0c;遇到复杂的布局经常超时 某些情况无法选中指定的 View 本文将围绕 LayoutInspector 的痛点&#xff0c;分析问题并修复&#xff0c;最终将 LayoutInspector 变成一个稳定、好用的插件。 二、加速 Dump View Hierarchy 2.1 问题描述 开发复杂业务的同学…...

C++ Thead互斥量死锁,mutex如何防止死锁---C++11多线程快速学习

假设有两个线程 T1 和 T2&#xff0c;它们需要对两个互斥量 mtx1 和 mtx2 进行访问&#xff0c;而且需要按照以下顺序获取互斥量的所有权&#xff1a; - T1 先获取 mtx1 的所有权&#xff0c;再获取 mtx2 的所有权。 - T2 先获取 mtx2 的所有权&#xff0c;再获取 mtx1 的所有…...

Ubuntu 之Glade图形化设计器

演示环境说明&#xff1a;本机使用Windows 11 家庭版本搭载 Ubuntu 22.04.4 LTS 子系统&#xff0c;同时并安装Ubuntu桌面虚拟化软件XLaunch。 如果没有搭建好上述问题&#xff0c;请参考&#xff1a;windows11子系统Ubuntu 22.04.4子安装图形化界面 Glade是什么&#xff1f;…...

152. 乘积最大子数组

152. 乘积最大子数组 题目链接&#xff1a;152. 乘积最大子数组 代码如下&#xff1a; class Solution { public:int maxProduct(vector<int>& nums) {int resnums[0];vector<int> f(nums.size()1,0),g(nums.size()1,0);f[0]nums[0],g[0]nums[0];for(int i1…...

proactor模式

Proactor模式是一种异步I/O的设计模式&#xff0c;它允许程序直接发起一个异步I/O操作并立即返回&#xff0c;而不需要等待该操作完成。一旦I/O操作实际完成&#xff0c;系统会通知相应的完成处理程序&#xff08;Completion Handler&#xff09;&#xff0c;该处理程序随后执行…...

Charles抓包工具

一、charles简介 1&#xff0c;charles是什么 Charles中文名叫青花瓷&#xff0c;它是一款基于HTTP协议的代理服务器&#xff0c;通过成为电脑或者浏览器的代理&#xff0c;然后截取请求和请求结果达到分析抓包的目的。 特点:跨平台、半免费 2&#xff0c;charles工作原理 前…...

RabbitMQ如何保证消息可靠

解决办法&#xff1a; 1、做好消息确认机制&#xff08;pulisher、consumer[手动ACK]&#xff09; 2、每一个发送的消息都在数据库做好记录。定期将失败的消息再次发送一遍 消息确认机制&#xff1a; 生产者确认模式&#xff1a;确认消息是否发送到broker&#xff0c;失败…...

学习笔记——路由网络基础——路由的高级特性

七、路由的高级特性 1、路由迭代(路由递归) 路由必须有直连的下一跳才能够指导转发&#xff0c;静态路由或BGP路由的下一跳可能不是直连的邻居&#xff0c;因此需要计算出一个直连的下一跳和对应的出接口&#xff0c;这个过程就叫做路由迭代(路由递归)。 添加一条去往20.1.1.…...

网络编程之XDP、TC和IO_URING以及DPDK

一、网络编程常见的技术 在前面已经分析过了XDP、TC和eBPF。也基本把三者间的关系理清了&#xff0c;但现在又有一个疑惑涌了上来。在前面提到过的IO_URING和DPDK与这些技术有什么关系呢&#xff1f;其实只要认真的看过分析文章可能大家心里都已经基本清楚了。 正如在前面不断…...

晶谷高温烧结导电浆料用低熔点玻璃粉 晶谷耐高温导电漆导电油墨高温玻璃粉

晶谷浆料玻璃粉是一种用于电子浆料的材料&#xff0c;它在电子浆料中起到粘结和降低烧结温度的作用&#xff0c;能够提高浆料与基材之间的结合力。 浆料玻璃粉的性能特点包括&#xff1a; - 软化点&#xff1a;软化点在350至650度之间。 - 热膨胀系数&#xff1a;热膨胀系数…...

【Mysql】DQL操作单表、创建数据库、排序、聚合函数、分组、limit关键字

DQL操作单表 1.1 创建数据库 •创建一个新的数据库 db2 CREATE DATABASE db2 CHARACTER SET utf8;•将db1数据库中的 emp表 复制到当前 db2数据库 ** 1.2 排序** 通过 ORDER BY 子句,可以将查询出的结果进行排序 (排序只是显示效果,不会影响真实数据) 语法结构&#xff1a;…...

Excel 常用技巧(四)

Microsoft Excel 是微软为 Windows、macOS、Android 和 iOS 开发的电子表格软件&#xff0c;可以用来制作电子表格、完成许多复杂的数据运算&#xff0c;进行数据的分析和预测&#xff0c;并且具有强大的制作图表的功能。由于 Excel 具有十分友好的人机界面和强大的计算功能&am…...

【Linux 基础】文件与目录管理

1. 文件和目录的基本概念 文件&#xff1a;是数据的集合&#xff0c;可以是文本、图像、视频等。 目录&#xff08;也称为文件夹&#xff09;&#xff1a;是文件和子目录的集合&#xff0c;用于组织文件。 2. 目录和路径 绝对路径&#xff1a;从根目录&#xff08;/&#x…...

C++系列-String(一)

&#x1f308;个人主页&#xff1a;羽晨同学 &#x1f4ab;个人格言:“成为自己未来的主人~” string是用于字符串&#xff0c;可以增删改查 首先&#xff0c;我们来看一下string的底层 接下来&#xff0c;我们来看一下string的常用接口有哪些&#xff1a; #define _CRT_S…...

服务器硬件的基础知识

引言 服务器是现代数据中心和企业IT基础设施的核心组成部分。了解服务器硬件的基本知识不仅有助于选择和维护服务器&#xff0c;还能提高系统性能和可靠性。本文将详细介绍服务器硬件的各个方面&#xff0c;包括处理器、内存、存储、网络、散热和电源等&#xff0c;帮助读者全…...

java基于ssm+jsp 汽车在线销售系统

1 前台功能模块 网站首页 网页首页汽车在线销售系统模块如下&#xff1a;首页、汽车信息、新闻资讯、留言反馈、我的收藏管理等功能图1 图1网页首页 网页前台车辆信息效果图如图2所示 图2 车辆信息界面图 2 管理员功能模块 管理员输入个人的账号、密码登录系统&#xff0c…...

【干货】Android中高级开发进阶必备资料(附:PDF+视频+源码笔记)

4、数据传输与序列化 5、Java虚拟机原理 6、高效IO 设计思想解读开源框架 随着互联网企业的不断发展&#xff0c;产品项目中的模块越来越多&#xff0c;用户体验要求也越来越高&#xff0c;想实现小步快跑、快速迭代的目的越来越难&#xff0c;插件化技术应用而生。如果没有…...

AI通用写作模版,可以在此基础上进行修改

指令 角色 作者 &#xff1a;你是一位自媒体爆文写作专家&#xff0c;负责撰写文章&#xff0c;具备对特定主题的深入理解和一定的写作技巧。读者 &#xff1a;25-55岁通用人群&#xff0c;对资讯新闻类感兴趣&#xff0c;需要易于理解且富有启发性的内容。 技能 研究能力&…...

openEuler2203SP3自定义ios

需求&#xff1a; 1、legacy启动 2、/boot分区1G&#xff0c;剩余给/&#xff0c;lvm分区 3、创建root密码和一个普通用户user&#xff0c;密码Hello2024 4、服务器安装&#xff08;选上development、legacy-unix、security-tools&#xff09; 5、关闭firewalld、selinux …...

一年又一年志愿

--第一篇 20220624十年苦读&#xff0c;青春飞扬&#xff0c;其道大光&#xff0c;来日方长。又是一年高考时&#xff0c;高考改变命运&#xff0c;但是后面还有更关键几步&#xff0c;跟大家一起聊聊。之前写我考状元的经历&#xff0c;堂弟考省前十的经历&#xff0c;有不少…...

NL2SQL进阶系列(1):DB-GPT-Hub、SQLcoder、Text2SQL开源应用实践详解

1. MindSQL(库) MindSQL 是一个 Python RAG&#xff08;检索增强生成&#xff09;库&#xff0c;旨在仅使用几行代码来简化用户与其数据库之间的交互。 MindSQL 与 PostgreSQL、MySQL、SQLite 等知名数据库无缝集成&#xff0c;还通过扩展核心类&#xff0c;将其功能扩展到 Sn…...

OpenGL3.3_C++_Windows(15)

理解glad&#xff1a; OpenGL只是一个标准/规范&#xff0c;具体的实现是由驱动开发商针对特定显卡实现的&#xff0c;由于OpenGL驱动版本众多&#xff0c;它大多数函数的位置都无法在编译时确定下来&#xff0c;需要在运行时查询&#xff0c;因此开发者需要在运行时获取函数…...

出海计划 | 赴马来西亚开展水环境项目考察暨2024中马水务合作论坛

报名咨询...

NeRF从入门到放弃5: Neurad代码实现细节

Talk is cheap, show me the code。 CNN Decoder 如patch设置为32x32,patch_scale设置为3&#xff0c;则先在原图上采样96x96大小的像素块&#xff0c;然后每隔三个取一个像素&#xff0c;降采样成32x32的块。 用这32x32个像素render feature&#xff0c;再经过CNN反卷积预测…...