当前位置: 首页 > news >正文

RabbitMQ死信队列

目录

一、概念

二、出现死信的原因

三、实战

(一)代码架构图

(二)消息被拒

(三)消息TTL过期

(四)队列达到最大长度


一、概念

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理
解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息
消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

二、出现死信的原因

  • 消息 TTL 过期
  • 队列达到最大长度(队列满了,无法再添加数据到 mq )
  • 消息被拒绝(basic.reject basic.nack)并且 requeue=false.

三、实战

(一)代码架构图

 

(二)消息被拒

生产者

public class Producer {public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();for (int i = 0; i < 10; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());}}
}
C1 消费者代码(启动之后关闭该消费者 模拟其接收不到消息)
public class Consumer01 {public static final String NORMAL_QUEUE = "normal_queue";public static final String NORMAL_EXCHANGE = "normal_exchange";public static final String DEAD_QUEUE = "dead_queue";public static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();// 声明普通和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);// 声明死信队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);// 死信的绑定channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");Map<String, Object> arguments = new HashMap<>();// 普通队列设置对应的交换机arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 设置死信队列的RouteKeyarguments.put("x-dead-letter-routing-key", "lisi");// 设置队列最大长度arguments.put("x-max-length", 6);// 声明普通队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);// 普通的绑定channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");DeliverCallback deliverCallback = (consumerTag, message) -> {String msg = new String(message.getBody());if (msg.equals("info5")) {System.out.println("Consumer01接收到消息" + message + "并拒绝签收该消息");channel.basicReject(message.getEnvelope().getDeliveryTag(), false);} else {System.out.println("consumer01接收到消息:" + msg);channel.basicAck(message.getEnvelope().getDeliveryTag(), false);}};channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {});}
}

C2 消费者代码不变
启动消费者 1 然后再启动消费者 2

 

 

(三)消息TTL过期

生产者代码
public class Producer {public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 0; i < 10; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());}}
}
消费者 C1 代码(启动之后关闭该消费者 模拟其接收不到消息)
public class Consumer01 {public static final String NORMAL_QUEUE = "normal_queue";public static final String NORMAL_EXCHANGE = "normal_exchange";public static final String DEAD_QUEUE = "dead_queue";public static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();// 声明普通和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);// 声明死信队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);// 死信的绑定channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");Map<String, Object> arguments = new HashMap<>();// 普通队列设置对应的交换机arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 设置死信队列的RouteKeyarguments.put("x-dead-letter-routing-key", "lisi");// 声明普通队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);// 普通的绑定channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");DeliverCallback deliverCallback = (consumerTag, message) -> {String msg = new String(message.getBody());System.out.println("consumer01接收到消息:" + msg);};channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {});}
}

 

消费者 C2 代码(以上步骤完成后 启动 C2 消费者 它消费死信队列里面的消息)

public class Consumer02 {public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();System.out.println("等待接收死信队列消息.....");DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("Consumer02 接收死信队列的消息:" + new String(message.getBody()));};channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag -> {});}
}

 

 

(四)队列达到最大长度

我们在声明普通队列时添加一个参数x-max-length即可

生产者

public class Producer {public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();for (int i = 0; i < 10; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());}}
}
C1 消费者修改以下代码(启动之后关闭该消费者 模拟其接收不到消息),这里设置了队列最多容纳6条消息,此时由于生产者发送了10条消息,所以有4条会进入死信队列。
public class Consumer01 {public static final String NORMAL_QUEUE = "normal_queue";public static final String NORMAL_EXCHANGE = "normal_exchange";public static final String DEAD_QUEUE = "dead_queue";public static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();// 声明普通和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);// 声明死信队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);// 死信的绑定channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");Map<String, Object> arguments = new HashMap<>();// 普通队列设置对应的交换机arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 设置死信队列的RouteKeyarguments.put("x-dead-letter-routing-key", "lisi");// 设置队列最大长度arguments.put("x-max-length", 6);// 声明普通队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);// 普通的绑定channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");DeliverCallback deliverCallback = (consumerTag, message) -> {String msg = new String(message.getBody());System.out.println("consumer01接收到消息:" + msg);};channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {});}
}
注意此时需要把原先队列删除 因为参数改变了
C2 消费者代码不变(启动 C2 消费者)

 

总结

死信队列可能出现的三种情况为:

①消息被拒(消费者方拒绝签收该消息)

②消息设置的TTL过期(生产者方设置的过期时间)

③消息投放的队列内消息已经满了,放不进入时消息会进入死信队列

相关文章:

RabbitMQ死信队列

目录 一、概念 二、出现死信的原因 三、实战 &#xff08;一&#xff09;代码架构图 &#xff08;二&#xff09;消息被拒 &#xff08;三&#xff09;消息TTL过期 &#xff08;四&#xff09;队列达到最大长度 一、概念 先从概念解释上搞清楚这个定义&#xff0c;死信&…...

Word控件Spire.Doc 【书签】教程(1):在C#/VB.NET:在 Word 中插入书签

Spire.Doc for .NET是一款专门对 Word 文档进行操作的 .NET 类库。在于帮助开发人员无需安装 Microsoft Word情况下&#xff0c;轻松快捷高效地创建、编辑、转换和打印 Microsoft Word 文档。拥有近10年专业开发经验Spire系列办公文档开发工具&#xff0c;专注于创建、编辑、转…...

微服务框架-学习笔记

1 微服务架构介绍 1.1 系统架构演变历史 单体架构垂直应用架构&#xff1a;按照业务线垂直划分分布式架构&#xff1a;抽出业务无关的公共模块SOA架构&#xff1a;面向服务微服务架构&#xff1a;彻底的服务化1.2 微服务架构概览 1.3 微服务架构核心要素 服务治理&#xff1…...

实验心理学笔记01:引论

原视频链接&#xff1a; https://www.bilibili.com/video/BV1Qt41137Kv 目录 一、实验心理学&#xff1a;定义、内容及简要历史回顾 二、实验心理学和普通心理学、认知心理学的区别 三、实验方法与非实验方法 四、实验范式 五、实验中的各种变量 六、The science of psy…...

预备3-如何学习编程

如何学习编程 我说说曾经学习编程踩得坑 纠结字面上的意思 如纠结一个关键词的名称如何来 为什么叫这个名称... 只是一个简单的名称,该名称代表某一想象/行为,就好比你为啥叫张三, 千万别去深究这些...做笔记的时间比敲代码的时间还多 做笔记的原因是,自己总结归纳所学的知识, …...

操作系统权限提升(十七)之绕过UAC提权-Windows令牌概述和令牌窃取攻击

系列文章 操作系统权限提升(十二)之绕过UAC提权-Windows UAC概述 操作系统权限提升(十三)之绕过UAC提权-MSF和CS绕过UAC提权 操作系统权限提升(十四)之绕过UAC提权-基于白名单AutoElevate绕过UAC提权 操作系统权限提升(十五)之绕过UAC提权-基于白名单DLL劫持绕过UAC提权 操作系…...

【时间之外】系统管人,能行?(冷眼旁观连载之二)

上次写了在用的工具系统和痛点&#xff0c;基本情况都交待清楚了&#xff0c;春节假期很快就过去了。这次继续按照之前观察计划&#xff0c;谈谈对这些工具使用情况的感受&#xff0c;学而时习之&#xff0c;算是抛砖引玉&#xff0c;也算是个人对这项工作的总结和体会。 目录…...

【数据结构必会基础】关于树,你所必须知道的亿些概念

目录 1.什么是树 1.1浅显的理解树 1.2 数据结构中树的概念 2.树的各种结构概念 2.1 节点的度 2.2 根节点/叶节点/分支节点 2.3 父节点/子节点 2.4祖先节点/子孙节点 2.5兄弟节点 2.6树的度 2.7节点的层次 2.8森林 3. 如何用代码表示一棵树 3.1链式结构 3.1.1 树节…...

设计模式的应用(已在大型项目中使用)

说明:开发语言:在本文中,使用的是C# 一、目录 •1 、单例模式 •2 、简单工厂模式 •3 、代理模式 •4 、观察者模式 •5 、外观模式 •6 、享元模式 •7 、命令模式 •8 、状态模式 •9 、发布订阅模式...

Git的相关用法

1.全局设置自己的git提交用户名和邮箱git config --global user.name 张三 git config --global user.email zsgmail.com即所有的提交都会用这个姓名和邮箱。如果不知道自己配置的是什么&#xff0c;可以查询下git config --global user.name git config --global user.email 或…...

Linux服务:Nginx反向代理与负载均衡

目录 一、Nginx反向代理 1、什么是代理 2、实现反向代理实验 ①实验拓扑 ②实验目的 ③实验过程 二、反向代理负载均衡 1、反向代理负载均衡调度算法 ①轮询算法 ②加权轮询算法 ③最小连接数算法 ④ip、url 哈希算法 ⑤响应时间fair算法 2、实现反向代理负载均…...

数据结构与算法——2.算法概述

这篇文章&#xff0c;我们来讲一下算法的概述&#xff0c;大致理解一下什么是算法。 目录 1.定义 2.生活实例 3.算法目标 4.实际案例 4.1案例一 4.2案例二 5.小结 1.定义 官方解释&#xff1a; 算法是指解题方案的准确而完整的描述&#xff0c;是一系列解决问题的清…...

BPMN2.0是什么,BPMN能解决企业流程管理中哪些问题?

一、前言&#xff1a; 在任何行业和企业中&#xff0c;一定存在着各式各样的流程&#xff0c;请假流程、报销流程、入职流程、离职流程、出差流程、合同审批流程、出入库流程等等…… 无论是管理者、技术人员还是业务人员&#xff0c;每天肯定也在使用各种流程&#xff0c;但…...

Java线程池的基本工作原理及案例

一、线程池的优点 线程池做的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量超出数量的线程排队等候,等其它线程执行完毕,再从队列中取出任务来执行。 主要特点:线程复用;控制最大并发数;管理线程…...

Stacked hourglass networks for human pose estimation代码学习

Stacked hourglass networks for human pose estimation https://github.com/princeton-vl/pytorch_stacked_hourglass 这是一个用于人体姿态估计的模型&#xff0c;只能检测单个人 作者通过重复的bottom-up&#xff08;高分辨率->低分辨率&#xff09;和top-down&#xff0…...

SpringCloud(五)MQ消息队列

MQ概念常见消息模型helloworld案例实现实现spring AMQP发送消息实现spring AMQP接收消息工作消息队列实现发布订阅模型Fanout Exchange实现DirectExchange实现TopicExchange实现DirectExchange 和FanoutExchange的差异DirectExchange 和TopicExchange的差异基于RabbitListener注…...

SQL语法基础汇总

三年前的存稿 默认端口号 3306 超级用户名 root 登录 mysql -uroot -p / mysql -uroot -proot 退出 exit / quit 服务器版本 SELECT VERSION(); 当前日期 SELECT NOW(); 当前用户 SELECT USER(); 备份 mysqldump -uroot -p 数据库名称 > 保存的路径 还原 create database1-…...

惠普星14Pro电脑开机不了显示错误代码界面怎么办?

惠普星14Pro电脑开机不了显示错误代码界面怎么办&#xff1f;有用户电脑开机之后&#xff0c;进入了一个错误界面&#xff0c;里面有一些错误代码。重启电脑之后依然是无法进入到桌面中&#xff0c;那么这个情况怎么去进行解决呢&#xff1f;我们可以重装一个新系统&#xff0c…...

顺序表的构造及功能

定义顺序表是一种随机存储都结构&#xff0c;其特点是表中的元素的逻辑顺序与物理顺序相同。假设线性表L存储起始位置为L(A)&#xff0c;sizeof(ElemType)是每个数据元素所占的存储空间的大小&#xff0c;则线性表L所对应的顺序存储如下图。顺序表的优缺点优点&#xff1a;随机…...

cesium: 绘制线段(008)

第008个 点击查看专栏目录 本示例的目的是介绍如何在vue+cesium中绘制线段,左键点击开始绘制,右键点击取消绘制 直接复制下面的 vue+cesium源代码,操作2分钟即可运行实现效果. 文章目录 示例效果配置方式示例源代码(共139行)相关API参考:专栏目标示例效果 配置方式 1)…...

微软PowerBI考试 PL300-选择 Power BI 模型框架【附练习数据】

微软PowerBI考试 PL300-选择 Power BI 模型框架 20 多年来&#xff0c;Microsoft 持续对企业商业智能 (BI) 进行大量投资。 Azure Analysis Services (AAS) 和 SQL Server Analysis Services (SSAS) 基于无数企业使用的成熟的 BI 数据建模技术。 同样的技术也是 Power BI 数据…...

Redis相关知识总结(缓存雪崩,缓存穿透,缓存击穿,Redis实现分布式锁,如何保持数据库和缓存一致)

文章目录 1.什么是Redis&#xff1f;2.为什么要使用redis作为mysql的缓存&#xff1f;3.什么是缓存雪崩、缓存穿透、缓存击穿&#xff1f;3.1缓存雪崩3.1.1 大量缓存同时过期3.1.2 Redis宕机 3.2 缓存击穿3.3 缓存穿透3.4 总结 4. 数据库和缓存如何保持一致性5. Redis实现分布式…...

SCAU期末笔记 - 数据分析与数据挖掘题库解析

这门怎么题库答案不全啊日 来简单学一下子来 一、选择题&#xff08;可多选&#xff09; 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘&#xff1a;专注于发现数据中…...

什么是库存周转?如何用进销存系统提高库存周转率?

你可能听说过这样一句话&#xff1a; “利润不是赚出来的&#xff0c;是管出来的。” 尤其是在制造业、批发零售、电商这类“货堆成山”的行业&#xff0c;很多企业看着销售不错&#xff0c;账上却没钱、利润也不见了&#xff0c;一翻库存才发现&#xff1a; 一堆卖不动的旧货…...

python执行测试用例,allure报乱码且未成功生成报告

allure执行测试用例时显示乱码&#xff1a;‘allure’ &#xfffd;&#xfffd;&#xfffd;&#xfffd;&#xfffd;ڲ&#xfffd;&#xfffd;&#xfffd;&#xfffd;ⲿ&#xfffd;&#xfffd;&#xfffd;Ҳ&#xfffd;&#xfffd;&#xfffd;ǿ&#xfffd;&am…...

大语言模型(LLM)中的KV缓存压缩与动态稀疏注意力机制设计

随着大语言模型&#xff08;LLM&#xff09;参数规模的增长&#xff0c;推理阶段的内存占用和计算复杂度成为核心挑战。传统注意力机制的计算复杂度随序列长度呈二次方增长&#xff0c;而KV缓存的内存消耗可能高达数十GB&#xff08;例如Llama2-7B处理100K token时需50GB内存&a…...

使用Matplotlib创建炫酷的3D散点图:数据可视化的新维度

文章目录 基础实现代码代码解析进阶技巧1. 自定义点的大小和颜色2. 添加图例和样式美化3. 真实数据应用示例实用技巧与注意事项完整示例(带样式)应用场景在数据科学和可视化领域,三维图形能为我们提供更丰富的数据洞察。本文将手把手教你如何使用Python的Matplotlib库创建引…...

HashMap中的put方法执行流程(流程图)

1 put操作整体流程 HashMap 的 put 操作是其最核心的功能之一。在 JDK 1.8 及以后版本中&#xff0c;其主要逻辑封装在 putVal 这个内部方法中。整个过程大致如下&#xff1a; 初始判断与哈希计算&#xff1a; 首先&#xff0c;putVal 方法会检查当前的 table&#xff08;也就…...

【Go语言基础【12】】指针:声明、取地址、解引用

文章目录 零、概述&#xff1a;指针 vs. 引用&#xff08;类比其他语言&#xff09;一、指针基础概念二、指针声明与初始化三、指针操作符1. &&#xff1a;取地址&#xff08;拿到内存地址&#xff09;2. *&#xff1a;解引用&#xff08;拿到值&#xff09; 四、空指针&am…...

A2A JS SDK 完整教程:快速入门指南

目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库&#xff…...