当前位置: 首页 > news >正文

RabbitMQ死信队列

目录

一、概念

二、出现死信的原因

三、实战

(一)代码架构图

(二)消息被拒

(三)消息TTL过期

(四)队列达到最大长度


一、概念

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理
解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息
消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

二、出现死信的原因

  • 消息 TTL 过期
  • 队列达到最大长度(队列满了,无法再添加数据到 mq )
  • 消息被拒绝(basic.reject basic.nack)并且 requeue=false.

三、实战

(一)代码架构图

 

(二)消息被拒

生产者

public class Producer {public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();for (int i = 0; i < 10; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());}}
}
C1 消费者代码(启动之后关闭该消费者 模拟其接收不到消息)
public class Consumer01 {public static final String NORMAL_QUEUE = "normal_queue";public static final String NORMAL_EXCHANGE = "normal_exchange";public static final String DEAD_QUEUE = "dead_queue";public static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();// 声明普通和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);// 声明死信队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);// 死信的绑定channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");Map<String, Object> arguments = new HashMap<>();// 普通队列设置对应的交换机arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 设置死信队列的RouteKeyarguments.put("x-dead-letter-routing-key", "lisi");// 设置队列最大长度arguments.put("x-max-length", 6);// 声明普通队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);// 普通的绑定channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");DeliverCallback deliverCallback = (consumerTag, message) -> {String msg = new String(message.getBody());if (msg.equals("info5")) {System.out.println("Consumer01接收到消息" + message + "并拒绝签收该消息");channel.basicReject(message.getEnvelope().getDeliveryTag(), false);} else {System.out.println("consumer01接收到消息:" + msg);channel.basicAck(message.getEnvelope().getDeliveryTag(), false);}};channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {});}
}

C2 消费者代码不变
启动消费者 1 然后再启动消费者 2

 

 

(三)消息TTL过期

生产者代码
public class Producer {public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 0; i < 10; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());}}
}
消费者 C1 代码(启动之后关闭该消费者 模拟其接收不到消息)
public class Consumer01 {public static final String NORMAL_QUEUE = "normal_queue";public static final String NORMAL_EXCHANGE = "normal_exchange";public static final String DEAD_QUEUE = "dead_queue";public static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();// 声明普通和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);// 声明死信队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);// 死信的绑定channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");Map<String, Object> arguments = new HashMap<>();// 普通队列设置对应的交换机arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 设置死信队列的RouteKeyarguments.put("x-dead-letter-routing-key", "lisi");// 声明普通队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);// 普通的绑定channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");DeliverCallback deliverCallback = (consumerTag, message) -> {String msg = new String(message.getBody());System.out.println("consumer01接收到消息:" + msg);};channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {});}
}

 

消费者 C2 代码(以上步骤完成后 启动 C2 消费者 它消费死信队列里面的消息)

public class Consumer02 {public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();System.out.println("等待接收死信队列消息.....");DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("Consumer02 接收死信队列的消息:" + new String(message.getBody()));};channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag -> {});}
}

 

 

(四)队列达到最大长度

我们在声明普通队列时添加一个参数x-max-length即可

生产者

public class Producer {public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();for (int i = 0; i < 10; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());}}
}
C1 消费者修改以下代码(启动之后关闭该消费者 模拟其接收不到消息),这里设置了队列最多容纳6条消息,此时由于生产者发送了10条消息,所以有4条会进入死信队列。
public class Consumer01 {public static final String NORMAL_QUEUE = "normal_queue";public static final String NORMAL_EXCHANGE = "normal_exchange";public static final String DEAD_QUEUE = "dead_queue";public static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();// 声明普通和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);// 声明死信队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);// 死信的绑定channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");Map<String, Object> arguments = new HashMap<>();// 普通队列设置对应的交换机arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 设置死信队列的RouteKeyarguments.put("x-dead-letter-routing-key", "lisi");// 设置队列最大长度arguments.put("x-max-length", 6);// 声明普通队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);// 普通的绑定channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");DeliverCallback deliverCallback = (consumerTag, message) -> {String msg = new String(message.getBody());System.out.println("consumer01接收到消息:" + msg);};channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {});}
}
注意此时需要把原先队列删除 因为参数改变了
C2 消费者代码不变(启动 C2 消费者)

 

总结

死信队列可能出现的三种情况为:

①消息被拒(消费者方拒绝签收该消息)

②消息设置的TTL过期(生产者方设置的过期时间)

③消息投放的队列内消息已经满了,放不进入时消息会进入死信队列

相关文章:

RabbitMQ死信队列

目录 一、概念 二、出现死信的原因 三、实战 &#xff08;一&#xff09;代码架构图 &#xff08;二&#xff09;消息被拒 &#xff08;三&#xff09;消息TTL过期 &#xff08;四&#xff09;队列达到最大长度 一、概念 先从概念解释上搞清楚这个定义&#xff0c;死信&…...

Word控件Spire.Doc 【书签】教程(1):在C#/VB.NET:在 Word 中插入书签

Spire.Doc for .NET是一款专门对 Word 文档进行操作的 .NET 类库。在于帮助开发人员无需安装 Microsoft Word情况下&#xff0c;轻松快捷高效地创建、编辑、转换和打印 Microsoft Word 文档。拥有近10年专业开发经验Spire系列办公文档开发工具&#xff0c;专注于创建、编辑、转…...

微服务框架-学习笔记

1 微服务架构介绍 1.1 系统架构演变历史 单体架构垂直应用架构&#xff1a;按照业务线垂直划分分布式架构&#xff1a;抽出业务无关的公共模块SOA架构&#xff1a;面向服务微服务架构&#xff1a;彻底的服务化1.2 微服务架构概览 1.3 微服务架构核心要素 服务治理&#xff1…...

实验心理学笔记01:引论

原视频链接&#xff1a; https://www.bilibili.com/video/BV1Qt41137Kv 目录 一、实验心理学&#xff1a;定义、内容及简要历史回顾 二、实验心理学和普通心理学、认知心理学的区别 三、实验方法与非实验方法 四、实验范式 五、实验中的各种变量 六、The science of psy…...

预备3-如何学习编程

如何学习编程 我说说曾经学习编程踩得坑 纠结字面上的意思 如纠结一个关键词的名称如何来 为什么叫这个名称... 只是一个简单的名称,该名称代表某一想象/行为,就好比你为啥叫张三, 千万别去深究这些...做笔记的时间比敲代码的时间还多 做笔记的原因是,自己总结归纳所学的知识, …...

操作系统权限提升(十七)之绕过UAC提权-Windows令牌概述和令牌窃取攻击

系列文章 操作系统权限提升(十二)之绕过UAC提权-Windows UAC概述 操作系统权限提升(十三)之绕过UAC提权-MSF和CS绕过UAC提权 操作系统权限提升(十四)之绕过UAC提权-基于白名单AutoElevate绕过UAC提权 操作系统权限提升(十五)之绕过UAC提权-基于白名单DLL劫持绕过UAC提权 操作系…...

【时间之外】系统管人,能行?(冷眼旁观连载之二)

上次写了在用的工具系统和痛点&#xff0c;基本情况都交待清楚了&#xff0c;春节假期很快就过去了。这次继续按照之前观察计划&#xff0c;谈谈对这些工具使用情况的感受&#xff0c;学而时习之&#xff0c;算是抛砖引玉&#xff0c;也算是个人对这项工作的总结和体会。 目录…...

【数据结构必会基础】关于树,你所必须知道的亿些概念

目录 1.什么是树 1.1浅显的理解树 1.2 数据结构中树的概念 2.树的各种结构概念 2.1 节点的度 2.2 根节点/叶节点/分支节点 2.3 父节点/子节点 2.4祖先节点/子孙节点 2.5兄弟节点 2.6树的度 2.7节点的层次 2.8森林 3. 如何用代码表示一棵树 3.1链式结构 3.1.1 树节…...

设计模式的应用(已在大型项目中使用)

说明:开发语言:在本文中,使用的是C# 一、目录 •1 、单例模式 •2 、简单工厂模式 •3 、代理模式 •4 、观察者模式 •5 、外观模式 •6 、享元模式 •7 、命令模式 •8 、状态模式 •9 、发布订阅模式...

Git的相关用法

1.全局设置自己的git提交用户名和邮箱git config --global user.name 张三 git config --global user.email zsgmail.com即所有的提交都会用这个姓名和邮箱。如果不知道自己配置的是什么&#xff0c;可以查询下git config --global user.name git config --global user.email 或…...

Linux服务:Nginx反向代理与负载均衡

目录 一、Nginx反向代理 1、什么是代理 2、实现反向代理实验 ①实验拓扑 ②实验目的 ③实验过程 二、反向代理负载均衡 1、反向代理负载均衡调度算法 ①轮询算法 ②加权轮询算法 ③最小连接数算法 ④ip、url 哈希算法 ⑤响应时间fair算法 2、实现反向代理负载均…...

数据结构与算法——2.算法概述

这篇文章&#xff0c;我们来讲一下算法的概述&#xff0c;大致理解一下什么是算法。 目录 1.定义 2.生活实例 3.算法目标 4.实际案例 4.1案例一 4.2案例二 5.小结 1.定义 官方解释&#xff1a; 算法是指解题方案的准确而完整的描述&#xff0c;是一系列解决问题的清…...

BPMN2.0是什么,BPMN能解决企业流程管理中哪些问题?

一、前言&#xff1a; 在任何行业和企业中&#xff0c;一定存在着各式各样的流程&#xff0c;请假流程、报销流程、入职流程、离职流程、出差流程、合同审批流程、出入库流程等等…… 无论是管理者、技术人员还是业务人员&#xff0c;每天肯定也在使用各种流程&#xff0c;但…...

Java线程池的基本工作原理及案例

一、线程池的优点 线程池做的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量超出数量的线程排队等候,等其它线程执行完毕,再从队列中取出任务来执行。 主要特点:线程复用;控制最大并发数;管理线程…...

Stacked hourglass networks for human pose estimation代码学习

Stacked hourglass networks for human pose estimation https://github.com/princeton-vl/pytorch_stacked_hourglass 这是一个用于人体姿态估计的模型&#xff0c;只能检测单个人 作者通过重复的bottom-up&#xff08;高分辨率->低分辨率&#xff09;和top-down&#xff0…...

SpringCloud(五)MQ消息队列

MQ概念常见消息模型helloworld案例实现实现spring AMQP发送消息实现spring AMQP接收消息工作消息队列实现发布订阅模型Fanout Exchange实现DirectExchange实现TopicExchange实现DirectExchange 和FanoutExchange的差异DirectExchange 和TopicExchange的差异基于RabbitListener注…...

SQL语法基础汇总

三年前的存稿 默认端口号 3306 超级用户名 root 登录 mysql -uroot -p / mysql -uroot -proot 退出 exit / quit 服务器版本 SELECT VERSION(); 当前日期 SELECT NOW(); 当前用户 SELECT USER(); 备份 mysqldump -uroot -p 数据库名称 > 保存的路径 还原 create database1-…...

惠普星14Pro电脑开机不了显示错误代码界面怎么办?

惠普星14Pro电脑开机不了显示错误代码界面怎么办&#xff1f;有用户电脑开机之后&#xff0c;进入了一个错误界面&#xff0c;里面有一些错误代码。重启电脑之后依然是无法进入到桌面中&#xff0c;那么这个情况怎么去进行解决呢&#xff1f;我们可以重装一个新系统&#xff0c…...

顺序表的构造及功能

定义顺序表是一种随机存储都结构&#xff0c;其特点是表中的元素的逻辑顺序与物理顺序相同。假设线性表L存储起始位置为L(A)&#xff0c;sizeof(ElemType)是每个数据元素所占的存储空间的大小&#xff0c;则线性表L所对应的顺序存储如下图。顺序表的优缺点优点&#xff1a;随机…...

cesium: 绘制线段(008)

第008个 点击查看专栏目录 本示例的目的是介绍如何在vue+cesium中绘制线段,左键点击开始绘制,右键点击取消绘制 直接复制下面的 vue+cesium源代码,操作2分钟即可运行实现效果. 文章目录 示例效果配置方式示例源代码(共139行)相关API参考:专栏目标示例效果 配置方式 1)…...

HTML、CSS学习笔记4(3D转换、动画)

目录 一、空间转换&#xff08;3D转换&#xff09; 1.空间位移 语法&#xff1a; 取值&#xff1a;&#xff08;正负均可&#xff09; 透视&#xff1a; 2.空间旋转 3.立体呈现 二、动画&#xff08;animation&#xff09; 1.动画的使用 先定义动画 再调用定义好的动画 …...

java的分布式锁

什么是分布式锁 分布式锁是指分布式环境下&#xff0c;系统部署在多个机器中&#xff0c;实现多进程分布式互斥的一种锁。为了保证多个进程能看到锁&#xff0c;锁被存在公共存储&#xff08;比如 Redis、Memcache、数据库等三方存储中&#xff09;&#xff0c;以实现多个进程并…...

17- TensorFlow实现手写数字识别 (tensorflow系列) (项目十七)

项目要点 模型创建: model Sequential()添加卷积层: model.add(Dense(32, activationrelu, input_dim100)) # 第一层需要 input_dim添加dropout: model.add(Dropout(0.2))添加第二次网络: model.add(Dense(512, activationrelu)) # 除了first, 其他层不要输入shape添加输出…...

Polkadot 基础

Polkadot Polkadot联合并保护了一个不断增长的专业区块链生态系统&#xff0c;称为parachains。Polkadot上的应用程序和服务可以安全地跨链通信&#xff0c;形成真正可互操作的去中心化网络的基础。 真正的互操作性 Polkadot支持跨区块链传输任何类型的数据或资产&#xff0c;…...

spring源码编译

spring源码编译1、安装gradle2、拉取源码3、配置gradle文件来源及镜像仓库4、预编译5、验证6、可能遇到的报错6.1、jdk.jfr不存在6.2、checkstyleMain6.3、org.gradle.api.artifacts.result.ComponentSelectionReason.getDescription()Ljava/lang/String6.4、其他jdk&#xff1…...

防盗链是什么?带你了解什么是防盗链

目录 什么是防盗链 防盗链的定义 防盗链的产生 防盗链的实现 什么是防盗链 防盗链其实就是采用服务器端编程&#xff0c;通过url过滤技术实现的防止盗链的软件。 比如&#xff1a;photo.abc.com/video.mp4 这个下载地址&#xff0c;如果没有装防盗链&#xff0c;别人就能轻…...

Linux基础命令-fdisk管理磁盘分区表

文章目录 fdisk 命令介绍 命令格式 基本参数 1&#xff09;常用参数 2&#xff09;fdisk菜单操作说明 创建一个磁盘分区 1&#xff09;创建分区 2&#xff09;创建交换分区 参考实例 1&#xff09; 显示当前分区的信息 2&#xff09; 显示每个磁盘的分区信息 命令…...

(四)K8S 安装 Nginx Ingress Controller

ingress-nginx 是 Kubernetes 的入口控制器&#xff0c;使用NGINX作为反向代理和负载均衡器 版本介绍 版本1&#xff1a;Ingress NGINX Controller(k8s社区的ingres-nginx) 以 NGINX 开源技术为基础&#xff08;kubernetes.io&#xff09;&#xff0c;可在GitHub的 kubernet…...

高频面试题

MyISAM和InnoDB是MySQL两种常见的存储引擎&#xff0c;它们之间有以下几点区别&#xff1a; 事务支持&#xff1a;MyISAM不支持事务处理&#xff0c;而InnoDB支持事务处理。 行级锁&#xff1a;MyISAM只支持表级锁&#xff0c;而InnoDB支持行级锁&#xff0c;可以避免并发访问…...

js 字节数组操作,TCP协议组装

js字节数组&#xff0c;进制转换js基础知识数组 Array扩展操作符三个点&#xff08;...&#xff09;ArrayBufferslice() 数组复制reduce 对数组中的每个元素执行一个提供的函数,将其结果汇总为单个返回值splice 数组删除&#xff0c;添加&#xff0c;替换js 字节数组转数字以及…...

阿里巴巴网站域名建设/深圳网络推广

1. 需要使用svnant&#xff0c;从SVN中获取源码 需要使用的扩展包&#xff1a;svnant-1.3.1.zip里所有的jar 下载地址&#xff1a;http://subclipse.tigris.org/files/documents/906/49042/svnant-1.3.1.zip build.xml中的写法 <!--定义SVN地址--><property name"…...

水源logo设计制作网/广州百度快速排名优化

在整理《全唐诗》的文本之前&#xff0c;我们首先需要完成以下两个步骤&#xff1a; 确定需求 了解文本 在完成以上步骤后&#xff0c;我们开始实际着手整理文本&#xff0c;在整理的过程中大体上也包含两个流程&#xff1a; 文本解析结果输出 全唐诗文本语料在“全唐诗.tx…...

专业的网站首页建设公司/阿里指数查询

Quartz 调度器以多线程的方式执行调度任务JobDetail,缺省线程池大小为10&#xff0c;也就是说若调度器中已有10个Job在工作&#xff08;线程没有结束&#xff09;&#xff0c;那么即使有JobDetail到了被触发的时间&#xff0c;新的JobDetail不会被执行&#xff0c;也就是说阻塞…...

站长工具网站备案查询/百度云网盘官网

在开始理解Linux文件管理和目录类命令之前&#xff0c;有必要先说一下&#xff0c;关于操作系统在计算机中都做了哪些工作。0、操作系统的工作1、文件管理&#xff0c;增删改查2、目录管理3、进程管理4、软件安装5、运行程序6、网络管理7、设备管理本次笔记介绍的是文件管理和目…...

翻书效果的网站/win7怎么优化最流畅

什么是指针&#xff1f; 指针是一种存储变量内存地址的变量。 如上图所示&#xff0c;变量 b 的值为 156&#xff0c;而 b 的内存地址为 0x1040a124。变量 a 存储了 b 的地址。我们就称 a 指向了 b。 指针的声明 指针变量的类型为 *T&#xff0c;该指针指向一个 T 类型的变量…...

兰州网站seo/厦门百度整站优化服务

【软件笔记I】 各类冷门函数细解 ■题外话■ 总觉得作为一个志向远大的 coder &#xff08;٩(◕‿◕&#xff61;)۶&#xff09;&#xff0c;我觉得单单只会做题是不够的所以我开始尝试自己编写软件&#xff01;初入道的我并不知道C其实并不太适合编写软件……但是我决定开始…...