当前位置: 首页 > news >正文

【Kafka】SpringBoot整合Kafka详细介绍及代码示例

Kafka介绍

Apache Kafka是一个分布式流处理平台。它最初由LinkedIn开发,后来成为Apache软件基金会的一部分,并在开源社区中得到了广泛应用。Kafka的核心概念包括Producer、Consumer、Broker、Topic、Partition和Offset。

  • Producer:生产者,负责将数据发送到Kafka集群。
  • Consumer:消费者,从Kafka集群中读取数据。
  • Broker:Kafka服务器实例,Kafka集群通常由多个Broker组成。
  • Topic:主题,数据按主题进行分类。
  • Partition:分区,每个主题可以有多个分区,用于实现并行处理和提高吞吐量。
  • Offset:偏移量,每个消息在其分区中的唯一标识。

使用场景

Kafka适用于以下场景:

  1. 日志收集:集中收集系统日志和应用日志,通过Kafka传输到大数据处理系统。
  2. 消息队列:作为高吞吐量、低延迟的消息队列系统。
  3. 数据流处理:实时处理数据流,用于实时分析、监控和处理。
  4. 事件源架构:将所有的变更事件存储在Kafka中,实现事件溯源和回放。
  5. 流数据管道:构建数据管道,连接数据源和数据存储系统。

Spring Boot整合Kafka 

项目结构

springboot-kafka
│
├── src
│   ├── main
│   │   ├── java
│   │   │   └── com.example.kafka
│   │   │       ├── KafkaApplication.java
│   │   │       ├── config
│   │   │       │   └── KafkaConfig.java
│   │   │       ├── producer
│   │   │       │   └── KafkaProducer.java
│   │   │       ├── consumer
│   │   │       │   └── KafkaConsumer.java
│   │   │       └── controller
│   │   │           └── KafkaController.java
│   │   └── resources
│   │       ├── application.yml
│   │       └── logback-spring.xml (可选)
│   └── test
│       └── java
│           └── com.example.kafka
│               └── KafkaApplicationTests.java
└── pom.xml

1. 创建Spring Boot项目并添加依赖

pom.xml
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>

2. 配置Kafka

application.yml
spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: my-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

3. 创建Kafka配置类

KafkaConfig.java
package com.example.kafka.config;import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class KafkaConfig {@Beanpublic NewTopic myTopic() {return new NewTopic("my-topic", 1, (short) 1);}
}

4. 创建Kafka生产者

KafkaProducer.java
package com.example.kafka.producer;import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducer {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);}
}

5. 创建Kafka消费者

KafkaConsumer.java
package com.example.kafka.consumer;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumer {@KafkaListener(topics = "my-topic", groupId = "my-group")public void listen(String message) {System.out.println("Received message: " + message);}
}

6. 创建控制器发送消息

KafkaController.java
package com.example.kafka.controller;import com.example.kafka.producer.KafkaProducer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {private final KafkaProducer kafkaProducer;public KafkaController(KafkaProducer kafkaProducer) {this.kafkaProducer = kafkaProducer;}@GetMapping("/send")public String sendMessage(@RequestParam String message) {kafkaProducer.sendMessage("my-topic", message);return "Message sent";}
}

7. 创建Spring Boot主类

KafkaApplication.java
package com.example.kafka;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class KafkaApplication {public static void main(String[] args) {SpringApplication.run(KafkaApplication.class, args);}
}

8. 测试应用

通过访问以下URL来发送消息:

http://localhost:8080/send?message=HelloKafka

9. 日志配置(可选)

为了更好地查看Kafka的日志,可以添加logback-spring.xml配置:

logback-spring.xml
<configuration><springProfile name="default"><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>%d{yyyy-MM-dd HH:mm:ss} - %msg%n</pattern></encoder></appender><logger name="org.apache.kafka" level="INFO"/><root level="INFO"><appender-ref ref="STDOUT"/></root></springProfile>
</configuration>

10. 测试类(可选)

KafkaApplicationTests.java
package com.example.kafka;import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
class KafkaApplicationTests {@Testvoid contextLoads() {}
}

至此,你已经完成了Spring Boot整合Kafka的详细配置和代码示例。你可以根据实际需求进一步扩展和修改这个基础代码。

相关文章:

【Kafka】SpringBoot整合Kafka详细介绍及代码示例

Kafka介绍 Apache Kafka是一个分布式流处理平台。它最初由LinkedIn开发&#xff0c;后来成为Apache软件基金会的一部分&#xff0c;并在开源社区中得到了广泛应用。Kafka的核心概念包括Producer、Consumer、Broker、Topic、Partition和Offset。 Producer&#xff1a;生产者&a…...

C++ 质数因子分解

描述 功能:输入一个正整数&#xff0c;按照从小到大的顺序输出它的所有质因子&#xff08;重复的也要列举&#xff09;&#xff08;如180的质因子为2 2 3 3 5 &#xff09; 输入描述&#xff1a; 输入一个整数 输出描述&#xff1a; 按照从小到大的顺序输出它的所有质数的…...

laravel版本≥ 8.1

laravel10 php ≥ 8.1 且 ≤ 8.3&#xff1f; 8.1 < php < 8.3PHP版本要求在 8.1 到 8.3 之间&#xff0c;包括这两个版本。具体来说&#xff1a;"≥ 8.1" 表示 PHP 的版本至少是 8.1&#xff0c;也就是说 8.1 及以上的版本都可以。 "≤ 8.3" 表示 P…...

【iOS】MRC下的单例模式批量创建单例

单例模式的介绍和ARC下的单例请见这篇&#xff1a;【iOS】单例模式 目录 关闭ARC环境MRC下的单例ARC下的单例批量创建单例Demo 关闭ARC环境 首先关闭ARC环境&#xff0c;即打开MRC&#xff1a; 或是指定某特定目标文件为非ARC环境&#xff1a; 双击某个类文件&#xff0c;指定…...

计算机网络期末复习

今天考专四&#xff0c;环境都蛮好的&#xff0c;试卷也很新&#xff0c;老师人也不错&#xff0c;明年再来。 又到期末考试咯&#xff0c;大家复习没有&#xff1f;还没复习啊&#xff1f;还不复&#xff1f;&#xff1f;&#xff1f;&#xff1f; 目录 第一章 1-02 试简述…...

python写一个获取竞品信息报告

要编写一个获取竞品信息报告的Python程序&#xff0c;首先需要明确您想要获取的竞品信息以及数据来源。在这个示例中&#xff0c;我将展示如何从网页提取竞品信息&#xff0c;并编写一个简单的报告。 假设您想要获取以下竞品信息&#xff1a; 1. 产品名称 2. 产品价格 3. 产品特…...

一文彻底理解机器学习 ROC-AUC 指标

在机器学习和数据科学的江湖中&#xff0c;评估模型的好坏是非常关键的一环。而 ROC&#xff08;Receiver Operating Characteristic&#xff09;曲线和 AUC&#xff08;Area Under Curve&#xff09;正是评估分类模型性能的重要工具。 这个知识点在面试中也很频繁的出现。尽管…...

【二】【动态规划NEW】91. 解码方法,62. 不同路径,63. 不同路径 II

91. 解码方法 一条包含字母 A-Z 的消息通过以下映射进行了 编码 &#xff1a; ‘A’ -> “1” ‘B’ -> “2” … ‘Z’ -> “26” 要 解码 已编码的消息&#xff0c;所有数字必须基于上述映射的方法&#xff0c;反向映射回字母&#xff08;可能有多种方法&#xff…...

Python闯LeetCode--第3题:无重复字符的最长子串

Problem: 3. 无重复字符的最长子串 文章目录 思路解题方法复杂度Code 思路 一上来马上想到两层for循环暴力枚举&#xff0c;但是又立马想到复杂度是 O ( n 2 ) O(n^2) O(n2)&#xff0c;思考了一下能否有更优解&#xff0c;于是想到用头尾两个指针来指定滑动窗口&#xff08;主…...

HTML DOM 对象

HTML DOM 对象 1. 概述 HTML DOM(文档对象模型)是一个跨平台和语言独立的接口,它允许程序和脚本动态地访问和更新文档的内容、结构和样式。在HTML DOM中,文档被表示为节点树,其中每个节点代表文档中的一个部分,例如元素、文本或属性。HTML DOM对象是构成这个节点树的基…...

如何解决 BeautifulSoup 安装问题:从 BeautifulSoup 3 到 BeautifulSoup 4

在使用 Python 的过程中&#xff0c;解析 HTML 和 XML 数据是一项常见任务。BeautifulSoup 是一个非常流行的解析库。然而&#xff0c;最近在安装 BeautifulSoup 时&#xff0c;遇到了一些问题。本文将介绍如何解决这些问题&#xff0c;并成功安装 BeautifulSoup 4。 问题描述 …...

原型模式--深复制/浅复制

原型模式用于克隆复杂对象&#xff0c;由于new一个实例对象会消耗大部分时间&#xff0c;所以原型模式可以节约大量时间 1 public class Sheep implements Cloneable{2 private String name;3 private Date birth;4 public Sheep(String name, Date birth) {5 …...

C# TextBox模糊查询及输入提示

在程序中&#xff0c;我们经常会遇到文本框中不知道输入什么内容&#xff0c;这时我们可以在文本框中显示提示词提示用户&#xff1b;或者需要查询某个内容却记不清完整信息&#xff0c;通常可以通过文本框列出与输入词相匹配的信息&#xff0c;帮助用户快速索引信息。 文本框…...

Node入门以及express创建项目

前言 记录学习NodeJS 一、NodeJS是什么&#xff1f; Node.js 是一个开源和跨平台的 JavaScript 运行时环境 二、下载NodeJs 1.下载地址(一直点击next即可&#xff0c;记得修改安装地址) https://nodejs.p2hp.com/download/ 2.查看是否安装成功&#xff0c;打开命令行 nod…...

Cheat Engine CE v7.5 安装教程(专注于游戏的修改器)

前言 Cheat Engine是一款专注于游戏的修改器。它可以用来扫描游戏中的内存&#xff0c;并允许修改它们。它还附带了调试器、反汇编器、汇编器、变速器、作弊器生成、Direct3D操作工具、系统检查工具等。 一、下载地址 下载链接&#xff1a;http://dygod/source 点击搜索&…...

【实例分享】访问后端服务超时,银河麒麟服务器操作系统分析及处理建议

1.服务器环境以及配置 【机型】 处理器&#xff1a; Intel 32核 内存&#xff1a; 128G 整机类型/架构&#xff1a; x86_64虚拟机 【内核版本】 4.19.90-25.22.v2101.kylin.x86_64 【OS镜像版本】 kylin server V10 SP2 【第三方软件】 开阳k8s 2.问题现象描述 …...

Java中和的区别

在Java中&#xff0c;& 和 && 都是逻辑运算符&#xff0c;但它们之间存在一些重要的区别&#xff0c;特别是在它们如何评估其操作数以及它们的性能影响方面。 短路评估&#xff08;Short-Circuit Evaluation&#xff09;&#xff1a; &&&#xff08;逻辑…...

深入理解计算机系统 CSAPP 家庭作业6.34

第一步先求(S,E,B,m) 题目说共C32个字节,块大小B为16个字节,那就是分为两组:0,1.然后每组存4个int 每个4字节 CB*E*S .B16 ,直接映射的E就是1,所以S2 m为啥等于7? 通过写出两个数组所有的地址可以得出m7. 得出高速缓存的参数:(S,E,B,m)(2,1,16,7),注意图6-26每个参数的定义…...

[leetcode 141环形链表]双指针解决环形链表

Problem: 141. 环形链表 文章目录 思路Code 思路 首先想到如果链表为空直接返回false 其次想到用双指针,一个一回走一步,另一个一回走两步 如果是环形,总有一个时刻,两指针会指向同一个节点,而且该结点不能为空(空是快指针遍历完单链表了) Code /*** Definition for singly-li…...

【深度学习】Precision、Accuracy的区别,精确率与准确率:深度学习多分类问题中的性能评估详解

在深度学习的多分类问题中&#xff0c;Precision&#xff08;精确率&#xff09;和Accuracy&#xff08;准确率&#xff09;是两种常用的性能评估指标&#xff0c;它们各自有不同的定义和用途。 Precision&#xff08;精确率&#xff09;的中文发音是&#xff1a;pǔ rēi xī…...

接口测试中缓存处理策略

在接口测试中&#xff0c;缓存处理策略是一个关键环节&#xff0c;直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性&#xff0c;避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明&#xff1a; 一、缓存处理的核…...

Java如何权衡是使用无序的数组还是有序的数组

在 Java 中,选择有序数组还是无序数组取决于具体场景的性能需求与操作特点。以下是关键权衡因素及决策指南: ⚖️ 核心权衡维度 维度有序数组无序数组查询性能二分查找 O(log n) ✅线性扫描 O(n) ❌插入/删除需移位维护顺序 O(n) ❌直接操作尾部 O(1) ✅内存开销与无序数组相…...

[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?

论文网址&#xff1a;pdf 英文是纯手打的&#xff01;论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误&#xff0c;若有发现欢迎评论指正&#xff01;文章偏向于笔记&#xff0c;谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...

Cloudflare 从 Nginx 到 Pingora:性能、效率与安全的全面升级

在互联网的快速发展中&#xff0c;高性能、高效率和高安全性的网络服务成为了各大互联网基础设施提供商的核心追求。Cloudflare 作为全球领先的互联网安全和基础设施公司&#xff0c;近期做出了一个重大技术决策&#xff1a;弃用长期使用的 Nginx&#xff0c;转而采用其内部开发…...

2025盘古石杯决赛【手机取证】

前言 第三届盘古石杯国际电子数据取证大赛决赛 最后一题没有解出来&#xff0c;实在找不到&#xff0c;希望有大佬教一下我。 还有就会议时间&#xff0c;我感觉不是图片时间&#xff0c;因为在电脑看到是其他时间用老会议系统开的会。 手机取证 1、分析鸿蒙手机检材&#x…...

【HTTP三个基础问题】

面试官您好&#xff01;HTTP是超文本传输协议&#xff0c;是互联网上客户端和服务器之间传输超文本数据&#xff08;比如文字、图片、音频、视频等&#xff09;的核心协议&#xff0c;当前互联网应用最广泛的版本是HTTP1.1&#xff0c;它基于经典的C/S模型&#xff0c;也就是客…...

Java面试专项一-准备篇

一、企业简历筛选规则 一般企业的简历筛选流程&#xff1a;首先由HR先筛选一部分简历后&#xff0c;在将简历给到对应的项目负责人后再进行下一步的操作。 HR如何筛选简历 例如&#xff1a;Boss直聘&#xff08;招聘方平台&#xff09; 直接按照条件进行筛选 例如&#xff1a…...

全志A40i android7.1 调试信息打印串口由uart0改为uart3

一&#xff0c;概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本&#xff1a;2014.07&#xff1b; Kernel版本&#xff1a;Linux-3.10&#xff1b; 二&#xff0c;Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01)&#xff0c;并让boo…...

今日学习:Spring线程池|并发修改异常|链路丢失|登录续期|VIP过期策略|数值类缓存

文章目录 优雅版线程池ThreadPoolTaskExecutor和ThreadPoolTaskExecutor的装饰器并发修改异常并发修改异常简介实现机制设计原因及意义 使用线程池造成的链路丢失问题线程池导致的链路丢失问题发生原因 常见解决方法更好的解决方法设计精妙之处 登录续期登录续期常见实现方式特…...

AI病理诊断七剑下天山,医疗未来触手可及

一、病理诊断困局&#xff1a;刀尖上的医学艺术 1.1 金标准背后的隐痛 病理诊断被誉为"诊断的诊断"&#xff0c;医生需通过显微镜观察组织切片&#xff0c;在细胞迷宫中捕捉癌变信号。某省病理质控报告显示&#xff0c;基层医院误诊率达12%-15%&#xff0c;专家会诊…...