当前位置: 首页 > news >正文

Apache Kafka与Spring整合应用详解

引言

Apache Kafka是一种高吞吐量的分布式消息系统,广泛应用于实时数据处理、日志聚合和事件驱动架构中。Spring作为Java开发的主流框架,通过Spring Kafka项目提供了对Kafka的集成支持。本文将深入探讨如何使用Spring Kafka整合Apache Kafka,并通过详细的代码示例帮助新人理解和掌握这一技术。

环境准备

在开始之前,请确保你已经安装并配置好了以下环境:

  1. Apache Kafka集群
  2. Java JDK 8或更高版本
  3. Maven或Gradle构建工具
  4. Spring Boot 2.3.0或更高版本

项目依赖配置

首先,我们需要在pom.xml中添加Spring Kafka的依赖。

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
</dependencies>

Kafka配置

在Spring Boot应用中,我们需要在application.properties中配置Kafka的相关信息。

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest

生产者配置与实现

生产者用于将消息发送到Kafka主题中。我们首先定义一个配置类来配置Kafka生产者。

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaProducerConfig {@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}
}

接着,我们创建一个生产者服务类,用于发送消息。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducerService {private static final String TOPIC = "my_topic";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send(TOPIC, message);}
}

消费者配置与实现

消费者用于从Kafka主题中读取消息。我们也需要定义一个配置类来配置Kafka消费者。

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;import java.util.HashMap;
import java.util.Map;@EnableKafka
@Configuration
public class KafkaConsumerConfig {@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);return new DefaultKafkaConsumerFactory<>(props);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;}
}

接着,我们创建一个消费者服务类,用于接收消息。

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumerService {@KafkaListener(topics = "my_topic", groupId = "my-group")public void consume(String message) {System.out.println("Consumed message: " + message);}
}

控制器实现

为了测试我们的Kafka生产者和消费者,我们可以创建一个简单的Spring Boot控制器。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {@Autowiredprivate KafkaProducerService producerService;@GetMapping("/send")public String sendMessage(@RequestParam("message") String message) {producerService.sendMessage(message);return "Message sent to Kafka topic: " + message;}
}

运行应用

启动Spring Boot应用,打开浏览器,访问http://localhost:8080/send?message=HelloKafka。你应该会看到控制台输出:

Consumed message: HelloKafka

总结

本文详细介绍了如何使用Spring Kafka整合Apache Kafka,包括项目依赖配置、Kafka配置、生产者与消费者的实现以及简单的测试控制器。通过这些示例代码,新人可以快速上手,并且深入理解Spring与Kafka的集成方式。希望本文对你有所帮助,祝你在Java开发的路上越来越顺利!

相关文章:

Apache Kafka与Spring整合应用详解

引言 Apache Kafka是一种高吞吐量的分布式消息系统&#xff0c;广泛应用于实时数据处理、日志聚合和事件驱动架构中。Spring作为Java开发的主流框架&#xff0c;通过Spring Kafka项目提供了对Kafka的集成支持。本文将深入探讨如何使用Spring Kafka整合Apache Kafka&#xff0c…...

SpringBoot配置第三方专业缓存技术Redis

Redis缓存技术 Redis&#xff08;Remote Dictionary Server&#xff09;是一个开源的内存中数据结构存储系统&#xff0c;通常用作数据库、缓存和消息中间件。它支持多种数据结构&#xff0c;如字符串、哈希表、列表、集合、有序集合等&#xff0c;并提供了丰富的功能和灵活的…...

javascript的toFixed()以及使用

toFixed() 是 JavaScript 中数字类型&#xff08;Number&#xff09;的一个方法&#xff0c;用来将数字转换为指定小数位数的字符串表示形式。 使用方式和示例&#xff1a; let num 123.45678; let fixedNum num.toFixed(2); console.log(fixedNum); // 输出 "123.46&qu…...

软件功能测试和性能测试包括哪些测试内容?又有什么联系和区别?

软件功能测试和性能测试是保证软件质量和稳定性的重要手&#xff0c;无论是验证软件的功能正确性&#xff0c;还是评估软件在负载下的性能表现&#xff0c;这些测试都是必不可少的。 一、软件功能测试   软件功能测试是指对软件的各项功能进行验证和确认&#xff0c;确保软件…...

从工具产品体验对比spark、hadoop、flink

作为一名大数据开发&#xff0c;从工具产品的角度&#xff0c;对比一下大数据工具最常使用的框架spark、hadoop和flink。工具无关好坏&#xff0c;但人的喜欢有偏好。 目录 评价标准1 效率2 用户体验分析从用户的维度来看从市场的维度来看从产品的维度来看 3 用户体验的基本原则…...

【软件设计】详细设计说明书(word原件,项目直接套用)

软件详细设计说明书 1.系统总体设计 2.性能设计 3.系统功能模块详细设计 4.数据库设计 5.接口设计 6.系统出错处理设计 7.系统处理规定 软件全套资料&#xff1a;本文末个人名片直接获取或者进主页。...

java本地缓存(map,Guava,echcache,caffeine)优缺点,以及适用场景

前言 在高并发系统环境下&#xff0c;jvm本地缓存扮演着至关重要的角色&#xff0c;合理的应用能够使系统响应迅速&#xff0c;提高用户体验感&#xff0c;而分布式缓存redis则存在着网络io&#xff0c;以及流量消耗问题&#xff0c;需要和本地缓存搭配使用&#xff0c;才能使…...

Monica

在 《long long ago》中&#xff0c;我论述了on是一个刚出生的孩子的脐带连接在其肚子g上的形象&#xff0c;脐带就是long的字母l和字母n&#xff0c;l表脐带很长&#xff0c;n表脐带曲转冗余和连接之性&#xff0c;on表一&#xff0c;是孩子刚诞生的意思&#xff0c;o是身体&a…...

国产数据库中读写分离实现机制

在数据库高可用架构下会存在1主多备的部署&#xff0c;备节点可以根据业务场景分发一部分流量以充分利用资源&#xff0c;并减轻主库的压力&#xff0c;因此在数据库的功能上需要读写分离来实现。 充分利用备节点的资源&#xff0c;提升业务的吞吐量&#xff1b;防止运维等非业…...

kubernetes部署dashboard

kubernetes部署dashboard 1. 简介 Dashboard 是基于网页的 Kubernetes 用户界面。 你可以使用 Dashboard 将容器应用部署到 Kubernetes 集群中&#xff0c;也可以对容器应用排错&#xff0c;还能管理集群资源。 你可以使用 Dashboard 获取运行在集群中的应用的概览信息&#…...

FPGA早鸟课程第二弹 | Vivado 设计静态时序分析和实际约束

在FPGA设计领域&#xff0c;时序约束和静态时序分析是提升系统性能和稳定性的关键。社区推出的「Vivado 设计静态时序分析和实际约束」课程&#xff0c;旨在帮助工程师们掌握先进的设计技术&#xff0c;优化设计流程&#xff0c;提高开发效率。 课程介绍 关于课程 权威认证&…...

STM32项目分享:家庭环境监测系统

目录 一、前言 二、项目简介 1.功能详解 2.主要器件 三、原理图设计 四、PCB硬件设计 1.PCB图 2.PCB板打样焊接图 五、程序设计 六、实验效果 七、资料内容 项目分享 一、前言 项目成品图片&#xff1a; 哔哩哔哩视频链接&#xff1a; https://www.bilibili.…...

华为HCIP Datacom H12-821 卷5

1.单选题 下列哪种工具不能被 route-policy 的 apply 子句直接引用? A、IP-Prefix B、tag C、community D、origin 正确答案: A 解析: 因route-policy工具中, apply 后面跟的是路由的相关属性。 但是ip-prefix是用来匹配路由的工具。 2.单选题...

Mongodb数据库基本操作

本文为在命令行模式下Mongodb数据库的基本操作整理。 目录 数据库操作 创建数据库 查看所有数据 查看当前数据库 删除数据库 断开连接 查看命令api 集合操作 查看当前数据库下集合 创建集合 删除当前数据库中的集合 文档操作 插入文档 insertOne()方法 insertMa…...

【机器学习】基于Softmax松弛技术的离散数据采样

1.引言 1.1.离散数据采样的意义 离散数据采样在深度学习中起着至关重要的作用&#xff0c;它直接影响到模型的性能、泛化能力、训练效率、鲁棒性和解释性。 首先&#xff0c;采样方法能够有效地平衡数据集中不同类别的样本数量&#xff0c;使得模型在训练时能够更均衡地学习…...

.NET+Python量化【1】——环境部署和个人资金账户信息查询

前言&#xff1a;量化资料很少&#xff0c;.NET更少。那我就来开个先河吧~ 以下是使用QMT进行量化开发的环境部署和基础信息获取有关操作。 1、首先自己申请券商的QMT权限&#xff0c;此步骤省略。 2、登陆QMT&#xff0c;选择极简模式&#xff0c;或者独立交易模式之类的。会进…...

洛谷 P10584 [蓝桥杯 2024 国 A] 数学题(整除分块+杜教筛)

题目 思路来源 登录 - Luogu Spilopelia 题解 参考了两篇洛谷题解&#xff0c;第一篇能得出这个式子&#xff0c;第二篇有比较严格的复杂度分析 结合去年蓝桥杯洛谷P9238&#xff0c;基本就能得出这题的正确做法 代码 #include<bits/stdc.h> #include<iostream&g…...

深入讲解C++基础知识(一)

目录 一、基本内置类型1. 类型的作用2. 分类3. 整型3.1 内存描述及查询3.2 布尔类型 —— bool3.3 字符类型 —— char3.4 其他整型 4. 有符号类型和无符号类型5. 浮点型6. 如何选择类型7. 类型转换7.1 自动类型转换7.2 强制类型转换7.3 类型转换总结 8. 类型溢出8.1 注意事项 …...

Python爬虫实战:批量下载网站图片

1.获取图片的url链接 首先&#xff0c;打开百度图片首页&#xff0c;注意下图url中的index 接着&#xff0c;把页面切换成传统翻页版&#xff08;flip&#xff09;&#xff0c;因为这样有利于我们爬取图片&#xff01; 对比了几个url发现&#xff0c;pn参数是请求到的数量。…...

使用 JavaScript 获取电池状态

在现代的移动设备和笔记本电脑上&#xff0c;了解电池状态是一项非常有用的功能。使用 JavaScript 可以轻松地获取电池的充电状态、电量百分比等信息。本文将介绍如何使用 JavaScript 访问这些信息&#xff0c;并将其显示在网页上。 1. HTML 结构 首先&#xff0c;我们需要一…...

java—类反射机制

简述 反射机制允许程序在执行期间借助于Reflection API取得任何类的内部信息&#xff08;如成员变量&#xff0c;构造器&#xff0c;成员方法等&#xff09;&#xff0c;并能操作对象的属性及方法。反射机制在设计模式和框架底层都能用到。 类一旦加载&#xff0c;在堆中会产生…...

浏览器-服务器架构 (BS架构) 详解

目录 前言1. BS架构概述1.1 BS架构的定义1.2 BS架构的基本原理 2. BS架构的优势2.1 客户端简化2.2 易于更新和维护2.3 跨平台性强2.4 扩展性高 3. BS架构的劣势3.1 网络依赖性强3.2 安全性问题3.3 用户体验局限 4. BS架构的典型应用场景4.1 企业内部应用4.2 电子商务平台4.3 在…...

微型操作系统内核源码详解系列五(四):cm3下svc启动任务

系列一&#xff1a;微型操作系统内核源码详解系列一&#xff1a;rtos内核源码概论篇&#xff08;以freertos为例&#xff09;-CSDN博客 系列二&#xff1a;微型操作系统内核源码详解系列二&#xff1a;数据结构和对象篇&#xff08;以freertos为例&#xff09;-CSDN博客 系列…...

筛质数(暴力法、埃氏筛、欧拉筛)

筛质数&#xff08;暴力法、埃氏筛、欧拉筛&#xff09; 暴力法 思路分析&#xff1a; 直接双for循环来求解质数 如果不设置标记只是简单地执行了break会导致内部循环(由j控制)而不是立即打印i或者跳过它。如果打印语句写到内部循环中&#xff0c;也会导致每个 非素数也被打…...

使用USI作为主SPI接口

代码; lcd_drive.c //***************************************************************************** // // File........: LCD_driver.c // // Author(s)...: ATMEL Norway // // Target(s)...: ATmega169 // // Compiler....: AVR-GCC 3.3.1; avr-libc 1.0 // // D…...

AI播客下载:Eye on AI(AI深度洞察)

"Eye on A.I." 是一档双周播客节目&#xff0c;由长期担任《纽约时报》记者的 Craig S. Smith 主持。在每一集中&#xff0c;Craig 都会与在人工智能领域产生影响的人们交谈。该播客的目的是将渐进的进步置于更广阔的背景中&#xff0c;并考虑发展中的技术的全球影响…...

Flink 窗口触发器

参考&#xff1a; NoteWarehouse/05_BigData/09_Flink(1).md at main FGL12321/NoteWarehouse GitHub Flink系列 9. 介绍 Flink 窗口触发器、移除器和延迟数据等 | hnbian https://github.com/kinoxyz1/bigdata-learning-notes/blob/master/note/flink/Window%26%E6%97%B6…...

Java面试题:解释线程间如何通过wait、notify和notifyAll方法进行通信

在 Java 中&#xff0c;线程间的通信可以通过 wait()、notify() 和 notifyAll() 这三个方法实现。这些方法是 Java 线程 Thread 类的一部分&#xff0c;它们与 synchronized 关键字一起使用&#xff0c;以实现线程间的协调。 基本概念 wait()&#xff1a;当一个线程执行到 wa…...

【机器学习 复习】第9章 降维算法——PCA降维

一、概念 1.PCA &#xff08;1&#xff09;主成分分析&#xff08;Principal ComponentAnalysis&#xff0c;PCA&#xff09;一种经典的线性降维分析算法。 &#xff08;2&#xff09;原理&#xff0c;这里以二维转一维为例&#xff0c;原来的平面变成了一条直线 这是三维变二…...

Ubuntu系统docker gpu环境搭建

Ubuntu系统dockergpu环境搭建 安装步骤前置安装安装指定版本的依赖包用docker官方脚本安装Docker-ce添加稳定仓库和GPG秘钥更新源 安装docker安装nvidia-docker2重启docker服务阿里云镜像加速 相关命令网络 docker常用命令镜像容器 docker相关问题解决方案使用wsl时docker的容器…...

装修广告做哪个网站最好看/百度开放云平台

一个完整的正则使用过程 #调用re模块&#xff1a; In [11]: import re # re.match(pattern, string)第一个参数是你正则的规则, 第二个参数是检测的字符串&#xff1a; In [12]: are.match(r"redhat","redhathello") In [13]: print a.group() redhat #re…...

副食店年报在哪个网站做/北京网站优化排名

概述 Tornado 是 FriendFeed 使用的可扩展的非阻塞式 web 服务器及其相关工具的开源版本。这个 Web 框架看起来有些像web.py 或者 Google 的 webapp&#xff0c;不过为了能有效利用非阻塞式服务器环境&#xff0c;这个 Web 框架还包含了一些相关的有用工具 和优化。 Tornado 和…...

初学者求教怎样做网站/网店如何推广

这是我个人的一点小想法啊&#xff0c;不代表准确 在c语言中我经常会使用到这样的用法&#xff1a; int main() {int i0,j0;for(i0,j10;i<5&&j>3;i,j--){printf("%d %d\n",i,j);}}//然后就会得到我想要的输出结果&#xff0c;如下&#xff1a; 0 10 …...

福建中海建设有限公司网站/市场调研公司

具体信息在我们的石墨文档里&#xff0c;欢迎查看&#xff01; 文档链接&#xff1a;https://shimo.im/docs/FLWh6EGkzcA6U7co/ 点击链接查看「圈地自萌 Alpha冲刺&#xff01;&#xff01;&#xff01;」&#xff0c;或复制链接用石墨文档 App 打开 转载于:https://www.cnblog…...

成都网站制作工具/seo长沙

安装完成后&#xff0c;启动虚拟机时提示“未能初始化远程显示子系统&#xff1a; 右击我的电脑--管理---本地用户和组里面的用户一栏&#xff0c;查看右边&#xff0c;里面是否有个__vmware_user__的账户&#xff0c;没有的话可能是这个原因&#xff0c;创建一个试试&#xff…...

佛山做网站3lue/学计算机哪个培训机构好

众所周知&#xff0c;制造型企业产品原材料众多&#xff0c;生产工艺复杂&#xff0c;工艺变更频繁&#xff0c;生产流程长&#xff0c;加工设备众多。在整个生产线上&#xff0c;管理层们很难及时发现产品质量缺陷出现在哪个环节&#xff0c;对应的解决方案也是处于事后补救的…...