当前位置: 首页 > news >正文

RabbitMQ深入 —— 死信队列

前言

        前面荔枝梳理了RabbitMQ中的普通队列、交换机以及相关的知识,在这篇文章中荔枝将会梳理RabbitMQ的一个重要的队列 —— 死信队列,主要了解消息流转到死信队列的三种的方式以及相应的实现demo。希望能帮助到有需要的小伙伴~~~


文章目录

前言

死信队列

1 基本概念 

2 设置消息时间TTL过期的死信队列

3 队列达到最大长度发生死信 

4 消息被拒引发死信

总结


死信队列

1 基本概念 

     死信就是无法被消费的消息,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息
消费发生异常时,将消息投入死信队列中。比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

死信具有一定的延迟性,它可以作为延迟消息来处理。

死信出现的原因:

  • 消息TTL过期
  • 队列达到最大长度(队列满了,无法再添加数据到mq中)
  • 消息被拒绝(basic.reject或basic.nack)并且requeue=false.I 

2 设置消息时间TTL过期的死信队列

首先我们在消费者Consumer1中声明普通交换机、死信交换机、普通队列和死信队列之间的关系,同时在声明之后令Consumer1拒收消息,在RabbitMQ中观察消息生产者发出消息的流转情况。

设置死信队列的消费者1

        在死信队列中我们设置了普通交换机、死信交换机、普通队列和死信队列。同时在正常队列中通过channel信道对象中的queueDeclare方法中的一个Map类型的参数,设置了死信交换机和普通交换机之间的关系,配置好TTL、RoutingKey并声明其死信交换机。

package com.crj.rabbitmq.deadQueue;import com.crj.rabbitmq.utils.RabbitMqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;/*** 死信队列* 消费者1:需要声明死信队列和普通队列*/
public class Consumer {//普通交换机名称public static final String NORMAL_EXCHANGE = "normal";//死信交换机名称public static final String DEAD_EXCHANGE = "dead";//普通队列的名称public static final String NORMAL_QUEUE = "normalQueue";//死信队列的名称public static final String DEAD_QUEUE = "deadQueue";public static void main(String[] args) throws Exception {//声明通道Channel channel = RabbitMqUtil.getChannel();//声明普通交换机和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);/*** 声明普通队列和死信队列*///创建一个hashmap对象来配置连接死信队列的参数Map<String, Object> arguments = new HashMap<>();//设置过期时间arguments.put("x-message-ttl",10000);//正常队列设置死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置死信RoutingKeyarguments.put("x-dead-letter-routing-key","dead1");//声明普通队列channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);//死信队列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定队列和交换机channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");channel.queueBind(DEAD_QUEUE,DEAD_QUEUE,"dead");//接收消息DeliverCallback deliverCallback = (consumerTag, message)->{System.out.println("Consumer1接收到的信息:"+new String(message.getBody(),"UTF-8"));System.out.println("接收队列:"+DEAD_QUEUE+"接收键:"+message.getEnvelope().getRoutingKey());};//消费者开始消费消息channel.basicConsume(DEAD_QUEUE,true,deliverCallback,(consumerTag)->{});}
}

需要注意的是,这里在正常队列中设置过期时间TTL一般不太常用,我们通常会在publish处设置消息的TTL,因此这里arguments对象有关 "x-message-ttl" 参数的配置可以注释掉。

实际处理消息的消费者2

在处理死信队列消息的消费者处,我们只需要设置消费者接收消息是来自死信队列即可。 

package com.crj.rabbitmq.deadQueue;import com.crj.rabbitmq.utils.RabbitMqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;/*** 死信队列* 消费者1:需要声明死信队列和普通队列*/
public class Consumer2 {//死信队列的名称public static final String DEAD_QUEUE = "deadQueue";public static void main(String[] args) throws Exception {//声明通道Channel channel = RabbitMqUtil.getChannel();System.out.println("等待接收消息");//接收消息DeliverCallback deliverCallback = (consumerTag, message)->{System.out.println("Consumer2接收到的信息:"+new String(message.getBody(),"UTF-8"));System.out.println("接收队列:"+DEAD_QUEUE+"接收键:"+message.getEnvelope().getRoutingKey());};//消费者开始消费消息channel.basicConsume(DEAD_QUEUE,true,deliverCallback,(consumerTag)->{});}
}

​​​​生产者

在这里我们借助AMQP. BasicProperties对象的build方法来设置相应的死信TTL。

package com.crj.rabbitmq.deadQueue;import com.crj.rabbitmq.utils.RabbitMqUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;public class Publish {public static final String NORMAL_EXCHANGE = "normal";public static final String NORMAL_QUEUE = "normalQueue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//在Consumer已经声明过交换机了,所以在这里不能声明//死信消息,设置TTLAMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 0; i < 11; i++) {String message = "info"+i;channel.basicPublish(NORMAL_EXCHANGE,"normal",properties,message.getBytes());}}
}

未运行Consumer2前我们看到普通队列在我们设置的TTL:10s之后将消息流转到死信队列中。

最后启动Consumer2后确实也收到了死信队列中的消息

3 队列达到最大长度发生死信 

在这一部分中我们需要注释掉之前在生产者中设置的消息的TTL,同时在消费者1中开启正常队列的最大消息堆积容量。 

arguments.put("x-max-length",6);

 这样子我们就可以模拟队列达到最大长度后产生死信的情况了。

4 消息被拒引发死信

        要想开启消费者拒收消息的功能,首先需要在消息接收的basicConsumer方法中关闭自动应答,同时自行设置手动应答的逻辑。在下面接收消息的回调函数中,在basicAck中设置应答,在basicReject实现消息拒收。

package com.crj.rabbitmq.deadQueue;import com.crj.rabbitmq.utils.RabbitMqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;/*** 死信队列* 消费者1:需要声明死信队列和普通队列*/
public class Consumer {//普通交换机名称public static final String NORMAL_EXCHANGE = "normal";//死信交换机名称public static final String DEAD_EXCHANGE = "dead";//普通队列的名称public static final String NORMAL_QUEUE = "normalQueue";//死信队列的名称public static final String DEAD_QUEUE = "deadQueue";public static void main(String[] args) throws Exception {//声明通道Channel channel = RabbitMqUtil.getChannel();//声明普通交换机和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);/*** 声明普通队列和死信队列*///创建一个hashmap对象来配置连接死信队列的参数Map<String, Object> arguments = new HashMap<>();//正常队列设置死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置死信RoutingKeyarguments.put("x-dead-letter-routing-key","dead1");//声明普通队列channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);//死信队列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定队列和交换机channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead1");System.out.println("等待接收消息》》》》》》》》》》》");//接收消息DeliverCallback deliverCallback = (consumerTag, message)->{String msg = new String(message.getBody(),"UTF-8");if (msg.equals("info5")){System.out.println("Consumer1接收的消息是:"+msg+":此消息是被拒绝的");//这里第二个参数设置了是否要将拒收的消息塞回原队列channel.basicReject(message.getEnvelope().getDeliveryTag(), false);}else {System.out.println("Consumer1接收到的信息:"+new String(message.getBody(),"UTF-8"));//成功应答,这里设置不批量操作channel.basicAck(message.getEnvelope().getDeliveryTag(), false);}};//开启手动应答//消费者开始消费消息channel.basicConsume(DEAD_QUEUE,false,deliverCallback,(consumerTag)->{});}
}

总结

        时间过期、消息被拒、队列容量限制这三个机制会引发消息被转发死信队列,那么死信队列除了在这三种情况下继续保存消息之外,还有什么作用呢?下一篇文章荔枝会梳理延时队列,相信看完下一篇文章大家能有所收获~

今朝已然成为过去,明日依然向往未来!我是荔枝,在技术成长之路上与您相伴~~~

如果博文对您有帮助的话,可以给荔枝一键三连嘿,您的支持和鼓励是荔枝最大的动力!

如果博文内容有误,也欢迎各位大佬在下方评论区批评指正!!!

相关文章:

RabbitMQ深入 —— 死信队列

前言 前面荔枝梳理了RabbitMQ中的普通队列、交换机以及相关的知识&#xff0c;在这篇文章中荔枝将会梳理RabbitMQ的一个重要的队列 —— 死信队列&#xff0c;主要了解消息流转到死信队列的三种的方式以及相应的实现demo。希望能帮助到有需要的小伙伴~~~ 文章目录 前言 死信队…...

【React + Umi】自定义离开页面拦截弹框事件

在 react umi 中对离开页面的行为进行自定义弹窗拦截控制。以下为可选的方案分析。 wrapper 首先&#xff0c;因为项目框架是 umi&#xff0c;最先想到了 umi 路由的 wrapper 装饰器&#xff0c;但仔细一想又不太对&#xff0c; wrapper 争对于跳转到某个特定页面的前置行为…...

S1FD40A180H-ASEMI快恢复二极管S1FD40A180H

编辑&#xff1a;ll S1FD40A180H-ASEMI快恢复二极管S1FD40A180H 型号&#xff1a;S1FD40A180H 品牌&#xff1a;ASEMI 封装&#xff1a;TO-247 特性&#xff1a;大功率、快恢复二极管 正向电流&#xff1a;40A 反向耐压&#xff1a;1800V 恢复时间&#xff1a;<300n…...

网络编程 day1

1->x.mind网络编程基础 2->简述字节序的概念&#xff0c;并用共用体&#xff08;联合体&#xff09;的方式计算本机的字节序 1.字节序是指不同类型的CPU主机&#xff0c;内存存储多字节整数序列的方式 2.小端字节序&#xff1a;低序字节存储在低地址上 3.大端字节序&a…...

《深入PostgreSQL的存储引擎:原理与性能》

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f405;&#x1f43e;猫头虎建议程序员必备技术栈一览表&#x1f4d6;&#xff1a; &#x1f6e0;️ 全栈技术 Full Stack: &#x1f4da…...

python开发之个微群聊机器人的开发

简要描述&#xff1a; 退出群聊 请求URL&#xff1a; http://域名地址/quitChatRoom 请求方式&#xff1a; POST 请求头Headers&#xff1a; Content-Type&#xff1a;application/jsonAuthorization&#xff1a;login接口返回 参数&#xff1a; 参数名必选类型说明wI…...

【Redis7】--4.事务、管道、发布和订阅

文章目录 事务1.Redis事务2.Redis事务特性3.Redis事务命令3.1MULTI3.2EXEC3.3DISCARD3.4WATCH3.5UNWATCH 4.不保证原子性4.1"全体连坐"4.2"冤头债主" 5.事务执行流程 管道1.pipeline的使用2.pipeline小总结 发布和订阅1.常用命令1.1SUBSCRIBE1.2PUBLISH1.3…...

【Vue】el 和 data短小精湛的细节!

hello&#xff0c;我是小索奇&#xff0c;精心制作的Vue教程持续更新哈&#xff0c;花费了大量的时间和精力&#xff0c;总结拓展了很多疑难点&#xff0c;想要学习&巩固&避坑就一起学习叭~ el 与 data 的两种写法 el共有2种写法 el表达式主要用来在模板中展示数据,它…...

前端screenfull实现界面全屏展示功能

还是先引入依赖 我们要先执行 npm config set registry https://registry.npmjs.org/将本地npm registry地址设置为官方的npm registry地址 不然这个东西安装会有点问题 然后我们执行命令安装 npm install screenfull安装完之后 我们终端执行一下 npm config delete registr…...

Dockerfile 制作常用命令总结

1.FROM( from ) : FROM : from 表示选择一个镜像作为基础镜像&#xff08;在一个Dockerfile 中可以使用多条from&#xff0c;来构建多个镜像&#xff09; 2.ENV &#xff1a; 用来在镜像创建出的容器中声明环境变量&#xff0c;如&#xff1a; ENV PYTHONIOENCODINGutf-8 …...

uniapp项目实践总结(十七)实现滚动触底加载

导语&#xff1a;在日测的开发过程中&#xff0c;经常会碰到页面需要渲染大量数据的情况&#xff0c;这时候就需要用到滚动加载功能&#xff0c;下面总结一下方法。 目录 原理分析实战演练案例展示 原理分析 使用scrolltolower事件来监听滚动到底部&#xff0c;然后加载下一…...

SAP入门到放弃系列之QM质量检验流程概述

目录 一、流程概述二、操作步骤概述2.1 主数据维护2.2 业务操作 一、流程概述 质量检验流程-Inspection Process Flow,通常由于预先设定的一些规则条件自动触发或者手工触发&#xff0c;例如库存地之间的调拨、生产完工入库检验、采购入库的检验、客户交货前检验等等。另外还有…...

Ansys Zemax | 光学系统设计中如何使用玻璃替换方法来优化玻璃

在光学系统中选择最优玻璃材料时&#xff0c;Conrady d-D以及模型玻璃等传统的玻璃选择方法提供的帮助有限。本文介绍了如何使用玻璃替换方法进行直接玻璃优化&#xff0c;以及在考虑玻璃的可用性、成本及耐候性等因素时&#xff0c;如何进一步严格挑选玻璃。 简介 玻璃替换方法…...

springboot基础--实现默认登录页面

1、搭建项目 依赖中 多加入thymeleaf依赖 <dependencies><!--thymeleaf的依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-thymeleaf</artifactId></dependency><!--we…...

TDesign WXS语法

目录 一、输出函数返回值如何获取&#xff1f; 二、WXS语法 三、WXS案例 一、输出函数返回值如何获取&#xff1f; 写在js的方法中 wxml中{{方法名()}}输出&#xff1a; 发现不显示&#xff1f;&#xff1f; 所以不能使用这种方式&#xff01;&#xff01; 二、WXS语法 1.…...

Iterator设计模式

目录 1、示例 1.1 Aggregate接口 1.2 Iterator接口 1.3 Book类 1.4 BookShelf类 1.6 BookShelfIterator 类 1.7 Main类 2、解释Iterator模式中的角色 2.1 Iterator模式的存在意义 2.2 抽象类和接口 2.3 Aggregate 和 Iterator的对应 2.4 容易弄错"下一个"…...

ROS 入门

目录 简介 ROS诞生背景 ROS的设计目标 ROS与ROS2 安装ROS 1.配置ubuntu的软件和更新 2.设置安装源 3.设置key 4.安装 5.配置环境变量 安装可能出现的问题 安装构建依赖 卸载 ROS架构 1.设计者 2.维护者 3. 立足系统架构: ROS 可以划分为三层 ROS通信机制 话…...

第四章 Linux网络编程

ARP 协议 ARP 协议&#xff08;Address Resolution Protocol&#xff09;通过 IP 地址查找对应的 MAC 地址。 当一个主机需要发送数据给另一个主机时&#xff0c;它首先会检查本地的 ARP 缓存表&#xff08;ARP cache&#xff09;中是否存在目标主机的 MAC 地址。如果存在&…...

无涯教程-JavaScript - OFFSET函数

描述 OFFSET函数返回对范围的引用,该范围是一个单元格或单元格范围中指定的行数和列数。 返回的引用可以是单个单元格或单元格范围。您可以指定要返回的行数和列数。 语法 OFFSET (reference, rows, cols, [height], [width]) 争论 Argument描述Required/OptionalReferenc…...

rust切片

切片类型写为[T]。 切片是序列的一个片段。 它是动态大小类型&#xff0c;所以要使用切片类型&#xff0c;就必须使用它的指针类型。引用是最常用的指针类型。 [T; n]能隐式转换成[T]。 一、定义切片 &#xff08;一&#xff09;不可变切片 &[T]&#xff0c;共享切片&…...

2023/9/18 -- C++/QT

作业 完善登录框 点击登录按钮后&#xff0c;判断账号&#xff08;admin&#xff09;和密码&#xff08;123456&#xff09;是否一致&#xff0c;如果匹配失败&#xff0c;则弹出错误对话框&#xff0c;文本内容“账号密码不匹配&#xff0c;是否重新登录”&#xff0c;给定两…...

vue柱状图+折线图组合

<template><div id"main" style"width: 100%;height: 500px; padding-top: .6rem"></div> </template>data() {return {weekData: ["1周","2周","3周","4周","5周","6周&…...

js中如何实现一个简单的防抖函数?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 防抖函数⭐ 使用示例⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅&#xff01;这个专栏是为那些对Web开发感兴趣、刚刚踏…...

mysq 主从同步错误之 Error_code 1032 handler error HA_ERR_KEY_NOT_FOUND

错误说明&#xff1a; MySQL主从同步的1032错误&#xff0c;一般是指要更改的数据不存在&#xff0c;SQL_THREAD提取的日志无法应用故报错&#xff0c;造成同步失败 &#xff08;Update、Delete、Insert一条已经delete的数据&#xff09;。 1032的错误本身对数据一致性没什么影…...

蓝桥杯 题库 简单 每日十题 day4

01 津津上初中了。妈妈认为津津应该更加用功学习&#xff0c;所以津津除了上学之外&#xff0c;还要参加妈妈为她报名的各科复习班。另外每周妈妈还会送她去学习朗诵、舞蹈和钢琴。但是津津如果一天上课超过八个小时就会不高兴&#xff0c;而且上得越久就会越不高兴。假设津津…...

l8-d21 域名解析与http服务器实现原理

一、域名解析gethostbyname函数 主机结构在 <netdb.h> 中定义如下&#xff1a; struct hostent { char *h_name; /* 官方域名 */ char **h_aliases; /* 别名*/ int h_addrtype; /* 地址族&#xff08;地址类型&#xff09; */ int h_l…...

网络安全(黑客技术)自学规划

一、什么是网络安全 网络安全可以基于攻击和防御视角来分类&#xff0c;我们经常听到的 “红队”、“渗透测试” 等就是研究攻击技术&#xff0c;而“蓝队”、“安全运营”、“安全运维”则研究防御技术。 无论网络、Web、移动、桌面、云等哪个领域&#xff0c;都有攻与防两面性…...

阻止用邮件不停注册wordpress账户的方法

您可以使用多种不同的策略来阻止垃圾邮件注册。以下是一些策略供您参考&#xff1a;第1个最好用 1.完全禁用WordPress注册&#xff1a;如果您不需要在您的WordPress网站上公开注册&#xff0c;最好完全禁用注册&#xff0c;而不是试图打击垃圾邮件注册。要完全禁用WordPress上…...

低代码工具大比拼:哪个最适合你?

低代码开发平台正日益流行&#xff0c;成为企业和开发者们的首选。但是&#xff0c;面对市场上众多的低代码工具&#xff0c;你是否感到困惑呢&#xff1f;今天&#xff0c;就和数聚股份一起探讨一下&#xff0c;究竟应该选择哪个低代码工具才能最好地满足你的需求。 首先&…...

用Python实现链式调用

嗨喽&#xff0c;大家好呀~这里是爱看美女的茜茜呐 我们在使用Django的models查询数据库时&#xff0c;可以看到有这种写法&#xff1a; form app.models import XXX query XXX.objects.all() query query.filter(name123, age456).filter(salary999)在这种写法里面&#xf…...

onethink wordpress/网络营销是以什么为中心

在之前的文章中&#xff0c;生成namespace文件是使用open62541提供的nodeset_compiler.py&#xff0c;根据nodeset_compiler.rst&#xff08;位于open62541/doc/&#xff09;里的描述&#xff0c;有更好的方法&#xff1a;使用cmake命令ua_generate_nodeset_and_datatypes来生成…...

python web网站开发/媒体发稿平台

C程序设计实验报告 实验项目&#xff1a; 1、利用复化梯形公式计算定积分2、计算Ackerman函数3、编写计算x的y次幂的递归函数getpower(int x,int y)&#xff0c;并在主程序中实现输入输出4、编写计算学生年龄的递归函数5、编写递归函数实现Ackman函数 姓名&#xff1a;张时锋 …...

国外在线crm酒店系统/南宁seo推广优化

目录1. 什么是设计模式&#xff1f;2. 设计原则概述3. 设计模式核心思想4. 设计模式分类1. 什么是设计模式&#xff1f; 设计模式是一套被反复使用、多数人知晓、经过分类编目的、代码设计经验的总结。它是为了可重用代码&#xff0c;让代码更容易的被他人理解并保证代码的可靠…...

要修改wordpress目录下的文件权限/北京百度竞价托管公司

问题如题&#xff1a;安装方法参考 http://www.cnblogs.com/shengulong/p/7887586.html &#xff0c;安装完后&#xff0c;使用时出现如题的错误 解决办法&#xff1a; 1、zerorpc本身依赖很多三方包&#xff0c;请注意版本的兼容性&#xff0c;因此最佳方案是&#xff0c;把这…...

可信的大连网站建设/地推接单平台

centos7全面升级了引导程序和系统管理程序&#xff0c;使用grub2替代了grub来引导操作系统&#xff0c;使用systemd替换了init管理系统程序&#xff0c;systemd的升级比较激进&#xff0c;从架构上变更init管理程序。微服务docker却希望使用docker domean管理微服务中的进程&am…...

内贸在什么网站做/石家庄限号

支付宝沙箱环境支付demo 下载电脑网站的官方demo&#xff1a; 下载地址&#xff1a;https://docs.open.alipay.com/270/106291/ 解压导入demo项目 3.配置AlipayConfig 3.1 注册蚂蚁金服开发者账号 注册地址&#xff1a;https://open.alipay.com/platform/home.htm &#xff…...