当前位置: 首页 > news >正文

Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动

为什么要使用MQ?

在Spring Boot Event这篇文章中已经通过Guava或者SpringBoot自身的Listener实现了事件驱动,已经做到了对业务的解耦。为什么还要用到MQ来进行业务解耦呢?

首先无论是通过Guava还是Spring Boot自身提供的监听注解来实现的事件驱动他都是处于同一进程中的,意思就是当前事件推送后只有当前的进程可以进行消费。通过MQ可以实现将事件推送到进程外的Broker中,在多实例/分布式环境下,其他的服务在订阅同一事件(Topic)时,可以在各自的服务中进行消费,最大化空闲服务的利用。

image-20231227120405997

源码地址:Gitee

整合RocketMQ

依赖版本

  • JDK 17
  • Spring Boot 3.2.0
  • RocketMQ-Client 5.0.4
  • RocketMQ-Starter 2.2.0

可以参考这篇进行RocketMQ安装

Spring Boot 3.0+ 取消了对spring.factories的支持。所以在导入时需要手动引入RocketMQ的配置类。

引入RocketMQ依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.4</version>
</dependency>
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version>
</dependency>

解决Spring Boot3+不兼容 spring.factories

rocketmq-spring-boot-starter:2.2.2版本中:
image-20231227062105302

参考配置文件

# RocketMQ 配置
rocketmq:name-server: 127.0.0.1:9876consumer:group: event-mq-group# 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值pull-batch-size: 1producer:# 发送同一类消息的设置为同一个group,保证唯一group: event-mq-group# 发送消息超时时间,默认3000sendMessageTimeout: 10000# 发送消息失败重试次数,默认2retryTimesWhenSendFailed: 2# 异步消息重试此处,默认2retryTimesWhenSendAsyncFailed: 2# 消息最大长度,默认1024 * 1024 * 4(默认4M)maxMessageSize: 4096# 压缩消息阈值,默认4k(1024 * 4)compressMessageBodyThreshold: 4096# 是否在内部发送失败时重试另一个broker,默认falseretryNextServer: false

参考Issue

  • 方法一 :通过@Import(RocketMQAutoConfiguration.class)在配置类中引入

  • 方法二:在resources资源目录下创建文件夹及文件META-INF/springorg.springframework.boot.autoconfigure.AutoConfiguration.imports
    文件内容为RocketMQ自动配置类路径:org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

RocketMQ 使用

解决Spring Boot3+不支持spring.factories的问题

import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;/*** 启动类*/
@Import(RocketMQAutoConfiguration.class)
@SpringBootApplication
public class MQEventApplication {public static void main(String[] args) {SpringApplication.run(MQEventApplication.class, args);}
}

RocketMQ操作工具

RocketMQ Message实体

import cn.hutool.core.util.IdUtil;
import jakarta.validation.constraints.NotBlank;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;import java.io.Serializable;
import java.util.List;/*** RocketMQ 消息*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RocketMQMessage<T> implements Serializable {/*** 消息队列主题*/@NotBlank(message = "MQ Topic 不能为空")private String topic;/*** 延迟级别*/@Builder.Defaultprivate DelayLevel delayLevel = DelayLevel.OFF;/*** 消息体*/private T message;/*** 消息体*/private List<T> messages;/*** 使用有序消息发送时,指定发送到队列*/private String hashKey;/*** 任务Id,用于日志打印相关信息*/@Builder.Defaultprivate String taskId = IdUtil.fastSimpleUUID();
}

RocketMQTemplate 二次封装

import com.yiyan.study.domain.RocketMQMessage;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;/*** RocketMQ 消息工具类*/
@Slf4j
@Component
public class RocketMQService {@Resourceprivate RocketMQTemplate rocketMQTemplate;@Value("${rocketmq.producer.sendMessageTimeout}")private int sendMessageTimeout;/*** 异步发送消息回调** @param taskId 任务Id* @param topic  消息主题* @return the send callback*/private static SendCallback asyncSendCallback(String taskId, String topic) {return new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("ROCKETMQ 异步消息发送成功 : [TaskId:{}] - [Topic:{}] - [SendStatus:{}]", taskId, topic, sendResult.getSendStatus());}@Overridepublic void onException(Throwable throwable) {log.error("ROCKETMQ 异步消息发送失败 : [TaskId:{}] - [Topic:{}] - [ErrorMessage:{}]", taskId, topic, throwable.getMessage());}};}/*** 发送同步消息,使用有序发送请设置HashKey** @param message 消息参数*/public <T> void syncSend(RocketMQMessage<T> message) {log.info("ROCKETMQ 同步消息发送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());SendResult sendResult;if (StringUtils.isNotBlank(message.getHashKey())) {sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey());} else {sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessage(), sendMessageTimeout, message.getDelayLevel().getLevel());}log.info("ROCKETMQ 同步消息发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());}/*** 批量发送同步消息** @param message 消息参数*/public <T> void syncSendBatch(RocketMQMessage<T> message) {log.info("ROCKETMQ 同步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",message.getTaskId(), message.getTopic(), message.getMessages().size());SendResult sendResult;if (StringUtils.isNotBlank(message.getHashKey())) {sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey());} else {sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessages());}log.info("ROCKETMQ 同步消息-批量发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());}/*** 异步发送消息,异步返回消息结果** @param message 消息参数*/public <T> void asyncSend(RocketMQMessage<T> message) {log.info("ROCKETMQ 异步消息发送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());if (StringUtils.isNotBlank(message.getHashKey())) {rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey(),asyncSendCallback(message.getTaskId(), message.getTopic()));} else {rocketMQTemplate.asyncSend(message.getTopic(), message.getMessage(),asyncSendCallback(message.getTaskId(), message.getTopic()), sendMessageTimeout, message.getDelayLevel().getLevel());}}/*** 批量异步发送消息** @param message 消息参数*/public <T> void asyncSendBatch(RocketMQMessage<T> message) {log.info("ROCKETMQ 异步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",message.getTaskId(), message.getTopic(), message.getMessages().size());if (StringUtils.isNotBlank(message.getHashKey())) {rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey(),asyncSendCallback(message.getTaskId(), message.getTopic()));} else {rocketMQTemplate.asyncSend(message.getTopic(), message.getMessages(),asyncSendCallback(message.getTaskId(), message.getTopic()));}}/*** 单向发送消息,不关心返回结果,容易消息丢失,适合日志收集、不精确统计等消息发送;** @param message 消息参数*/public <T> void sendOneWay(RocketMQMessage<T> message) {sendOneWay(message, false);}/*** 单向消息 - 批量发送** @param message 消息体* @param batch   是否为批量操作*/public <T> void sendOneWay(RocketMQMessage<T> message, boolean batch) {log.info((batch ? "ROCKETMQ 单向消息发送 : [TaskId:{}] - [Topic:{}]": "ROCKETMQ 单向消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount{}]"),message.getTaskId(), message.getTopic(), message.getMessages().size());if (StringUtils.isNotBlank(message.getHashKey())) {if (batch) {message.getMessages().forEach(msg -> rocketMQTemplate.sendOneWayOrderly(message.getTopic(), msg, message.getHashKey()));} else {rocketMQTemplate.sendOneWayOrderly(message.getTopic(), message.getMessage(), message.getHashKey());}} else {if (batch) {message.getMessages().forEach(msg -> rocketMQTemplate.sendOneWay(message.getTopic(), msg));} else {rocketMQTemplate.sendOneWay(message.getTopic(), message.getMessage());}}}
}

定义RocketMQ消费者

import com.yiyan.study.constants.MQConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** MQ消息监听*/
@Component
@Slf4j
@RocketMQMessageListener(topic = MQConfig.EVENT_TOPIC,consumerGroup = MQConfig.EVENT_CONSUMER_GROUP)
public class MQListener implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("MQListener 接收消息 : {}", message);}
}

定义测试类发送消息

import cn.hutool.core.thread.ThreadUtil;
import com.yiyan.study.constants.MQConfig;
import com.yiyan.study.domain.RocketMQMessage;
import com.yiyan.study.utils.RocketMQService;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;/*** MQ测试*/
@SpringBootTest
public class MQTest {@Resourceprivate RocketMQService rocketMQService;@Testpublic void sendMessage() {int count = 1;while (count <= 50) {rocketMQService.syncSend(RocketMQMessage.builder().topic(MQConfig.EVENT_TOPIC).message(count++).build());}// 休眠等待消费消息ThreadUtil.sleep(2000L);}
}

测试

springboot3-RocketMQ

相关文章:

Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动

为什么要使用MQ&#xff1f; 在Spring Boot Event这篇文章中已经通过Guava或者SpringBoot自身的Listener实现了事件驱动&#xff0c;已经做到了对业务的解耦。为什么还要用到MQ来进行业务解耦呢&#xff1f; 首先无论是通过Guava还是Spring Boot自身提供的监听注解来实现的事…...

oracle ORA-01704: string literal too long ORACLE数据库clob类型

当oracle数据表中有clob类型字段时候&#xff0c;insert或update的sql语句中&#xff0c;超过长度就会报错 ORA-01704: string literal too long update xxx set xxx <div><h1>123</h1></div> where id 100;可以修改为 DECLAREstr varchar2(10000…...

微星主板强刷BIOS(以微星X370gaming plus 为例)

(前两天手欠&#xff0c;用U盘通过微星的M-flash升级BIOS 升级过程中老没动静就强制关机了 然后电脑就打不开了) 几种强刷主板BIOS的方式 在网上看到有三种强刷BIOS的方式分别是: 使用夹子编程器 (听说不太好夹)使用微星转接线编程器&#xff08;只能用于微星主板&#xff0…...

matlab 图像上生成指定中心,指定大小的矩形窗

用matlab实现在图像上生成指定中心,指定大小的矩形窗(奇数*奇数) function PlaneWin PlaneWindow(CentreCoorX,CentreCoorY,RadiusX,RadiusY,SizeImRow,SizeImColumn) % 在图像上生成指定中心,指定大小的矩形窗(奇数*奇数) % % Input: % CentreCoorX(1*1) % CentreCoorY(1*1)…...

❀My学习小记录之算法❀

目录 算法:) 一、定义 二、特征 三、基本要素 常用设计模式 常用实现方法 四、形式化算法 五、复杂度 时间复杂度 空间复杂度 六、非确定性多项式时间&#xff08;NP&#xff09; 七、实现 八、示例 求最大值算法 求最大公约数算法 九、分类 算法:) 一、定义 …...

Hive-high Avaliabl

hive—high Avaliable ​ hive的搭建方式有三种&#xff0c;分别是 ​ 1、Local/Embedded Metastore Database (Derby) ​ 2、Remote Metastore Database ​ 3、Remote Metastore Server ​ 一般情况下&#xff0c;我们在学习的时候直接使用hive –service metastore的方式…...

码住!8个小众宝藏的开发者学习类网站

1、simplilearn simplilearn是全球排名第一的在线学习网站&#xff0c;它的课程由世界知名大学、顶级企业和领先的行业机构通过实时在线课程设计和提供&#xff0c;其中包括顶级行业从业者、广受欢迎的培训师和全球领导者。 2、VisuAlgo VisuAlgo是一个免费的在线学习算法和数…...

Postman常见问题及解决方法

1、网络连接问题 如果Postman无法发送请求或接收响应&#xff0c;可以尝试以下操作&#xff1a; 检查网络连接是否正常&#xff0c;包括检查网络设置、代理设置等。 确认请求的URL是否正确&#xff0c;并检查是否使用了正确的HTTP方法&#xff08;例如GET、POST、PUT等&#…...

ubuntu图形化登录默认只有guest session账号解决方法

新安装的ubuntu16.x 图形化界面登录默认只有guest账号&#xff0c;只有进入guest账号之后再去手动切换root账号很麻烦&#xff0c;但是这样确实很安全。为了方便希望能够在登录图形化界面的时候以root身份/或者自定义其他身份登录。做一下简单的记录。 使用终端命令行编辑文件…...

全国计算机等级考试| 二级Python | 真题及解析(1)

一、选择题 1. 按照“后进先出”原则组织数据的数据结构是____ A栈 B双向链表 C二叉树 D队列 正确答案: A 2. 以下选项的叙述中,正确的是 A在循环队列中,只需要队头指针就能反映队列中元素的动态变化情况 B在循环队列中,只需要队尾指针就能反映队列中元素的动态变…...

Java开发框架和中间件面试题(9)

目录 102.你了解秒杀吗&#xff1f;怎么设计&#xff1f; 103.什么是缓存穿透&#xff1f;怎么解决&#xff1f; 102.你了解秒杀吗&#xff1f;怎么设计&#xff1f; 1.设计难点&#xff1a;并发量大&#xff0c;应用&#xff0c;数据库都承受不了。另外难控制超卖。 2.设计…...

【ARMv8M Cortex-M33 系列 2 -- Cortex-M33 JLink 连接 及 JFlash 烧写介绍】

文章目录 Jlink 工具JLink 命令行示例JFlash 烧写问题Jlink 工具 J-Link 是 SEGGER 提供的一款流行的 JTAG 调试器,它支持多个平台和处理器。JLink.exe 是 J-Link 调试器的命令行接口,它允许用户通过命令行执行一系列操作,例如编程、擦除、调试等。 工具链接: https://ww…...

react pwa应用示例

创建一个基于React的PWA应用&#xff0c;你可以使用create-react-app&#xff0c;它自带PWA支持&#xff0c;但默认是关闭的。以下是创建React PWA应用的步骤&#xff1a; 安装create-react-app 如果你还没有安装&#xff0c;你可以通过npm来安装&#xff1a; npm install -…...

python如何通过日志分析加入黑名单

python通过日志分析加入黑名单 监控nginx日志&#xff0c;若有人攻击&#xff0c;则加入黑名单&#xff0c;操作步骤如下&#xff1a; 1.读取日志文件 2.分隔文件&#xff0c;取出ip 3.将取出的ip放入list&#xff0c;然后判读ip的次数 4.若超过设定的次数&#xff0c;则加…...

RabbitMq知识概述

本文来说下RabbitMq相关的知识与概念 文章目录 概述AMQP协议Exchange 消息如何保证100&#xff05;投递什么是生产端的可靠性投递可靠性投递保障方案 消息幂等性高并发的情况下如何避免消息重复消费confirm 确认消息、Return返回消息如何实现confirm确认消息return消息机制 消费…...

专业级A链接测试特有

A链接普通 A链接添加链接描述带有blank...

Spring Boot 入参校验及全局异常处理

版本依赖 JDK 17 Spring Boot 3.2.0 源码地址&#xff1a;Gitee Spring Boot validation spring-boot-starter-validation是基于hibernate-validator的实现&#xff0c;在Spring Boot项目中直接导入spring-boot-starter-validation即可。 Valid 和 Validated 的区别 适用范围…...

MySQL 和 MySQL2 的区别

MySQL是最流行的开源关系型数据库管理系统,拥有大量的使用者和广泛的应用场景。而MySQL2是MySQL官方团队推出的新一代MySQL驱动&#xff0c;用于取代老版的MySQL模块&#xff0c;提供更好的性能和更丰富的功能。 本文将介绍MySQL2相较于MySQL有哪些优势以及具体的技术区别。 …...

AutoCAD图纸打印后内容不见

用户反映&#xff0c;在CAD里的对象打印出来不显示。其实&#xff0c;这是在CAD的打印对象颜色的问题。&#xff08;在9.2以下版本有这种问题&#xff0c;9.2及以上版本已默认此种颜色&#xff09; 1.当背景色为黑色的时候&#xff0c;这里的颜色是白&#xff0c;如下图 2.当C…...

ASUS华硕ROG幻16 2023款GU603VU VV VI笔记本电脑原厂Win11.22H2系统

链接&#xff1a;https://pan.baidu.com/s/1AgevUZleCHBJgCBcIp5CFQ?pwdhjxy 提取码&#xff1a;hjxy 华硕笔记本2023款幻16原厂Windows11系统自带所有驱动、出厂主题壁纸、Office办公软件、MyASUS华硕电脑管家、Armoury Crate奥创控制中心等预装程序 文件格式&#xff1…...

Prompt Tuning、P-Tuning、Prefix Tuning的区别

一、Prompt Tuning、P-Tuning、Prefix Tuning的区别 1. Prompt Tuning(提示调优) 核心思想:固定预训练模型参数,仅学习额外的连续提示向量(通常是嵌入层的一部分)。实现方式:在输入文本前添加可训练的连续向量(软提示),模型只更新这些提示参数。优势:参数量少(仅提…...

DockerHub与私有镜像仓库在容器化中的应用与管理

哈喽&#xff0c;大家好&#xff0c;我是左手python&#xff01; Docker Hub的应用与管理 Docker Hub的基本概念与使用方法 Docker Hub是Docker官方提供的一个公共镜像仓库&#xff0c;用户可以在其中找到各种操作系统、软件和应用的镜像。开发者可以通过Docker Hub轻松获取所…...

FastAPI 教程:从入门到实践

FastAPI 是一个现代、快速&#xff08;高性能&#xff09;的 Web 框架&#xff0c;用于构建 API&#xff0c;支持 Python 3.6。它基于标准 Python 类型提示&#xff0c;易于学习且功能强大。以下是一个完整的 FastAPI 入门教程&#xff0c;涵盖从环境搭建到创建并运行一个简单的…...

剑指offer20_链表中环的入口节点

链表中环的入口节点 给定一个链表&#xff0c;若其中包含环&#xff0c;则输出环的入口节点。 若其中不包含环&#xff0c;则输出null。 数据范围 节点 val 值取值范围 [ 1 , 1000 ] [1,1000] [1,1000]。 节点 val 值各不相同。 链表长度 [ 0 , 500 ] [0,500] [0,500]。 …...

ffmpeg(四):滤镜命令

FFmpeg 的滤镜命令是用于音视频处理中的强大工具&#xff0c;可以完成剪裁、缩放、加水印、调色、合成、旋转、模糊、叠加字幕等复杂的操作。其核心语法格式一般如下&#xff1a; ffmpeg -i input.mp4 -vf "滤镜参数" output.mp4或者带音频滤镜&#xff1a; ffmpeg…...

使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台

🎯 使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台 📌 项目背景 随着大语言模型(LLM)的广泛应用,开发者常面临多个挑战: 各大模型(OpenAI、Claude、Gemini、Ollama)接口风格不统一;缺乏一个统一平台进行模型调用与测试;本地模型 Ollama 的集成与前…...

云原生玩法三问:构建自定义开发环境

云原生玩法三问&#xff1a;构建自定义开发环境 引言 临时运维一个古董项目&#xff0c;无文档&#xff0c;无环境&#xff0c;无交接人&#xff0c;俗称三无。 运行设备的环境老&#xff0c;本地环境版本高&#xff0c;ssh不过去。正好最近对 腾讯出品的云原生 cnb 感兴趣&…...

Rust 开发环境搭建

环境搭建 1、开发工具RustRover 或者vs code 2、Cygwin64 安装 https://cygwin.com/install.html 在工具终端执行&#xff1a; rustup toolchain install stable-x86_64-pc-windows-gnu rustup default stable-x86_64-pc-windows-gnu ​ 2、Hello World fn main() { println…...

【FTP】ftp文件传输会丢包吗?批量几百个文件传输,有一些文件没有传输完整,如何解决?

FTP&#xff08;File Transfer Protocol&#xff09;本身是一个基于 TCP 的协议&#xff0c;理论上不会丢包。但 FTP 文件传输过程中仍可能出现文件不完整、丢失或损坏的情况&#xff0c;主要原因包括&#xff1a; ✅ 一、FTP传输可能“丢包”或文件不完整的原因 原因描述网络…...

【Kafka】Kafka从入门到实战:构建高吞吐量分布式消息系统

Kafka从入门到实战:构建高吞吐量分布式消息系统 一、Kafka概述 Apache Kafka是一个分布式流处理平台,最初由LinkedIn开发,后成为Apache顶级项目。它被设计用于高吞吐量、低延迟的消息处理,能够处理来自多个生产者的海量数据,并将这些数据实时传递给消费者。 Kafka核心特…...