当前位置: 首页 > news >正文

SpringBoot 整合 RabbitMQ 实现延迟消息

一、业务场景说明

用于解决用户下单以后,订单超时如何取消订单的问题。

  • 用户进行下单操作(会有锁定商品库存、使用优惠券、积分一系列的操作);
  • 生成订单,获取订单的id;
  • 获取到设置的订单超时时间(假设设置的为60分钟不支付取消订单);
  • 按订单超时时间发送一个延迟消息给RabbitMQ,让它在订单超时后触发取消订单的操作;
  • 如果用户没有支付,进行取消订单操作(释放锁定商品库存、返还优惠券、返回积分一系列操作)。

相关概念:(死信队列与延迟队列)

        死信,顾名思义就是无法被消费的消息,一般来说 Producer 将消息投递到 broker 或者直接丢到 queue 中,Consumer 从 Queue 中取出消息进行消费,但是某些时候由于特定的原因导致 Queue 中的某些消息无法被消费,这样的消息如果没有后续的处理就变成了死信,有死信自然就有了死信队列

        死信队列有其特殊的应用场景,例如用户在商城下单成功并点击去支付的时候,如果在指定的时间内未支付,那么就可以将该下单消息投递到死信队列中,至于后续怎么处理死信队列需要结合具体的应用场景。

死信队列(Dead Letter Queue, DLQ)

目的: 处理无法成功消费的消息。

特点:

  1. 目的: 用于接收无法成功处理的消息。这些消息可能因为多种原因无法被正常消费,例如消息处理失败、消息超时、队列满等。
  2. 触发条件: 当消息在主队列中无法被正常消费,并且达到一定的失败次数或过期时间后,会被转发到死信队列。
  3. 用途: 通过分析死信队列中的消息,开发者可以排查和解决系统中的问题,也可以进行后续的审计和处理。

实现方式:

  • 需要配置主队列和死信队列之间的关系。
  • 配置消息的重试策略和死信转发条件(如重试次数、过期时间等)。

延迟队列(Delayed Queue)

目的: 控制消息的处理时间。

特点:

  1. 目的: 在特定的时间点或延迟一段时间后再将消息发送到主队列进行处理。用于在系统中实现消息的延迟处理。
  2. 触发条件: 当消息被发送到延迟队列时,它会根据设定的延迟时间在未来某个时刻被转发到主队列。
  3. 用途: 常用于实现任务的延迟执行、定时任务、超时重试等功能。

实现方式:

  • 使用专门支持延迟队列的消息队列系统,或者通过设置消息的过期时间和重试机制来实现。
  • 可以配置延迟时间,确保消息在设定的时间后进入主队列。

总结

  • 死信队列: 处理无法消费的消息,通常用于异常处理和问题排查。
  • 延迟队列: 控制消息的处理时间,允许消息在未来某个时间点才被处理。

二、整合RabbitMQ实现延迟消息

整合依赖及配置

●在pom.xml中添加AMQP相关依赖;

<!--Spring AMQP相关依赖-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 修改application.yml文件,在spring节点下添加RabbitMQ相关配置。
spring:rabbitmq:host: localhostport: 5672virtual-host: /mallusername: mallpassword: mallpublisher-returns: true #消息发送到队列确认publisher-confirm-type: simple #消息发送到交换器确认

实现延迟消息

  • 添加消息队列的枚举配置类QueueEnum,用于延迟消息队列及处理取消订单消息队列的常量定义,包括交换机名称、队列名称、路由键名称;
/*** @description 消息队列枚举配置*/
@Getter
public enum QueueEnum {/*** 消息通知队列*/QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"),/*** 消息通知ttl队列*/QUEUE_TTL_ORDER_CANCEL("mall.order.direct.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl");/*** 交换机名称*/private String exchange;/*** 队列名称*/private String name;/*** 路由键*/private String routeKey;QueueEnum(String exchange, String name, String routeKey) {this.exchange = exchange;this.name = name;this.routeKey = routeKey;}
}
  • 添加RabbitMQ的Java配置,用于配置交换机、队列及队列与交换机的绑定关系;
/*** @description 消息队列配置*/
@Configuration
public class RabbitMqConfig {/*** 订单消息实际消费队列所绑定的交换机*/@BeanDirectExchange orderDirect() {return ExchangeBuilder.directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange()).durable(true).build();}/*** 订单延迟队列所绑定的交换机*/@BeanDirectExchange orderTtlDirect() {return ExchangeBuilder.directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange()).durable(true).build();}/*** 订单实际消费队列*/@Beanpublic Queue orderQueue() {return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());}/*** 订单延迟队列(死信队列)*/@Beanpublic Queue orderTtlQueue() {return QueueBuilder.durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName()).withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后转发的交换机.withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后转发的路由键.build();}/*** 将订单队列绑定到交换机*/@BeanBinding orderBinding(DirectExchange orderDirect,Queue orderQueue){return BindingBuilder.bind(orderQueue).to(orderDirect).with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());}/*** 将订单延迟队列绑定到交换机*/@BeanBinding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){return BindingBuilder.bind(orderTtlQueue).to(orderTtlDirect).with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());}}
  • 此时登录,在RabbitMQ管理页面可以看到以下交换机和队列;

  • 交换机及队列说明如下:mall.order.direct(取消订单消息队列所绑定的交换机):绑定的队列为mall.order.cancel,一旦有消息以mall.order.cancel为路由键发过来,会发送到此队列。mall.order.direct.ttl(订单延迟消息队列所绑定的交换机):绑定的队列为mall.order.cancel.ttl,一旦有消息以mall.order.cancel.ttl为路由键发送过来,会转发到此队列,并在此队列保存一定时间,等到超时后会自动将消息发送到mall.order.cancel(取消订单消息消费队列)。
  • 添加延迟消息的发送者CancelOrderSender,用于向订单延迟消息队(mall.order.cancel.ttl)里发送消息;
/*** @description 取消订单消息的发出者*/
@Component
public class CancelOrderSender {private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class);@Autowiredprivate AmqpTemplate amqpTemplate;public void sendMessage(Long orderId,final long delayTimes){//给延迟队列发送消息amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//给消息设置延迟毫秒值message.getMessageProperties().setExpiration(String.valueOf(delayTimes));return message;}});LOGGER.info("send delay message orderId:{}",orderId);}
}
  • 添加取消订单消息的接收者CancelOrderReceiver,用于从取消订单的消息队列(mall.order.cancel)里接收消息;
/*** @description 取消订单消息的处理者*/
@Component
@RabbitListener(queues = "mall.order.cancel")
public class CancelOrderReceiver {private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);@Autowiredprivate OmsPortalOrderService portalOrderService;@RabbitHandlerpublic void handle(Long orderId){LOGGER.info("receive delay message orderId:{}",orderId);portalOrderService.cancelOrder(orderId);}
}
  • 添加OmsPortalOrderService接口;
/*** @description 前台订单管理Service*/
public interface OmsPortalOrderService {/*** 根据提交信息生成订单*/@TransactionalCommonResult generateOrder(OrderParam orderParam);/*** 取消单个超时订单*/@Transactionalvoid cancelOrder(Long orderId);
}
  • 添加OmsPortalOrderService的实现类OmsPortalOrderServiceImpl;
/*** @description 前台订单管理Service*/
@Service
public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class);@Autowiredprivate CancelOrderSender cancelOrderSender;@Overridepublic CommonResult generateOrder(OrderParam orderParam) {//todo 执行一系类下单操作,具体参考mall项目LOGGER.info("process generateOrder");//下单完成后开启一个延迟消息,用于当用户没有付款时取消订单(orderId应该在下单后生成)sendDelayMessageCancelOrder(11L);return CommonResult.success(null, "下单成功");}@Overridepublic void cancelOrder(Long orderId) {//todo 执行一系类取消订单操作,具体参考mall项目LOGGER.info("process cancelOrder orderId:{}",orderId);}private void sendDelayMessageCancelOrder(Long orderId) {//获取订单超时时间,假设为60分钟long delayTimes = 30 * 1000;//发送延迟消息cancelOrderSender.sendMessage(orderId, delayTimes);}}
  • 添加OmsPortalOrderController定义接口;
/*** @description 订单管理Controller*/
@Controller
@Api(tags = "OmsPortalOrderController")
@Tag(name = "OmsPortalOrderController", description = "订单管理")
@RequestMapping("/order")
public class OmsPortalOrderController {@Autowiredprivate OmsPortalOrderService portalOrderService;@ApiOperation("根据购物车信息生成订单")@RequestMapping(value = "/generateOrder", method = RequestMethod.POST)@ResponseBodypublic Object generateOrder(@RequestBody OrderParam orderParam) {return portalOrderService.generateOrder(orderParam);}
}

延迟消息功能演示

  • 运行项目,访问Swagger API文档,访问地址:http://localhost:8080/swagger-ui/

可以看到,经过了30秒后延迟消息发送出去了 

相关文章:

SpringBoot 整合 RabbitMQ 实现延迟消息

一、业务场景说明 用于解决用户下单以后&#xff0c;订单超时如何取消订单的问题。 用户进行下单操作&#xff08;会有锁定商品库存、使用优惠券、积分一系列的操作&#xff09;&#xff1b;生成订单&#xff0c;获取订单的id&#xff1b;获取到设置的订单超时时间&#xff0…...

Cilium:基于开源 eBPF 的网络、安全性和可观察性

基于 eBPF 的网络、安全性和可观察性 Cilium 是一种开源的云原生解决方案&#xff0c;它利用 Linux 内核中的 eBPF 技术来提供、保护和监控工作负载之间的网络连接。 什么是 eBPF&#xff1f; eBPF 是一项源自 Linux 内核的技术&#xff0c;允许沙盒程序在特权上下文&#x…...

Axios 详解与使用指南

Axios 详解与使用指南 1. Axios 简介 Axios 是一个基于 Promise 的 HTTP 客户端&#xff0c;能够在浏览器和 Node.js 环境中运行。它提供了一种简便的方式来执行 HTTP 请求&#xff0c;并支持多种请求方法&#xff0c;如 GET、POST、PUT、DELETE 等。Axios 的配置灵活&#x…...

深度学习 —— 个人学习笔记20(转置卷积、全卷积网络)

声明 本文章为个人学习使用&#xff0c;版面观感若有不适请谅解&#xff0c;文中知识仅代表个人观点&#xff0c;若出现错误&#xff0c;欢迎各位批评指正。 三十九、转置卷积 import torch from torch import nndef trans_conv(X, K):h, w K.shapeY torch.zeros((X.shape[…...

解决Mac系统Python3.12版本pip安装报错error: externally-managed-environment的问题

遇到的问题 在Mac安装了Python3.12.x版本&#xff08;3.12.3、3.12.4&#xff09;后&#xff0c;当尝试pip3 install xxx的时候&#xff0c;总是报错&#xff1a;error: externally-managed-environment error: externally-managed-environment This environment is external…...

lvm知识终结

、什么是 LVM LVM 是 Linux 下对磁盘分区进行管理的一种工具&#xff0c;适合管理大存储设备&#xff0c;并允许用户动态调整文件系统的大小 lvm 常用的命令 功能 PV 管理命令 VG 管理命令 LV 管理命令 scan 扫描 pvscan vgscan lvscan create 创建 pvcreate v…...

ESP32S3 IDF 对 16路输入输出芯片MCP23017做了个简单的测试

这次还是使用了idf老版本4.4.7&#xff0c;上次用了5.3&#xff0c;感觉不好用&#xff0c;官方的MCP23017芯片是英文版&#xff0c;真的很难读明白&#xff0c;可能是我英语水平不够吧。先看看每个寄存器的功能&#xff1a; IODIRA 和 IODIRB: 输入/输出方向寄存器 IPOLA 和 I…...

【技术前沿】Flux.1部署教程入门--Stable Diffusion团队最前沿、免费的开源AI图像生成器

项目简介 FLUX.1 是一种新的开源图像生成模型。它由 Stable Diffusion 背后的团队 Black Forest Labs 开发。 官网中有以下功能开源供大家参考&#xff1a; FLUX.1 擅长在图像中准确再现文字&#xff0c;因此非常适合需要清晰文字或短语的设计。无论是标牌、书籍封面还是品牌…...

Redis 的 STREAM 和 RocketMQ 是两种不同的消息队列和流处理解决方案,它们在设计理念、功能和用途上有显著区别。以下是它们的主要区别:

20240813 Redis 的 STREAM 和 RocketMQ 是两种不同的消息队列和流处理解决方案&#xff0c;它们在设计理念、功能和用途上有显著区别。以下是它们的主要区别&#xff1a;1. 使用 Redis 的 Sorted Set 数据结构连接到 Redis示例用法添加事件获取滑动窗口内的事件移除过期事件连接…...

Visual Studio Code安装与C/C++语言运行(上)

Visual Studio Code&#xff08;VS Code&#xff09;作为微软开发的一款轻量级但功能强大的源代码编辑器&#xff0c;广泛应用于各种编程语言的开发&#xff0c;包括C/C。以下将详细介绍VS Code的安装过程以及与C/C语言运行环境的配置。 一、Visual Studio Code的安装 1. 准备…...

探索数据可视化,数据看板在各行业中的应用

数据可视化是一种通过图形化手段将数据呈现出来的技术&#xff0c;它将复杂的数据和信息转化为易于理解的图表、地图、仪表盘等视觉元素&#xff0c;使得数据的模式、趋势和关系更加直观地展现出来。通过数据可视化&#xff0c;用户可以快速识别重要信息、发现潜在问题&#xf…...

haralyzer 半自动,一次性少量数据采集快捷方法

使用场景&#xff1a;半自动&#xff0c;一次性少量数据采集需求在工作中还是不少遇到的&#xff0c;无论使用模拟的方式&#xff0c;或者破解都不太划算。其实这种需求&#xff0c;使用半自动爬虫是最简单的。不需要考虑网站反爬虫的问题&#xff0c;因为你使用的就是真实的浏…...

mall-admin-web-master前端项目下载依赖失败解决

碰壁后的总结 pythone 环境 2.XX版本&#xff0c;切记不要3.0以上的。node 16.x不能太高 错误案例 npm ERR! code 1 npm ERR! path D:\workspace\springBootMall\mall-admin-web-master\node_modules\node-sass npm ERR! command failed npm ERR! command C:\windows\system…...

【07】JVM是怎么实现invokedynamic的

在Java中&#xff0c;方法调用会被编译为invokeStatic&#xff0c;invokeSpecial&#xff0c;invokVirtual以及invokeInterface四种指令。这些指令与包含目标方法类名、方法名以及方法描述符的符号引用捆绑&#xff0c;在实际运行之前&#xff0c;JVM根据这个符号引用链接到具体…...

使用API有效率地管理Dynadot域名,查看参与的拍卖列表

前言 Dynadot是通过ICANN认证的域名注册商&#xff0c;自2002年成立以来&#xff0c;服务于全球108个国家和地区的客户&#xff0c;为数以万计的客户提供简洁&#xff0c;优惠&#xff0c;安全的域名注册以及管理服务。 Dynadot平台操作教程索引&#xff08;包括域名邮箱&…...

Linux 基本指令讲解

linux 基本指令 clear 清屏 Alt Enter 全屏/退出全屏 pwd 显示当前用户所处路径 cd 改变目录 cd /root/mikecd … 返回上级目录cd - 返回最近所处的路径cd ~ 直接返回当前用户自己的家目 roor 中&#xff1a;/root普通用户中&#xff1a;/home/mike mkdir 创建一个文件夹(d) …...

PRE_EMPHASIS

PRE_EMPASIS属性用于提高高频信号的信号完整性 其通过传输线遭受高频损耗。发射机 预加重&#xff08;pre_EMPASIS&#xff09;功能允许对某些信号驱动器进行预加重 I/O标准。 提示&#xff1a;发射机的预加重可以与接收机的均衡相结合&#xff0c;以提高 整体信号完整性。 理想…...

【QT常用技术讲解】多线程处理+全局变量处理异步事件并获取多个线程返回的结果

前言 QTableView加入勾选项后&#xff08;参考【QT常用技术讲解】QTableView添加QCheckBox、QPushButton&#xff09;&#xff0c;如果支持右键菜单功能&#xff0c;此时就有统一执行多个异步事件&#xff0c;并且统一输出到界面的需求了&#xff0c;本篇结合多线程共享全局变量…...

数组列表中的最大距离

给定 m 个数组&#xff0c;每个数组都已经按照升序排好序了。现在你需要从两个不同的数组中选择两个整数&#xff08;每个数组选一个&#xff09;并且计算它们的距离。两个整数 a 和 b 之间的距离定义为它们差的绝对值 |a-b| 。你的任务就是去找到最大距离 示例 1&#xff1a;…...

C语言新手小白详细教程(7)指针和指针变量

希望文章能够给到初学的你一些启发&#xff5e; 如果觉得文章对你有帮助的话&#xff0c;点赞 关注 收藏支持一下笔者吧&#xff5e; 阅读指南&#xff1a; 开篇说明1、指针的定义接下来我们用图示的形式来解释一下 指针&#xff1a;2、申明指针变量3、取地址符 &4、为指针…...

Kafka保证消息不丢失

Kafka保证消息不丢失 生产者发送消息到Broker丢失 设置异步发送 回调方法中的参数Exception e如果为空 代表发送成功,如果不为空代表发送失败出现异常 消息在Broker中丢失 kafka集群中存在分区机制 分区中分为leader和follower副本 leader负责读写,而follower只负责数据…...

数据结构+基数排序算法

一、问题描述 实现英文单词按字典序排列的基数排序算法 编写一个程序&#xff0c;采用基数排序方法将一组英文单词按字典顺序排 列。假设单词均由小写字母或空格构成&#xff0c;最长的单词有 MaxLen 个 字母&#xff0c;用相关数据进行测试并输出各趟的排序结果。 用例&#…...

C++ list【常用接口、模拟实现等】

1. list的介绍及使用 1.1 list的介绍 1.list是可以在常数范围内在任意位置进行插入和删除的序列式容器&#xff0c;并且该容器可以前后双向迭代。 2.list的底层是双向链表结构&#xff0c;双向链表中每个元素存储在互不相关的独立节点中&#xff0c;在节点中通过指针指向其前…...

12.面试题——Spring Boot

1.Spring Boot是什么&#xff1f; Spring Boot 是 Spring 开源组织下的子项目&#xff0c;是 Spring 组件一站式解决方案&#xff0c;主要是简化了使用 Spring 的难度&#xff0c;简省了繁重的配置&#xff0c;提供了各种启动器&#xff0c;开发者能快速上手。 2.为什么要用 …...

【前端VUE】npm i 出现版本错误等报错 简单直接解决命令

前端vue npm i 安装时出现 报错原因 在新版本的npm中&#xff0c;默认情况下&#xff0c;npm install遇到冲突的peerDependencies时将失败。 解决办法 使用--force或--legacy-peer-deps可解决这种情况。 --force 会无视冲突&#xff0c;并强制获取远端npm库资源&#xff0…...

精彩回顾 | 风丘科技亮相2024名古屋汽车工程博览会

2024年7月17日-19日&#xff0c;风丘科技联合德国IPETRONIK亮相日本名古屋汽车工程博览会。该展会面向汽车行业不同应用场景&#xff0c;包括新的eAxle、FCEV、ADAS、测试测量系统和ECU测试等相关技术&#xff0c;是一个专为活跃在汽车行业前线的工程师和研究人员举办的汽车技术…...

设计模式21-组合模式

设计模式21-组合模式&#xff08;Composite Pattern&#xff09; 写在前面 动机定义与结构定义结构主要类及其关系 C代码推导优缺点应用场景总结补充叶子节点不重载这三个方法叶子节点重载这三个方法结论 写在前面 数据结构模式 常常有一些组件在内部具有特定的数据结构。如何…...

如何选择深度学习的损失函数和激活函数

一概述 在深度学习中&#xff0c;损失函数&#xff08;Loss Function&#xff09;和激活函数&#xff08;Activation Function&#xff09;是两个至关重要的组件&#xff0c;它们共同影响着模型的训练效果和泛化能力。本文将简要介绍这两个概念&#xff0c;阐述选择它们的重要性…...

DATAX自定义KafkaWriter

因为datax目前不支持写入数据到kafka中&#xff0c;因此本文主要介绍如何基于DataX自定义KafkaWriter&#xff0c;用来同步数据到kafka中。本文偏向实战&#xff0c;datax插件开发理论宝典请参考官方文档&#xff1a; https://github.com/alibaba/DataX/blob/master/dataxPlug…...

Mybatis分页多表多条件查询

个人总结三种方式&#xff1a; Xml、queryWrapper、PageHelper第三方组件这三种方式进行查询&#xff1b; 方式一&#xff1a; xml中联表查询&#xff0c;在mapper中传参IPage<T>和条件Map&#xff08;这里用map装参数&#xff09;。 代码示例&#xff1a; Mapper层 M…...