当前位置: 首页 > news >正文

RabbitMQ --- 死信交换机

一、简介

1.1、什么是死信交换机

什么是死信?

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false

  • 消息是一个过期消息,超时无人消费

  • 要投递的队列消息满了,无法投递

 

如果这个包含死信的队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,检查DLX)。

 

如图,一个消息被消费者拒绝了,变成了死信:

因为simple.queue绑定了死信交换机 dl.direct,因此死信会投递给这个交换机:  

如果这个死信交换机也绑定了一个队列,则消息最终会进入这个存放死信的队列:  

另外,队列将死信投递给死信交换机时,必须知道两个信息:

  • 死信交换机名称

  • 死信交换机与死信队列绑定的RoutingKey

这样才能确保投递的消息能到达死信交换机,并且正确的路由到死信队列。

 

 

1.2、利用死信交换机接收死信(拓展)

在失败重试策略中,默认的RejectAndDontRequeueRecoverer会在本地重试次数耗尽后,发送reject给RabbitMQ,消息变成死信,被丢弃。

我们可以给simple.queue添加一个死信交换机,给死信交换机绑定一个队列。这样消息变成死信后也不会丢弃,而是最终投递到死信交换机,路由到与死信交换机绑定的队列。

我们在consumer服务中,定义一组死信交换机、死信队列:  

// 声明普通的 simple.queue队列,并且为其指定死信交换机:dl.direct
@Bean
public Queue simpleQueue2(){return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化.deadLetterExchange("dl.direct") // 指定死信交换机.build();
}
// 声明死信交换机 dl.direct
@Bean
public DirectExchange dlExchange(){return new DirectExchange("dl.direct", true, false);
}
// 声明存储死信的队列 dl.queue
@Bean
public Queue dlQueue(){return new Queue("dl.queue", true);
}
// 将死信队列 与 死信交换机绑定
@Bean
public Binding dlBinding(){return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}

 

1.3、总结

什么样的消息会成为死信?

  • 消息被消费者reject或者返回nack

  • 消息超时未消费

  • 队列满了

死信交换机的使用场景是什么?

  • 如果队列绑定了死信交换机,死信会投递到死信交换机;

  • 可以利用死信交换机收集所有消费者处理失败的消息(死信),交由人工处理,进一步提高消息队列的可靠性。

 

 

二、TTL

一个队列中的消息如果超时未消费,则会变为死信,超时分为两种情况:

  • 消息所在的队列设置了超时时间

  • 消息本身设置了超时时间

 

2.1、接收超时死信的死信交换机

在consumer服务的SpringRabbitListener中,定义一个新的消费者,并且声明 死信交换机、死信队列:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "dl.ttl.queue", durable = "true"),exchange = @Exchange(name = "dl.ttl.direct"),key = "ttl"
))
public void listenDlQueue(String msg){log.info("接收到 dl.ttl.queue的延迟消息:{}", msg);
}

 

2.2、声明一个队列,并且指定TTL

要给队列设置超时时间,需要在声明队列时配置x-message-ttl属性:

@Bean
public Queue ttlQueue(){return QueueBuilder.durable("ttl.queue") // 指定队列名称,并持久化.ttl(10000) // 设置队列的超时时间,10秒.deadLetterExchange("dl.ttl.direct") // 指定死信交换机.build();
}

注意,这个队列设定了死信交换机为dl.ttl.direct

声明交换机,将ttl与交换机绑定:

@Bean
public DirectExchange ttlExchange(){return new DirectExchange("ttl.direct");
}
@Bean
public Binding ttlBinding(){return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}

发送消息,但是不要指定TTL:

@Test
public void testTTLQueue() {// 创建消息String message = "hello, ttl queue";// 消息ID,需要封装到CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 发送消息rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);// 记录日志log.debug("发送消息成功");
}

发送消息的日志:

查看下接收消息的日志:  

因为队列的TTL值是10000ms,也就是10秒。可以看到消息发送与接收之间的时差刚好是10秒。  

 

2.3、发送消息时,设定TTL

在发送消息时,也可以指定TTL:

@Test
public void testTTLMsg() {// 创建消息Message message = MessageBuilder.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8)).setExpiration("5000").build();// 消息ID,需要封装到CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 发送消息rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);log.debug("发送消息成功");
}

查看发送消息日志:

接收消息日志:  

这次,发送与接收的延迟只有5秒。说明当队列、消息都设置了TTL时,任意一个到期就会成为死信。  

 

2.4、总结

消息超时的两种方式是?

  • 给队列设置ttl属性,进入队列后超过ttl时间的消息变为死信

  • 给消息设置ttl属性,队列接收到消息超过ttl时间后变为死信

如何实现发送一个消息20秒后消费者才收到消息?

  • 给消息的目标队列指定死信交换机

  • 将消费者监听的队列绑定到死信交换机

  • 发送消息时给消息设置超时时间为20秒

 

 

三、延迟队列

利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。

延迟队列的使用场景包括:

  • 延迟发送短信

  • 用户下单,如果用户在15 分钟内未支付,则自动取消

  • 预约工作会议,20分钟后自动通知所有参会人员

因为延迟队列的需求非常多,所以RabbitMQ的官方也推出了一个插件,原生支持延迟队列效果。

这个插件就是DelayExchange插件。参考RabbitMQ的插件列表页面:Community Plugins — RabbitMQ

使用方式可以参考官网地址:Scheduling Messages with RabbitMQ | RabbitMQ - Blog

 

3.1、安装DelayExchange插件

安装MQ


执行下面的命令来运行MQ容器:

docker run \-e RABBITMQ_DEFAULT_USER=itcast \-e RABBITMQ_DEFAULT_PASS=123321 \-v mq-plugins:/plugins \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3.8-management

安装DelayExchange插件


官方的安装指南地址为:Scheduling Messages with RabbitMQ | RabbitMQ - Blog

上述文档是基于linux原生安装RabbitMQ,然后安装插件。

 

因为我们之前是基于Docker安装RabbitMQ,所以下面我们会讲解基于Docker来安装RabbitMQ插件。


下载插件


RabbitMQ有一个官方的插件社区,地址为:Community Plugins — RabbitMQ

其中包含各种各样的插件,包括我们要使用的DelayExchange插件:

 

大家可以去对应的GitHub页面下载3.8.9版本的插件,地址为Release v3.8.9 · rabbitmq/rabbitmq-delayed-message-exchange · GitHub这个对应RabbitMQ的3.8.5以上版本。


上传插件


因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。如果不是基于Docker的,重新创建Docker容器。

我们之前设定的RabbitMQ的数据卷名称为mq-plugins,所以我们使用下面命令查看数据卷:

docker volume inspect mq-plugins

可以得到下面结果:

接下来,将插件上传到这个目录即可:

 


安装插件


最后就是安装了,需要进入MQ容器内部来执行安装。我的容器名为mq,所以执行下面命令:

docker exec -it mq bash

执行时,请将其中的 -it 后面的mq替换为你自己的容器名.

进入容器内部后,执行下面命令开启插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

 

3.2、DelayExchange原理

DelayExchange需要将一个交换机声明为delayed类型。当我们发送消息到delayExchange时,流程如下:

  • 接收消息

  • 判断消息是否具备x-delay属性

  • 如果有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值,作为延迟时间

  • 返回routing not found结果给消息发送者

  • x-delay时间到期后,重新投递消息到指定队列

 

3.3、使用DelayExchange

插件的使用也非常简单:声明一个交换机,交换机的类型可以是任意类型,只需要设定delayed属性为true即可,然后声明队列与其绑定即可。

1)声明DelayExchange交换机

基于注解方式(推荐):

也可以基于@Bean的方式:  

 

2)发送消息

发送消息时,一定要携带x-delay属性,指定延迟的时间:

 

3.4、总结

延迟队列插件的使用步骤包括哪些?

•声明一个交换机,添加delayed属性为true

•发送消息时,添加x-delay头,值为超时时间

相关文章:

RabbitMQ --- 死信交换机

一、简介 1.1、什么是死信交换机 什么是死信? 当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter): 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false…...

如何在个人web项目中使用会话技术(cookiesession)?

编译软件:IntelliJ IDEA 2019.2.4 x64 操作系统:win10 x64 位 家庭版 服务器软件:apache-tomcat-8.5.27 目录 一. 什么是会话?二. 为什么要使用会话技术?三. 如何使用会话技术?3.1 Cookie(客户端的会话技术…...

创建线索二叉树

创建线索二叉树 一、创建线索二叉树一、案例1、前序线索二叉树2、中序线索二叉树3、后序线索二叉树 一、创建线索二叉树 现将某结点的空指针域指向该结点的前驱后继,定义规则如下: 若结点的左子树为空,则该结点的左孩子指针指向其前驱结点。…...

HNU-操作系统OS-实验Lab2

OS_Lab2_Experimental report 湖南大学信息科学与工程学院 计科 210X wolf (学号 202108010XXX) 前言 实验一过后大家做出来了一个可以启动的系统,实验二主要涉及操作系统的物理内存管理。操作系统为了使用内存,还需高效地管理…...

如何使用HTML和CSS创建有方向感知的按钮

在互联网应用中,按钮是一种常见的控件,用户通过点击按钮来触发相应的操作。考虑到用户体验和交互设计,设计有方向感知的按钮可以使得用户更加易于理解按钮的功能和状态。 在本文中,我们将介绍如何使用HTML和CSS来创建具有方向感知…...

java 线程安全

内部锁 在 Java 中,每个对象都有一个内部锁,也称为监视器锁或对象锁。内部锁是通过在代码块或方法前加上 synchronized 关键字来实现的。当一个线程执行一个带有 synchronized 关键字的方法或代码块时,它必须先获得该对象的内部锁&#xff0…...

移动硬盘修复后文件丢失恢复方法

最近收到很多这样的咨询: 问1:移动硬盘目录损坏用chkdsk修复后,文件被删除,怎么才可以恢復文件? 问2:移动硬盘出错然后修复,然后文件都没有了怎么处理啊!!!&a…...

直线飙升到10万+star的AutoGpt,有多强?帮我写了个网页!

先来感受一下10万的star,到底有多强! 从4月2日开始,直线飙升到10万star Auto-GPT是一个实验性的开源应用程序,展示了GPT-4语言模型的功能。这个程序由GPT-4驱动,将LLM“思想”链接在一起,以自主实现您设定的…...

rk3568平台调试typec口实现uvc输出,网络共享等功能

一、修改kernel相关配置 注意:一定要知道主控接线,那个物理口是otg的,然后要找准与之所连接的phy和控制器。然后处理CC1 CC2识别芯片,fusb302。默认sdk自带有驱动,需要配上中断脚和提供VBUS 5V的脚。用来判断角色是DF…...

java基础知识——26.反射

这篇文章我们来讲一下java的代理与反射,这是很重要的一部分内容。 目录 1.什么是反射 2.获取class对象的三种方式 3.反射获取构造方法 4.利用反射来获取成员变量 5.利用反射来获取成员方法 6.反射的作用 7.反射小结 1.什么是反射 首先,我们来看…...

【容器化】Docker 简介和安装

【容器化】Docker 简介和安装 DockerDocker的应用场景Docker 的优点1、快速,一致地交付您的应用程序2、响应式部署和扩展3、在同一硬件上运行更多工作负载 Docker 架构Docker 安装Ubuntu Docker 、Debian Docker 安装使用官方安装脚本自动安装手动安装使用 Shell 脚…...

性能测试场景分析并设计?超细案例讲解,看这篇就够了

目录:导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜) 前言 性能测试场景&…...

JAVA9新特性

JAVA9新特性 概述 ​ 经过4次推迟,历经曲折的Java9最终在2017年9月21日发布。因为里面加入的模块化系统,在最初设想的时候并没有想过那么复杂,花费的时间超出预估时间。距离java8大约三年时间。 ​ Java 9提供了超过150项新功能特性&#x…...

( 数组和矩阵) 645. 错误的集合 ——【Leetcode每日一题】

❓645. 错误的集合 难度:简单 集合 s 包含从 1 到 n 的整数。不幸的是,因为数据错误,导致集合里面某一个数字复制了成了集合里面的另外一个数字的值,导致集合 丢失了一个数字 并且 有一个数字重复 。 给定一个数组 nums 代表了…...

2023年全国最新道路运输从业人员精选真题及答案63

百分百题库提供道路运输安全员考试试题、道路运输从业人员考试预测题、道路安全员考试真题、道路运输从业人员证考试题库等,提供在线做题刷题,在线模拟考试,助你考试轻松过关。 119.在危险货物道路运输过程中,(&#x…...

Kettle安装与使用

一、Kettle简介 Kettle最早是一个开源的ETL(Extract-Transform-Load的缩写)工具,全称为KDE Extraction, Transportation, Transformation and Loading Environment。后来Kettle重命名为Pentaho Data Integration 。它由Java开发,…...

C51 - DS18B20

Thermometer 1> 实验概述2> 硬件设计3> DS18B203.1> 原理框图3.2> 数据格式 4> 单总线(1-Wire)通讯协议4.1> 初始化(复位)时序4.2> 写-DS18B20时序4.3> 读-DS18B20时序4.4> 命令 5> 程序设计5.1…...

手把手教你使用vue2搭建微前端micro-app

​ 简述 本文主要讲述新手小白怎么搭建micro-app,几乎是每一步都有截图说明。上手应该很简单。 研究背景 这段时间在网上找了很多有关微前端相关的知识,起初本来是想着先搭建一个single-spa,但是奈何网上能找到的内容都是千篇一律。我也是…...

DDR3(MIG核配置官方demoFPGA代码实现及仿真)

由于直接对 DDR3 进行控制很复杂,因此一般使用 MIG IP 来实现,同时为了更简单地使用 MIG IP,我们采用 AXI4 总线协议进行控制。下面首先介绍 MIG IP 的配置,然后看看官方 demo (里面包含一个仿真要用到的 DDR3 模型&am…...

传奇人物《周兴和》书连载之67 不辱神圣的使命

不辱神圣的使命 这里,先前还是一个十分神秘的地方。 外人和车辆要想进入这片区域,那是绝对不允许的。这片区域隐于群山之中,且戒备森严,外人若想进入,那是要经过好几道政治审查和随身检查的。近年来,随着…...

基于ASP.NET+ SQL Server实现(Web)医院信息管理系统

医院信息管理系统 1. 课程设计内容 在 visual studio 2017 平台上,开发一个“医院信息管理系统”Web 程序。 2. 课程设计目的 综合运用 c#.net 知识,在 vs 2017 平台上,进行 ASP.NET 应用程序和简易网站的开发;初步熟悉开发一…...

基于Flask实现的医疗保险欺诈识别监测模型

基于Flask实现的医疗保险欺诈识别监测模型 项目截图 项目简介 社会医疗保险是国家通过立法形式强制实施,由雇主和个人按一定比例缴纳保险费,建立社会医疗保险基金,支付雇员医疗费用的一种医疗保险制度, 它是促进社会文明和进步的…...

1688商品列表API与其他数据源的对接思路

将1688商品列表API与其他数据源对接时,需结合业务场景设计数据流转链路,重点关注数据格式兼容性、接口调用频率控制及数据一致性维护。以下是具体对接思路及关键技术点: 一、核心对接场景与目标 商品数据同步 场景:将1688商品信息…...

Qwen3-Embedding-0.6B深度解析:多语言语义检索的轻量级利器

第一章 引言:语义表示的新时代挑战与Qwen3的破局之路 1.1 文本嵌入的核心价值与技术演进 在人工智能领域,文本嵌入技术如同连接自然语言与机器理解的“神经突触”——它将人类语言转化为计算机可计算的语义向量,支撑着搜索引擎、推荐系统、…...

基于数字孪生的水厂可视化平台建设:架构与实践

分享大纲: 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年,数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段,基于数字孪生的水厂可视化平台的…...

涂鸦T5AI手搓语音、emoji、otto机器人从入门到实战

“🤖手搓TuyaAI语音指令 😍秒变表情包大师,让萌系Otto机器人🔥玩出智能新花样!开整!” 🤖 Otto机器人 → 直接点明主体 手搓TuyaAI语音 → 强调 自主编程/自定义 语音控制(TuyaAI…...

Spring是如何解决Bean的循环依赖:三级缓存机制

1、什么是 Bean 的循环依赖 在 Spring框架中,Bean 的循环依赖是指多个 Bean 之间‌互相持有对方引用‌,形成闭环依赖关系的现象。 多个 Bean 的依赖关系构成环形链路,例如: 双向依赖:Bean A 依赖 Bean B,同时 Bean B 也依赖 Bean A(A↔B)。链条循环: Bean A → Bean…...

PAN/FPN

import torch import torch.nn as nn import torch.nn.functional as F import mathclass LowResQueryHighResKVAttention(nn.Module):"""方案 1: 低分辨率特征 (Query) 查询高分辨率特征 (Key, Value).输出分辨率与低分辨率输入相同。"""def __…...

vulnyx Blogger writeup

信息收集 arp-scan nmap 获取userFlag 上web看看 一个默认的页面,gobuster扫一下目录 可以看到扫出的目录中得到了一个有价值的目录/wordpress,说明目标所使用的cms是wordpress,访问http://192.168.43.213/wordpress/然后查看源码能看到 这…...

C++ 设计模式 《小明的奶茶加料风波》

👨‍🎓 模式名称:装饰器模式(Decorator Pattern) 👦 小明最近上线了校园奶茶配送功能,业务火爆,大家都在加料: 有的同学要加波霸 🟤,有的要加椰果…...