RabbitMQ实现死信队列
目录
- 死信队列是什么
- 怎样实现一个死信队列
- 说明
- 实现过程
- 导入依赖
- 添加配置
- 编写mq配置类
- 添加业务队列的消费者
- 添加死信队列的消费者
- 添加消息发送者
- 添加消息测试类
- 测试
- 死信队列的应用场景
- 总结
死信队列是什么
“死信”是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:
- 消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
- 消息在队列的存活时间超过设置的TTL时间。
- 消息队列的消息数量已经超过最大队列长度。
那么该消息将成为“死信”。
“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。
怎样实现一个死信队列
说明
配置死信队列大概可以分为三个步骤:
1.配置业务队列,绑定到业务交换机上
2.为业务队列配置死信交换机和路由key
3.为死信交换机配置死信队列
注意,并不是直接声明一个公共的死信队列,然后所以死信消息就自己跑到死信队列里去了。而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由key。
实现过程
导入依赖
<!--RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
添加配置
spring: #rabbitmqrabbitmq:host: 83.136.16.134password: guestusername: guestlistener:type: simplesimple:default-requeue-rejected: falseacknowledge-mode: manual
编写mq配置类
代码里面有详细说明,这里不在赘述。
package com.miaosha.study.mq;import com.sun.org.apache.regexp.internal.RE;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** @Author: laz* @CreateTime: 2023-02-27 09:16* @Version: 1.0*/
@Configuration
public class RabbitmqConfig {/*** 业务交换机*/public static final String BUSINESS_EXCHANGE_NAME = "business.exchange";/*** 业务队列a*/public static final String BUSINESS_QUEUEA_NAME = "business.queue.a";/*** 业务交换机b*/public static final String BUSINESS_QUEUEB_NAME = "business.queue.b";/*** 死信交换机*/public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";/*** 死信队列a*/public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.queue.a";/*** 死信队列b*/public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.queue.b";/*** 死信队列路由键a*/public static final String DEAD_LETTER_QUEUEA_ROUNTING_KEY_NAME = "dead.letter.queue.a.rounting.key";/*** 死信队列路由键b*/public static final String DEAD_LETTER_QUEUEB_ROUNTING_KEY_NAME = "dead.letter.queue.b.rounting.key";/*** 申明业务交换机* @return*/@Beanpublic FanoutExchange businessExchange(){return new FanoutExchange(BUSINESS_EXCHANGE_NAME);}/*** 申明死信交换机* @return*/@Beanpublic DirectExchange deadletterExchange(){return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);}/*** 申明业务队列a* @return*/@Beanpublic Queue queuea(){Map<String,Object> map = new HashMap<>();//绑定死信交换机map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME);//绑定的死信路由键map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEA_ROUNTING_KEY_NAME);return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(map).build();}/*** 申明业务队列b* @return*/@Beanpublic Queue queueb(){Map<String,Object> map = new HashMap<>();//绑定死信交换机map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME);//绑定的死信路由键map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEB_ROUNTING_KEY_NAME);return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(map).build();}/*** 申明死信队列a* @return*/@Beanpublic Queue deadletterQueuea(){return new Queue(DEAD_LETTER_QUEUEA_NAME);}/*** 申明死信队列b* @return*/@Beanpublic Queue deadletterQueueb(){return new Queue(DEAD_LETTER_QUEUEB_NAME);}/*** 队列a绑定到业务交换机* @return*/@Beanpublic Binding businessBindinga(){return BindingBuilder.bind(queuea()).to(businessExchange());}/*** 队列b绑定到业务交换机* @return*/@Beanpublic Binding businessBindingb(){return BindingBuilder.bind(queueb()).to(businessExchange());}/*** 死信队列a绑定到死信交换机* @return*/@Beanpublic Binding deadletterBindinga(){return BindingBuilder.bind(deadletterQueuea()).to(deadletterExchange()).with(DEAD_LETTER_QUEUEA_ROUNTING_KEY_NAME);}/*** 死信队列b绑定到死信交换机* @return*/@Beanpublic Binding deadletterBindingB(){return BindingBuilder.bind(deadletterQueueb()).to(deadletterExchange()).with(DEAD_LETTER_QUEUEB_ROUNTING_KEY_NAME);}
}
添加业务队列的消费者
package com.miaosha.study.mq;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;import static com.miaosha.study.mq.RabbitmqConfig.BUSINESS_QUEUEA_NAME;
import static com.miaosha.study.mq.RabbitmqConfig.BUSINESS_QUEUEB_NAME;/*** @Author: laz* @CreateTime: 2023-02-27 09:53* @Version: 1.0*/
@Slf4j
@Component
public class RabbitmqReceiver {/*** 监听业务队列a* @param message*/@RabbitListener(queues = BUSINESS_QUEUEA_NAME)public void queuea(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());log.info("业务队列A接受到消息【{}】",msg);boolean ack = true;Exception exception = null;try {//这里模拟业务逻辑出现异常的情况if (msg.contains("fail")){throw new RuntimeException("dead letter exception");}} catch (Exception e){ack = false;exception = e;}//当ack为false时(业务逻辑出现异常),说明当前消息消费异常,这里直接放入死信队列if (!ack){log.error("业务队列A消费发生异常,error msg:{}", exception.getMessage());/*** void basicNack(long deliveryTag, boolean multiple, boolean requeue)* 参数一:当前消息的唯一id* 参数二:是否针对多条消息* 参数三:是否从新入队列*/channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);} else {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}/*** 监听业务队列b* @param msg*/@RabbitListener(queues = BUSINESS_QUEUEB_NAME)public void queueb(Message msg,Channel channel) throws Exception{String str = new String(msg.getBody());log.info("业务队列B接受到消息【{}】",str);channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);}
}
添加死信队列的消费者
package com.miaosha.study.mq;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;import static com.miaosha.study.mq.RabbitmqConfig.*;/*** @Author: laz* @CreateTime: 2023-02-27 09:58* @Version: 1.0*/
@Slf4j
@Component
public class DeadLetterReceiver {/*** 监听业务队列a* @param msg*/@RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)public void queuea(Message msg, Channel channel) throws IOException {String str = new String(msg.getBody());log.info("死信队列A接受到消息【{}】",str);channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);log.info("死信消息properties:{}", msg.getMessageProperties());}/*** 监听业务队列b* @param msg*/@RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)public void queueb(Message msg, Channel channel) throws IOException {String str = new String(msg.getBody());log.info("死信队列B接受到消息【{}】",str);channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);log.info("死信消息properties:{}", msg.getMessageProperties());}
}
添加消息发送者
package com.miaosha.study.mq;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import static com.miaosha.study.mq.RabbitmqConfig.BUSINESS_EXCHANGE_NAME;/*** @Author: laz* @CreateTime: 2023-02-27 09:49* @Version: 1.0*/
@Component
public class RabbitmqSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(String msg){rabbitTemplate.convertAndSend(BUSINESS_EXCHANGE_NAME,"",msg);}
}
添加消息测试类
package com.miaosha.study.controller;import com.miaosha.study.mq.RabbitmqSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @Author: laz* @CreateTime: 2023-02-27 09:59* @Version: 1.0*/
@RestController
@RequestMapping("mq")
public class TestController {@Autowiredprivate RabbitmqSender rabbitmqSender;@RequestMapping("testDeadLetterQueue/{msg}")public void testDeadLetterQueue(@PathVariable("msg")String msg){rabbitmqSender.sendMsg(msg);}
}
测试
运行项目,访问:http://localhost:8081/mq/testDeadLetterQueue/msg
可以看到,此时只有业务消费者消费了消息,死信队列并没有消费到消息。
然后根据消费者里面的逻辑,我们发送一条 ‘fail’的消息,再次测试
访问:http://localhost:8081/mq/testDeadLetterQueue/fail
可以看到,死信队列a已收到消息。到此实现死信队列的流程就通了。
注意:我们的死信消息MessageProperties
中的内容比较多,代表的含义分别是:
字段名 | 含义 |
---|---|
x-first-death-exchange | 第一次被抛入的死信交换机的名称 |
x-first-death-reason | 第一次成为死信的原因,rejected:消息在重新进入队列时被队列拒绝,由于default-requeue-rejected 参数被设置为false。expired :消息过期。maxlen : 队列内消息数量超过队列最大容量 |
x-first-death-queue | 第一次成为死信前所在队列名称 |
x-death | 历次被投入死信交换机的信息列表,同一个消息每次进入一个死信交换机,这个数组的信息就会被更新 |
死信队列的应用场景
一般用在较为重要的业务队列中,确保未被正确消费的消息不被丢弃,一般发生消费异常可能原因主要有由于消息信息本身存在错误导致处理异常,处理过程中参数校验异常,或者因网络波动导致的查询异常等等,当发生异常时,当然不能每次通过日志来获取原消息,然后让运维帮忙重新投递消息。通过配置死信队列,可以让未正确处理的消息暂存到另一个队列中,待后续排查清楚问题后,编写相应的处理代码来处理死信消息,这样比手工恢复数据要好太多了。
总结
死信队列其实并没有什么神秘的地方,不过是绑定在死信交换机上的普通队列,而死信交换机也只是一个普通的交换机,不过是用来专门处理死信的交换机。
死信消息的生命周期:
- 业务消息被投入业务队列
- 消费者消费业务队列的消息,由于处理过程中发生异常,于是进行了nck或者reject操作
- 被nck或reject的消息由RabbitMQ投递到死信交换机中
- 死信交换机将消息投入相应的死信队列
- 死信队列的消费者消费死信消息
本篇文章到此结束!希望对您有所帮助。
相关文章:
RabbitMQ实现死信队列
目录死信队列是什么怎样实现一个死信队列说明实现过程导入依赖添加配置编写mq配置类添加业务队列的消费者添加死信队列的消费者添加消息发送者添加消息测试类测试死信队列的应用场景总结死信队列是什么 “死信”是RabbitMQ中的一种消息机制,当你在消费消息时&#…...
【Linux】安装Tomcat教程
目录 1.上传安装包 2.解压安装包 3.启动Tomcat 4.查看启动日志 5.查看进程 6.开放端口 7.停止Tomcat 1.上传安装包 使用FinalShell自带的上传工具将Tomcat的二进制发布包上传到Linux(与前面上传JDK安装包步骤 一致)。 2.解压安装包 将上传上来的安装包解压到指定目录…...
学习笔记之Vuex(五)
Vuex(五)Vuex一、什么是Vuex二、Vuex工作原理三、搭建Vuex环境四、求和案例分析4.1 求和案例——vue实现4.2 求和案例——vuex实现(五)Vuex 一、什么是Vuex 1.概念 在Vue中实现集中式状态(数据)管理的一…...
SSM知识快速复习
SSM知识快速复习SpringIOCDIIOC容器在Spring中的实现常用注解Autowired注解的原理AOP相关术语作用动态代理实现原理事务Transactional事务属性:只读事务属性:超时事务属性:回滚策略事务属性:事务隔离级别事务属性:事务…...
【Linux】安装MySQL
目录 1.检测当前系统是否安装过MySQL相关数据库 2. 卸载现有的MySQL数据库 3.上传解压 4.顺序安装rpm包 5.启动MySQL 6.查看临时密码 7.登录MySQL 8.开放端口 1.检测当前系统是否安装过MySQL相关数据库 需要通过rpm相关指令,来查询当前系统中是否存在已安…...
【深度学习】手把手教你开发自己的深度学习模板
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录前言1数据相关1.1 数据初探1.2.数据处理1.3 数据变形2 定义网络,优化函数3. 训练前言 入坑2年后,重新梳理之前的知识,发现其实需…...
一个诡异的 Pulsar InterruptedException 异常
背景 今天收到业务团队反馈线上有个应用往 Pulsar 中发送消息失败了,经过日志查看得知是发送消息时候抛出了 java.lang.InterruptedException 异常。 和业务沟通后得知是在一个 gRPC 接口中触发的消息发送,大约持续了半个小时的异常后便恢复正常了&…...
Java岗面试题--Java并发(volatile 专题)
目录1. 面试题一:谈谈 volatile 的使用及其原理补充:内存屏障volatile 的原理2. 面试题二:volatile 为什么不能保证原子性3. 面试题三:volatile 的内存语义4. 面试题四:volatile 的实现机制5. 面试题五:vol…...
Java---打家劫舍ⅠⅡ
目录 打家劫舍Ⅰ 题目分析 代码一 代码二 打家劫舍Ⅱ 打家劫舍Ⅰ 你是一个专业的小偷,计划偷窃沿街的房屋。每间房内都藏有一定的现金,影响你偷窃的唯一制约因素就是相邻的房屋装有相互连通的防盗系统,如果两间相邻的房屋在同一晚上被…...
MySQL Lesson4
1:关于查询结果集的去重(distinct) select distinct job from emp; **distinct只能出现在所有字段的最前面。所表示的含有是所有的结果联合起来去重。 select distinct deptno,job from emp order by deptno; select count(distinct job)from…...
浅谈权限获取方法之文件上传
概述 文件上传漏洞是发生在有上传功能的应用中,如果应用程序对用户的上传文件没有控制或者存在缺陷,攻击者可以利用应用上传功能存在的缺陷,上传木马、病毒等有危害的文件到服务器上面,控制服务器。 漏洞成因及危害 文件上传漏…...
资产设备防拆标签安全防护和资产定位解决方案
随着社会经济的发展和高新技术的日新月异,对各方面的安全要求也在不断地提高,以物联网安防、入侵报警和出入口控制、应急系统等为主的安全防范系统日益成为各类文物场所智能化弱电工程不可或缺的组成部分,是重点资产管理场所内加强管理和安全…...
企业电子招标采购源码之电子招标投标全流程!
随着各级政府部门的大力推进,以及国内互联网的建设,电子招投标已经逐渐成为国内主流的招标投标方式,但是依然有很多人对电子招投标的流程不够了解,在具体操作上存在困难。虽然各个交易平台的招标投标在线操作会略有不同࿰…...
【考研408】计算机网络笔记
文章目录计算机网络体系结构计算机网络概述计算机网络的组成计算机网络的功能计算机网络的分类计算机网络的性能指标课后习题计算机网络体系结构与参考模型计算机网络协议、接口、服务的概念ISO/OSI参考模型和TCP/IP模型课后习题物理层通信基础基本概念奈奎斯特定理与香农定理编…...
[C++]继承
🥁作者: 华丞臧 📕专栏:【C】 各位读者老爷如果觉得博主写的不错,请诸位多多支持(点赞收藏关注)。如果有错误的地方,欢迎在评论区指出。 推荐一款刷题网站 👉LeetCode 文章目录一、继承…...
优化知识管理方法丨整理零碎信息,提高数据价值
信息流时代,知识成集合倍数增长,看似我们学习了很多知识,但知识零碎无系统,知识之间缺乏联系,没有深度,所以虽然你很努力,但你发现自己的能力增长特别缓慢,你需要整理知识将零散的知…...
Windows操作系统的体系结构、运行环境和运行状态
我是荔园微风,作为一名在IT界整整25年的老兵,今天我们来重新审视一下Windows这个我们熟悉的不能再熟悉的系统。说Windows操作系统的运行环境和运行状态,首先要介绍一下Windows操作系统的体系结构,然后再要说到最重要的两个概念:核…...
【工作笔记】Http响应头过长
起因 突然有测试小伙伴反馈进公司官网主页会白屏,但只是个例不是普遍现象 查监控发现没监控到异常问题 查了很久(这个很久单指对于线上问题来说)才定位是请求的异常,因为这套系统的异常用的是 ExceptionHandler,这也导…...
hive建分区表,分桶表,内部表,外部表
hive建分区表,分桶表,内部表,外部表 一、概念介绍 Hive是基于Hadoop的一个工具,用来帮助不熟悉 MapReduce的人使用SQL对存储在Hadoop中的大规模数据进行数据提取、转化、加载。Hive数据仓库工具能将结构化的数据文件映射为一张数…...
【分享】灌溉制度设计小程序VB源代码
说明 根据作物需水特性和当地气候、土壤、农业技术及灌水技术等因素制定的灌水方案。主要内容包括灌水次数、灌水时间、灌水定额和灌溉定额。灌溉制度是规划、设计灌溉工程和进行灌区运行管理的基本资料,是编制和执行灌区用水计划的重要依据。 1—计划湿润土层允…...
PR9268/300-000库存现货振动传感器 雄霸工控
PR9268/300-000库存现货振动传感器 雄霸工控PR9268/300-000库存现货振动传感器 雄霸工控SDM010PR9670/110-100PR9670/010-100PR9670/003-000PR9670/002-000PR9670/001-000PR9670/000-000PR9600/014-000PR9600/011-000PR9376/010-021PR9376/010-011PR9376/010-011PR9376/010-001…...
浅谈模型评估选择及重要性
作者:王同学 来源:投稿 编辑:学姐 模型评估作为机器学习领域一项不可分割的部分,却常常被大家忽略,其实在机器学习领域中重要的不仅仅是模型结构和参数量,对模型的评估也是至关重要的,只有选择那…...
多线程的初识和创建
✨个人主页:bit me👇 ✨当前专栏:Java EE初阶👇 ✨每日一语:知不足而奋进,望远山而前行。 目 录💤一. 认识线程(Thread)🍎1. 线程的引入🍏2. 线程…...
一句话设计模式3:工厂模式
工厂模式:new多种对象的简单方式。 文章目录 工厂模式:new多种对象的简单方式。前言一、两种工厂模式二、如何实现工厂模式1. 简单工厂2. 抽象工厂总结前言 工厂模式可以说比较常见的设计模式,仔细观察在很多源码中都有此种模式的应用;用来解决创建对象的创建问题; 一、两种工…...
【Codeforces Round #853 (Div. 2)】C. Serval and Toxel‘s Arrays【题解】
题目 Toxel likes arrays. Before traveling to the Paldea region, Serval gave him an array aaa as a gift. This array has nnn pairwise distinct elements. In order to get more arrays, Toxel performed mmm operations with the initial array. In the iii-th opera…...
100天精通Python(数据可视化篇)——第77天:数据可视化入门基础大全(万字总结+含常用图表动图展示)
文章目录1. 什么是数据可视化?2. 为什么会用数据可视化?3. 数据可视化的好处?4. 如何使用数据可视化?5. Python数据可视化常用工具1)Matplotlib绘图2)Seaborn绘图3)Bokeh绘图6. 常用图表介绍及其…...
PMP考前冲刺2.27 | 2023新征程,一举拿证
题目1-2:1.在产品开发过程中,项目发起人向项目团队推荐了一种新材料,新材料比现有的材料更便宜而且性能更好。如果团队采用新材料,不但有利于提升产品质量,而且可以显著降低成本。项目经理应该怎么办?A.采用新材料&am…...
【C++】map和set的封装(红黑树)
map和set的封装一、介绍二、stl源码剖析三、仿函数获取数值四、红黑树的迭代器五、map的[]5.1 普通迭代器转const迭代器六、set源码七、map源码八、红黑树源码一、介绍 首先要知道map和set的底层都是用红黑树实现的 【数据结构】红黑树 set只需要一个key,但是map既…...
【批处理脚本】-1.14-移动文件(夹)命令move
"><--点击返回「批处理BAT从入门到精通」总目录--> 共10页精讲(列举了所有move的用法,图文并茂,通俗易懂) 在从事“嵌入式软件开发”和“Autosar工具开发软件”过程中,经常会在其集成开发环境IDE(CodeWarrior,S32K DS,Davinci,EB Tresos,ETAS…)中,…...
逻辑地址和物理地址转换
在操作系统的学习中,很多抵挡都会涉及虚拟地址转换为物理地址的计算,本篇就简单介绍一下在分页存储管理、分段存储管理、磁盘存储管理中涉及的地址转换问题。 虚拟地址与物理地址 编程一般只有可能和逻辑地址打交道,比如在 C 语言中&#x…...
番禺网站建设哪里好/短网址
前言 本文主要是讲解在Controller中的开发,主要的知识点有如下: 编码过滤器使用注解开发注解RequestMapping详解业务方法接收参数字符串转日期重定向和转发返回JSONSpringMVC过滤编码器 在SpringMVC的控制器中,如果没有对编码进行任何的操作&…...
cnetos 做网站服务/官网排名优化方案
文章目录加法运算用加法代替减法移码参考加法运算 用加法代替减法 10-37 和 (109) 19 ,然后 19 mod 127,从而达到减法和加法的效果一样 存储单元为8bit时,计算机作加减运算时,都可以看成 mod 2^8 移码…...
启东 网站开发/互联网舆情
<?xml version"1.0" encoding"utf-8"?> 功能模块 LinearMath功能模块 LinearMath Table of Contents btScalar 宏定义类型函数btMinMax 文件btAlignedAllocator 宏定义函数类特殊说明btAlignedObjectArray 宏定义btAlignedObjectArray 类btList bt…...
乐清网页制作公司哪家好/优化营商环境心得体会个人
Python的运算符和其他语言类似(我们暂时只了解这些运算符的基本用法,方便我们展开后面的内容,高级应用暂时不介绍)数学运算>>>print 19 # 加法>>>print 1.3-4 # 减法>>>print 3*5 …...
不用开源程序怎么做网站/域名站长工具
一:select模型 二:WSAAsyncSelect模型 三:WSAEventSelect模型 四:Overlapped I/O 事件通知模型 五:Overlapped I/O 完成例程模型 六:IOCP模型 本文简单介绍了当前Windows支持的各种Socket I/O模型&#x…...
成都做网站设/杭州seo中心
抽象类:对类的抽象,抽象其行为特征,分为数据抽象,即类的属性,过程抽象,即类的行为特征。抽象层次:是类的属性、行为抽象,全局进行抽象。是一种自下而上的设计。描述的是一继承关系&a…...