使用RabbitMQ实现延迟消息的完整指南
在分布式系统中,消息队列通常用于解耦服务,RabbitMQ是一个广泛使用的消息队列服务。延迟消息(也称为延时队列或TTL消息)是一种常见的场景应用,特别适合处理某些任务在一段时间后执行的需求,如订单超时处理、延时通知等。
本文将以具体代码为例,展示如何使用RabbitMQ来实现延迟消息处理,涵盖队列和交换机的配置、消息的发送与接收以及死信队列的处理。
什么是延迟消息?
延迟消息是指消息在发送到队列后,经过设定的时间延迟再被消费。RabbitMQ 本身没有直接支持延迟队列的功能,但可以通过 TTL(Time To Live)+ 死信队列(Dead Letter Queue, DLQ) 的组合来实现。当消息超过TTL(消息存活时间)后,不会被立即消费,而是会被转发到绑定的死信队列,从而实现延迟处理。
RabbitMQ中的延迟消息原理
在RabbitMQ中,我们可以通过以下几个概念来实现延迟消息:
- TTL(Time To Live):可以为队列设置TTL,消息超过该时间后会被标记为“死信”。
- 死信队列(Dead Letter Queue):当消息在正常队列中过期或处理失败时,RabbitMQ可以将它们路由到一个死信队列,死信队列可以用来处理这些过期或未处理的消息。
- x-dead-letter-exchange 和 x-dead-letter-routing-key:可以通过配置队列的参数,将过期消息发送到一个专门的死信交换器,并根据指定的路由键转发到死信队列。
消息来到ttl.queue消息队列,过期时间内无人消费,消息来到死信交换机hmall.direct,在direct.queue消息队列无需等待。
1. RabbitMQ的配置
首先,我们需要配置两个队列和两个交换机:一个用于存放延时消息,另一个用于处理超时的死信消息。
package com.heima.stroke.configuration;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {// 延迟时间 单位:毫秒 (这里设为30秒)private static final long DELAY_TIME = 1000 * 30;// 行程超时队列public static final String STROKE_OVER_QUEUE = "STROKE_OVER_QUEUE";// 行程死信队列public static final String STROKE_DEAD_QUEUE = "STROKE_DEAD_QUEUE";// 行程超时队列交换机public static final String STROKE_OVER_QUEUE_EXCHANGE = "STROKE_OVER_QUEUE_EXCHANGE";// 行程死信队列交换机public static final String STROKE_DEAD_QUEUE_EXCHANGE = "STROKE_DEAD_QUEUE_EXCHANGE";// 行程超时交换机 Routing Keypublic static final String STROKE_OVER_KEY = "STROKE_OVER_KEY";// 行程死信交换机 Routing Keypublic static final String STROKE_DEAD_KEY = "STROKE_DEAD_KEY";/*** 声明行程超时队列,并设置其参数* x-dead-letter-exchange:绑定的死信交换机* x-dead-letter-routing-key:死信路由Key* x-message-ttl:消息的过期时间*/@Beanpublic Queue strokeOverQueue() {Map<String, Object> args = new HashMap<>(3);args.put("x-dead-letter-exchange", STROKE_DEAD_QUEUE_EXCHANGE);args.put("x-dead-letter-routing-key", STROKE_DEAD_KEY);args.put("x-message-ttl", DELAY_TIME); // 设置TTL为30秒return QueueBuilder.durable(STROKE_OVER_QUEUE).withArguments(args).build();}@Beanpublic DirectExchange strokeOverQueueExchange() {return new DirectExchange(STROKE_OVER_QUEUE_EXCHANGE);}@Beanpublic Binding bindingStrokeOverDirect() {return BindingBuilder.bind(strokeOverQueue()).to(strokeOverQueueExchange()).with(STROKE_OVER_KEY);}
}
解释:
TTL设置:我们通过x-message-ttl
设置消息的过期时间为30秒。
死信队列绑定:通过x-dead-letter-exchange
和x-dead-letter-routing-key
设置,当消息过期时,它会被转发到死信交换机,再路由到死信队列。
2. 生产者发送延迟消息
接下来,我们通过生产者向超时队列发送消息,这些消息将在TTL过期后转发到死信队列。
package com.heima.stroke.rabbitmq;import com.alibaba.fastjson.JSON;
import com.heima.modules.vo.StrokeVO;
import com.heima.stroke.configuration.RabbitConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class MQProducer {private final static Logger logger = LoggerFactory.getLogger(MQProducer.class);@AutowiredRabbitTemplate rabbitTemplate;/*** 发送延时消息到行程超时队列** @param strokeVO 消息体*/public void sendOver(StrokeVO strokeVO) {String mqMessage = JSON.toJSONString(strokeVO);logger.info("send timeout msg:{}", mqMessage);rabbitTemplate.convertAndSend(RabbitConfig.STROKE_OVER_QUEUE_EXCHANGE, RabbitConfig.STROKE_OVER_KEY, mqMessage);}
}
解释:
sendOver
方法将消息发送到超时队列,消息将在超时后进入死信队列。生产者不需要额外处理TTL或死信的配置,只需发送消息即可。
3. 消费者监听死信队列
当消息超过TTL后,将会被转发到死信队列。消费者需要监听死信队列并处理这些消息。
j
package com.heima.stroke.rabbitmq;import com.alibaba.fastjson.JSON;
import com.heima.modules.vo.StrokeVO;
import com.heima.stroke.configuration.RabbitConfig;
import com.heima.stroke.handler.StrokeHandler;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;@Component
public class MQConsumer {private final static Logger logger = LoggerFactory.getLogger(MQConsumer.class);@Autowiredprivate StrokeHandler strokeHandler;/*** 监听死信队列** @param message 消息体* @param channel RabbitMQ的Channel* @param tag 消息的Delivery Tag*/@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = RabbitConfig.STROKE_DEAD_QUEUE, durable = "true"),exchange = @Exchange(value = RabbitConfig.STROKE_DEAD_QUEUE_EXCHANGE),key = RabbitConfig.STROKE_DEAD_KEY)})@RabbitHandlerpublic void processStroke(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {StrokeVO strokeVO = JSON.parseObject(message.getBody(), StrokeVO.class);logger.info("get dead msg:{}", message.getBody());if (strokeVO == null) {return;}try {// 处理超时的行程消息strokeHandler.timeoutHandel(strokeVO);// 手动确认消息channel.basicAck(tag, false);} catch (Exception e) {e.printStackTrace();}}
}
解释:
@RabbitListener
注解绑定了死信队列的监听器。当消息被转发到死信队列时,该消费者会接收到消息。
使用 channel.basicAck(tag, false)
手动确认消息处理成功,确保消息不会重复消费。
4. 处理超时业务逻辑
在我们的业务中,当消息超时未处理时,将其状态设置为超时。
public void timeoutHandel(StrokeVO strokeVO) {// 获取司机行程ID和乘客行程IDString inviterTripId = strokeVO.getInviterTripId();String inviteeTripId = strokeVO.getInviteeTripId();// 检查邀请状态是否为未确认String inviteeStatus = redisHelper.getHash(HtichConstants.STROKE_INVITE_PREFIX, inviteeTripId, inviterTripId);String inviterStatus = redisHelper.getHash(HtichConstants.STROKE_INVITE_PREFIX, inviterTripId, inviteeTripId);if (String.valueOf(InviteState.UNCONFIRMED.getCode()).equals(inviteeStatus) &&String.valueOf(InviteState.UNCONFIRMED.getCode()).equals(inviterStatus)) {// 更新为超时状态redisHelper.addHash(HtichConstants.STROKE_INVITE_PREFIX, inviteeTripId, inviterTripId, String.valueOf(InviteState.TIMEOUT.getCode()));redisHelper.addHash(HtichConstants.STROKE_INVITE_PREFIX, inviterTripId, inviteeTripId, String.valueOf(InviteState.TIMEOUT.getCode()));}
}
相关文章:
使用RabbitMQ实现延迟消息的完整指南
在分布式系统中,消息队列通常用于解耦服务,RabbitMQ是一个广泛使用的消息队列服务。延迟消息(也称为延时队列或TTL消息)是一种常见的场景应用,特别适合处理某些任务在一段时间后执行的需求,如订单超时处理、…...
阿里员工:阿里工作7年至少得P7吧,快的都P8了,年薪100W是正常的,80才算及格...
上一篇:一线体面男的收入 年薪64W的阿里蚂蚁员工爆料:在阿里,工作7年至少得P7,快的都P8了,年薪100W才正常,80分才算及格。 其实,在大厂工作,听起来风光无限,但个中滋味&a…...
Django进一步掌握(10月22日)
一、请求响应对象 请求对象request 响应对象HttpResponse 二、HttpResponse常用属性 status设置HTTP响应状态码 status_code查询HTTP响应状态码 content_type设置响应的类型 write()写入响应内容 三、重定向 1、实现URl访问的重定向 (1)使用Ht…...
C++从入门到起飞之——红黑树封装map和set 全方位剖析!
目录 1、map和set的整体框架 2、map和set迭代器的实现 3、map支持[] 4、完整源码 set.h map.h RBTree.h 1、map和set的整体框架 因为map和set的底层都是红黑树,所以我们考虑用一个红黑树的类模版去实例化map和set对象!不过,map节点中存…...
【javax maven项目缺少_Maven的依赖管理 引入依赖】
javax maven项目缺少_Maven的依赖管理 引入依赖 Maven的依赖管理 - 引入依赖依赖管理(引入依赖)导入依赖 https://blog.csdn.net/weixin_28932089/article/details/112381468 Maven的依赖管理 - 引入依赖 依赖管理(引入依赖) 能够掌握依赖引入的配置方式 导入依赖 导入依赖练…...
手搓一个定时器
目录 1.什么是定时器 2.计时器的使用 3.手搓定时器 3.1定义一个TimerTask类 3.2定义一个Timer类 3.3实现schedule方法 3.4实现Timer的构造方法 3.4.1随时随地查看优先级队列中是否有任务要执行 3.4.2获取队首任务,并判断是否到执行时间 3.4.3到达执行时间…...
AI提示词工程优化Prompt-GPT使用手册(科普一键收藏史上最强攻略)
Prompt(提示),最初是 NLP 研究者为下游任务设计出来的一种任务专属的输入形式或模板。在 ChatGPT 引发大语言模型新时代之后,Prompt 指与大模型交互输入的代称。 随着大模型的进展,Prompt Engineering是一个持久的探索过程。 目录 什么是提示…...
【数据结构】快速排序(三种实现方式)
目录 一、基本思想 二、动图演示(hoare版) 三、思路分析(图文) 四、代码实现(hoare版) 五、易错提醒 六、相遇场景分析 6.1 ❥ 相遇位置一定比key要小的原因 6.2 ❥ 右边为key,左边先走 …...
利用前向勾子获取神经网络中间层的输出并将其进行保存(示例详解)
代码示例: # 激活字典,用于保存每次的中间特征 activation {}# 将 forward_hook 函数定义在 upsample_v2 外部 def forward_hook(name):def hook(module, input, output):activation[name] output.detach()return hookdef upsample_v2(in_channels, o…...
CTF-RE 从0到N: S盒
S盒(Substitution Box) 是密码学中的一种替换表,用于对输入数据进行非线性变换,以增加加密过程的复杂性。它主要用于对称加密算法中(例如AES、DES),作为加密轮次的一部分,对输入字节…...
MT-Pref数据集:包含18种语言的18k实例,涵盖多个领域。实验表明它能有效提升Tower模型在WMT23和FLORES基准测试中的翻译质量。
2024-10-10,由电信研究所、里斯本大学等联合创建MT-Pref数据集,它包含18种语言方向的18k实例,覆盖了2022年后的多个领域文本。通过在WMT23和FLORES基准测试上的实验,我们展示了使用MT-Pref数据集对Tower模型进行对齐可以显著提高翻…...
【C++ 真题】B2099 矩阵交换行
矩阵交换行 题目描述 给定一个 5 5 5 \times 5 55 的矩阵(数学上,一个 r c r \times c rc 的矩阵是一个由 r r r 行 c c c 列元素排列成的矩形阵列),将第 n n n 行和第 m m m 行交换,输出交换后的结果。 输入格式 输入共 6 6 6 …...
AAPL: Adding Attributes to Prompt Learning for Vision-Language Models
文章汇总 当前的问题 1.元标记未能捕获分类的关键语义特征 如下图(a)所示, π \pi π在类聚类方面没有显示出很大的差异,这表明元标记 π \pi π未能捕获分类的关键语义特征。我们进行简单的数据增强后,如图(b)所示,效果也是如…...
MySQLDBA修炼之道-开发篇(一)
三、开发基础 1. 数据模型 1.1 关系数据模型介绍 关于NULL 如果某个字段的值是未知的或未定义的,数据库会提供一个特殊的值NULL来表示。NULL值很特殊,在关系数据库中应该小心处理。例如查询语句“select*from employee where 绩效得分<85 or>绩…...
Spring MVC 知识点全解析
Spring MVC 知识点全解析 Spring MVC 是一个基于 Java 的请求驱动的 Web 框架,属于 Spring 框架的一部分,广泛用于构建企业级 Web 应用程序。本文将详细阐述 Spring MVC 的核心知识点,包括其工作原理、关键组件、配置、请求处理、数据绑定、…...
python 基于FastAPI实现一个简易的在线用户统计 服务
简易在线用户统计服务 概述 这是一个基于Python的FastAPI框架实现的服务,用于统计客户端的心跳信息,并据此维护在线用户列表以及记录活跃用户数。 功能特性 心跳接收:接受来自客户端的心跳包,以更新客户端的状态。在线用户统计…...
glibc中xdr的一个bug
本人在64位linux服务器上(centos7),发现xdr_u_long这个函数有个bug,就是数字的范围如果超过unsigned int的最大值(4294967295)时,xdr_u_long失败。 这个场景主要用在unix时间戳上面,比如一款软件,设置有效期为100年。…...
Android Framework定制sim卡插入解锁pin码的界面
文章目录 手机设置SIM卡pin码一、安卓手机二、苹果手机 Android Framework中SIM卡pin码代码定位pin码提示文本位置定位pin码java代码位置 定制pin码framework窗口数字按钮 手机设置SIM卡pin码 设置 SIM 卡 PIN 码可以提高手机的安全性,防止他人在未经授权的情况下使…...
cc2530 Basic RF 讲解 和点灯讲解(1_1)
1. Basic RF 概述 Basic RF 是 TI 提供的一套简化版的无线通信协议栈,旨在帮助开发者快速搭建无线通信系统。它基于 IEEE 802.15.4 标准的数据包收发,但只用于演示无线设备数据传输的基本方法,不包含完整功能的协议。Basic RF 的功能限制包括…...
Android H5页面性能分析策略
文章目录 引言一、拦截资源加载请求以优化性能二、通过JavaScript代码监控资源下载速度三、使用vConsole进行前端性能调试四、使用Chrome DevTools调试Android端五、通过抓包分析优化网络性能六、总结 引言 在移动应用开发中,H5页面的性能直接影响到用户体验。本文…...
【前端面试】Typescript
Typescript面试题目回答 Typescript有哪些常用类型? Typescript的常用类型包括: 基本类型:boolean(布尔类型)、number(数字类型)、string(字符串类型)。特殊类型:nul…...
程序语言的内存管理:垃圾回收GC(Java)、手动管理(C语言)与所有权机制(Rust)(手动内存管理、手动管理内存)
文章目录 程序语言的内存管理:垃圾回收、手动管理与所有权机制引言一、垃圾回收机制(GC)(Java)1. 什么是垃圾回收机制2. 垃圾回收的工作原理3. 优点与缺点4. 示例代码 二、手动管理内存的分配和释放(C语言&…...
研究生论文学习记录
文献检索 检索论文的网站 知网:找论文,寻找创新点paperswithcode :这个网站可以直接找到源代码 直接再谷歌学术搜索 格式:”期刊名称“ 关键词 在谷歌学术搜索特定期刊的关键词相关论文,可以使用以下几种方法&#…...
毕业设计选题:基于Django+Vue的图书馆管理系统
开发语言:Python框架:djangoPython版本:python3.7.7数据库:mysql 5.7数据库工具:Navicat11开发软件:PyCharm 系统展示 系统首页 图书馆界面 图书信息界面 个人中心界面 后台登录界面 管理员功能界面 用户…...
#网络安全#NGSOC与传统SOC的区别
NGSOC是Next Generation Security Operation Center(下一代安全运营中心)的缩写。 NGSOC安全运营服务基于态势感知与安全运营平台来开展监测分析等一系列的服务工作,旨在通过专业、高效的运营服务工作,帮助用户尽可能发挥NGSOC作…...
GCN+BiLSTM多特征输入时间序列预测(Pytorch)
目录 效果一览基本介绍程序设计参考资料 效果一览 基本介绍 GCNBiLSTM多特征输入时间序列预测(Pytorch) 可以做风电预测,光伏预测,寿命预测,浓度预测等。 Python代码,基于Pytorch编写 1.多特征输入单步预测…...
LinkedList和链表之刷题课(下)
1. 给定x根据x把链表分割,大的结点放在x后面,小的结点放在x前面 题目解析: 注意此时的pHead就是head(头节点的意思) 基本上就是给定一个链表,我们根据x的值来把这个链表分成俩部分,大的那部分放在x后面,小的那部分放在x前面,并且我们不能改变链表本来的顺序,比如下面的链表,我…...
ollama 在 Linux 环境的安装
ollama 在 Linux 环境的安装 介绍 他的存在在我看来跟 docker 的很是相似,他把市面上已经存在的大语言模型集合在一个仓库中,然后通过 ollama 的方式来管理这些大语言模型 下载 # 可以直接通过 http 的方式吧对应的 shell 脚本下载下来,然…...
C语言二刷指针篇
&取得变量的地址 printf("%p\n", &a); printf("%p\n", a); printf("%p\n", &a[0]); printf("%p\n", &a[1]); 前三个输出相同,a[0]和a[1]之间相差4 指针就是保存地址的变量,指针里放的是别的…...
LeetCode题练习与总结:回文对--336
一、题目描述 给定一个由唯一字符串构成的 0 索引 数组 words 。 回文对 是一对整数 (i, j) ,满足以下条件: 0 < i, j < words.length,i ! j ,并且words[i] words[j](两个字符串的连接)是一个回文…...
wordpress idstore/搜索引擎优化关键词的处理
1)将扫描枪电缆的一端与网络电缆的水晶头连接到扫描枪底部的插座。 当您听到“喀哒”声时,请再次将其拉回。 电缆不会滑出以表示已插入。另一端插入计算机上的相对端口号(此处以USB接口为例)。 此时,计算机的右下角将提醒您自动安装硬件配置驱动程序软件…...
家乐福官网/荥阳seo推广
昨天晚上在寝室写python多线程的时候,用了几个测试的程序,分别是递归方法求斐波那契数的值。分别采用单线程一个一个执行的方法和采用多线程调用的方法。观察所用的时间基本上差不多的。 然后我在每个函数内部加入sleep()函数以后,分别让它们…...
网站建设的功能定位/怎么从网上找客户
在LotusScript程序中如何计算程序所用的时间? 环境 产品: Lotus Domino Designer平台: Windows / PC软件: 5.x , 6.x 问题 在程序设计过程中,特别是在调试性能的时候,有时候需要计算某些操作所使用的时间,如何得到程序消耗的时间呢? 解答 可以通过NotesDateTime的TimeDiffer…...
云服务器快速安装wordpress/中国最新消息
在运行vue的项目时,报’vue-cli-service’ 不是内部或外部命令,也不是可运行的程序 或批处理文件。 原因是没有安装依赖:npm install,如果你下载的淘宝镜像,也可以cnpm install。 清理 node_modules 重新安装…...
唐山网站制作方案/电商项目策划书
二、 依赖属性的优先级 由于WPF 允许我们可以在多个地方设置依赖属性的值,所以我们就必须要用一个标准来保证值的优先级别。比如下面的例子中,我们在三个地方设置了按钮的背景颜色,那么哪一个设置才会是最终的结果呢?是Black、Red…...
目字形布局结构的网站/个人购买链接
拿到了自己阿里云服务器的日志,对其需要进行处理。class Read_Rizhi:def __init__(self,filename):self.filenamefilenamedef open_file(self):try:f open(self.filename, r, encodingutf-8)resuly {code: 1, result: f}except Exception as e:resuly {code: 0, …...