当前位置: 首页 > news >正文

通过Redisson构建延时队列并实现注解式消费

目录

  • 一、序言
  • 二、延迟队列实现
    • 1、Redisson延时消息监听注解和消息体
    • 2、Redisson延时消息发布器
    • 3、Redisson延时消息监听处理器
  • 三、测试用例
  • 四、结语

一、序言

两个月前接了一个4万的私活,做一个线上商城小程序,在交易过程中不可避免的一个问题就是用户下单后的订单自动取消。

目前成熟的方案有通过RabbitMQ+死信队列RabbitMQ+延迟消息插件RocketMQ定时消息推送Redisson延时队列来实现。

考虑到商城的定位和用户体量,以及系统维护成本,其实完全没有必要引入消息中间件,借助Redis其实就可以轻松实现这个需求。

加上Redisson客户端本身就已经实现了很多分布式集合工具类,借助阻塞队列和延时队列就可轻松搞定。

当然,为了使用方便以及团队协作,顺便模仿@RabbitListener封装了一套基于注解的消息消费,废话不多说,直接上代码。


二、延迟队列实现

1、Redisson延时消息监听注解和消息体

延迟消息监听器定义:

/*** Redisson延时队列监听器** @author Nick Liu* @date 2024/11/13*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RedissonDelayedQueueListener {/*** 队列名称* @return*/String queueName();
}

消息体定义:

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RedisDelayedMsgDTO {/*** 消息内容*/private String msg;/*** 队列名称*/private String queueName;/*** 延时时间*/private long delayTime;private TimeUnit timeUnit;
}

2、Redisson延时消息发布器

@Slf4j
@Component
@RequiredArgsConstructor
public class RedissonDelayedMsgPublisher {private final RedissonClient redissonClient;/*** 发布延时信息* @param delayedMsgDTO*/public void publishDelayedMsg(RedisDelayedMsgDTO delayedMsgDTO) {log.info("开始发布延迟消息: {}", FastJsonUtils.toJsonString(delayedMsgDTO));RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(delayedMsgDTO.getQueueName());RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);delayedQueue.offer(delayedMsgDTO.getMsg(), delayedMsgDTO.getDelayTime(), delayedMsgDTO.getTimeUnit());}
}

这里我们借助RBlockingQueueRDelayedQueue来实现,只有当延迟消息快到期时,消费者才能从阻塞队列拉取到消息,否则消费者将一直阻塞。

3、Redisson延时消息监听处理器

这里我们定义了一个BeanPostProcessor 的实现,目的就是为了扫描Spring容器中所有带RedissonDelayedQueueListener注解的Bean实例和方法。

/*** Redisson延迟队列Bean后处理器* @author Nick Liu* @date 2025/1/3*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RedissonDelayedQueuePostProcessor implements BeanPostProcessor {private final RedissonClient redissonClient;@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {// 获取最终的目标运行时对象Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);Method[] methods = clazz.getDeclaredMethods();for (Method m : methods) {if (!m.isAnnotationPresent(RedissonDelayedQueueListener.class)) {continue;}// 如果Bean上的方法有Redisson队列监听注解,则启动一个线程监听队列RedissonDelayedQueueListener annotation = m.getAnnotation(RedissonDelayedQueueListener.class);CompletableFuture.runAsync(() -> {log.info("开始监听Redisson延时队列[{}]消息", annotation.queueName());while (true) {RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(annotation.queueName());redissonClient.getDelayedQueue(blockingQueue);try {String msg = blockingQueue.take();MDC.put(CommonConst.X_REQUEST_ID, SerialNoUtils.generateSimpleUUID());log.info("监听到队列[{}]延时消息: {}", annotation.queueName(), msg);m.invoke(bean, msg);MDC.remove(CommonConst.X_REQUEST_ID);} catch (Exception e) {log.error(e.getMessage(), e);}}});}return bean;}}

这里我们扫描到指定Bean的方法后,会开启一个异步线程,并轮询拉取延时消息,如果消息没过期,异步线程将会一直阻塞等待。


三、测试用例

/*** @author Nick Liu* @date 2025/2/2*/
@Slf4j
@RestController
@RequiredArgsConstructor
public class RedissonDelayedMsgController {private static final String DELAYED_QUEUE = "redisson:delayed:queue";private final RedissonDelayedMsgPublisher redissonDelayedMsgPublisher;@GetMapping("/delayed/msg")public ResponseEntity<RedisDelayedMsgDTO> publishDelayedMsg() {RedisDelayedMsgDTO redisDelayedMsgDTO = new RedisDelayedMsgDTO();redisDelayedMsgDTO.setQueueName(DELAYED_QUEUE);redisDelayedMsgDTO.setMsg("This is a delayed msg");redisDelayedMsgDTO.setDelayTime(10);redisDelayedMsgDTO.setTimeUnit(TimeUnit.SECONDS);redissonDelayedMsgPublisher.publishDelayedMsg(redisDelayedMsgDTO);return ResponseEntity.ok(redisDelayedMsgDTO);}@RedissonDelayedQueueListener(queueName = DELAYED_QUEUE)public void handleDelayedMsg(String msg) {log.info("Received delayed msg: {}", msg);}
}

启动服务后,Bean后处理器会启动异步线程监听延时消息,如下:

2025-02-02 16:46:04.271 INFO  [] [ForkJoinPool.commonPool-worker-2] [c.xlyj.common.message.RedissonDelayedQueuePostProcessor.lambda$postProcessAfterInitialization$0():44] - 开始监听Redisson延时队列[redisson:delayed:queue]消息

浏览器直接输入http://localhost:8000/delayed/msg发布延时消息,10s后消费者进行处理,如下:

2025-02-02 16:43:11.107 INFO  [e810d175b0e24e71a4b9e517366b4aa6] [ForkJoinPool.commonPool-worker-2] [c.xlyj.common.message.RedissonDelayedQueuePostProcessor.lambda$postProcessAfterInitialization$0():51] - 监听到队列[redisson:delayed:queue]延时消息: This is a delayed msg
2025-02-02 16:43:11.108 INFO  [e810d175b0e24e71a4b9e517366b4aa6] [ForkJoinPool.commonPool-worker-2] [com.xlyj.contoller.RedissonDelayedMsgController.handleDelayedMsg():40] - Received delayed msg: This is a delayed msg

四、结语

虽说通过Redisson实现的延迟队列也能实现支付订单的自动取消,但是可用性相比专业的消息中间件还是尚有不足的。

比如消息生产者发送消息没有确认机制,消息消费也没有确认机制,这两个环节都有可能导致消息丢失。

当然我们可以通过其它保障机制去补偿,比如再加上定时任务扫表,把扫描时间可以设置长一点,保证最终的一致性。

在大型项目中还是优先推荐专业的消息中间件去实现延时消息消费。

相关文章:

通过Redisson构建延时队列并实现注解式消费

目录 一、序言二、延迟队列实现1、Redisson延时消息监听注解和消息体2、Redisson延时消息发布器3、Redisson延时消息监听处理器 三、测试用例四、结语 一、序言 两个月前接了一个4万的私活&#xff0c;做一个线上商城小程序&#xff0c;在交易过程中不可避免的一个问题就是用户…...

SQL Server配置管理器无法连接到 WMI 提供程序

目录 第一步第二部 第一步 发现没有资源管理器 ​​​​ 在文件夹找到管理器 打开发现报这个错误 配置管理器无法连接到 WMI 提供程序第二部 https://blog.csdn.net/thb369208315/article/details/126954074...

Linux内核源码:ext4 extent详解

在 Linux 系统的庞大体系中&#xff0c;文件系统就像是一个井然有序的图书馆&#xff0c;而 ext4 文件系统则是这座图书馆中极为重要的 “藏书室”&#xff0c;它负责高效管理和存储数据。在 ext4 众多的奥秘中&#xff0c;ext4 extent 犹如一颗璀璨的明珠&#xff0c;起着关键…...

Maven jar 包下载失败问题处理

Maven jar 包下载失败问题处理 1.配置好国内的Maven源2.重新下载3. 其他问题 1.配置好国内的Maven源 打开⾃⼰的 Idea 检测 Maven 的配置是否正确&#xff0c;正确的配置如下图所示&#xff1a; 检查项⼀共有两个&#xff1a; 确认右边的两个勾已经选中&#xff0c;如果没有请…...

自指学习:AGI的元认知突破

文章目录 引言:从模式识别到认知革命一、自指学习的理论框架1.1 自指系统的数学定义1.2 认知架构的三重反射1.3 与传统元学习的本质区别二、元认知突破的技术路径2.1 自指神经网络架构2.2 认知效能评价体系2.3 知识表示的革命三、实现突破的关键挑战3.1 认知闭环的稳定性3.2 计…...

排序算法--希尔排序

希尔排序是插入排序的改进版本&#xff0c;适合中等规模数据排序&#xff0c;性能优于简单插入排序。 // 希尔排序函数 void shellSort(int arr[], int n) {// 初始间隔&#xff08;gap&#xff09;为数组长度的一半&#xff0c;逐步缩小for (int gap n / 2; gap > 0; gap …...

Java 2024年面试总结(持续更新)

目录 最近趁着金三银四面了五六家公司吧&#xff0c;也整理了一些问题供大家参考一下&#xff08;适合经验三年左右的&#xff09;。 面试问题&#xff08;答案是我自己总结的&#xff0c;不一定正确&#xff09;&#xff1a; 总结&#xff1a; 最近趁着金三银四面了五六家公…...

TensorFlow是个啥玩意?

TensorFlow是一个开源的机器学习框架&#xff0c;由Google开发。它可以帮助开发者构建和训练各种机器学习模型&#xff0c;包括神经网络和深度学习模型。TensorFlow的设计理念是使用数据流图来表示计算过程&#xff0c;其中节点表示数学运算&#xff0c;边表示数据流动。 Tens…...

不可信的搜索路径(CWE-426)

漏洞描述&#xff1a;程序使用关键资源时&#xff08;如动态链接库、执行文件、配置文件等&#xff09;没有明确的指定资源的路径&#xff0c;而是依赖操作系统去搜索资源&#xff0c;这种行为可能被攻击者利用&#xff0c;通过在搜索优先级较高的目录放置不良资源&#xff0c;…...

Linux——基础命令

$&#xff1a;普通用户 #&#xff1a;超级用户 cd 切换目录 cd 目录 &#xff08;进入目录&#xff09; cd ../ &#xff08;返回上一级目录&#xff09; cd ~ &#xff08;切换到当前用户的家目录&#xff09; cd - &#xff08;返回上次目录&#xff09; pwd 输出当前目…...

利用TensorFlow.js实现浏览器端机器学习:一个全面指南

引言 随着深度学习技术的不断发展&#xff0c;机器学习已从传统的服务器端运算逐渐转向了前端技术。TensorFlow.js 是 Google 推出的一个用于在浏览器中进行机器学习的开源库&#xff0c;它允许开发者在浏览器中直接运行机器学习模型&#xff0c;而无需依赖后端服务器。Tensor…...

利用HTML和css技术编写学校官网页面

目录 一&#xff0c;图例展示 二&#xff0c;代码说明 1&#xff0c;html部分&#xff1a; 【第一张图片】 【第二张图片】 【第三张图片】 2&#xff0c;css部分&#xff1a; 【第一张图片】 【第二张图片】 【第三张图片】 三&#xff0c;程序代码 一&#xff0c;…...

SpringSecurity密码编码器:使用BCrypt算法加密、自定义密码编码器

1、Spring Security 密码编码器 Spring Security 作为一个功能完备的安全性框架,一方面提供用于完成加密操作的 PasswordEncoder 组件,另一方面提供一个可以在应用程序中独立使用的密码模块。 1.1 PasswordEncoder 抽象接口 在 Spring Security 中,PasswordEncoder 接口代…...

笔记:新能源汽车零部件功率级测试怎么进行?

摘要:本文旨在梳理主机厂对新能源汽车核心零部件功率级测试需求,通过试验室的主流设备仪器集成,快速实现试验方案搭建,并体现测试测量方案的时效性、便捷性优势。目标是通过提升实现设备的有效集成能力、实现多设备测试过程的有效协同、流程化测试,可快速采集、分析当前数…...

ES6中的map和原生的对象有什么区别?

在 ES6 中&#xff0c;Map 和原生的对象&#xff08;Object&#xff09;都是用来存储键值对数据的集合&#xff0c;但它们有显著的区别。以下是它们之间的主要区别&#xff1a; 1. 键的类型 Object: 只允许使用字符串或符号作为键。其他类型的键&#xff08;如数字或对象&…...

2502vim,vim文本对象中文文档

介绍 文本块用户(textobj-user)是一个可帮助你毫不费力地创建自己的文本对象的Vim插件. 因为有许多陷阱需要处理,很难创建文本对象.此插件隐藏了此类细节,并提供了声明式定义文本对象的方法. 你可用正则式来定义简单的文本对象,或使用函数来定义复杂的文本对象.如… 文本对…...

spring security与gateway结合进行网关鉴权和授权

在Spring Cloud Gateway中集成Spring Security 6以实现鉴权和认证工作&#xff0c;可以在网关代理层完成权限校验和认证。这种架构通常被称为“边缘安全”或“API网关安全”&#xff0c;它允许你在请求到达后端服务之前进行集中式的安全控制。 以下是如何配置Spring Cloud Gat…...

LabVIEW在电机自动化生产线中的实时数据采集与生产过程监控

在电机自动化生产线中&#xff0c;实时数据采集与生产过程监控是确保生产效率和产品质量的重要环节。LabVIEW作为一种强大的图形化编程平台&#xff0c;可以有效实现数据采集、实时监控和自动化控制。详细探讨如何利用LabVIEW实现这一目标&#xff0c;包括硬件选择、软件架构设…...

log4j2日志配置文件

log4j2配置文件每个项目都会用到,记录一个比较好用的配置文件,方便以后使用时调取,日志输出级别为debug,也可以修改 <?xml version"1.0" encoding"UTF-8"?> <Configuration monitorInterval"180" packages""><prope…...

用Deepseek做EXCLE文件对比

背景是我想对比两个PO系统里的一个消息映射&#xff0c;EDI接口的mapping有多复杂懂的都懂&#xff0c;它还不支持跨系统版本对比&#xff0c;所以我费半天劲装NWDS&#xff0c;导出MM到excle&#xff0c;然后问题来了&#xff0c;我需要对比两个excel文件里的内容&#xff0c;…...

未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?

编辑&#xff1a;陈萍萍的公主一点人工一点智能 未来机器人的大脑&#xff1a;如何用神经网络模拟器实现更智能的决策&#xff1f;RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战&#xff0c;在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…...

04-初识css

一、css样式引入 1.1.内部样式 <div style"width: 100px;"></div>1.2.外部样式 1.2.1.外部样式1 <style>.aa {width: 100px;} </style> <div class"aa"></div>1.2.2.外部样式2 <!-- rel内表面引入的是style样…...

NXP S32K146 T-Box 携手 SD NAND(贴片式TF卡):驱动汽车智能革新的黄金组合

在汽车智能化的汹涌浪潮中&#xff0c;车辆不再仅仅是传统的交通工具&#xff0c;而是逐步演变为高度智能的移动终端。这一转变的核心支撑&#xff0c;来自于车内关键技术的深度融合与协同创新。车载远程信息处理盒&#xff08;T-Box&#xff09;方案&#xff1a;NXP S32K146 与…...

C++:多态机制详解

目录 一. 多态的概念 1.静态多态&#xff08;编译时多态&#xff09; 二.动态多态的定义及实现 1.多态的构成条件 2.虚函数 3.虚函数的重写/覆盖 4.虚函数重写的一些其他问题 1&#xff09;.协变 2&#xff09;.析构函数的重写 5.override 和 final关键字 1&#…...

Vite中定义@软链接

在webpack中可以直接通过符号表示src路径&#xff0c;但是vite中默认不可以。 如何实现&#xff1a; vite中提供了resolve.alias&#xff1a;通过别名在指向一个具体的路径 在vite.config.js中 import { join } from pathexport default defineConfig({plugins: [vue()],//…...

嵌入式学习之系统编程(九)OSI模型、TCP/IP模型、UDP协议网络相关编程(6.3)

目录 一、网络编程--OSI模型 二、网络编程--TCP/IP模型 三、网络接口 四、UDP网络相关编程及主要函数 ​编辑​编辑 UDP的特征 socke函数 bind函数 recvfrom函数&#xff08;接收函数&#xff09; sendto函数&#xff08;发送函数&#xff09; 五、网络编程之 UDP 用…...

QT开发技术【ffmpeg + QAudioOutput】音乐播放器

一、 介绍 使用ffmpeg 4.2.2 在数字化浪潮席卷全球的当下&#xff0c;音视频内容犹如璀璨繁星&#xff0c;点亮了人们的生活与工作。从短视频平台上令人捧腹的搞笑视频&#xff0c;到在线课堂中知识渊博的专家授课&#xff0c;再到影视平台上扣人心弦的高清大片&#xff0c;音…...

Vue 3 + WebSocket 实战:公司通知实时推送功能详解

&#x1f4e2; Vue 3 WebSocket 实战&#xff1a;公司通知实时推送功能详解 &#x1f4cc; 收藏 点赞 关注&#xff0c;项目中要用到推送功能时就不怕找不到了&#xff01; 实时通知是企业系统中常见的功能&#xff0c;比如&#xff1a;管理员发布通知后&#xff0c;所有用户…...

鸿蒙HarmonyOS 5军旗小游戏实现指南

1. 项目概述 本军旗小游戏基于鸿蒙HarmonyOS 5开发&#xff0c;采用DevEco Studio实现&#xff0c;包含完整的游戏逻辑和UI界面。 2. 项目结构 /src/main/java/com/example/militarychess/├── MainAbilitySlice.java // 主界面├── GameView.java // 游戏核…...

Xcode 16 集成 cocoapods 报错

基于 Xcode 16 新建工程项目&#xff0c;集成 cocoapods 执行 pod init 报错 ### Error RuntimeError - PBXGroup attempted to initialize an object with unknown ISA PBXFileSystemSynchronizedRootGroup from attributes: {"isa">"PBXFileSystemSynchro…...