RabbitMQ延迟队列
目录
一、概念
二、使用场景
三、RabbitMQ 中的 TTL
(一)队列设置 TTL
(二)消息设置 TTL
(三)两者的区别
四、整合SpringBoot实现延迟队列
(一)创建项目
(二)添加依赖
(三)修改配置文件
(四)添加Swagger配置类
五、队列TTL
(一)代码架构图
(二)配置文件类
(三)消息生产者
(四)消息消费者
六、延迟队列优化
(一)代码架构图
(二)配置文件类
(三)消息生产者
七、Rabbitmq 插件实现延迟队列
(一)代码架构图
(二)配置文件类
(三)消息生产者
(四)消息消费者
八、总结
一、概念
二、使用场景
- 订单在十分钟之内未支付则自动取消
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
- 用户注册成功后,如果三天内没有登陆则进行短信提醒。
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
- 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

三、RabbitMQ 中的 TTL
(一)队列设置 TTL
Map<String, Object> arguments = new HashMap<>();
// 声明队列的TTL
arguments.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
(二)消息设置 TTL
另一种方式便是针对每条消息设置 TTL
rabbitTemplate.convertAndSend("X", "XC", message, msg -> {msg.getMessageProperties().setExpiration(ttl);return msg;
});
(三)两者的区别
四、整合SpringBoot实现延迟队列
(一)创建项目


(二)添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--RabbitMQ 依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--swagger--><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.9.2</version></dependency><!--RabbitMQ 测试依赖--><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency>
(三)修改配置文件
spring.rabbitmq.host=192.168.23.100
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
(四)添加Swagger配置类
@Configuration
@EnableSwagger2
public class SwaggerConfig {@Beanpublic Docket webApiConfig() {return new Docket(DocumentationType.SWAGGER_2).groupName("webapi").apiInfo(webApiInfo()).select().build();}public ApiInfo webApiInfo() {return new ApiInfoBuilder().title("rabbitmq 接口文档").description("本文档描述了 rabbitmq 微服务接口定义").version("1.0").contact(new Contact("enjoy6288", "http://atguigu.com","1551388580@qq.com")).build();}
}
五、队列TTL
(一)代码架构图
(二)配置文件类
@Configuration
public class TtlQueueConfig {public static final String X_EXCHANGE = "X";public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";public static final String Y_DEAD_LETTER_EXCHANGE = "Y";public static final String DEAD_LETTER_QUEUE = "QD";// 声明xExchange@Bean("xExchange")public DirectExchange xExchange() {return new DirectExchange(X_EXCHANGE);}// 声明yExchange@Bean("yExchange")public DirectExchange yExchange() {return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}// 声明队列A@Bean("queueA")public Queue queueA() {Map<String, Object> arguments = new HashMap<>();// 当前队列的死信交换机arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 当前队列的死信路由keyarguments.put("x-dead-letter-routing-key", "YD");// 声明队列的TTLarguments.put("x-message-ttl", 10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}// 声明队列A绑定交换机X@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange")DirectExchange xExchange) {return BindingBuilder.bind(queueA).to(xExchange).with("XA");}// 声明队列B@Bean("queueB")public Queue queueB() {Map<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 当前队列的死信路由keyarguments.put("x-dead-letter-routing-key", "YD");// 声明队列的TTLarguments.put("x-message-ttl", 40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}// 声明队列B绑定交换机X@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange")DirectExchange xExchange) {return BindingBuilder.bind(queueB).to(xExchange).with("XB");}// 声明死信队列@Bean("queueD")public Queue queueD() {return new Queue(DEAD_LETTER_QUEUE);}@Bean// 声明死信队列 QD 绑定关系public Binding queuedBindingY(@Qualifier("queueD")Queue queueD,@Qualifier("yExchange")DirectExchange exchange) {return BindingBuilder.bind(queueD).to(exchange).with("YD");}}
(三)消息生产者
@GetMapping("/sendMsg/{message}")public void sendMsg(@PathVariable String message) {log.info("当前时间是{},发送一条信息给两个 TTL 队列:{}", new Date().toString(), message);rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s的队列" + message);rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s的队列" + message);}
(四)消息消费者
@Component
@Slf4j
public class DeadLetterQueueConsumer {@RabbitListener(queues = "QD")public void receiveD(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);}
}
发起一个请求 http://localhost:8080/ttl/sendMsg/嘻嘻嘻
六、延迟队列优化
(一)代码架构图
在这里新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间,而是由生产者设置过期时间

(二)配置文件类
@Configuration
public class TtlQueueConfig {public static final String X_EXCHANGE = "X";public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";public static final String Y_DEAD_LETTER_EXCHANGE = "Y";public static final String DEAD_LETTER_QUEUE = "QD";public static final String QUEUE_C = "QC";// 声明xExchange@Bean("xExchange")public DirectExchange xExchange() {return new DirectExchange(X_EXCHANGE);}// 声明yExchange@Bean("yExchange")public DirectExchange yExchange() {return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}// 声明队列A@Bean("queueA")public Queue queueA() {Map<String, Object> arguments = new HashMap<>();// 当前队列的死信交换机arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 当前队列的死信路由keyarguments.put("x-dead-letter-routing-key", "YD");// 声明队列的TTLarguments.put("x-message-ttl", 10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}// 声明队列A绑定交换机X@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange")DirectExchange xExchange) {return BindingBuilder.bind(queueA).to(xExchange).with("XA");}// 声明队列B@Bean("queueB")public Queue queueB() {Map<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 当前队列的死信路由keyarguments.put("x-dead-letter-routing-key", "YD");// 声明队列的TTLarguments.put("x-message-ttl", 40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}// 声明队列B绑定交换机X@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange")DirectExchange xExchange) {return BindingBuilder.bind(queueB).to(xExchange).with("XB");}// 声明队列C@Bean("queueC")public Queue queueC() {Map<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 当前队列的死信路由keyarguments.put("x-dead-letter-routing-key", "YD");return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();}// 声明队列C绑定交换机X@Beanpublic Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange")DirectExchange xExchange) {return BindingBuilder.bind(queueC).to(xExchange).with("XC");}// 声明死信队列@Bean("queueD")public Queue queueD() {return new Queue(DEAD_LETTER_QUEUE);}@Bean// 声明死信队列 QD 绑定关系public Binding queuedBindingY(@Qualifier("queueD")Queue queueD,@Qualifier("yExchange")DirectExchange exchange) {return BindingBuilder.bind(queueD).to(exchange).with("YD");}}
(三)消息生产者
@GetMapping("/sendExpirationMsg/{message}/{ttl}")public void sendMsg(@PathVariable String message, @PathVariable String ttl) {log.info("当前时间是{},发送一条过期信息给两个 TTL 队列:{}", new Date().toString(), message);rabbitTemplate.convertAndSend("X", "XC", message, msg -> {msg.getMessageProperties().setExpiration(ttl);return msg;});}
七、Rabbitmq 插件实现延迟队列
关于插件的安装可以查看这篇文章Docker安装RabbitMq延迟队列插件

(一)代码架构图
在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:

(二)配置文件类
(三)消息生产者
/** 基于插件的延迟队列和延迟交换机*/
@Configuration
public class DelayedQueueConfig {public static final String DELAYED_QUEUE_NAME = "delayed.queue";public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";// 声明队列@Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE_NAME);}// 声明自定义交换机@Beanpublic CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);}// 声明队列和延迟交换机的绑定@Beanpublic Binding bindingDelayedQueue(@Qualifier("delayedQueue")Queue delayedQueue,@Qualifier("delayedExchange")CustomExchange exchange) {return BindingBuilder.bind(delayedQueue).to(exchange).with(DELAYED_ROUTING_KEY).noargs();}
}
(四)消息消费者
@Component
@Slf4j
public class DelayedQueueConsumer {@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)public void receiveDelayedQueue(String message) {log.info("当前时间:{}, 接收到消息: {}", new Date().toString(), message);}
}
第二个消息被先消费掉了,符合预期
八、总结
相关文章:
RabbitMQ延迟队列
目录 一、概念 二、使用场景 三、RabbitMQ 中的 TTL (一)队列设置 TTL (二)消息设置 TTL (三)两者的区别 四、整合SpringBoot实现延迟队列 (一)创建项目 (二&am…...
Java中常用的七种队列你了解多少?
文章目录Java中常用的七种队列你了解多少?ArrayBlockingQueue队列如何使用?添加元素到队列获取队列中的元素遍历队列LinkedBlockingQueue队列如何使用?1. 创建SynchronousQueue对象2. 添加元素到队列3. 获取队列中的元素4. 遍历队列SynchronousQueue队列…...
<Java获取时间日期工具类>常见八种场景(一)
一:自定义时间日期工具类常用的八种方式(整理): 0,getTimeSecondNum:时间日期转成秒数,常用于大小比较 1,getLastYearMonthLastDay:获取去年当月最后一天的时间日期 2,getLastYearM…...
接上一篇 对多个模型环形旋转进行优化 指定旋转位置
using System.Collections; using System.Collections.Generic; using UnityEngine; using DG.Tweening; public class ModelAnimal : MonoBehaviour { //记录鼠标滑动 public Vector2 lastPos;//鼠标上次位置 Vector2 currPos;//鼠标当前位置 Vector2 offset;//两次位置的偏移…...
Unity中获取地形的法线
序之前,生成了地形图:(42条消息) 从灰度图到地形图_averagePerson的博客-CSDN博客那末,地形的法线贴图怎么获取?大概分为两个部分吧,先拿到法线数据,再画到纹理中去。关于法线计算Unity - Scripting API: M…...
模型解释性:PFI、PDP、ICE等包的用法
本篇主要介绍几种其他较常用的模型解释性方法。 1. Permutation Feature Importance(PFI) 1.1 算法原理 置换特征重要性(Permutation Feature Importance)的概念很简单,其衡量特征重要性的方法如下:计算特征改变后模型预测误差的增加。如果打乱该特征的…...
spring常见面试题(2023最新)
目录前言1.spring是什么2.spring的设计核心是什么3.IOC和AOP面试题4.spring的优点和缺点5.spring中bean的作用域6.spring中bean的注入方式7.BeanFactory 和 ApplicationContext有什么区别?8.循环依赖的情况,怎么解决?9.spring中单例Bean是线程…...
华为OD机试题,用 Java 解【压缩报文还原】问题
最近更新的博客 华为OD机试题,用 Java 解【停车场车辆统计】问题华为OD机试题,用 Java 解【字符串变换最小字符串】问题华为OD机试题,用 Java 解【计算最大乘积】问题华为OD机试题,用 Java 解【DNA 序列】问题华为OD机试 - 组成最大数(Java) | 机试题算法思路 【2023】使…...
机器学习-BM-FKNCN、BM-FKNN等分类器对比实验
目录 一、简介和环境准备 二、算法简介 2.1四种方法类: 2.1.1FKNN 2.1.2FKNCN 2.1.3BM-FKNN 2.1.3BM-FKNCN 2.2数据预处理 2.3输出视图 2.4调用各种方法看准确率 2.4.1BM-FKNCN 2.4.2BM-FKNN 2.4.3FKNCN 2.4.4FKNN 2.4.5KNN 一、简介和环境准备 k…...
ChatGPT火了,对话式人工智能还能干嘛?
身兼数职的ChatGPT 从2022火到了2023 连日来一直是各大平台的热议对象 其实除了写诗、敲代码、处理文档 以ChatGPT为代表的 对话式人工智能 还有更重要的工作要做 对话式AI与聊天机器人 相信大多数人…...
十一、操作数栈的特点(Operand Sstack)
1.每一个独立的栈帧中除了包含局部变量表以外,还包含一个后进先出的操作数栈,也可以称之为表达式栈。 2.操作数栈,在方法执行过程中,根据字节码指令,往栈中写入数据,或提取数据,即入栈ÿ…...
拆解瑞幸新用户激活流程,如何让用户“动”起来?
Aha时刻 一个产品的拉新环节,是多种方式并存的;新用户可能来自于商务搭建了新的渠道,运营策划了新的活动,企划发布了新的广告,销售谈下了新的客户,市场推广了新的群体,以及产品本身的口碑传播,功能更新带来的自然流量。 这是一个群策群力的环节,不同的团队背负不同的K…...
tkinter界面的TCP通信/开启线程等待接收数据
前言 用简洁的语言写一个可以与TCP客户端实时通信的界面。之前做了一个项目是要与PLC进行信息交互的界面,在测试的时候就利用TCP客户端来实验,文末会附上TCP客户端。本文分为三部分,第一部分是在界面向TCP发送数据,第二部分是接收…...
华为OD机试题,用 Java 解【任务混部】问题
最近更新的博客 华为OD机试题,用 Java 解【停车场车辆统计】问题华为OD机试题,用 Java 解【字符串变换最小字符串】问题华为OD机试题,用 Java 解【计算最大乘积】问题华为OD机试题,用 Java 解【DNA 序列】问题华为OD机试 - 组成最大数(Java) | 机试题算法思路 【2023】使…...
看linux内核启动流程需要的汇编指令解释
一、指令 0.MRS 和MSR MRS 指令: 对状态寄存器CPSR和SPSR进行读操作。 MSR指令: 对状态寄存器CPSR和SPSR进行写操作。 1.adrp adrp x0, boot_args把boot_args的页基地址提取出来,放到x0中。 2.stp stp x21, x1, [x0]将 x21, x1 的值存入 x0寄存器记录的地址中…...
【巨人的肩膀】JAVA面试总结(二)
1、💪 目录1、💪1.0、什么是面向对象1.1、JDK、JRE、JVM之间的区别1.2、什么是字节码1.3、hashCode()与equals()之间的联系1.4、String、StringBuffer、StringBuilder的区别1.5、和equals方法的区别1.6、重载和重写的区别1.7、List和Set的区别1.8、Array…...
【网络安全入门】零基础小白必看!!!
看到很多小伙伴都想学习 网络安全 ,让自己掌握更多的 技能,但是学习兴趣有了,却发现自己不知道哪里有 学习资源◇瞬间兴致全无!◇ 😄在线找人要资料太卑微,自己上网下载又发现要收费0 🙃差点当…...
字节前端经典面试题(附答案)
有哪些可能引起前端安全的问题? 跨站脚本 (Cross-Site Scripting, XSS): ⼀种代码注⼊⽅式, 为了与 CSS 区分所以被称作 XSS。早期常⻅于⽹络论坛, 起因是⽹站没有对⽤户的输⼊进⾏严格的限制, 使得攻击者可以将脚本上传到帖⼦让其他⼈浏览到有恶意脚本的⻚⾯, 其注⼊⽅式很简…...
数据库管理工具的使用
目录 摘要 一、Navicat是什么? 二、使用步骤 1.如何下载与安装 2.如何连接远程数据库 总结 摘要 本文主要介绍数据库管理工具的使用 一、Navicat是什么? 它是一款数据库管理工具,将此工具连接数据库,你可以从中看到各种数据库的详细…...
让马斯克反悔的毫米波雷达,被国产雷达头部厂商木牛科技迭代到了5D时代
近日,特斯拉或将在其HW4.0硬件系统配置一枚高精度4D毫米波雷达的消息在外网刷屏。据分析,“纯视觉”信仰者马斯克之所以做出这样的决定,一方面是减配了雷达的特斯拉自动驾驶,表现不尽如人意;另一方面也跟毫米波雷达的技…...
在软件开发中正确使用MySQL日期时间类型的深度解析
在日常软件开发场景中,时间信息的存储是底层且核心的需求。从金融交易的精确记账时间、用户操作的行为日志,到供应链系统的物流节点时间戳,时间数据的准确性直接决定业务逻辑的可靠性。MySQL作为主流关系型数据库,其日期时间类型的…...
【Python】 -- 趣味代码 - 小恐龙游戏
文章目录 文章目录 00 小恐龙游戏程序设计框架代码结构和功能游戏流程总结01 小恐龙游戏程序设计02 百度网盘地址00 小恐龙游戏程序设计框架 这段代码是一个基于 Pygame 的简易跑酷游戏的完整实现,玩家控制一个角色(龙)躲避障碍物(仙人掌和乌鸦)。以下是代码的详细介绍:…...
Python:操作 Excel 折叠
💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...
关于iview组件中使用 table , 绑定序号分页后序号从1开始的解决方案
问题描述:iview使用table 中type: "index",分页之后 ,索引还是从1开始,试过绑定后台返回数据的id, 这种方法可行,就是后台返回数据的每个页面id都不完全是按照从1开始的升序,因此百度了下,找到了…...
【大模型RAG】Docker 一键部署 Milvus 完整攻略
本文概要 Milvus 2.5 Stand-alone 版可通过 Docker 在几分钟内完成安装;只需暴露 19530(gRPC)与 9091(HTTP/WebUI)两个端口,即可让本地电脑通过 PyMilvus 或浏览器访问远程 Linux 服务器上的 Milvus。下面…...
转转集团旗下首家二手多品类循环仓店“超级转转”开业
6月9日,国内领先的循环经济企业转转集团旗下首家二手多品类循环仓店“超级转转”正式开业。 转转集团创始人兼CEO黄炜、转转循环时尚发起人朱珠、转转集团COO兼红布林CEO胡伟琨、王府井集团副总裁祝捷等出席了开业剪彩仪式。 据「TMT星球」了解,“超级…...
【服务器压力测试】本地PC电脑作为服务器运行时出现卡顿和资源紧张(Windows/Linux)
要让本地PC电脑作为服务器运行时出现卡顿和资源紧张的情况,可以通过以下几种方式模拟或触发: 1. 增加CPU负载 运行大量计算密集型任务,例如: 使用多线程循环执行复杂计算(如数学运算、加密解密等)。运行图…...
Spring AI 入门:Java 开发者的生成式 AI 实践之路
一、Spring AI 简介 在人工智能技术快速迭代的今天,Spring AI 作为 Spring 生态系统的新生力量,正在成为 Java 开发者拥抱生成式 AI 的最佳选择。该框架通过模块化设计实现了与主流 AI 服务(如 OpenAI、Anthropic)的无缝对接&…...
【Go语言基础【12】】指针:声明、取地址、解引用
文章目录 零、概述:指针 vs. 引用(类比其他语言)一、指针基础概念二、指针声明与初始化三、指针操作符1. &:取地址(拿到内存地址)2. *:解引用(拿到值) 四、空指针&am…...
无人机侦测与反制技术的进展与应用
国家电网无人机侦测与反制技术的进展与应用 引言 随着无人机(无人驾驶飞行器,UAV)技术的快速发展,其在商业、娱乐和军事领域的广泛应用带来了新的安全挑战。特别是对于关键基础设施如电力系统,无人机的“黑飞”&…...

