当前位置: 首页 > news >正文

死信队列

死信队列

死信的概念

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列.

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

造成死信的原因

  • 消息 TTL 过期
  • 队列达到最大长度(队列满了,无法再添加数据到 MQ 中)
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

死信架构图

在这里插入图片描述

代码实战

  • TTL过期

    package com.vmware.rabbit.demo8;import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DeliverCallback;
    import com.vmware.rabbit.utils.RabbitUtil;import java.util.HashMap;public class Consumer {//普通交换机private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交换机private static final String DEAD_EXCHANGE = "dead_exchange";//普通队列private static final String NORMAL_QUEUE = "normal_queue";//死信队列private static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws Exception {Connection connection = RabbitUtil.getConnection();Channel channel = connection.createChannel();//创建普通队列死信分发参数HashMap<String,Object> arguments= new HashMap<>();arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);arguments.put("x-dead-letter-routing-key","lisi");//创建普通交换机与队列并绑定channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");//创建死信交换机和队列并绑定channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);channel.queueDeclare(DEAD_QUEUE,false,false,false,null);channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");DeliverCallback deliverCallback= (tag,msg)->{String message = new String(msg.getBody());System.out.println("接收到消息:"+message);}};//创建消费者channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,(tag)->{});}
    }
    
    package com.vmware.rabbit.demo8;import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.vmware.rabbit.utils.RabbitUtil;public class Producer {private static final String EXCHANGE_NAME = "normal_exchange";private static final String ROUTING_KEY = "zhangsan";public static void main(String[] args) throws Exception {Connection connection = RabbitUtil.getConnection();System.out.println("已连接到RabbitMQ服务器....");Channel channel = connection.createChannel();//设置超时为10秒AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 0; i < 10; i++) {String message = "msg" + i;channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, basicProperties, message.getBytes());System.out.println("消息:"+message+"发送成功!");}}
    }
    
  • 队列达到最大长度

    package com.vmware.rabbit.demo8;import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DeliverCallback;
    import com.vmware.rabbit.utils.RabbitUtil;import java.util.HashMap;public class Consumer {//普通交换机private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交换机private static final String DEAD_EXCHANGE = "dead_exchange";//普通队列private static final String NORMAL_QUEUE = "normal_queue";//死信队列private static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws Exception {Connection connection = RabbitUtil.getConnection();Channel channel = connection.createChannel();//创建普通队列死信分发参数HashMap<String,Object> arguments= new HashMap<>();arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);arguments.put("x-dead-letter-routing-key","lisi");//设置队列最大长度arguments.put("x-max-length",5);//创建普通交换机与队列并绑定channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");//创建死信交换机和队列并绑定channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);channel.queueDeclare(DEAD_QUEUE,false,false,false,null);channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");DeliverCallback deliverCallback= (tag,msg)->{String message = new String(msg.getBody());System.out.println("消息:"+message+"被拒绝");};//创建消费者channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,(tag)->{});}
    }
    
    package com.vmware.rabbit.demo8;import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.vmware.rabbit.utils.RabbitUtil;public class Producer {private static final String EXCHANGE_NAME = "normal_exchange";private static final String ROUTING_KEY = "zhangsan";public static void main(String[] args) throws Exception {Connection connection = RabbitUtil.getConnection();System.out.println("已连接到RabbitMQ服务器....");Channel channel = connection.createChannel();for (int i = 0; i < 10; i++) {String message = "msg" + i;channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());System.out.println("消息:"+message+"发送成功!");}}
    }
    
  • 消息被拒

    package com.vmware.rabbit.demo8;import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DeliverCallback;
    import com.vmware.rabbit.utils.RabbitUtil;import java.util.HashMap;public class Consumer {//普通交换机private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交换机private static final String DEAD_EXCHANGE = "dead_exchange";//普通队列private static final String NORMAL_QUEUE = "normal_queue";//死信队列private static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws Exception {Connection connection = RabbitUtil.getConnection();Channel channel = connection.createChannel();//创建普通队列死信分发参数HashMap<String,Object> arguments= new HashMap<>();arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);arguments.put("x-dead-letter-routing-key","lisi");//创建普通交换机与队列并绑定channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");//创建死信交换机和队列并绑定channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);channel.queueDeclare(DEAD_QUEUE,false,false,false,null);channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");DeliverCallback deliverCallback= (tag,msg)->{String message = new String(msg.getBody());if (message.equals("msg5")){System.out.println("消息:"+message+"被拒绝");channel.basicReject(msg.getEnvelope().getDeliveryTag(),false);}else {System.out.println("接收到消息:"+message);channel.basicAck(msg.getEnvelope().getDeliveryTag(),false);}};//创建消费者channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,(tag)->{});}
    }
    
    package com.vmware.rabbit.demo8;import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DeliverCallback;
    import com.vmware.rabbit.utils.RabbitUtil;public class Consumer2 {private static final String DEAD_QUEUE_NAME = "dead_queue";public static void main(String[] args)throws Exception {Connection connection = RabbitUtil.getConnection();Channel channel = connection.createChannel();DeliverCallback deliverCallback=(tag,msg)->{String message= new String(msg.getBody());System.out.println("队列:"+DEAD_QUEUE_NAME+"\t收到消息:"+message);};channel.basicConsume(DEAD_QUEUE_NAME,true,deliverCallback,(tag)->{});}
    }
    
    package com.vmware.rabbit.demo8;import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.vmware.rabbit.utils.RabbitUtil;public class Producer {private static final String EXCHANGE_NAME = "normal_exchange";private static final String ROUTING_KEY = "zhangsan";public static void main(String[] args) throws Exception {Connection connection = RabbitUtil.getConnection();System.out.println("已连接到RabbitMQ服务器....");Channel channel = connection.createChannel();for (int i = 0; i < 10; i++) {String message = "msg" + i;channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());System.out.println("消息:"+message+"发送成功!");}}
    }
    

相关文章:

死信队列

死信队列 死信的概念 先从概念解释上搞清楚这个定义&#xff0c;死信&#xff0c;顾名思义就是无法被消费的消息&#xff0c;字面意思可以这样理解&#xff0c;一般来说&#xff0c;producer 将消息投递到 broker 或者直接到queue 里了&#xff0c;consumer 从 queue 取出消息…...

基于YOLOv5的目标检测系统详解(附MATLAB GUI版代码)

摘要&#xff1a;本文重点介绍了基于YOLOv5目标检测系统的MATLAB实现&#xff0c;用于智能检测物体种类并记录和保存结果&#xff0c;对各种物体检测结果可视化&#xff0c;提高目标识别的便捷性和准确性。本文详细阐述了目标检测系统的原理&#xff0c;并给出MATLAB的实现代码…...

使用ChatGPT工具阅读文献的实战教程

大家好,我是herosunly。985院校硕士毕业,现担任算法研究员一职,热衷于机器学习算法研究与应用。曾获得阿里云天池比赛第一名,CCF比赛第二名,科大讯飞比赛第三名。拥有多项发明专利。对机器学习和深度学习拥有自己独到的见解。曾经辅导过若干个非计算机专业的学生进入到算法…...

实训笔记1

实训笔记 第一天 1.安装tomcat或者其他大数据开发的路径不含中文及空格 2.和同开发 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FoApp1oX-1683039421826)(C:\Users\18249\AppData\Roaming\Typora\typora-user-images\image-20230422110823748…...

CCD视觉检测设备如何选择光源

CCD视觉检测设备的机器视觉系统对光源的要求很高&#xff0c;光源是决定图像质量的一个重要因素。那么&#xff0c;我们就来看看CCD图像加网设备和机器视觉系统光源的选择点——CCD图像加网设备。 CCD视觉检测设备机器视觉系统光源选择要点&#xff1a; 1. 对比度&#xff1a;…...

基于协同过滤的旅游推荐系统设计与实现(论文+源码)_kaic

1 绪论 1.1 研究背景及意义 1.2 国内外研究现状 1.3 研究目标与意义 1.4 主要研究工作 2 相关理论介绍 2.1HTML与JavaScript 2.2 MySQL数据库 2.3 协同过滤算法简介 3 系统分析与设计 3.1 系统需求分析 3.1.1 功能性需求 3.1.2 安全性需求 3.2 系统总体架构 3.3 功能模块设计 3…...

代码随想录补打卡 746 使用最小花费爬楼梯

代码如下 func minCostClimbingStairs(cost []int) int { dp : make([]int,len(cost)1) //思路&#xff1a;设置一个花费数组dp&#xff0c;dp数组的长度等于之前的cost在加上1&#xff08;1为楼顶元素&#xff09; dp[0] 0 dp[1] 0 for i : 2 ; i < len(c…...

有理函数的不定积分习题

前置知识&#xff1a;有理函数的不定积分 习题 计算 ∫ x 3 1 x 4 − 3 x 3 3 x 2 − x d x \int \dfrac{x^31}{x^4-3x^33x^2-x}dx ∫x4−3x33x2−xx31​dx 解&#xff1a; \qquad 将被积函数的分母因式分解得 x 4 − 3 x 3 3 x 2 − x x ( x − 1 ) 3 x^4-3x^33x^2-xx…...

PS滤镜插件-Nik Collection介绍

PS滤镜插件-Nik Collection介绍 什么是Nik CollectionNik Collection都包含什么&#xff1f; 什么是Nik Collection Nik Collection是一款PS滤镜插件套装&#xff0c;其包含了八款PS插件&#xff0c;功能涵盖修图、调色、降噪、胶片滤镜等方面。Nik Collection 作为很多摄影师…...

力扣刷题2023-05-04-1——题目:2614. 对角线上的质数

题目&#xff1a; 给你一个下标从 0 开始的二维整数数组 nums 。 返回位于 nums 至少一条 对角线 上的最大 质数 。如果任一对角线上均不存在质数&#xff0c;返回 0 。 注意&#xff1a; 如果某个整数大于 1 &#xff0c;且不存在除 1 和自身之外的正整数因子&#xff0c;…...

【Java笔试强训 2】

&#x1f389;&#x1f389;&#x1f389;点进来你就是我的人了博主主页&#xff1a;&#x1f648;&#x1f648;&#x1f648;戳一戳,欢迎大佬指点! 欢迎志同道合的朋友一起加油喔&#x1f93a;&#x1f93a;&#x1f93a; 目录 一、选择题 二、编程题 &#x1f525;排序子…...

术数基础背诵口诀整理

物象对应 五行方位天干神兽季节气候星宿生成数脏器木东甲乙青龙春风岁八肝火南丙丁朱雀夏热荧惑七心土中戊己&#xff1f;长夏湿镇五脾金西庚辛白虎秋燥太白九肺水北壬癸玄武冬寒辰六肾 口诀&#xff1a;东方甲乙青龙木&#xff0c;南方丙丁朱雀火&#xff0c;戊己勾陈腾蛇土&…...

Linux 基础语法 -2

如果我们以后再Linux当中 写了一些命名&#xff0c;导致程序我们不能进行操作了&#xff0c;如这个死循环&#xff1a; 他就会一直输出 "hello Linux" &#xff0c;我们就使用 ctrl c 来终止因为程序或者指令异常&#xff0c;而导致我们无法进行指令输入&#xff…...

深度学习框架发展趋势

深度学习方法的发展是推动深度学习框架进步的最大动力&#xff0c;因此深度学习框架的功能和设计应顺应 算法和模型的发展趋势&#xff1a; 第一&#xff0c;易用性。深度学习领域仍处于快速发展期&#xff0c;参与者和学习者不断增加&#xff0c;新模型大量提出。因 此&#…...

Mysql为json字段创建索引的两种方式

目录 一、前言二、通过虚拟列添加索引&#xff08;Secondary Indexes and Generated Columns&#xff09;三、多值索引&#xff08;Using multi-valued Indexes&#xff09;四、官网地址 一、前言 JSON 数据类型是在mysql5.7版本后新增的&#xff0c;同 TEXT&#xff0c;BLOB …...

cassandra数据库入门-4

插入数据 在表中创建数据 您可以使用命令 INSERT 将数据插入表中一行的列中。 下面给出了在表中创建数据的语法。 INSERT INTO <tablename> (<column1 name>, <column2 name>....) VALUES (<value1>, <value2>....) USING <option> 例子…...

微服务学习——分布式搜索

初识elasticsearch 什么是elasticsearch elasticsearch是一款非常强大的开源搜索引擎&#xff0c;可以帮助我们从海量数据中快速找到需要的内容。 elasticsearch结合kibana、Logstash、Beats&#xff0c;也就是elastic stack(ELK)。被广泛应用在日志数据分析、实时监控等领域…...

ChatGPT根据销售数据、客户反馈、财务报告,自动生成报告,并根据不同利益方的需要和偏好进行调整?

该场景对应的关键词库&#xff08;24个&#xff09;&#xff1a; 汇报对象身份&#xff08;下属、跨部门平级、领导&#xff09;、销售数据&#xff08;销售额、销售量、销售渠道&#xff09;、财务报告&#xff08;营业收入、净利润、成本费用&#xff09;、市场分析&#xf…...

Flask开发之环境搭建

目录 1、安装flask 2、创建Flask工程 ​编辑 3、初始化效果 4、运行效果 5、设置Debug模式 6、设置Host 7、设置Port 8、在app.config中添加配置 1、安装flask 如果电脑上从没有安装过flask&#xff0c;则在命令行界面输入以下命令&#xff1a; pip install flask 如果电…...

Java集合框架与ArrayList、LinkedList的区别

文章目录 Java集合框架与ArrayList、LinkedList的区别集合框架ArrayList特点操作 LinkedList特点操作 区别代码实践注意事项 Java集合框架与ArrayList、LinkedList的区别 在Java中&#xff0c;集合框架是非常重要的一部分。集合框架提供了各种数据结构和算法&#xff0c;可以方…...

深度学习在微纳光子学中的应用

深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向&#xff1a; 逆向设计 通过神经网络快速预测微纳结构的光学响应&#xff0c;替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…...

[2025CVPR]DeepVideo-R1:基于难度感知回归GRPO的视频强化微调框架详解

突破视频大语言模型推理瓶颈,在多个视频基准上实现SOTA性能 一、核心问题与创新亮点 1.1 GRPO在视频任务中的两大挑战 ​安全措施依赖问题​ GRPO使用min和clip函数限制策略更新幅度,导致: 梯度抑制:当新旧策略差异过大时梯度消失收敛困难:策略无法充分优化# 传统GRPO的梯…...

idea大量爆红问题解决

问题描述 在学习和工作中&#xff0c;idea是程序员不可缺少的一个工具&#xff0c;但是突然在有些时候就会出现大量爆红的问题&#xff0c;发现无法跳转&#xff0c;无论是关机重启或者是替换root都无法解决 就是如上所展示的问题&#xff0c;但是程序依然可以启动。 问题解决…...

Linux 文件类型,目录与路径,文件与目录管理

文件类型 后面的字符表示文件类型标志 普通文件&#xff1a;-&#xff08;纯文本文件&#xff0c;二进制文件&#xff0c;数据格式文件&#xff09; 如文本文件、图片、程序文件等。 目录文件&#xff1a;d&#xff08;directory&#xff09; 用来存放其他文件或子目录。 设备…...

Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动

一、前言说明 在2011版本的gb28181协议中&#xff0c;拉取视频流只要求udp方式&#xff0c;从2016开始要求新增支持tcp被动和tcp主动两种方式&#xff0c;udp理论上会丢包的&#xff0c;所以实际使用过程可能会出现画面花屏的情况&#xff0c;而tcp肯定不丢包&#xff0c;起码…...

反向工程与模型迁移:打造未来商品详情API的可持续创新体系

在电商行业蓬勃发展的当下&#xff0c;商品详情API作为连接电商平台与开发者、商家及用户的关键纽带&#xff0c;其重要性日益凸显。传统商品详情API主要聚焦于商品基本信息&#xff08;如名称、价格、库存等&#xff09;的获取与展示&#xff0c;已难以满足市场对个性化、智能…...

Admin.Net中的消息通信SignalR解释

定义集线器接口 IOnlineUserHub public interface IOnlineUserHub {/// 在线用户列表Task OnlineUserList(OnlineUserList context);/// 强制下线Task ForceOffline(object context);/// 发布站内消息Task PublicNotice(SysNotice context);/// 接收消息Task ReceiveMessage(…...

【AI学习】三、AI算法中的向量

在人工智能&#xff08;AI&#xff09;算法中&#xff0c;向量&#xff08;Vector&#xff09;是一种将现实世界中的数据&#xff08;如图像、文本、音频等&#xff09;转化为计算机可处理的数值型特征表示的工具。它是连接人类认知&#xff08;如语义、视觉特征&#xff09;与…...

Matlab | matlab常用命令总结

常用命令 一、 基础操作与环境二、 矩阵与数组操作(核心)三、 绘图与可视化四、 编程与控制流五、 符号计算 (Symbolic Math Toolbox)六、 文件与数据 I/O七、 常用函数类别重要提示这是一份 MATLAB 常用命令和功能的总结,涵盖了基础操作、矩阵运算、绘图、编程和文件处理等…...

【C语言练习】080. 使用C语言实现简单的数据库操作

080. 使用C语言实现简单的数据库操作 080. 使用C语言实现简单的数据库操作使用原生APIODBC接口第三方库ORM框架文件模拟1. 安装SQLite2. 示例代码:使用SQLite创建数据库、表和插入数据3. 编译和运行4. 示例运行输出:5. 注意事项6. 总结080. 使用C语言实现简单的数据库操作 在…...