RabbitMQ深入 —— 死信队列
前言
前面荔枝梳理了RabbitMQ中的普通队列、交换机以及相关的知识,在这篇文章中荔枝将会梳理RabbitMQ的一个重要的队列 —— 死信队列,主要了解消息流转到死信队列的三种的方式以及相应的实现demo。希望能帮助到有需要的小伙伴~~~
文章目录
前言
死信队列
1 基本概念
2 设置消息时间TTL过期的死信队列
3 队列达到最大长度发生死信
4 消息被拒引发死信
总结
死信队列
1 基本概念
死信就是无法被消费的消息,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息
消费发生异常时,将消息投入死信队列中。比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。
死信具有一定的延迟性,它可以作为延迟消息来处理。
死信出现的原因:
- 消息TTL过期
- 队列达到最大长度(队列满了,无法再添加数据到mq中)
- 消息被拒绝(basic.reject或basic.nack)并且requeue=false.I
2 设置消息时间TTL过期的死信队列
首先我们在消费者Consumer1中声明普通交换机、死信交换机、普通队列和死信队列之间的关系,同时在声明之后令Consumer1拒收消息,在RabbitMQ中观察消息生产者发出消息的流转情况。
设置死信队列的消费者1
在死信队列中我们设置了普通交换机、死信交换机、普通队列和死信队列。同时在正常队列中通过channel信道对象中的queueDeclare方法中的一个Map类型的参数,设置了死信交换机和普通交换机之间的关系,配置好TTL、RoutingKey并声明其死信交换机。
package com.crj.rabbitmq.deadQueue;import com.crj.rabbitmq.utils.RabbitMqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;/*** 死信队列* 消费者1:需要声明死信队列和普通队列*/
public class Consumer {//普通交换机名称public static final String NORMAL_EXCHANGE = "normal";//死信交换机名称public static final String DEAD_EXCHANGE = "dead";//普通队列的名称public static final String NORMAL_QUEUE = "normalQueue";//死信队列的名称public static final String DEAD_QUEUE = "deadQueue";public static void main(String[] args) throws Exception {//声明通道Channel channel = RabbitMqUtil.getChannel();//声明普通交换机和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);/*** 声明普通队列和死信队列*///创建一个hashmap对象来配置连接死信队列的参数Map<String, Object> arguments = new HashMap<>();//设置过期时间arguments.put("x-message-ttl",10000);//正常队列设置死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置死信RoutingKeyarguments.put("x-dead-letter-routing-key","dead1");//声明普通队列channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);//死信队列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定队列和交换机channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");channel.queueBind(DEAD_QUEUE,DEAD_QUEUE,"dead");//接收消息DeliverCallback deliverCallback = (consumerTag, message)->{System.out.println("Consumer1接收到的信息:"+new String(message.getBody(),"UTF-8"));System.out.println("接收队列:"+DEAD_QUEUE+"接收键:"+message.getEnvelope().getRoutingKey());};//消费者开始消费消息channel.basicConsume(DEAD_QUEUE,true,deliverCallback,(consumerTag)->{});}
}
需要注意的是,这里在正常队列中设置过期时间TTL一般不太常用,我们通常会在publish处设置消息的TTL,因此这里arguments对象有关 "x-message-ttl" 参数的配置可以注释掉。
实际处理消息的消费者2
在处理死信队列消息的消费者处,我们只需要设置消费者接收消息是来自死信队列即可。
package com.crj.rabbitmq.deadQueue;import com.crj.rabbitmq.utils.RabbitMqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;/*** 死信队列* 消费者1:需要声明死信队列和普通队列*/
public class Consumer2 {//死信队列的名称public static final String DEAD_QUEUE = "deadQueue";public static void main(String[] args) throws Exception {//声明通道Channel channel = RabbitMqUtil.getChannel();System.out.println("等待接收消息");//接收消息DeliverCallback deliverCallback = (consumerTag, message)->{System.out.println("Consumer2接收到的信息:"+new String(message.getBody(),"UTF-8"));System.out.println("接收队列:"+DEAD_QUEUE+"接收键:"+message.getEnvelope().getRoutingKey());};//消费者开始消费消息channel.basicConsume(DEAD_QUEUE,true,deliverCallback,(consumerTag)->{});}
}
生产者
在这里我们借助AMQP. BasicProperties对象的build方法来设置相应的死信TTL。
package com.crj.rabbitmq.deadQueue;import com.crj.rabbitmq.utils.RabbitMqUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;public class Publish {public static final String NORMAL_EXCHANGE = "normal";public static final String NORMAL_QUEUE = "normalQueue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//在Consumer已经声明过交换机了,所以在这里不能声明//死信消息,设置TTLAMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 0; i < 11; i++) {String message = "info"+i;channel.basicPublish(NORMAL_EXCHANGE,"normal",properties,message.getBytes());}}
}
未运行Consumer2前我们看到普通队列在我们设置的TTL:10s之后将消息流转到死信队列中。
最后启动Consumer2后确实也收到了死信队列中的消息
3 队列达到最大长度发生死信
在这一部分中我们需要注释掉之前在生产者中设置的消息的TTL,同时在消费者1中开启正常队列的最大消息堆积容量。
arguments.put("x-max-length",6);
这样子我们就可以模拟队列达到最大长度后产生死信的情况了。
4 消息被拒引发死信
要想开启消费者拒收消息的功能,首先需要在消息接收的basicConsumer方法中关闭自动应答,同时自行设置手动应答的逻辑。在下面接收消息的回调函数中,在basicAck中设置应答,在basicReject实现消息拒收。
package com.crj.rabbitmq.deadQueue;import com.crj.rabbitmq.utils.RabbitMqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;/*** 死信队列* 消费者1:需要声明死信队列和普通队列*/
public class Consumer {//普通交换机名称public static final String NORMAL_EXCHANGE = "normal";//死信交换机名称public static final String DEAD_EXCHANGE = "dead";//普通队列的名称public static final String NORMAL_QUEUE = "normalQueue";//死信队列的名称public static final String DEAD_QUEUE = "deadQueue";public static void main(String[] args) throws Exception {//声明通道Channel channel = RabbitMqUtil.getChannel();//声明普通交换机和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);/*** 声明普通队列和死信队列*///创建一个hashmap对象来配置连接死信队列的参数Map<String, Object> arguments = new HashMap<>();//正常队列设置死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置死信RoutingKeyarguments.put("x-dead-letter-routing-key","dead1");//声明普通队列channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);//死信队列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定队列和交换机channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead1");System.out.println("等待接收消息》》》》》》》》》》》");//接收消息DeliverCallback deliverCallback = (consumerTag, message)->{String msg = new String(message.getBody(),"UTF-8");if (msg.equals("info5")){System.out.println("Consumer1接收的消息是:"+msg+":此消息是被拒绝的");//这里第二个参数设置了是否要将拒收的消息塞回原队列channel.basicReject(message.getEnvelope().getDeliveryTag(), false);}else {System.out.println("Consumer1接收到的信息:"+new String(message.getBody(),"UTF-8"));//成功应答,这里设置不批量操作channel.basicAck(message.getEnvelope().getDeliveryTag(), false);}};//开启手动应答//消费者开始消费消息channel.basicConsume(DEAD_QUEUE,false,deliverCallback,(consumerTag)->{});}
}
总结
时间过期、消息被拒、队列容量限制这三个机制会引发消息被转发死信队列,那么死信队列除了在这三种情况下继续保存消息之外,还有什么作用呢?下一篇文章荔枝会梳理延时队列,相信看完下一篇文章大家能有所收获~
今朝已然成为过去,明日依然向往未来!我是荔枝,在技术成长之路上与您相伴~~~
如果博文对您有帮助的话,可以给荔枝一键三连嘿,您的支持和鼓励是荔枝最大的动力!
如果博文内容有误,也欢迎各位大佬在下方评论区批评指正!!!
相关文章:
RabbitMQ深入 —— 死信队列
前言 前面荔枝梳理了RabbitMQ中的普通队列、交换机以及相关的知识,在这篇文章中荔枝将会梳理RabbitMQ的一个重要的队列 —— 死信队列,主要了解消息流转到死信队列的三种的方式以及相应的实现demo。希望能帮助到有需要的小伙伴~~~ 文章目录 前言 死信队…...
【React + Umi】自定义离开页面拦截弹框事件
在 react umi 中对离开页面的行为进行自定义弹窗拦截控制。以下为可选的方案分析。 wrapper 首先,因为项目框架是 umi,最先想到了 umi 路由的 wrapper 装饰器,但仔细一想又不太对, wrapper 争对于跳转到某个特定页面的前置行为…...
S1FD40A180H-ASEMI快恢复二极管S1FD40A180H
编辑:ll S1FD40A180H-ASEMI快恢复二极管S1FD40A180H 型号:S1FD40A180H 品牌:ASEMI 封装:TO-247 特性:大功率、快恢复二极管 正向电流:40A 反向耐压:1800V 恢复时间:<300n…...
网络编程 day1
1->x.mind网络编程基础 2->简述字节序的概念,并用共用体(联合体)的方式计算本机的字节序 1.字节序是指不同类型的CPU主机,内存存储多字节整数序列的方式 2.小端字节序:低序字节存储在低地址上 3.大端字节序&a…...
《深入PostgreSQL的存储引擎:原理与性能》
🌷🍁 博主猫头虎(🐅🐾)带您 Go to New World✨🍁 🐅🐾猫头虎建议程序员必备技术栈一览表📖: 🛠️ 全栈技术 Full Stack: 📚…...
python开发之个微群聊机器人的开发
简要描述: 退出群聊 请求URL: http://域名地址/quitChatRoom 请求方式: POST 请求头Headers: Content-Type:application/jsonAuthorization:login接口返回 参数: 参数名必选类型说明wI…...
【Redis7】--4.事务、管道、发布和订阅
文章目录 事务1.Redis事务2.Redis事务特性3.Redis事务命令3.1MULTI3.2EXEC3.3DISCARD3.4WATCH3.5UNWATCH 4.不保证原子性4.1"全体连坐"4.2"冤头债主" 5.事务执行流程 管道1.pipeline的使用2.pipeline小总结 发布和订阅1.常用命令1.1SUBSCRIBE1.2PUBLISH1.3…...
【Vue】el 和 data短小精湛的细节!
hello,我是小索奇,精心制作的Vue教程持续更新哈,花费了大量的时间和精力,总结拓展了很多疑难点,想要学习&巩固&避坑就一起学习叭~ el 与 data 的两种写法 el共有2种写法 el表达式主要用来在模板中展示数据,它…...
前端screenfull实现界面全屏展示功能
还是先引入依赖 我们要先执行 npm config set registry https://registry.npmjs.org/将本地npm registry地址设置为官方的npm registry地址 不然这个东西安装会有点问题 然后我们执行命令安装 npm install screenfull安装完之后 我们终端执行一下 npm config delete registr…...
Dockerfile 制作常用命令总结
1.FROM( from ) : FROM : from 表示选择一个镜像作为基础镜像(在一个Dockerfile 中可以使用多条from,来构建多个镜像) 2.ENV : 用来在镜像创建出的容器中声明环境变量,如: ENV PYTHONIOENCODINGutf-8 …...
uniapp项目实践总结(十七)实现滚动触底加载
导语:在日测的开发过程中,经常会碰到页面需要渲染大量数据的情况,这时候就需要用到滚动加载功能,下面总结一下方法。 目录 原理分析实战演练案例展示 原理分析 使用scrolltolower事件来监听滚动到底部,然后加载下一…...
SAP入门到放弃系列之QM质量检验流程概述
目录 一、流程概述二、操作步骤概述2.1 主数据维护2.2 业务操作 一、流程概述 质量检验流程-Inspection Process Flow,通常由于预先设定的一些规则条件自动触发或者手工触发,例如库存地之间的调拨、生产完工入库检验、采购入库的检验、客户交货前检验等等。另外还有…...
Ansys Zemax | 光学系统设计中如何使用玻璃替换方法来优化玻璃
在光学系统中选择最优玻璃材料时,Conrady d-D以及模型玻璃等传统的玻璃选择方法提供的帮助有限。本文介绍了如何使用玻璃替换方法进行直接玻璃优化,以及在考虑玻璃的可用性、成本及耐候性等因素时,如何进一步严格挑选玻璃。 简介 玻璃替换方法…...
springboot基础--实现默认登录页面
1、搭建项目 依赖中 多加入thymeleaf依赖 <dependencies><!--thymeleaf的依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-thymeleaf</artifactId></dependency><!--we…...
TDesign WXS语法
目录 一、输出函数返回值如何获取? 二、WXS语法 三、WXS案例 一、输出函数返回值如何获取? 写在js的方法中 wxml中{{方法名()}}输出: 发现不显示?? 所以不能使用这种方式!! 二、WXS语法 1.…...
Iterator设计模式
目录 1、示例 1.1 Aggregate接口 1.2 Iterator接口 1.3 Book类 1.4 BookShelf类 1.6 BookShelfIterator 类 1.7 Main类 2、解释Iterator模式中的角色 2.1 Iterator模式的存在意义 2.2 抽象类和接口 2.3 Aggregate 和 Iterator的对应 2.4 容易弄错"下一个"…...
ROS 入门
目录 简介 ROS诞生背景 ROS的设计目标 ROS与ROS2 安装ROS 1.配置ubuntu的软件和更新 2.设置安装源 3.设置key 4.安装 5.配置环境变量 安装可能出现的问题 安装构建依赖 卸载 ROS架构 1.设计者 2.维护者 3. 立足系统架构: ROS 可以划分为三层 ROS通信机制 话…...
第四章 Linux网络编程
ARP 协议 ARP 协议(Address Resolution Protocol)通过 IP 地址查找对应的 MAC 地址。 当一个主机需要发送数据给另一个主机时,它首先会检查本地的 ARP 缓存表(ARP cache)中是否存在目标主机的 MAC 地址。如果存在&…...
无涯教程-JavaScript - OFFSET函数
描述 OFFSET函数返回对范围的引用,该范围是一个单元格或单元格范围中指定的行数和列数。 返回的引用可以是单个单元格或单元格范围。您可以指定要返回的行数和列数。 语法 OFFSET (reference, rows, cols, [height], [width]) 争论 Argument描述Required/OptionalReferenc…...
rust切片
切片类型写为[T]。 切片是序列的一个片段。 它是动态大小类型,所以要使用切片类型,就必须使用它的指针类型。引用是最常用的指针类型。 [T; n]能隐式转换成[T]。 一、定义切片 (一)不可变切片 &[T],共享切片&…...
2023/9/18 -- C++/QT
作业 完善登录框 点击登录按钮后,判断账号(admin)和密码(123456)是否一致,如果匹配失败,则弹出错误对话框,文本内容“账号密码不匹配,是否重新登录”,给定两…...
vue柱状图+折线图组合
<template><div id"main" style"width: 100%;height: 500px; padding-top: .6rem"></div> </template>data() {return {weekData: ["1周","2周","3周","4周","5周","6周&…...
js中如何实现一个简单的防抖函数?
聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 防抖函数⭐ 使用示例⭐ 写在最后 ⭐ 专栏简介 前端入门之旅:探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅!这个专栏是为那些对Web开发感兴趣、刚刚踏…...
mysq 主从同步错误之 Error_code 1032 handler error HA_ERR_KEY_NOT_FOUND
错误说明: MySQL主从同步的1032错误,一般是指要更改的数据不存在,SQL_THREAD提取的日志无法应用故报错,造成同步失败 (Update、Delete、Insert一条已经delete的数据)。 1032的错误本身对数据一致性没什么影…...
蓝桥杯 题库 简单 每日十题 day4
01 津津上初中了。妈妈认为津津应该更加用功学习,所以津津除了上学之外,还要参加妈妈为她报名的各科复习班。另外每周妈妈还会送她去学习朗诵、舞蹈和钢琴。但是津津如果一天上课超过八个小时就会不高兴,而且上得越久就会越不高兴。假设津津…...
l8-d21 域名解析与http服务器实现原理
一、域名解析gethostbyname函数 主机结构在 <netdb.h> 中定义如下: struct hostent { char *h_name; /* 官方域名 */ char **h_aliases; /* 别名*/ int h_addrtype; /* 地址族(地址类型) */ int h_l…...
网络安全(黑客技术)自学规划
一、什么是网络安全 网络安全可以基于攻击和防御视角来分类,我们经常听到的 “红队”、“渗透测试” 等就是研究攻击技术,而“蓝队”、“安全运营”、“安全运维”则研究防御技术。 无论网络、Web、移动、桌面、云等哪个领域,都有攻与防两面性…...
阻止用邮件不停注册wordpress账户的方法
您可以使用多种不同的策略来阻止垃圾邮件注册。以下是一些策略供您参考:第1个最好用 1.完全禁用WordPress注册:如果您不需要在您的WordPress网站上公开注册,最好完全禁用注册,而不是试图打击垃圾邮件注册。要完全禁用WordPress上…...
低代码工具大比拼:哪个最适合你?
低代码开发平台正日益流行,成为企业和开发者们的首选。但是,面对市场上众多的低代码工具,你是否感到困惑呢?今天,就和数聚股份一起探讨一下,究竟应该选择哪个低代码工具才能最好地满足你的需求。 首先&…...
用Python实现链式调用
嗨喽,大家好呀~这里是爱看美女的茜茜呐 我们在使用Django的models查询数据库时,可以看到有这种写法: form app.models import XXX query XXX.objects.all() query query.filter(name123, age456).filter(salary999)在这种写法里面…...
onethink wordpress/网络营销是以什么为中心
在之前的文章中,生成namespace文件是使用open62541提供的nodeset_compiler.py,根据nodeset_compiler.rst(位于open62541/doc/)里的描述,有更好的方法:使用cmake命令ua_generate_nodeset_and_datatypes来生成…...
python web网站开发/媒体发稿平台
C程序设计实验报告 实验项目: 1、利用复化梯形公式计算定积分2、计算Ackerman函数3、编写计算x的y次幂的递归函数getpower(int x,int y),并在主程序中实现输入输出4、编写计算学生年龄的递归函数5、编写递归函数实现Ackman函数 姓名:张时锋 …...
国外在线crm酒店系统/南宁seo推广优化
目录1. 什么是设计模式?2. 设计原则概述3. 设计模式核心思想4. 设计模式分类1. 什么是设计模式? 设计模式是一套被反复使用、多数人知晓、经过分类编目的、代码设计经验的总结。它是为了可重用代码,让代码更容易的被他人理解并保证代码的可靠…...
要修改wordpress目录下的文件权限/北京百度竞价托管公司
问题如题:安装方法参考 http://www.cnblogs.com/shengulong/p/7887586.html ,安装完后,使用时出现如题的错误 解决办法: 1、zerorpc本身依赖很多三方包,请注意版本的兼容性,因此最佳方案是,把这…...
可信的大连网站建设/地推接单平台
centos7全面升级了引导程序和系统管理程序,使用grub2替代了grub来引导操作系统,使用systemd替换了init管理系统程序,systemd的升级比较激进,从架构上变更init管理程序。微服务docker却希望使用docker domean管理微服务中的进程&am…...
内贸在什么网站做/石家庄限号
支付宝沙箱环境支付demo 下载电脑网站的官方demo: 下载地址:https://docs.open.alipay.com/270/106291/ 解压导入demo项目 3.配置AlipayConfig 3.1 注册蚂蚁金服开发者账号 注册地址:https://open.alipay.com/platform/home.htm ÿ…...