当前位置: 首页 > news >正文

JAVA(SpringBoot)集成Kafka实现消息发送和接收。

SpringBoot集成Kafka实现消息发送和接收。

  • 一、Kafka 简介
  • 二、Kafka 功能
  • 三、POM依赖
  • 四、配置文件
  • 五、生产者
  • 六、消费者

君子之学贵一,一则明,明则有功

一、Kafka 简介

Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,最初由 LinkedIn 公司开发,并于 2011 年开源。它是一种高吞吐量的分布式发布 - 订阅消息系统,以可持久化、高吞吐、低延迟、高容错等特性而著称。
Kafka 主要由生产者(Producer)、消费者(Consumer)、主题(Topic)、分区(Partition)和代理(Broker)等组件构成。生产者负责将数据发送到 Kafka 集群,消费者从集群中读取数据。主题是一种逻辑上的分类,数据被发送到特定的主题。每个主题又可以划分为多个分区,以实现数据的并行处理和提高系统的可扩展性。代理则是 Kafka 集群中的服务器节点,负责接收和存储生产者发送的数据,并为消费者提供数据读取服务。

二、Kafka 功能

消息队列功能:Kafka 可以作为消息队列使用,在应用程序之间传递消息。生产者将消息发送到主题,不同的消费者可以从主题中订阅并消费消息,实现应用程序解耦。例如,在电商系统中,订单生成模块可以将订单消息发送到 Kafka 主题,后续的库存管理、物流配送等模块可以从该主题消费订单消息,各自独立处理,降低模块间的耦合度。
数据存储功能:Kafka 具有持久化存储能力,它将消息数据存储在磁盘上,并且通过多副本机制保证数据的可靠性。即使某个节点出现故障,数据也不会丢失。这种特性使得 Kafka 不仅可以作为消息队列,还能用于数据的长期存储和备份,例如用于存储系统的操作日志,方便后续的数据分析和故障排查。
流处理功能:Kafka 可以与流处理框架(如 Apache Flink、Spark Streaming 等)集成,对实时数据流进行处理。通过将实时数据发送到 Kafka 主题,流处理框架可以从主题中读取数据并进行实时计算、分析和转换。例如,在实时监控系统中,通过 Kafka 收集服务器的性能指标数据,然后使用流处理框架对这些数据进行实时分析,及时发现性能异常并发出警报。

三、POM依赖

    <!-- kafka--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version></dependency>

四、配置文件

spring:# Kafka 配置kafka:# Kafka 服务器地址和端口 代理地址,可以多个bootstrap-servers: IP:9092# 生产者配置producer:# 发送失败时的重试次数retries: 3# 每次批量发送消息的数量,调整为较小值batch-size: 1# 生产者缓冲区大小buffer-memory: 33554432# 消息 key 的序列化器,将 key 序列化为字节数组key-serializer: org.apache.kafka.common.serialization.StringSerializer# 消息 value 的序列化器,将消息体序列化为字节数组value-serializer: org.apache.kafka.common.serialization.StringSerializer# 消费者配置consumer:# 当没有初始偏移量或当前偏移量不存在时,从最早的消息开始消费auto-offset-reset: earliest# 是否自动提交偏移量enable-auto-commit: true# 自动提交偏移量的时间间隔(毫秒),延长自动提交时间间隔auto-commit-interval: 1000# 消息 key 的反序列化器,将字节数组反序列化为 keykey-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 消息 value 的反序列化器,将字节数组反序列化为消息体value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

五、生产者


import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;/*** 生产者** @author chenlei*/
@Slf4j
@Component
public class KafkaProducer {/*** KafkaTemplate*/@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;/*** 发送消息到指定的 Kafka 主题,并可指定分组信息** @param topic   消息要发送到的 Kafka 主题* @param message 要发送的消息内容*/public void sendMessage(String topic, String message) {// 使用 KafkaTemplate 发送消息,将消息发送到指定的主题ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onSuccess(SendResult<String, String> result) {// 消息发送成功后的处理逻辑,可根据需要添加log.info("已发送消息=[" + message + "],其偏移量=[" + result.getRecordMetadata().offset() + "]");}@Overridepublic void onFailure(Throwable ex) {// 消息发送失败后的处理逻辑,使用日志记录异常log.error("发送消息=[" + message + "] 失败", ex);}});}
}

六、消费者

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** @author 消费者* chenlei*/
@Slf4j
@Component
public class KafkaConsumer {/*** 监听 Kafka 主题方法。** @param record 从 Kafka 接收到的 ConsumerRecord,包含消息的键值对*/@KafkaListener(topics = {"topic"}, groupId = "consumer.group-id", concurrency = "5")public void listen(ConsumerRecord<?, ?> record) {// 打印接收到的消息的详细信息log.info("接收到 Kafka 消息: 主题 = {}, 分区 = {}, 偏移量 = {}, 键 = {}, 值 = {}",record.topic(), record.partition(), record.offset(), record.key(), record.value());}
}

相关文章:

JAVA(SpringBoot)集成Kafka实现消息发送和接收。

SpringBoot集成Kafka实现消息发送和接收。 一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者 君子之学贵一&#xff0c;一则明&#xff0c;明则有功。 一、Kafka 简介 Kafka 是由 Apache 软件基金会开发的一个开源流处理平台&#xff0c;最初由 Link…...

AI刷题-蛋糕工厂产能规划、优质章节的连续选择

挑两个简单的写写 目录 一、蛋糕工厂产能规划 问题描述 输入格式 输出格式 解题思路&#xff1a; 问题理解 数据结构选择 算法步骤 关键点 最终代码&#xff1a; 运行结果&#xff1a;​编辑 二、优质章节的连续选择 问题描述 输入格式 输出格式 解题思路&a…...

在线可编辑Excel

1. Handsontable 特点&#xff1a; 提供了类似 Excel 的表格编辑体验&#xff0c;包括单元格样式、公式计算、数据验证等功能。 支持多种插件&#xff0c;如筛选、排序、合并单元格等。 轻量级且易于集成到现有项目中。 具备强大的自定义能力&#xff0c;可以调整外观和行为…...

什么是词嵌入?Word2Vec、GloVe 与 FastText 的区别

自然语言处理(NLP)领域的核心问题之一,是如何将人类的语言转换成计算机可以理解的数值形式,而词嵌入(Word Embedding)正是为了解决这个问题的重要技术。本文将详细讲解词嵌入的概念及其经典模型(Word2Vec、GloVe 和 FastText)的原理与区别。 1. 什么是词嵌入(Word Em…...

WPS数据分析000010

基于数据透视表的内容 一、排序 手动调动 二、筛选 三、值显示方式 四、值汇总依据 五、布局和选项 不显示分类汇总 合并居中带标签的单元格 空单元格显示 六、显示报表筛选页...

Qt中QVariant的使用

1.使用QVariant实现不同类型数据的相加 方法&#xff1a;通过type函数返回数值的类型&#xff0c;然后通过setValue来构造一个QVariant类型的返回值。 函数&#xff1a; QVariant mainPage::dataPlus(QVariant a, QVariant b) {QVariant ret;if ((a.type() QVariant::Int) &a…...

Avalonia UI MVVM DataTemplate里绑定Command

Avalonia 模板里面绑定ViewModel跟WPF写法有些不同。需要单独绑定Command. WPF里面可以直接按照下面的方法绑定DataContext. <Button Content"Button" Command"{Binding DataContext.ClickCommand, RelativeSource{RelativeSource AncestorType{x:Type User…...

动态规划DP 数字三角型模型 最低通行费用(题目详解+C++代码完整实现)

最低通行费用 原题链接 AcWing 1018. 最低同行费用 题目描述 一个商人穿过一个 NN的正方形的网格&#xff0c;去参加一个非常重要的商务活动。 他要从网格的左上角进&#xff0c;右下角出。每穿越中间 1个小方格&#xff0c;都要花费 1个单位时间。商人必须在 (2N−1)个单位…...

deepseek R1的确不错,特别是深度思考模式

deepseek R1的确不错&#xff0c;特别是深度思考模式&#xff0c;每次都能自我反省改进。比如我让 它写文案&#xff1a; 【赛博朋克版程序员新春密码——2025我们来破局】 亲爱的代码骑士们&#xff1a; 当CtrlS的肌肉记忆遇上抢票插件&#xff0c;当Spring Boot的…...

Linux 常用命令 - sort 【对文件内容进行排序】

简介 sort 命令源于英文单词 “sort”&#xff0c;表示排序。其主要功能是对文本文件中的行进行排序。它可以根据字母、数字、特定字段等不同的标准进行排序。sort 通过逐行读取文件&#xff08;没有指定文件或指定文件为 - 时读取标准输入&#xff09;内容&#xff0c;并按照…...

MyBatis最佳实践:提升数据库交互效率的秘密武器

第一章&#xff1a;框架的概述&#xff1a; MyBatis 框架的概述&#xff1a; MyBatis 是一个优秀的基于 Java 的持久框架&#xff0c;内部对 JDBC 做了封装&#xff0c;使开发者只需要关注 SQL 语句&#xff0c;而不关注 JDBC 的代码&#xff0c;使开发变得更加的简单MyBatis 通…...

选择困难?直接生成pynput快捷键字符串

from pynput import keyboard# 文档&#xff1a;https://pynput.readthedocs.io/en/latest/keyboard.html#monitoring-the-keyboard # 博客(pynput相关源码)&#xff1a;https://blog.csdn.net/qq_39124701/article/details/145230331 # 虚拟键码(十六进制)&#xff1a;https:/…...

DeepSeek-R1:强化学习驱动的推理模型

1月20日晚&#xff0c;DeepSeek正式发布了全新的推理模型DeepSeek-R1&#xff0c;引起了人工智能领域的广泛关注。该模型在数学、代码生成等高复杂度任务上表现出色&#xff0c;性能对标OpenAI的o1正式版。同时&#xff0c;DeepSeek宣布将DeepSeek-R1以及相关技术报告全面开源。…...

国内优秀的FPGA设计公司主要分布在哪些城市?

近年来&#xff0c;国内FPGA行业发展迅速&#xff0c;随着5G通信、人工智能、大数据等新兴技术的崛起&#xff0c;FPGA设计企业的需求也迎来了爆发式增长。很多技术人才在求职时都会考虑城市的行业分布和发展潜力。因此&#xff0c;国内优秀的FPGA设计公司主要分布在哪些城市&a…...

3.日常英语笔记

screening discrepancies 筛选差异 The team found some screening discrepancies in the data. 团队在数据筛选中发现了些差异。 Don’t tug at it ,or it will fall over and crush you. tug 拉&#xff0c;拽&#xff0c;拖 He tugged the door open with all his might…...

基于RIP的MGRE实验

实验拓扑 实验要求 按照图示配置IP地址配置静态路由协议&#xff0c;搞通公网配置MGRE VPNNHRP的配置配置RIP路由协议来传递两端私网路由测试全网通 实验配置 1、配置IP地址 [R1]int g0/0/0 [R1-GigabitEthernet0/0/0]ip add 15.0.0.1 24 [R1]int LoopBack 0 [R1-LoopBack0]i…...

【开源免费】基于Vue和SpringBoot的美食推荐商城(附论文)

本文项目编号 T 166 &#xff0c;文末自助获取源码 \color{red}{T166&#xff0c;文末自助获取源码} T166&#xff0c;文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…...

Pandas DataFrame 拼接、合并和关联

拼接:使用 pd.concat(),可以沿着行或列方向拼接 DataFrame。 合并:使用 pd.merge(),可以根据一个或多个键进行不同类型的合并(左连接、右连接、全连接、内连接)。 关联:使用 join() 方法,通常在设置了索引的 DataFrame 上进行关联操作。 concat拼接 按列拼接 df1 = …...

【Redis】Redis修改连接数参数

1.重启操作背景 Redis数据库连接数上限&#xff0c;需要修改配置文件里maxclients参数&#xff0c;修改后需重启数据库 1.1、修改操作系统open files参数 1.2、修改redis连接数 2.登录操作系统 登录堡垒机 ssh {ip}3.查看当前状态 3.1、查看操作系统配置 ulimit -a3.2、…...

scratch变魔术 2024年12月scratch三级真题 中国电子学会 图形化编程 scratch三级真题和答案解析

目录 scratch变魔术 一、题目要求 1、准备工作 2、功能实现 二、案例分析 1、角色分析 2、背景分析 3、前期准备 三、解题思路 1、思路分析 2、详细过程 四、程序编写 五、考点分析 六、 推荐资料 1、入门基础 2、蓝桥杯比赛 3、考级资料 4、视频课程 5、py…...

51单片机开发:点阵屏显示数字

实验目标&#xff1a;在8x8的点阵屏上显示数字0。 点阵屏的原理图如下图所示&#xff0c;点阵屏的列接在P0端口&#xff0c;行接在74HC595扩展的DP端口上。 扩展口的使用详见&#xff1a;51单片机开发&#xff1a;IO扩展(串转并)实验-CSDN博客 要让点阵屏显示数字&#xff0…...

mysql DDL可重入讨论

mysql的bug&#xff1a;当执行 MySQL online DDL 时&#xff0c;期间如有其他并发的 DML 对相同的表进行增量修改&#xff0c;比如 update、insert、insert into … on duplicate key、replace into 等&#xff0c;且增量修改的数据违背唯一约束&#xff0c;那么 DDL 最后都会执…...

DAY01 面向对象回顾、继承、抽象类

学习目标 能够写出类的继承格式public class 子类 extends 父类{}public class Cat extends Animal{} 能够说出继承的特点子类继承父类,就会自动拥有父类非私有的成员 能够说出子类调用父类的成员特点1.子类有使用子类自己的2.子类没有使用,继承自父类的3.子类父类都没有编译报…...

127周一复盘 (165)玩法与难度思考

1.上午测试&#xff0c;小改了点东西&#xff0c; 基本等于啥也没干。 匆忙赶往车站。 从此进入春节期间&#xff0c;没有开发&#xff0c;而思考与设计。 2.火车上思考玩法与难度的问题。 目前的主流作法实际上并不完全符合不同玩家的需求&#xff0c; 对这方面还是要有自…...

【C语言常见概念详解】

目录 -----------------------------------------begin------------------------------------- 什么是C语言&#xff1a; 1. 基本数据类型 2. 变量与常量 3. 运算符与表达式 4. 控制结构 5. 函数 6. 指针 7. 数组与字符串 8. 结构体与联合体 9. 文件操作 结语 ----…...

弹性分组环——RPR技术

高频考点&#xff0c;考查20次&#xff1a; RPR与FDDI一样使用双环结构RPR环中的每一个节点都会执行SRP公平算法&#xff08;非DPT、MPLS&#xff09;传统的FDDI环&#xff0c;当源节点成功向目的结点发送一个数据帧后&#xff0c;这个数据帧由源结点从环中回收。但RPR环&#…...

定制Centos镜像

环境准备&#xff1a; 一台最小化安装的干净的系统&#xff0c;这里使用Centos7.9,一个Centos镜像&#xff0c;镜像也使用Centos7.9的。 [rootlocalhost ~]# cat /etc/system-release CentOS Linux release 7.9.2009 (Core) [rootlocalhost ~]# rpm -qa | wc -l 306 [rootloca…...

Java---判断素数的三种方法

我们首先先来了解一下什么是素数 素数:一个整数只能被1和自身整除 , 注意:0与1不是素数 目录 方法一:暴力法 方法二:除二法(优化) 方法三.根号法(最优法) 方法一:暴力法 最简单最暴力的方法就是根据定义&#xff0c;判断n是不是素数,让n除以2到n-1的所有数,只要遇到能除开…...

多级缓存(亿级并发解决方案)

多级缓存&#xff08;亿级流量&#xff08;并发&#xff09;的缓存方案&#xff09; 传统缓存的问题 传统缓存是请求到达tomcat后&#xff0c;先查询redis&#xff0c;如果未命中则查询数据库&#xff0c;问题如下&#xff1a; &#xff08;1&#xff09;请求要经过tomcat处…...

代理模式 - 代理模式的应用

引言 代理模式&#xff08;Proxy Pattern&#xff09;是一种结构型设计模式&#xff0c;它允许你提供一个代理对象来控制对另一个对象的访问。代理对象通常会在客户端和目标对象之间起到中介的作用&#xff0c;从而可以在不改变目标对象的情况下&#xff0c;增加额外的功能或控…...