网站开发网页前置开发/seo交流论坛
为什么要使用MQ?
在Spring Boot Event这篇文章中已经通过Guava
或者SpringBoot自身的Listener实现了事件驱动,已经做到了对业务的解耦。为什么还要用到MQ来进行业务解耦呢?
首先无论是通过Guava还是Spring Boot自身提供的监听注解来实现的事件驱动他都是处于同一进程中的,意思就是当前事件推送后只有当前的进程可以进行消费。通过MQ可以实现将事件推送到进程外的Broker
中,在多实例/分布式环境下,其他的服务在订阅同一事件(Topic)时,可以在各自的服务中进行消费,最大化空闲服务的利用。
源码地址:Gitee
整合RocketMQ
依赖版本
- JDK 17
- Spring Boot 3.2.0
- RocketMQ-Client 5.0.4
- RocketMQ-Starter 2.2.0
可以参考这篇进行RocketMQ安装
Spring Boot 3.0+ 取消了对spring.factories
的支持。所以在导入时需要手动引入RocketMQ的配置类。
引入RocketMQ依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.4</version>
</dependency>
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version>
</dependency>
解决Spring Boot3+不兼容 spring.factories
rocketmq-spring-boot-starter:2.2.2
版本中:
参考配置文件
# RocketMQ 配置
rocketmq:name-server: 127.0.0.1:9876consumer:group: event-mq-group# 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值pull-batch-size: 1producer:# 发送同一类消息的设置为同一个group,保证唯一group: event-mq-group# 发送消息超时时间,默认3000sendMessageTimeout: 10000# 发送消息失败重试次数,默认2retryTimesWhenSendFailed: 2# 异步消息重试此处,默认2retryTimesWhenSendAsyncFailed: 2# 消息最大长度,默认1024 * 1024 * 4(默认4M)maxMessageSize: 4096# 压缩消息阈值,默认4k(1024 * 4)compressMessageBodyThreshold: 4096# 是否在内部发送失败时重试另一个broker,默认falseretryNextServer: false
参考Issue
-
方法一 :通过
@Import(RocketMQAutoConfiguration.class)
在配置类中引入 -
方法二:在
resources
资源目录下创建文件夹及文件META-INF/spring
,org.springframework.boot.autoconfigure.AutoConfiguration.imports
。
文件内容为RocketMQ自动配置类路径:org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
RocketMQ 使用
解决Spring Boot3+不支持spring.factories的问题
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;/*** 启动类*/
@Import(RocketMQAutoConfiguration.class)
@SpringBootApplication
public class MQEventApplication {public static void main(String[] args) {SpringApplication.run(MQEventApplication.class, args);}
}
RocketMQ操作工具
RocketMQ Message实体
import cn.hutool.core.util.IdUtil;
import jakarta.validation.constraints.NotBlank;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;import java.io.Serializable;
import java.util.List;/*** RocketMQ 消息*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RocketMQMessage<T> implements Serializable {/*** 消息队列主题*/@NotBlank(message = "MQ Topic 不能为空")private String topic;/*** 延迟级别*/@Builder.Defaultprivate DelayLevel delayLevel = DelayLevel.OFF;/*** 消息体*/private T message;/*** 消息体*/private List<T> messages;/*** 使用有序消息发送时,指定发送到队列*/private String hashKey;/*** 任务Id,用于日志打印相关信息*/@Builder.Defaultprivate String taskId = IdUtil.fastSimpleUUID();
}
RocketMQTemplate 二次封装
import com.yiyan.study.domain.RocketMQMessage;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;/*** RocketMQ 消息工具类*/
@Slf4j
@Component
public class RocketMQService {@Resourceprivate RocketMQTemplate rocketMQTemplate;@Value("${rocketmq.producer.sendMessageTimeout}")private int sendMessageTimeout;/*** 异步发送消息回调** @param taskId 任务Id* @param topic 消息主题* @return the send callback*/private static SendCallback asyncSendCallback(String taskId, String topic) {return new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("ROCKETMQ 异步消息发送成功 : [TaskId:{}] - [Topic:{}] - [SendStatus:{}]", taskId, topic, sendResult.getSendStatus());}@Overridepublic void onException(Throwable throwable) {log.error("ROCKETMQ 异步消息发送失败 : [TaskId:{}] - [Topic:{}] - [ErrorMessage:{}]", taskId, topic, throwable.getMessage());}};}/*** 发送同步消息,使用有序发送请设置HashKey** @param message 消息参数*/public <T> void syncSend(RocketMQMessage<T> message) {log.info("ROCKETMQ 同步消息发送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());SendResult sendResult;if (StringUtils.isNotBlank(message.getHashKey())) {sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey());} else {sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessage(), sendMessageTimeout, message.getDelayLevel().getLevel());}log.info("ROCKETMQ 同步消息发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());}/*** 批量发送同步消息** @param message 消息参数*/public <T> void syncSendBatch(RocketMQMessage<T> message) {log.info("ROCKETMQ 同步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",message.getTaskId(), message.getTopic(), message.getMessages().size());SendResult sendResult;if (StringUtils.isNotBlank(message.getHashKey())) {sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey());} else {sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessages());}log.info("ROCKETMQ 同步消息-批量发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());}/*** 异步发送消息,异步返回消息结果** @param message 消息参数*/public <T> void asyncSend(RocketMQMessage<T> message) {log.info("ROCKETMQ 异步消息发送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());if (StringUtils.isNotBlank(message.getHashKey())) {rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey(),asyncSendCallback(message.getTaskId(), message.getTopic()));} else {rocketMQTemplate.asyncSend(message.getTopic(), message.getMessage(),asyncSendCallback(message.getTaskId(), message.getTopic()), sendMessageTimeout, message.getDelayLevel().getLevel());}}/*** 批量异步发送消息** @param message 消息参数*/public <T> void asyncSendBatch(RocketMQMessage<T> message) {log.info("ROCKETMQ 异步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",message.getTaskId(), message.getTopic(), message.getMessages().size());if (StringUtils.isNotBlank(message.getHashKey())) {rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey(),asyncSendCallback(message.getTaskId(), message.getTopic()));} else {rocketMQTemplate.asyncSend(message.getTopic(), message.getMessages(),asyncSendCallback(message.getTaskId(), message.getTopic()));}}/*** 单向发送消息,不关心返回结果,容易消息丢失,适合日志收集、不精确统计等消息发送;** @param message 消息参数*/public <T> void sendOneWay(RocketMQMessage<T> message) {sendOneWay(message, false);}/*** 单向消息 - 批量发送** @param message 消息体* @param batch 是否为批量操作*/public <T> void sendOneWay(RocketMQMessage<T> message, boolean batch) {log.info((batch ? "ROCKETMQ 单向消息发送 : [TaskId:{}] - [Topic:{}]": "ROCKETMQ 单向消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount{}]"),message.getTaskId(), message.getTopic(), message.getMessages().size());if (StringUtils.isNotBlank(message.getHashKey())) {if (batch) {message.getMessages().forEach(msg -> rocketMQTemplate.sendOneWayOrderly(message.getTopic(), msg, message.getHashKey()));} else {rocketMQTemplate.sendOneWayOrderly(message.getTopic(), message.getMessage(), message.getHashKey());}} else {if (batch) {message.getMessages().forEach(msg -> rocketMQTemplate.sendOneWay(message.getTopic(), msg));} else {rocketMQTemplate.sendOneWay(message.getTopic(), message.getMessage());}}}
}
定义RocketMQ消费者
import com.yiyan.study.constants.MQConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** MQ消息监听*/
@Component
@Slf4j
@RocketMQMessageListener(topic = MQConfig.EVENT_TOPIC,consumerGroup = MQConfig.EVENT_CONSUMER_GROUP)
public class MQListener implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("MQListener 接收消息 : {}", message);}
}
定义测试类发送消息
import cn.hutool.core.thread.ThreadUtil;
import com.yiyan.study.constants.MQConfig;
import com.yiyan.study.domain.RocketMQMessage;
import com.yiyan.study.utils.RocketMQService;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;/*** MQ测试*/
@SpringBootTest
public class MQTest {@Resourceprivate RocketMQService rocketMQService;@Testpublic void sendMessage() {int count = 1;while (count <= 50) {rocketMQService.syncSend(RocketMQMessage.builder().topic(MQConfig.EVENT_TOPIC).message(count++).build());}// 休眠等待消费消息ThreadUtil.sleep(2000L);}
}
测试
相关文章:

Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动
为什么要使用MQ? 在Spring Boot Event这篇文章中已经通过Guava或者SpringBoot自身的Listener实现了事件驱动,已经做到了对业务的解耦。为什么还要用到MQ来进行业务解耦呢? 首先无论是通过Guava还是Spring Boot自身提供的监听注解来实现的事…...

oracle ORA-01704: string literal too long ORACLE数据库clob类型
当oracle数据表中有clob类型字段时候,insert或update的sql语句中,超过长度就会报错 ORA-01704: string literal too long update xxx set xxx <div><h1>123</h1></div> where id 100;可以修改为 DECLAREstr varchar2(10000…...

微星主板强刷BIOS(以微星X370gaming plus 为例)
(前两天手欠,用U盘通过微星的M-flash升级BIOS 升级过程中老没动静就强制关机了 然后电脑就打不开了) 几种强刷主板BIOS的方式 在网上看到有三种强刷BIOS的方式分别是: 使用夹子编程器 (听说不太好夹)使用微星转接线编程器(只能用于微星主板࿰…...

matlab 图像上生成指定中心,指定大小的矩形窗
用matlab实现在图像上生成指定中心,指定大小的矩形窗(奇数*奇数) function PlaneWin PlaneWindow(CentreCoorX,CentreCoorY,RadiusX,RadiusY,SizeImRow,SizeImColumn) % 在图像上生成指定中心,指定大小的矩形窗(奇数*奇数) % % Input: % CentreCoorX(1*1) % CentreCoorY(1*1)…...
❀My学习小记录之算法❀
目录 算法:) 一、定义 二、特征 三、基本要素 常用设计模式 常用实现方法 四、形式化算法 五、复杂度 时间复杂度 空间复杂度 六、非确定性多项式时间(NP) 七、实现 八、示例 求最大值算法 求最大公约数算法 九、分类 算法:) 一、定义 …...

Hive-high Avaliabl
hive—high Avaliable hive的搭建方式有三种,分别是 1、Local/Embedded Metastore Database (Derby) 2、Remote Metastore Database 3、Remote Metastore Server 一般情况下,我们在学习的时候直接使用hive –service metastore的方式…...

码住!8个小众宝藏的开发者学习类网站
1、simplilearn simplilearn是全球排名第一的在线学习网站,它的课程由世界知名大学、顶级企业和领先的行业机构通过实时在线课程设计和提供,其中包括顶级行业从业者、广受欢迎的培训师和全球领导者。 2、VisuAlgo VisuAlgo是一个免费的在线学习算法和数…...

Postman常见问题及解决方法
1、网络连接问题 如果Postman无法发送请求或接收响应,可以尝试以下操作: 检查网络连接是否正常,包括检查网络设置、代理设置等。 确认请求的URL是否正确,并检查是否使用了正确的HTTP方法(例如GET、POST、PUT等&#…...

ubuntu图形化登录默认只有guest session账号解决方法
新安装的ubuntu16.x 图形化界面登录默认只有guest账号,只有进入guest账号之后再去手动切换root账号很麻烦,但是这样确实很安全。为了方便希望能够在登录图形化界面的时候以root身份/或者自定义其他身份登录。做一下简单的记录。 使用终端命令行编辑文件…...

全国计算机等级考试| 二级Python | 真题及解析(1)
一、选择题 1. 按照“后进先出”原则组织数据的数据结构是____ A栈 B双向链表 C二叉树 D队列 正确答案: A 2. 以下选项的叙述中,正确的是 A在循环队列中,只需要队头指针就能反映队列中元素的动态变化情况 B在循环队列中,只需要队尾指针就能反映队列中元素的动态变…...

Java开发框架和中间件面试题(9)
目录 102.你了解秒杀吗?怎么设计? 103.什么是缓存穿透?怎么解决? 102.你了解秒杀吗?怎么设计? 1.设计难点:并发量大,应用,数据库都承受不了。另外难控制超卖。 2.设计…...

【ARMv8M Cortex-M33 系列 2 -- Cortex-M33 JLink 连接 及 JFlash 烧写介绍】
文章目录 Jlink 工具JLink 命令行示例JFlash 烧写问题Jlink 工具 J-Link 是 SEGGER 提供的一款流行的 JTAG 调试器,它支持多个平台和处理器。JLink.exe 是 J-Link 调试器的命令行接口,它允许用户通过命令行执行一系列操作,例如编程、擦除、调试等。 工具链接: https://ww…...

react pwa应用示例
创建一个基于React的PWA应用,你可以使用create-react-app,它自带PWA支持,但默认是关闭的。以下是创建React PWA应用的步骤: 安装create-react-app 如果你还没有安装,你可以通过npm来安装: npm install -…...

python如何通过日志分析加入黑名单
python通过日志分析加入黑名单 监控nginx日志,若有人攻击,则加入黑名单,操作步骤如下: 1.读取日志文件 2.分隔文件,取出ip 3.将取出的ip放入list,然后判读ip的次数 4.若超过设定的次数,则加…...

RabbitMq知识概述
本文来说下RabbitMq相关的知识与概念 文章目录 概述AMQP协议Exchange 消息如何保证100%投递什么是生产端的可靠性投递可靠性投递保障方案 消息幂等性高并发的情况下如何避免消息重复消费confirm 确认消息、Return返回消息如何实现confirm确认消息return消息机制 消费…...

专业级A链接测试特有
A链接普通 A链接添加链接描述带有blank...

Spring Boot 入参校验及全局异常处理
版本依赖 JDK 17 Spring Boot 3.2.0 源码地址:Gitee Spring Boot validation spring-boot-starter-validation是基于hibernate-validator的实现,在Spring Boot项目中直接导入spring-boot-starter-validation即可。 Valid 和 Validated 的区别 适用范围…...

MySQL 和 MySQL2 的区别
MySQL是最流行的开源关系型数据库管理系统,拥有大量的使用者和广泛的应用场景。而MySQL2是MySQL官方团队推出的新一代MySQL驱动,用于取代老版的MySQL模块,提供更好的性能和更丰富的功能。 本文将介绍MySQL2相较于MySQL有哪些优势以及具体的技术区别。 …...

AutoCAD图纸打印后内容不见
用户反映,在CAD里的对象打印出来不显示。其实,这是在CAD的打印对象颜色的问题。(在9.2以下版本有这种问题,9.2及以上版本已默认此种颜色) 1.当背景色为黑色的时候,这里的颜色是白,如下图 2.当C…...

ASUS华硕ROG幻16 2023款GU603VU VV VI笔记本电脑原厂Win11.22H2系统
链接:https://pan.baidu.com/s/1AgevUZleCHBJgCBcIp5CFQ?pwdhjxy 提取码:hjxy 华硕笔记本2023款幻16原厂Windows11系统自带所有驱动、出厂主题壁纸、Office办公软件、MyASUS华硕电脑管家、Armoury Crate奥创控制中心等预装程序 文件格式࿱…...

学习笔记 k8s常用kubectl命令
k8s常用kubectl命令 pod 相关强制删除pod查看 Pod 中指定容器的日志pod 扩容 etcd 备份集群设置集群上下文配置文件切换集群 节点cordondrain pod 相关 强制删除pod pod 状态terminal了,需要强制删除 kubectl delete pod <pod_name> --grace-period0 --force…...

企业数据可视化-亿发数据化管理平台提供商,实现一站式数字化运营
近些年来,国内企业数据化管理升级进程持续加速,以物联网建设、人工智能、大数据和5G网络等新技术的发展,推动了数字经济的蓬勃发展,成为维持经济持续稳定增长的重要引擎。如今许多国内中小型企业纷纷摒弃传统管理模式,…...

网络通信-Linux 对网络通信的实现
Linux 网络 IO 模型 同步和异步,阻塞和非阻塞 同步和异步 关注的是调用方是否主动获取结果 同步:同步的意思就是调用方需要主动等待结果的返回 异步:异步的意思就是不需要主动等待结果的返回,而是通过其他手段比如,状态通知࿰…...

mysql修改密码
mysql -u root -p ALTER USER USER() IDENTIFIED BY root; FLUSH PRIVILEGES; 其它cmd: ①查看端口,找到占用3306端口的进程:命令行输入 netstat -aon ,找到端口号为3306的对应的PID ②结束占用端口3306的进程:命令…...

深入解析C语言中void (*signal(int ,void(*)(int) ) ) (int)
目录 深入解析C语言中的signal函数声明 1. signal函数声明 1.1 signal是一个函数 1.2 返回类型是一个函数指针 2. 函数指针的理解 3. 简化声明使用typedef 为啥不这么写typedef void (*)(int) acc? 代码: 结论 深入解析C语言中的signal函数声明…...

网站显示不安全警告怎么办?消除网站不安全警告超全指南
网站显示不安全警告怎么办?当用户访问你的网站,而您的网站没有部署SSL证书实现HTTPS加密时,网站就会显示不安全警告,这种警告,不仅有可能阻止用户继续浏览网站,影响网站声誉,还有可能影响网站在…...

[SWPUCTF 2021 新生赛]finalrce
[SWPUCTF 2021 新生赛]finalrce wp 注:本文参考了 NSSCTF Leaderchen 师傅的题解,并修补了其中些许不足。 此外,参考了 命令执行(RCE)面对各种过滤,骚姿势绕过总结 题目代码: <?php highlight_file(__FILE__); …...

如何底层调用最快地复制OPC数据到关系数据库
计算机上的二大应用,一是从WEB服务器上获得数据,另一种是向关系数据库中写入数据。在上集我已提出了一个从WEB上获得OPC数据的独创方法,现在谈谈第二种如何快速地把OPC数据写进到数据库中,这也是Calssic OPC最典型的一个应用场景。…...

接口测试工具——ApiFox使用初体验 postman导出和ApiFox导入
目录 ApiFox使用初体验初步使用从postman导出到apifox导入 IDEA简单测试Postman测试工具post请求 接口测试工具swaggerKnife4j1.引入依赖2.配置3.常用注解4.接口测试 JMeter什么是JMeter?JMeter安装配置1.官网下载2.下载后解压3.汉语设置 JMeter的使用方法1.新建线程组2.设置参…...

搜维尔科技:经脉腧穴虚拟针灸VR虚拟教学平台AcuMap软件案例分享
北京中医药大学经脉腧穴VR虚拟教学平台案例 主要产品 HTCvive ,AcuMap; 实施内容 一、项目说明 (1)穴位取穴与体表解剖标志关系;(2)穴下层次解剖及周围解剖结构展示; …...