当前位置: 首页 > news >正文

springboot整合kafka

springboot整合kafka

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.4.0</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>cn.lhz</groupId><artifactId>kafka</artifactId><version>0.0.1</version><name>kafka</name><description>kafka</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>21</java.version><jdk.version>21</jdk.version><maven.compiler.source>${jdk.version}</maven.compiler.source><maven.compiler.target>${jdk.version}</maven.compiler.target><maven.compiler.compilerVersion>${jdk.version}</maven.compiler.compilerVersion><maven.compiler.encoding>utf-8</maven.compiler.encoding><project.build.sourceEncoding>utf-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><maven.test.failure.ignore>true</maven.test.failure.ignore><maven.test.skip>true</maven.test.skip></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-kafka</artifactId></dependency>--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>com.github.xiaoymin</groupId><artifactId>knife4j-openapi3-jakarta-spring-boot-starter</artifactId><version>4.5.0</version></dependency></dependencies><build><finalName>${project.name}</finalName><plugins><!-- 编译级别 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.13.0</version><configuration><!-- 设置编译字符编码 --><encoding>UTF-8</encoding><!-- 设置编译jdk版本 --><source>${jdk.version}</source><target>${jdk.version}</target></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build><repositories><repository><id>public</id><name>aliyun nexus</name><url>https://maven.aliyun.com/repository/public</url><releases><enabled>true</enabled></releases></repository></repositories><pluginRepositories><pluginRepository><id>public</id><name>aliyun nexus</name><url>https://maven.aliyun.com/repository/public</url><releases><enabled>true</enabled></releases><snapshots><enabled>false</enabled></snapshots></pluginRepository></pluginRepositories>
</project>
application.yml配置kafka

本案例使用最少的配置,在application.yml中添加Kafka的连接配置:

spring:application:name: springboot-kafkakafka:bootstrap-servers: lihaozhe01:9092,lihaozhe02:9092,lihaozhe03:9092consumer:group-id: lihaozheauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
#是否激活 swagger true or false
springdoc:swagger-ui:path: /swagger-uitags-sorter: alphaoperations-sorter: alphaapi-docs:path: /v3/api-docsgroup-configs:- group: 'default'paths-to-match: '/**'packages-to-scan: cn.lhz.controller
# knife4j的增强配置,不需要增强可以不配
knife4j:enable: truesetting:language: zh_cnbasic:# 启用基本认证enable: true# 设置用户名username: admin# 设置密码password: lihaozhe
Kafka配置类

创建KafkaConfig.java配置类,用于创建Kafka主题:

package cn.lhz.config;import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;/*** @author 李昊哲* @version 1.0.0*/
@Configuration
@EnableKafka
public class KafkaConfig {
//  @Bean
//  public ProducerFactory<String, String> producerFactory() {
//    Map<String, Object> configProps = new HashMap<>();
//    // 配置属性
//    return new DefaultKafkaProducerFactory<>(configProps);
//  }
//
//  @Bean
//  public KafkaTemplate<String, String> kafkaTemplate() {
//    return new KafkaTemplate<>(producerFactory());
//  }//  @Bean
//  public NewTopic myTopic() {
//    return new NewTopic("lihaozhe", 1, (short) 1);
//  }
}
Kafka消费者

创建KafkaConsumer.java消费者类,用于从Kafka主题接收消息:

以下两段代码二选一

  • 第一个案例代码只获取 value
  • 第二个案例代码获取了 ConsumerRecord 完整信息
获取value
package cn.lhz.service;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;/*** @author 李昊哲* @version 1.0.0*/
@Service
public class KafkaConsumer {/*** 从 topic 中获取 value** @param value topic 中获取 value*/@KafkaListener(topics = "lihaozhe", groupId = "lihaozhe")public void consume(String value) {System.out.printf("Consumed: %s\n", value);}
}
获取ConsumerRecord
package cn.lhz.service;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;/*** @author 李昊哲* @version 1.0.0*/
@Service
public class KafkaConsumer {/*** 从 topic 中获取 ConsumerRecord** @param record topic 中获取 ConsumerRecord*/@KafkaListener(topics = "lihaozhe", groupId = "lihaozhe")public void consume(ConsumerRecord<String, String> record) {System.out.printf("Consumed: %s-%d-%s:%s\n", record.topic(), record.partition(), record.key(), record.value());}
}
Kafka生产者
package cn.lhz.service;import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;/*** @author 李昊哲* @version 1.0.0*/
@Service
@RequiredArgsConstructor
public class KafkaProducer {private final KafkaTemplate<String, String> kafkaTemplate;/*** 或者 topic 中的 value** @param topic kafka topic* @param value kafka topic 中的 value*/public void sendMessage(String topic, String value) {kafkaTemplate.send(topic, value);}/*** 或者 topic 中的 value** @param topic kafka topic* @param key   kafka topic 中的 key* @param value kafka topic 中的 value*/public void sendMessage(String topic, String key, String value) {kafkaTemplate.send(topic, key, value);}
}
创建控制器发送消息
package cn.lhz.controller;import cn.lhz.service.KafkaProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;/*** @author 李昊哲* @version 1.0.0*/
@RestController
@RequiredArgsConstructor
public class KafkaController {private final KafkaProducer kafkaProducer;@GetMapping("/send/{message}")public String sendMessage(@PathVariable(value = "message") String message) {// 只发送 valuekafkaProducer.sendMessage("lihaozhe", message);return "消息:" + message;}@GetMapping("/send/{key}/{message}")public String sendMessage(@PathVariable(value = "key") String key,@PathVariable(value = "message") String message) {// 只发送 key 和 valuekafkaProducer.sendMessage("lihaozhe", key, message);return "消息:" + message;}
}
配置OpenApi
package cn.lhz.config;import io.swagger.v3.oas.models.OpenAPI;
import io.swagger.v3.oas.models.info.Contact;
import io.swagger.v3.oas.models.info.Info;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.web.context.WebServerInitializedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.net.Inet4Address;
import java.net.UnknownHostException;/*** @author 李昊哲* @version 1.0.0*/
@Configuration
@Slf4j
public class OpenApiConfig implements ApplicationListener<WebServerInitializedEvent> {@Beanpublic OpenAPI springOpenAPI() {Contact contact = new Contact();contact.setName("李昊哲");contact.setUrl("https://space.bilibili.com/480308139");contact.setEmail("646269678@qq.com");// 访问路径:http://localhost:8080/doc.html// 访问路径:http://localhost:8080/swagger-ui/index.htmlreturn new OpenAPI().info(new Info().title("SpringBoot Kafka API").description("SpringBoot Kafka Simple Application").contact(contact).version("1.0.0"));}@Overridepublic void onApplicationEvent(WebServerInitializedEvent event) {try {//获取IPString hostAddress = Inet4Address.getLocalHost().getHostAddress();//获取端口号int port = event.getWebServer().getPort();//获取应用名String applicationName = event.getApplicationContext().getApplicationName();// TODO:这个localhost改成host地址log.info("项目启动启动成功!接口文档地址: http://{}:{}{}/doc.html", hostAddress, port, applicationName);log.info("项目启动启动成功!接口文档地址: http://{}:{}{}/swagger-ui/index.html", hostAddress, port, applicationName);} catch (UnknownHostException e) {e.printStackTrace();}}
}
启动项目测试

springboot kafka

控制台输出

Consumed: hello world

springboot kafka

控制台输出

Consumed: lihaozhe-0-hello:world

相关文章:

springboot整合kafka

springboot整合kafka pom.xml <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven…...

Python深度学习框架:PyTorch、Keras、Scikit-learn、TensorFlow如何使用?学会轻松玩转AI!

前言 我们先简单了解一下PyTorch、Keras、Scikit-learn和TensorFlow都是什么。 想象一下你要盖一座大房子。你需要砖头、水泥、工具等等&#xff0c;对吧&#xff1f;机器学习也是一样&#xff0c;需要一些工具来帮忙。PyTorch、Keras、Scikit-learn和TensorFlow就是四种不同的…...

【Linux】安装cuda

一、安装nvidia驱动 # 添加nvidia驱动ppa库 sudo add-apt-repository ppa:graphics-drivers/ppa sudo apt update# 查找推荐版本 sudo ubuntu-drivers devices# 安装推荐版本 sudo apt install nvidia-driver-560# 检验nvidia驱动是否安装 nvidia-smi 二、安装cudatoolkit&…...

为什么DDoS防御很贵?

分布式拒绝服务攻击&#xff08;DDoS攻击&#xff09;是一种常见的网络安全威胁&#xff0c;通过大量恶意流量使目标服务器无法提供正常服务。DDoS防御是一项复杂且昂贵的服务&#xff0c;本文将详细探讨为什么DDoS防御如此昂贵&#xff0c;并提供一些实用的代码示例和解决方案…...

将WPS的PPT 无损的用微软的PowerPoint打开

用WPS做了PPT&#xff0c;但是用用PowerPoint打开的时候&#xff0c;老是会有几张图错位。 解决方案&#xff1a;将wps做的PPT另存为PowerPoint的格式 参考博客&#xff1a;解决office的PPT和WPS的PPT不兼容的问题_office ppt和wps中代码不通用-CSDN博客 另存为的时候&#…...

【汇编】uniapp开发

UniApp是一款基于Vue.js构建的跨平台开发框架&#xff0c;可以用于快速开发同时运行在多个平台&#xff08;包括iOS、Android、H5和小程序&#xff09;的应用程序。UniApp的目标是提供一套代码即可在不同平台上运行的开发模式&#xff0c;从而节省开发者的时间和精力。本文将介…...

详解Oracle表的类型(二)

1.引言&#xff1a; Oracle数据库提供了多种表类型&#xff0c;以满足不同的数据存储和管理需求。本博文将对Oracle分区表及使用场景进行详细介绍。 2. 分区表 分区表是Oracle数据库中一种重要的表类型&#xff0c;它通过将表数据分割成多个逻辑部分来提高查询性能、管理灵活…...

Docker--通过Docker容器创建一个Web服务器

Web服务器 Web服务器&#xff0c;一般指网站服务器&#xff0c;是驻留于因特网上某种类型计算机的程序。 Web服务器可以向浏览器等Web客户端提供文档&#xff0c;也可以放置网站文件以供全世界浏览&#xff0c;或放置数据文件以供全世界下载。 Web服务器的主要功能是提供网上…...

Next.js-样式处理

#题引&#xff1a;我认为跟着官方文档学习不会走歪路 Next.js 支持多种为应用程序添加样式的方法&#xff0c;包括&#xff1a; CSS Modules&#xff1a;创建局部作用域的 CSS 类&#xff0c;避免命名冲突并提高可维护性。全局 CSS&#xff1a;使用简单&#xff0c;对于有传统…...

整合Springboot shiro jpa mysql 实现权限管理系统(附源码地址)

一、在开发企业级应用时,权限管理是一个至关重要的功能。本文将围绕 Spring Boot、JPA、MySQL 和 Apache Shiro,构建一个基础的权限管理系统,涵盖用户认证与授权等核心功能。 一、技术选型及框架介绍 Spring Boot:简化 Spring 应用的配置和开发。JPA:实现数据持久化,提供…...

极智嘉嵌入式面试题及参考答案

对于交叉编译器的理解 交叉编译器是一种在一个计算机平台上为另一个不同架构的计算机平台生成可执行代码的编译器。它在嵌入式系统开发中起着关键作用。 从其必要性来看&#xff0c;嵌入式系统通常使用的处理器架构与我们日常使用的 PC 等通用计算机不同&#xff0c;如 ARM、MI…...

【MySQL】数据库核心技术与应用指南

数据库的各种概念 1. 指一门学科《数据库原理与应用》。&#xff08;研究如何设计实现一个数据库&#xff09; 2. 指一类用来管理数据的软件。 3. 指某一个具体的数据库软件。 4. 指部署了某个数据库软件的电脑。 数据库软件 关系型数据库 1. 使用 “表” 的结构来组织数据。…...

23省赛区块链应用与维护(房屋租凭)

23省赛区块链应用与维护(房屋租凭) 背景描述 随着异地务工人员的增多,房屋租赁成为一个广阔市场。目前,现有技术中的房屋租赁是由房主发布租赁信息,租赁信息发布在房屋中介或租赁软件,租客获取租赁信息后,现场看房,并签订纸质的房屋租赁合同,房屋租赁费用通过中介或…...

深度学习4

一、手动构建模型 epoch 一次完整数据的训练过程&#xff08;可细分多次训练&#xff09;&#xff0c;称为 一代训练 Batch 小部分样本对权重的更新&#xff0c;称为 一批数据 iteration 使用一个 Batch 的过程&#xff0c;称为 一次训练 步骤&#xff1a; 1、生成 x,y 的…...

跳绳视觉计数方案

产品概述 提供基于摄像头视觉技术的跳绳计数解决方案&#xff0c;可精准完成跳绳动作的实时计数&#xff0c;效果完全满足考试水平的要求。方案采用先进的计算机视觉算法&#xff0c;结合高效的模型架构&#xff0c;确保计数的准确性和稳定性。适用场景 学校体育考试&#xff…...

TEA加密逆向

IDA伪代码 do{if ( v15 )v17 v38; // x120x0->0x79168ba790&#xff0c; 输入字符串经过check1处理后字符串elsev17 v40;v18 (unsigned int *)&v17[v16]; // 0x78cbbd47fc add x12, x12, x8 ; x120x79168ba790->…...

LeetCode 404.左叶子之和

题目&#xff1a;给定二叉树的根节点 root &#xff0c;返回所有左叶子之和。 思路&#xff1a;一个节点为「左叶子」节点&#xff0c;当且仅当它是某个节点的左子节点&#xff0c;并且它是一个叶子结点。因此我们可以考虑对整 node 时&#xff0c;如果它的左子节点是一个叶子…...

01-go入门

文章目录 Go语言学习1. 简介安装windows安装linux安装编译工具安装-goland 2. 入门2.1 Helloworld注释 2.2 变量初始化打印内存地址变量交换匿名变量作用域局部变量全局变量 2.3 常量iota 2.4 数据类型布尔整数浮点类型复数字符串定义字符串字符串拼接符定义多行字符串 map数据…...

【经典】抽奖系统(HTML,CSS、JS)

目录 1、添加参与者 2、多次添加 3、点击抽奖 功能介绍&#xff1a; 使用方法&#xff1a; 完整代码&#xff1a; 一个简单但功能强大的抽奖系统的示例&#xff0c;用于在网页上实现抽奖。 1、添加参与者 2、多次添加 3、点击抽奖 功能介绍&#xff1a; 参与者添加&…...

GoF设计模式——结构型设计模式分析与应用

文章目录 UML图的结构主要表现为&#xff1a;继承&#xff08;抽象&#xff09;、关联 、组合或聚合 的三种关系。1. 继承&#xff08;抽象&#xff0c;泛化关系&#xff09;2. 关联3. 组合/聚合各种可能的配合&#xff1a;1. 关联后抽象2. 关联的集合3. 组合接口4. 递归聚合接…...

Java后端如何进行文件上传和下载 —— 本地版

简介&#xff1a; 本文详细介绍了在Java后端进行文件上传和下载的实现方法&#xff0c;包括文件上传保存到本地的完整流程、文件下载的代码实现&#xff0c;以及如何处理文件预览、下载大小限制和运行失败的问题&#xff0c;并提供了完整的代码示例。 大体思路 1、文件上传 …...

json格式数据集转换成yolo的txt格式数据集

这个代码是参考了两个博客 我是感觉第一篇博客可能有问题&#xff0c;然后自己做了改进&#xff0c;如果我是错误的或者正确的&#xff0c;请各位评论区说一下&#xff0c;感谢 Json格式的数据集标签转化为有效的txt格式(data_coco)_train.json-CSDN博客 COCO&#xff08;.j…...

什么是Three.js,有什么特点

什么是 Three.js&#xff1f; Three.js 是一个基于 WebGL 技术的 JavaScript 3D 库。它允许开发者在网页上创建和展示 3D 图形内容&#xff0c;而无需用户安装任何额外的插件或软件。Three.js 简化了 WebGL 的复杂性&#xff0c;使得即便是对图形编程不太熟悉的人也能快速上手…...

Linux笔记--基于OCRmyPDF将扫描件PDF转换为可搜索的PDF

1--官方仓库 https://github.com/ocrmypdf/OCRmyPDF 2--基本步骤 # 安装ocrmypdf库 sudo apt install ocrmypdf# 安装简体中文库 sudo apt-get install tesseract-ocr-chi-sim# 转换 # -l 表示使用的语言 # --force-ocr 防止出现以下错误&#xff1a;ERROR - PriorOcrFoundE…...

Unity 导出 Xcode 工程 修改 Podfile 文件

Unity 导出 Xcode 工程 修改 Podfile 文件 在 Editor 文件夹下新建 xxx.cs 脚本 实现静态方法 [PostProcessBuild]public static void OnPostprocessBuild(BuildTarget target, string pathToBuiltProject){// Unity 导出 Xcode 工程自动调用这个方法 }using System.IO; using…...

UE5 slate BlankProgram独立程序系列

源码版Engine\Source\Programs\中copy BlankProgram文件夹&#xff0c;重命名为ASlateLearning&#xff0c;修改所有文件命名及内部名称。 ASlateLearning.Target.cs // Copyright Epic Games, Inc. All Rights Reserved.using UnrealBuildTool; using System.Collections.Ge…...

内存不足引发C++程序闪退崩溃问题的分析与总结

目录 1、内存不足一般出现在32位程序中 2、内存不足时会导致malloc或new申请内存失败 2.1、malloc申请内存失败&#xff0c;返回NULL 2.2、new申请内存失败&#xff0c;抛出异常 3、内存不足项目实战案例中相关细节与要点说明 3.1、内存不足导致malloc申请内存失败&#…...

C++ —— 以真我之名 如飞花般绚丽 - 智能指针

目录 1. RAII和智能指针的设计思路 2. C标准库智能指针的使用 2.1 auto_ptr 2.2 unique_ptr 2.3 简单模拟实现auto_ptr和unique_ptr的核心功能 2.4 shared_ptr 2.4.1 make_shared 2.5 weak_ptr 2.6 shared_ptr的缺陷&#xff1a;循环引用问题 3. shared_ptr 和 unique_…...

Linux中安装InfluxDB

什么是InfluxDB InfluxDB是一个开源的时间序列数据库&#xff0c;专为处理时间序列数据而设计。时间序列数据是指带有时间戳的数据点&#xff0c;例如传感器数据、应用程序日志、服务器指标等。InfluxDB 由 InfluxData 公司开发&#xff0c;广泛应用于物联网&#xff08;IoT&am…...

nginx服务器实现上传文件功能_使用nginx-upload-module模块

目录 conf文件内容如下html文件内容如下上传文件功能展示 conf文件内容如下 #user nobody; worker_processes 1;error_log /usr/logs/error.log; #error_log /usr/logs/error.log notice; #error_log /usr/logs/error.log info;#pid /usr/logs/nginx.pid;events …...