当前位置: 首页 > news >正文

【微服务】springboot整合kafka-stream使用详解

目录

一、前言

二、kafka stream概述

2.1 什么是kafka stream

2.2 为什么需要kafka stream

2.2.1 对接成本低

2.2.2 节省资源

2.2.3 使用简单

2.3 kafka stream特点

2.4 kafka stream中的一些概念

2.5 Kafka Stream应用场景

三、环境准备

3.1 搭建zk

3.1.1 自定义docker网络

3.1.2 拉取zk镜像

3.1.3 启动zk容器

3.2 搭建kafka

3.2.1 下载kafka并解压

3.2.2 修改配置文件

3.2.3 启动kafka服务

3.3 kafka测试

3.3.1 创建topic

3.3.2 开启kafka生产端控制台

3.3.3 开启kafka消费端控制台

3.4 java客户端集成kafka测试

四、kafka stream 使用

4.1 前置准备

4.2 kafka stream应用开发步骤

4.2.1 步骤1:创建Kafka Streams 实例

4.2.2 步骤2:指定输入与输出topic

4.2.3 步骤3:启动Kafka Streams 实例

4.3 kafka stream操作案例

4.3.1 转换单词大小写

4.3.2 将topic1数据写入到topic2中

4.3.3 统计wordcount

4.4 kafka stream窗口函数使用

4.4.1 需求一,固定时间输出统计结果到另一个topic

4.4.2 需求二,统计topic1中10秒内的wordcount写到topic2

4.5 Kafka Streams使用场景拓展

4.5.1 事件日志监控

4.5.2 事用户行为统计分析

4.5.3 数据聚合与实时计算

4.5.4 实时推荐

4.5.5 实时告警

4.5.6 应用解耦

五、kafka stream整合springboot

5.1 整合过程

5.1.1 导入springboot相关依赖

5.1.2 配置kafka相关信息

5.1.3 添加Kafka Stream配置类

5.1.4 自定义Kafka Stream业务处理监听器

5.1.5 效果测试

六、写在文末


一、前言

随着大数据技术的发展越来越成熟,大数据涉及的领域也越来越多,从以往的T+1到如今的实时处理,得益于底层技术的强大支撑,尤其是流式计算技术的发展让众多的业务场景价值得以深度挖掘,聊到流式计算,涌入入脑海中的Spark Streaming,Flink等,本文接下来将介绍另一种流式计算技术kafka stream。

二、kafka stream概述

2.1 什么是kafka stream

Kafka Stream是一款开源、分布式和水平扩展的流处理平台,其在Apache Kafka之上进行构建,借助其高性能、可伸缩性和容错性,可以实现高效的流处理应用程序。

2.2 为什么需要kafka stream

在处理流式计算的场景中,发展到今天出现了很多成熟的性能高效的技术框架,比如老牌的Apache Storm,大数据处理框架Spark Streaming,Flink等,而且像Spark 与flink都能与SQL紧密结合,集成便捷,功能也很强大,为何还需要kafka stream呢?

2.2.1 对接成本低

kafka可以说在很多互联网公司都有着广泛的使用,只要维护了kafka的环境,即可集成和使用kafka stream。

2.2.2 节省资源

相比于部署spark,storm等这样的大数据处理框架需要的计算资源,部署kafka占用的服务器资源更少,而且维护起来也相对节省人力。

2.2.3 使用简单

相比与spark和flink这样的大数据框架,kafka在日常的开发中接触和使用会更多,学习和上手成本会低很多。  

2.3 kafka stream特点

Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。具有如下特点:

  • Kafka Stream提供了一个非常简单而轻量的Library,可以方便的嵌入任意Java应用中,也可以任意方式打包和部署;

  • 充分利用Kafka分区机制实现水平扩展和顺序性保证;

  • 提供记录级的处理能力,从而实现毫秒级的低延迟;

  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records),这点与spark和flink中的时间窗口处理机制很像;

  • 提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce);

  • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation);

  • 除了Kafka外,无任何外部依赖,且支持正好一次处理语义;

2.4 kafka stream中的一些概念

在kafka stream中,KStream和KTable是理解kafka stream时非常核心的两个概念。

KStream

KStream是一个数据流,是一段顺序的、可以无限长、不断更新的数据集,可以认为所有的记录都通过Insert only的方式插入进这个数据流中。

KTable

KTable代表一个完整的数据集,可对照mysql理解为数据库中的表。每条记录都有KV键值对,key可理解为数据库中的主键,是唯一的,而value代表一条记录,记录通常是一段可序列化的字符串。可以认为KTable中的数据时通过Update only的方式进入的。如果是相同的key,会覆盖掉原来那条记录。

综上来说:

  • KStream是数据流,即不断传输过来的流式数据记录,以Insert only的方式不断插入;

  • KTable是数据集(逻辑概念),相同key的数据只保留最新的记录,也就是Update only;

2.5 Kafka Stream应用场景

Kafka Streams主要用于以下应用场景:

  • 实时数据处理,通过实时流计算,对数据进行快速分析和处理,或者处理之后转交下游应用;
  • 流式ETL,将数据从一个数据源抽取到另一个数据源,或将数据进行转换、清洗和聚合操作;
  • 流-表格Join:将一条流数据与一个表进行关联查询,实现实时查询和联合分析;
  • 行为数据统计分析与推荐,在电商场景中,通过接收用户行为日志数据进行分析计算从而为用户推荐提供数据支撑;

三、环境准备

在开始使用kafka stream之前,先快速搭建起kafka的环境,参照下面的步骤快速部署kafka的环境。

3.1 搭建zk

3.1.1 自定义docker网络

docker network create zk-kafka --driver bridge

3.1.2 拉取zk镜像

docker pull zookeeper:3.8.1

3.1.3 启动zk容器

docker run -d --name zk-server -p 2181:2181 --network zk-kafka -e ALLOW_ANONYMOUS_LOGIN=yes zookeeper:3.8.1 

 

3.2 搭建kafka

3.2.1 下载kafka并解压

下载地址:Apache Kafka,这里我使用 kafka_2.12-3.1.1.tgz

tar -zxvf  kafka_2.12-3.1.1.tgzcd kafka_2.12-3.1.1mkdir logs

3.2.2 修改配置文件

进到config目录下,找到server.properties配置文件,主要修改下面几个核心配置即可(覆盖原有的默认的配置参数)

broker.id=0

listeners=PLAINTEXT://云服务器内网IP:9092
zookeeper.connect=内外网均可,如果不对外暴露使用内网IP:2181
log.dirs=/usr/local/kafka/kafka_2.12-3.1.1/logs
advertised.listeners=PLAINTEXT://外网IP:9092

参数说明:

  • listeners=PLAINTEXT://云服务器内网ip:9092,如果是云服务器,一定要配置成内网IP;
  • advertised.listeners=PLAINTEXT://云服务器公网ip:9092,若要远程访问需配置此项为云服务器的公网ip;

3.2.3 启动kafka服务

在主目录下,使用下面的命令启动kafka服务前台启动

./bin/kafka-server-start.sh ./config/server.properties

或者使用下面的命令后台启动

./bin/kafka-server-start.sh -daemon ./config/server.properties

3.3 kafka测试

kafka服务启动之后,接下来创建一个测试用的topic并测试是否能够正常生产和消费消息

3.3.1 创建topic

使用下面的命令创建一个名为zcy的topic

bin/kafka-topics.sh --create --topic zcy --bootstrap-server 公网IP:9092

3.3.2 开启kafka生产端控制台

使用下面的命令,开启一个生产者的控制台窗口,并发送一条消息

bin/kafka-console-producer.sh --broker-list 公网IP:9092 --topic zcy

3.3.3 开启kafka消费端控制台

使用下面的命令,开启一个消费端的控制台窗口,检查是否能够正常消费消息

bin/kafka-console-consumer.sh --bootstrap-server 公网IP:9092 --topic zcy
或者
bin/kafka-console-consumer.sh --bootstrap-server 公网IP:9092 --topic zcy --from-beginning

3.4 java客户端集成kafka测试

引入kafka的客户端依赖

        <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency>

编写如下的测试代码,向上述kafka的zcy这个topic中发送一条消息

public static void main(String[] args) throws Exception {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "公网IP:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);System.out.println("开始发送数据");// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("zcy","congge " + i));}// 5. 关闭资源kafkaProducer.close();}

运行上面的代码,运行成功后,可以看到上面的kafka的消费端的控制台正确接收到了5条消息

四、kafka stream 使用

介绍了kafka stream的相关概念之后,接下来通过一些案例感受下如何使用

4.1 前置准备

创建一个maven工程,引入如下依赖

    <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.4.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency></dependencies>

再创建另一个topic

bin/kafka-console-consumer.sh --bootstrap-server IP:9092 --topic zcy-out

4.2 kafka stream应用开发步骤

使用kafka stream进行应用的业务开发,即相关的API使用,按照下面几步操作:

4.2.1 步骤1:创建Kafka Streams 实例

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "定义本次实例名称,保持全局唯一");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka连接IP地址:9092");
//... 更多其他的属性可以点击到StreamsConfig配置类进行查看
StreamsBuilder builder = new StreamsBuilder();
KafkaStreams streams = new KafkaStreams(builder.build(), props);

参数说明:

  • props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream") 指定本次流处理应用的唯一标识符;

  • props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") 指定连接的 Kafka 集群的地址;

  • StreamsBuilder builder = new StreamsBuilder() 创建 StreamsBuilder 实例,并用其构建 TOPOLOGY;

4.2.2 步骤2:指定输入与输出topic

final String inputTopic = "topic-input";
final String outputTopic = "topic-output";
KStream<String, String> inputStream = builder.stream(inputTopic);
//从input-topic中拿到数据进行逻辑处理
KStream<String, String> outputStream = inputStream.mapValues(value -> value.toUpperCase());
//将处理后的数据输出到其他的topic中
outputStream.to(outputTopic);

4.2.3 步骤3:启动Kafka Streams 实例

streams.start();

以上几步可以说是Kafka Streams编程的一种固定的方法模板,需重点关注。

4.3 kafka stream操作案例

4.3.1 转换单词大小写

业务场景如下,从topic1中接收到消息,将消息内容转换为大写之后,输出到topic2

完整的代码如下:

public static void main(String[] args) {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-convert-app");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());StreamsBuilder builder = new StreamsBuilder();KStream<String, String> inputStream = builder.stream("zcy");KStream<String, String> outputStream = inputStream.mapValues(value -> value.toUpperCase());outputStream.to("zcy-out", Produced.with(Serdes.String(), Serdes.String()));KafkaStreams streams = new KafkaStreams(builder.build(), props);streams.start();}

运行代码之前,我们将zcy-out这个topic的消费端的终端打开,便于看到程序中处理之后的结果

运行上面的程序,通过观察控制台日志可以发现当前处于等待接收消息输入的状态

由于之前zcy这个topic中已经有消息了,可以看到,经过程序的处理,窗口中能够获取到之前的消息,并且已经将消息转为大写了

此时通过生产端的控制台发送一条消息,然后再在zcy-out消息控制台中就能近乎实时看到被转换后的消息了

注意:如果实际业务中想适当节省计算资源,即不需要实时计算,而是间隔计算之后提交结果,可以通过设置下面的这个参数,即3秒提交一次结果

prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,3000);  //提交时间设置为3秒

4.3.2 将topic1数据写入到topic2中

业务场景如下,topic1接收外部消息,然后转发到topic2中

实际开发中,可能需要将原始的消息经过简单的处理之后发到另一个topic中,以供后面的业务使用,可以考虑使用下面这种方式

public class StreamCopy {public static void main(String[] args) {Properties prop =new Properties();prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"copy-stream");prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"IP:9092");prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,3000);StreamsBuilder builder = new StreamsBuilder();KStream<String, String> inputStream = builder.stream("zcy");inputStream.to("zcy-out", Produced.with(Serdes.String(), Serdes.String()));KafkaStreams streams = new KafkaStreams(builder.build(), prop);streams.start();}}

运行代码之后,仍然采用上面的方式做测试,在zcy的生产者窗口发送一条消息,可以看到zcy-out

中接收到相同的消息

4.3.3 统计wordcount

需求场景如下,通过kafka stream将第一个topic中接收到的消息经过计算之后输出到topic2中

完整代码如下

public class KafkaStreamWordCount {public static void main(String[] args) {//kafka的配置Properties prop = new Properties();prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-count");StreamsBuilder streamsBuilder = new StreamsBuilder();KStream<String, String> stream = streamsBuilder.stream("zcy");stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})//按照value进行聚合处理.groupBy((key, value) -> value)//时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//统计单词的个数.count()//转换为kStream.toStream().map((key, value) -> {System.out.println("key:" + key + " ,vlaue:" + value);return new KeyValue<>(key.key().toString(), value.toString());})//发送消息.to("zcy-out");KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), prop);kafkaStreams.start();}}

4.4 kafka stream窗口函数使用

窗口函数在很多技术框架中都有着广泛的使用,比如spark,flink,hive,甚至在mysql8也开始支持窗口函数了,利用窗口函数可以对某个时间窗口内的数据进行统计、聚合和计算,接下来通过几个案例展示下在kafka stream中窗口函数的使用。

4.4.1 需求一,固定时间输出统计结果到另一个topic

这里每隔3秒输出一次从topic1中过去10秒的数据到topic2中

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class WindowStream1 {public static void main(String[] args) {Properties prop = new Properties();prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "WindowCountStream");prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 3000);prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());StreamsBuilder builder = new StreamsBuilder();KStream<Object, Object> source = builder.stream("zcy");KTable<Windowed<String>, Long> countTable = source.flatMapValues(value -> Arrays.asList(value.toString().split("\\s+"))).map((x, y) -> {return new KeyValue<String, String>(y, "1");}).groupByKey()//加10秒窗口,按步长3秒滑动.windowedBy(TimeWindows.of(Duration.ofSeconds(10).toMillis()).advanceBy(Duration.ofSeconds(3).toMillis())).count();countTable.toStream().foreach((key, val) -> {System.out.println("key: " + key + "  val: " + val);});countTable.toStream().map((key, val) -> {return new KeyValue<String, String>(key.toString(), val.toString());}).to("zcy-out");final Topology topo = builder.build();final KafkaStreams streams = new KafkaStreams(topo, prop);streams.start();}
}

运行代码,按照上述相同的方式测试,然后再在控制台可以看到统计到的时间窗口内的单词数

4.4.2 需求二,统计topic1中10秒内的wordcount写到topic2

一个典型的场景就是,通过session会话的时间窗口统计用户访问网站的时长,对某个特定的用户来说,用户从登录开始,即该用户的窗口开始,直到发生退出或者会话超时,窗口期结束,可以统计在窗口期间发生的各种动作,比如点击某些按钮,浏览某个页面的时长等行为。

public class WindowStream2 {public static void main(String[] args) {Properties prop = new Properties();prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "WindowCountStream");prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 2000);prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());StreamsBuilder builder = new StreamsBuilder();KStream<Object, Object> source = builder.stream("zcy");KTable<Windowed<String>, Long> countTable = source.flatMapValues(value -> Arrays.asList(value.toString().split("\\s+"))).map((x, y) -> {return new KeyValue<>(y, "1");}).groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(15).toMillis())).count();countTable.toStream().foreach((key, val) -> {System.out.println("key: " + key + "  val: " + val);});countTable.toStream().map((key, val) -> {return new KeyValue<String, String>(key.toString(), val.toString());}).to("zcy-out");KafkaStreams streams = new KafkaStreams(builder.build(), prop);streams.start();}}

4.5 Kafka Streams使用场景拓展

4.5.1 事件日志监控

假如现在需要对某系统中实时上报到topic-1的错误或告警日志进行转换,并输出到下游的topic-2中做大屏监控,如下为原始的从topic-1中获取到的日志数据格式

{"timestamp" : "2023-12-11 23:25:13","method": "GET","endpoint": "http://10.1.63.112:9098/fox/message/get","status_code": 500,"source_ip":"192.168.9.138","request_params":"type=5&status=3","operation_user":"6613"
}

假如下游的应用需要实时可视化用户请求日志,需要的数据格式如下:

{"ope_time": "2023-12-11 23:25:13","ope_user": [{"user_id": "6613", "source_ip": "192.168.9.138","endpoint":"http://10.1.63.112:9098/fox/message/get"}]
}

如果使用Kafka Stream来处理,可以考虑下面的思路

  • 根据业务需求对原始日志进行聚合和转换,重新组装结果的格式,并将结果写到下游的topic中;

  • 下游应用从topic中获取处理的结果,按照大屏的数据格式再次组装数据,最后展示到大屏;

4.5.2 事用户行为统计分析

比如某电商网站或app的后台需要统计用户某些指标的数据,从而分析用户的消费习惯为后续做促销提供数据决策依据,现在从原始的topic中可以拿到下面几类指标信息

{ "enter_type": app, "online_time": 16m, "user_type": "level_1" ,"buy_time_in_month":2,"user_id":1003
}

有了这些信息,就可以计算某种类型的用户,在过去一年内产生在app或网站来浏览的时长,购买的总次数,如果需要汇聚更多的信息,可以要求上游的topic中传入更详细的参数。

4.5.3 数据聚合与实时计算

kafka stream可以作为简单的实时计算框架,对数据进行准实时的聚合统计,快速汇总计算数据按业务维度进行数据分发,承载一部分大数据实时计算的功能。

4.5.4 实时推荐

基于现有的数据模型进行相关的指标计算,预测某些指标的行为,进一步指导业务决策,比如上面统计电商网站中用户的网站浏览动作。

4.5.5 实时告警

检测系统异常指标,通过准实时计算汇聚结果,将异常行为进行上报。

4.5.6 应用解耦

这个与消息中间件的作用类似,为了减少源系统的计算压力,可以通过kafka stream进行解耦,所有的计算动作在kafka stream中进行,然后再将计算结果推送到下游的topic进行后续的使用。

五、kafka stream整合springboot

有了上面对kafka stream的了解和使用,接下来演示下如何在springboot中整合kafka stream

5.1 整合过程

5.1.1 导入springboot相关依赖

 在上述已经导入的依赖的基础上补充下面几个依赖

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>

5.1.2 配置kafka相关信息

在配置文件中添加如下配置信息

server:port: 8088spring:application:name: kafka-sream-appkafka:bootstrap-servers: kafka连接IP:9092producer:retries: 5key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerkafka:hosts: kafka连接IP:9092group: ${spring.application.name}

5.1.3 添加Kafka Stream配置类


@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {private static final int MAX_MESSAGE_SIZE = 16 * 1024 * 1024;private String hosts;private String group;@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {Map<String, Object> props = new HashMap<>();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_id");props.put(StreamsConfig.RETRIES_CONFIG, 5);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());return new KafkaStreamsConfiguration(props);}
}

5.1.4 自定义Kafka Stream业务处理监听器

还记得在编写消息中间件客户端程序的时候添加的那些监听器吗,原理类似,这里自定义一个监听器处理类,接收上游的topic消息进行处理之后再发送到下一个topic中,相当于是把上面的代码搬过来放到spring的ioc容器中

import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.time.Duration;
import java.util.Arrays;@Configuration
public class StreamCountListener {@Beanpublic KStream<String, String> kStream(StreamsBuilder streamsBuilder) {KStream<String, String> stream = streamsBuilder.stream("zcy");stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})//按照value进行聚合处理.groupBy((key, value) -> value)//时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//统计单词的个数.count()//转换为kStream.toStream().map((key, value) -> {System.out.println("key:" + key + " ,vlaue:" + value);return new KeyValue<>(key.key().toString(), value.toString());})//发送消息.to("zcy-out");return stream;}}

5.1.5 效果测试

运行项目,运行之后,使用下面的代码,往zcy这个topic中发送一些消息

public static void main(String[] args) throws Exception {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);System.out.println("开始发送数据");// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("zcy","congge_" + i));}// 5. 关闭资源kafkaProducer.close();}

发送成功后,在控制台中可以看到经过上面的监听类处理得到的结果输出信息

六、写在文末

本篇通过较大得篇幅详细分享了kafka stream的使用,流式计算可以说是当下非常火热的技术之一,对于非大数据场景下的业务处理,kafka stream提供了一种很好的解决思路,希望对看到的同学有所帮助,本篇到此介绍,感谢观看。

相关文章:

【微服务】springboot整合kafka-stream使用详解

目录 一、前言 二、kafka stream概述 2.1 什么是kafka stream 2.2 为什么需要kafka stream 2.2.1 对接成本低 2.2.2 节省资源 2.2.3 使用简单 2.3 kafka stream特点 2.4 kafka stream中的一些概念 2.5 Kafka Stream应用场景 三、环境准备 3.1 搭建zk 3.1.1 自定义d…...

什么是动态代理?

目录 一、为什么需要代理&#xff1f; 二、代理长什么样&#xff1f; 三、Java通过什么来保证代理的样子&#xff1f; 四、动态代理实现案例 五、动态代理在SpringBoot中的应用 导入依赖 数据库表设计 OperateLogEntity实体类 OperateLog枚举 RecordLog注解 上下文相…...

【OAuth2】:赋予用户控制权的安全通行证--原理篇

&#x1f973;&#x1f973;Welcome Huihuis Code World ! !&#x1f973;&#x1f973; 接下来看看由辉辉所写的关于OAuth2的相关操作吧 目录 &#x1f973;&#x1f973;Welcome Huihuis Code World ! !&#x1f973;&#x1f973; 一.什么是OAuth? 二.为什么要用OAuth?…...

【K8s】2# 使用kuboard管理K8s集群(kuboard安装)

文章目录 安装 Kuboard v3部署计划 安装登录测试 安装 Kuboard v3 部署计划 在正式安装 kuboard v3 之前&#xff0c;需做好一个简单的部署计划的设计&#xff0c;在本例中&#xff0c;各组件之间的连接方式&#xff0c;如下图所示&#xff1a; 假设用户通过 http://外网IP:80…...

爬虫是什么?起什么作用?

【爬虫】 如果把互联网比作一张大的蜘蛛网&#xff0c;数据便是放于蜘蛛网的各个节点&#xff0c;而爬虫就是一只小蜘蛛&#xff0c;沿着网络抓取自己得猎物&#xff08;数据&#xff09;。这种解释可能更容易理解&#xff0c;官网的&#xff0c;就是下面这个。 爬虫是一种自动…...

代码随想录27期|Python|Day24|回溯法|理论基础|77.组合

图片来自代码随想录 回溯法题目目录 理论基础 定义 回溯法也可以叫做回溯搜索法&#xff0c;它是一种搜索的方式。 回溯是递归的副产品&#xff0c;只要有递归就会有回溯。回溯函数也就是递归函数&#xff0c;指的都是一个函数。 基本问题 组合问题&#xff08;无序&…...

mysql(49) : 大数据按分区导出数据

代码 import com.alibaba.gts.flm.base.util.Mysql8Instance;import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.math.BigDecimal; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.u…...

阿里云ECS配置IPv6后,如果无法访问该服务器上的网站,可检查如下配置

1、域名解析到这个IPv6地址,同一个子域名可以同时解析到IPv4和IPv6两个地址&#xff0c;这样就可以给网站配置ip4和ipv6双栈&#xff1b; 2、在安全组规则开通端口可访问&#xff0c;设定端口后注意授权对象要特殊设置“源:::/0” 3、到服务器nginx配置处&#xff0c;增加端口…...

基于SSM的双减后初小教育课外学习生活活动平台的设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;Vue 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#xff1a;是 目录…...

HTTP前端请求

目录 HTTP 请求1.请求组成2.请求方式与数据格式get 请求示例post 请求示例json 请求示例multipart 请求示例数据格式小结 3.表单3.1.作用与语法3.2.常见的表单项 4.session 原理5.jwt 原理 HTTP 请求 1.请求组成 请求由三部分组成 请求行请求头请求体 可以用 telnet 程序测…...

前端性能优化二十四:花裤衩模板第三方库打包

(1). 工作原理: ①. externals配置在所创建bundle时:a. 会依赖于用户环境(consumers environment)中的依赖,防止将某些import的包(package)打包到bundle中b. 在运行时(runtime)再去从外部获取这些扩展依赖(external dependencies)②. webpack会检测这些组件是否在externals中注…...

多维时序 | MATLAB实现BiTCN-Multihead-Attention多头注意力机制多变量时间序列预测

多维时序 | MATLAB实现BiTCN-Multihead-Attention多头注意力机制多变量时间序列预测 目录 多维时序 | MATLAB实现BiTCN-Multihead-Attention多头注意力机制多变量时间序列预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 多维时序 | MATLAB实现BiTCN-Multihea…...

Qt的简单游戏实现提供完整代码

文章目录 1 项目简介2 项目基本配置2.1 创建项目2.2 添加资源 3 主场景3.1 设置游戏主场景配置3.2 设置背景图片3.3 创建开始按钮3.4 开始按钮跳跃特效实现3.5 创建选择关卡场景3.6 点击开始按钮进入选择关卡场景 4 选择关卡场景4.1场景基本设置4.2 背景设置4.3 创建返回按钮4.…...

SpringMVC之文件的下载

系列文章目录 提示&#xff1a;这里可以添加系列文章的所有文章的目录&#xff0c;目录需要自己手动添加 SpringMVC之文件的下载 提示&#xff1a;写完文章后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 系列文章目录前言一、文件下载实现…...

计算机组成原理第6章-(算术运算)【下】

移位运算 对于有符号数的移位称为算术移位,对于无符号数的移位称为逻辑移位。 算术移位规则【极其重要】 对于正数的算术移位,且不管是何种机器数【原码、反码、补码】,移位后出现的空位全部填0。 而对于负数的算术移位,机器数不同,移位后的规则也不同。 对于负数的原…...

【开题报告】基于微信小程序的校园资讯平台的设计与实现

1.选题背景与意义 随着移动互联网的快速发展&#xff0c;微信成为了人们日常生活中不可或缺的工具之一。在校园生活中&#xff0c;学生们对于校园资讯的获取和交流需求也越来越高。然而&#xff0c;传统的校园资讯发布方式存在信息不及时、传播范围有限等问题&#xff0c;无法…...

VUE前端导出文件之file-saver插件

VUE前端导出文件之file-saver插件 安装 npm install file-saver --save # 如使用TS开发&#xff0c;可安装file-saver的TypeScript类型定义 npm install types/file-saver --save-dev如果需要保存大于 blob 大小限制的非常大的文件&#xff0c;或者没有 足够的 RAM&#xff0…...

【Earth Engine】协同Sentinel-1/2使用随机森林回归实现高分辨率相对财富(贫困)制图

目录 1 简介与摘要2 思路3 效果预览4 代码思路5 完整代码6 后记 1 简介与摘要 最近在做一些课题&#xff0c;需要使用Sentinel-1/2进行机器学习制图。 然后想着总结一下相关数据和方法&#xff0c;就花半小时写了个代码。 然后再花半小时写下这篇博客记录一下。 因为基于多次拍…...

C++ 检测 是不是 com组件 的办法 已解决

在日常开发中&#xff0c;遇到动态库和 com组件库的调用 无法区分。检测是否com组件的办法 在头部文件&#xff0c;引入文件 如果能编译成功说明是 com组件&#xff0c;至于动态库如何引入&#xff0c;还在观察中 最简单办法 regsvr32 TerraExplorerX.dll 是com 组件 regs…...

linux buffer的回写的触发链路

mark_buffer_dirty中除了会标记dirty到buffer_head->state、page.flag、folio->mapping->i_pages外&#xff0c;还会调用inode所在文件系统的dirty方法&#xff08;inode->i_sb->s_op->dirty_inode&#xff09;。然后为inode创建一个它所在memory group的wri…...

Lambda表达式超详解

目录 背景 Lambda表达式的用法 函数式接口 Lambda表达式的基本使用 语法精简 变量捕获 匿名内部类 匿名内部类中的变量捕获 Lambda的变量捕获 Lambda表达式在类集中的使用 Collection接口 List接口 Map接口 总结 背景 Lambda表达式是Java SE 8中的一个重要的新特性.…...

西门子博途与菲尼克斯无线蓝牙模块通讯

菲尼克斯无线蓝牙模块 正常运行时,可以使用基站控制字0发送00E0(得到错误代码命令) 正常运行时,可以使用基站控制字0发送00E0(得到错误代码命令)得到各个无线I/O是否连 接的信号(状态字IN word 1的第2、6、10位) 小车1连接状态 小车2连接状态 小车3连接状态 1#小车自…...

vue2 之 实现pdf电子签章

一、前情提要 1. 需求 仿照e签宝&#xff0c;实现pdf电子签章 > 拿到pdf链接&#xff0c;移动章的位置&#xff0c;获取章的坐标 技术 : 使用fabric pdfjs-dist vuedraggable 2. 借鉴 一位大佬的代码仓亏 : 地址 一位大佬写的文章 &#xff1a;地址 3. 优化 在大佬的代码…...

什么是MVC?MVC框架的优势和特点

目录 一、什么是MVC 二、MVC模式的组成部分和工作原理 1、模型&#xff08;Model&#xff09; 2、视图&#xff08;View&#xff09; 3、控制器&#xff08;Controller&#xff09; 三、MVC模式的工作过程如下&#xff1a; 用户发送请求&#xff0c;请求由控制器处理。 …...

主从复制mysql-replication | Replication故障排除

主从复制mysql-replication 准备环境 #防火墙 selinux systemctl stop firewalld --now &&setenforce 0 #修改主机名&#xff1a;hostnamectl set-hostname 名字 tip&#xff1a;vim /etc/sysconfig/network-scripts/ifcfg-ens33 BOOTPRTOTstatic IPADDR192.168.100.…...

基于Java SSM框架实现教学质量评价评教系统项目【项目源码+论文说明】计算机毕业设计

基于java的SSM框架实现教学质量评价评教系统演示 摘要 随着科学技术的飞速发展&#xff0c;社会的方方面面、各行各业都在努力与现代的先进技术接轨&#xff0c;通过科技手段来提高自身的优势&#xff0c;教学质量评价系统当然也不能排除在外。教学质量评价系统是以实际运用为…...

03|模型I/O:输入提示、调用模型、解析输出

03&#xff5c;模型I/O&#xff1a;输入提示、调用模型、解析输出 从这节课开始&#xff0c;我们将对 LangChain 中的六大核心组件一一进行详细的剖析。 模型&#xff0c;位于 LangChain 框架的最底层&#xff0c;它是基于语言模型构建的应用的核心元素&#xff0c;因为所谓 …...

springcloud-gateway-2-鉴权

目录 一、跨域安全设置 二、GlobalFilter实现全局的过滤与拦截。 三、GatewayFilter单个服务过滤器 1、原理-官方内置过滤器 2、自定义过滤器-TokenAuthGatewayFilterFactory 3、完善TokenAuthGatewayFilterFactory的功能 4、每一个服务编写一个或多个过滤器&#xff0c…...

实现一个最简单的内核

更好的阅读体验&#xff0c;请点击 YinKai s Blog | 实现一个最简单的内核。 ​ 这篇文章带大家实现一个最简单的操作系统内核—— Hello OS。 PC 机的引导流程 ​ 我们这里将借助 Ubuntu Linux 操纵系统上的 GRUB 引导程序来引导我们的 Hello OS。 ​ 首先我们得了解一下&a…...

2024华为OD机试真题指南宝典—持续更新(JAVAPythonC++JS)【彻底搞懂算法和数据结构—算法之翼】

PC端可直接搜索关键词 快捷键&#xff1a;CtrlF 年份关键字、题目关键字等等 注意看本文目录-快速了解本专栏 文章目录 &#x1f431;2024年华为OD机试真题&#xff08;马上更新&#xff09;&#x1f439;2023年华为OD机试真题&#xff08;更新中&#xff09;&#x1f436;新…...

【12.23】转行小白历险记-算法02

不会算法的小白不是好小白&#xff0c;可恶还有什么可以难倒我这个美女的&#xff0c;不做花瓶第二天&#xff01; 一、螺旋矩阵 59. 螺旋矩阵 II - 力扣&#xff08;LeetCode&#xff09; 1.核心思路&#xff1a;确定循环的路线&#xff0c;左闭右开循环&#xff0c;思路简…...

k8s部署nginx-ingress服务

k8s部署nginx-ingress服务 经过大佬的拷打&#xff0c;终于把这块的内容配置完成了。 首先去 nginx-ingress官网查看相关内容。 核心就是这个&#xff1a; kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/controller-v1.8.2/deploy/static/prov…...

SpringBoot Elasticsearch全文搜索

文章目录 概念全文搜索相关技术Elasticsearch概念近实时索引类型文档分片(Shard)和副本(Replica) 下载启用SpringBoot整合引入依赖创建文档类创建资源库测试文件初始化数据创建控制器 问题参考 概念 全文搜索&#xff08;检索&#xff09;&#xff0c;工作原理&#xff1a;计算…...

Python 常用模块re

Python 常用模块re 【一】正则表达式 【1】说明 正则表达式是一种强大的文本匹配和处理工具&#xff0c;主要用于字符串的模式匹配、搜索和替换。正则表达式测试网址&#xff1a;正则表达式在线测试 正则表达式手册&#xff1a;正则表达式手册 【2】字符组 字符转使用[]表…...

【华为OD题库-106】全排列-java

题目 给定一个只包含大写英文字母的字符串S&#xff0c;要求你给出对S重新排列的所有不相同的排列数。如:S为ABA&#xff0c;则不同的排列有ABA、AAB、BAA三种。 解答要求 时间限制:5000ms,内存限制:100MB 输入描述 输入一个长度不超过10的字符串S&#xff0c;确保都是大写的。…...

Three.js 详细解析(持续更新)

1、简介&#xff1b; Three.js依赖一些要素&#xff0c;第一是scene&#xff0c;第二是render&#xff0c;第三是carmea npm install --save three import * as THREE from "three"; import { GLTFLoader } from "three/examples/jsm/loaders/GLTFLoader.js&quo…...

Unity中Shader平移矩阵

文章目录 前言方式一&#xff1a;对顶点本地空间下的坐标进行相加平移1、在属性面板定义一个四维变量记录在 xyz 上平移多少。2、在常量缓冲区进行申明3、在顶点着色器中&#xff0c;在进行其他坐标转化之前&#xff0c;对模型顶点本地空间下的坐标进行转化4、我们来看看效果 方…...

python dash 的学习笔记1

dash 用python开发web界面 https://dash.plotly.com/ 官方上支持jula F# python一类。当然我只会python只学习python中使用dash. 要做一个APP&#xff0c;用php,java以及.net都可以写&#xff0c;只所有选择python是因为最近在用这一个。同时也发现python除了慢全是优点。 资料…...

SQLITE如何同时查询出第一条和最后一条两条记录

一个时间记录表&#xff0c;需要同时得到整个表或一段时间内第一条和最后一条两条记录&#xff0c;按如下方法会提示错误&#xff1a;ORDER BY clause should come after UNION not before select * from sdayXX order by op_date asc limit 1 union select * from sday…...

四、ensp配置ftp服务器实验

文章目录 实验内容实验拓扑操作步骤配置路由器为ftp server 实验内容 本实验模拟企业网络。PC-1为FTP 用户端设备&#xff0c;需要访问FTP Server&#xff0c;从服务器上下载或上传文件。出于安全角度考虑&#xff0c;为防止服务器被病毒文件感染&#xff0c;不允许用户端直接…...

VS2020使用MFC开发一个贪吃蛇游戏

背景&#xff1a; 贪吃蛇游戏 按照如下步骤实现:。初始化地图 。通过键盘控制蛇运动方向&#xff0c;注意重新设置运动方向操作。 。制造食物。 。让蛇移动&#xff0c;如果吃掉食物就重新生成一个食物&#xff0c;如果会死亡就break。用蛇的坐标将地图中的空格替换为 #和”将…...

【经典LeetCode算法题目专栏分类】【第9期】深度优先搜索DFS与并查集:括号生成、岛屿问题、扫雷游戏

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能AI、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推荐--…...

字符设备驱动开发-注册-设备文件创建

一、字符设备驱动 linux系统中一切皆文件 1、应用层&#xff1a; APP1 APP2 ... fd open("led驱动的文件"&#xff0c;O_RDWR); read(fd); write(); close(); 2、内核层&#xff1a; 对灯写一个驱动 led_driver.c driver_open(); driver_read(); driver_write(…...

TrustZone之可信操作系统

有许多可信内核&#xff0c;包括商业和开源的。一个例子是OP-TEE&#xff0c;最初由ST-Ericsson开发&#xff0c;但现在是由Linaro托管的开源项目。OP-TEE提供了一个功能齐全的可信执行环境&#xff0c;您可以在OP-TEE项目网站上找到详细的描述。 OP-TEE的结构如下图所示&…...

java定义三套场景接口方案

一、背景 在前后端分离开发的背景下&#xff0c;后端java开发人员现在只需要编写接口接口。特别是使用微服务开发的接口。resful风格接口。那么一般后端接口被调用有下面三种场景。一、不需要用户登录的接口调用&#xff0c;第二、后端管理系统接口调用&#xff08;需要账号密…...

idea连接数据库,idea连接MySQL,数据库驱动下载与安装

文章目录 普通Java工程先创建JAVA工程JDBC连接数据库测试连接 可视化连接数据库数据库驱动下载与安装常用的数据库驱动下载MySQL数据库Oracle数据库SQL Server 数据库PostgreSQL数据库 下载MySQL数据库驱动JDBC连接各种数据库的连接语句MySQL数据库Oracle数据库DB2数据库sybase…...

Redis-实践知识

转自极客时间Redis 亚风 原文视频&#xff1a;https://u.geekbang.org/lesson/535?article681062 Redis最佳实践 普通KEY Redis 的key虽然可以自定义&#xff0c;但是最好遵循下面几个实践的约定&#xff1a; 格式&#xff1a;[业务名称]:[数据名]:[id] 长度不超过44字节 不…...

多维时序 | MATLAB实现SSA-CNN-SVM麻雀算法优化卷积神经网络-支持向量机多变量时间序列预测

多维时序 | MATLAB实现SSA-CNN-SVM麻雀算法优化卷积神经网络-支持向量机多变量时间序列预测 目录 多维时序 | MATLAB实现SSA-CNN-SVM麻雀算法优化卷积神经网络-支持向量机多变量时间序列预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 多维时序 | MATLAB实现…...

leetcode160相交链表思路解析

分别让tmp1以及tmp2的结点分别先指向headA以及headB&#xff0c;当遍历完成后&#xff0c;再让tmp1以及tmp2分别指向haedB和headA反转 此处有个问题&#xff1a;为什么if判断句中写tmp1&#xff01;&#xff1d;nullptr&#xff0c;能够编译通过&#xff0c;但是写tmp1->ne…...

在线分析工具-日志优化

一、概述 针对于大日志文件&#xff0c;统计分析出日志文件的相关指标&#xff0c;帮助开发测试人员&#xff0c;优化日志打印。减少存储成本 二、日志分析指标 重复打印日志&#xff1a;统一请求reqId的重复打印日志打印最多的方法&#xff1a;检测出打印日志最多的方法…...