死信队列详解
什么是死信队列?
在消息队列中,执行异步任务时,通常是将消息生产者发布的消息存储在队列中,由消费者从队列中获取并处理这些消息。但是,在某些情况下,消息可能无法正常地被处理和消耗,例如:格式错误、设备故障等,这些未成功处理的消息就被称为“死信”。
为了避免这些未成功处理的消息导致程序异常或对系统造成影响,我们需要使用死信队列(Dead Letter Queue)。当我们设置死信队列后,所有无法成功处理的消息将被捕获并重定向到指定的死信交换机中。消费者可以从该交换机中读取并处理这些“死信”。
死信队列的优点
使用死信队列有以下优点:
- 提高系统可靠性:避免因未处理的死信而导致程序异常,提高系统的可靠性。
- 实现延迟消息:可以通过设置TTL时间,将超时未消费的消息转移到死信队列中,实现延迟消息的功能。
- 防止滥用:当某些生产者恶意发送低质量的消息或进行滥用时,可以通过丢弃或重定向死信消息来防止滥用和恶意攻击。
死信队列的应用场景
死信队列在以下场景下是非常有用的:
- 消息格式错误:当消息格式错误时,可能会导致消费者无法正确地解析或处理该消息,这个问题通常与生产者的代码有关。为了避免消息失效,并提高系统可靠性,我们可以使用死信队列。
- 消费者故障:另一个常见的场景是消息处理者无法正确地处理或响应到推入到队列中的消息,例如消费者创建了一个协程并在逻辑执行完成后未正确地关闭该协程。由于该协程始终处于打开状态,它将一直阻止该消费者对其他消息进行正确消费。为了避免这种消息挂起并影响其他消息的正常处理,可以将其加入死信中心。
死信队列的实现方式
下面通过RabbitMQ和Spring Boot,演示如何实现死信队列。
RabbitMQ实现
创建交换机和队列
import pikadef main():credentials = pika.PlainCredentials('guest', 'guest')parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)# 创建死信交换机dlx_exchnage_name = 'my-dlx-exchange'with pika.BlockingConnection(parameters) as connection:channel = connection.channel()channel.exchange_declare(exchange=dlx_exchnage_name, exchange_type='fanout', durable=True)# 创建死信队列和交换机dead_letter_queue_name = 'dead-letter-queue'with pika.BlockingConnection(parameters) as connection:channel = connection.channel()channel.queue_declare(queue=dead_letter_queue_name, durable=True)channel.queue_bind(queue=dead_letter_queue_name, exchange=dlx_exchnage_name)# 创建消息队列,并将其绑定到死信队列上 queue_name = "job-queue"arguments = {"x-dead-letter-exchange": dlx_exchnage_name} with pika.BlockingConnection(parameters) as connection:channel = connection.channel()channel.queue_declare(queue=queue_name, durable=True,arguments=arguments)channel.queue_bind(exchange='', queue=queue_name, routing_key=queue_name)print("Queue is created")if __name__ == '__main__':main()
以上代码创建了两个队列,一个是my-dlx-exchange,一个是dead-letter-queue。同时创建另外一个名为job-queue的队列,它绑定了dead-letter-exchange这个交换机。
在发送消息时,需要提供一些属性来指定该队列应采取哪些步骤来防止该类丢失的消息。 这里我们可以使用x-dead-letter-exchange和x-message-ttl两个特殊属性,告诉RabbitMQ,如果消息在某段时间内无法正确处理,则将其放入死信队列中。
发送和接收消息
import pikadef send_message():credentials = pika.PlainCredentials('guest', 'guest')parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)connection = pika.BlockingConnection(parameters)channel = connection.channel()# 发送一个消息5秒后过期,并且未被消费端确认queue_name = "job-queue"properties = pika.BasicProperties(delivery_mode=2,expiration="5000")channel.basic_publish(exchange='', routing_key=queue_name, body='Hello World!', properties=properties)channel.close()connection.close()def receive_message():credentials = pika.PlainCredentials('guest', 'guest')parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)connection = pika.BlockingConnection(parameters)channel = connection.channel()dlx_exchnage_name = 'my-dlx-exchange'def callback(ch, method, properties, body):print("Receivedmessage: %r" % body)ch.basic_ack(delivery_tag=method.delivery_tag)# 消费来自job-queue的消息queue_name = "job-queue"channel.basic_consume(queue_name, callback)channel.start_consuming()if __name__ == '__main__':send_message()receive_message()
以上代码是一个简单的生产者和消费者。send_message()函数发送一个消息,并且未被消费端确认;receive_message()函数从job-queue中接收消息。
Spring Boot实现
在Spring Boot中,我们可以使用RabbitMQ来实现死信队列。
添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest
创建死信交换机、队列和绑定
@Configuration
public class RabbitConfig {// 死信交换机public static final String DLX_EXCHANGE_NAME = "my-dlx-exchange";// 死信队列public static final String DEAD_LETTER_QUEUE_NAME = "dead-letter-queue";// 业务处理队列public static final String PROCESS_QUEUE_NAME = "process-queue";@Beanpublic TopicExchange dlxExchange() {return new TopicExchange(DLX_EXCHANGE_NAME);}@Beanpublic Queue deadLetterQueue() {return QueueBuilder.durable(DEAD_LETTER_QUEUE_NAME).withArgument("x-dead-letter-exchange", DLX_EXCHANGE_NAME).build();}@Beanpublic Queue processQueue() {return QueueBuilder.durable(PROCESS_QUEUE_NAME).withArgument("x-message-ttl", 5000).withArgument("x-dead-letter-exchange", DLX_EXCHANGE_NAME).build();}@Beanpublic Binding dlxBinding(Queue deadLetterQueue, TopicExchange dlxExchange) {return BindingBuilder.bind(deadLetterQueue).to(dlxExchange).with("#");}}
以上代码创建了my-dlx-exchange死信交换机和dead-letter-queue死信队列。同时创建一个process-queue业务处理队列,该队列设置了消息的生存时间为5s,并在该时间内未被消费者消费,则将该消息转移到死信队列中。
发送和接收消息
@Component
public class MessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(String message) {rabbitTemplate.convertAndSend("", RabbitConfig.PROCESS_QUEUE_NAME, message);}}@Component
public class MessageReceiver {@RabbitListener(queues = "process-queue")public void receive(String message) throws Exception {System.out.println("Received message: " + message);Thread.sleep(10000);}}
以上代码是一个简单的生产者和消费者。生产者使用RabbitTemplate来发送消息到process-queue队列中;消费者通过使用注解@RabbitListener监听process-queue队列的消息并进行处理。
结语
通过本篇文章,我们详细介绍了什么是死信队列、死信队列的优点、应用场景以及如何实现死信队列。通过RabbitMQ和Spring Boot的实现方式,不难看出死信队列在项目中的重要性和实际价值。在工程实践中,我们可以根据具体业务需求,结合技术选型,灵活运用死信队列来提高系统的可靠性和稳定性。
相关文章:
死信队列详解
什么是死信队列? 在消息队列中,执行异步任务时,通常是将消息生产者发布的消息存储在队列中,由消费者从队列中获取并处理这些消息。但是,在某些情况下,消息可能无法正常地被处理和消耗,例如&…...
我用ChatGPT写2023高考语文作文(五):北京卷I
2023年 北京卷 I 适用地区:北京 “续航”一词,原指连续航行,今天在使用中被赋予了新的含义,如为青春续航、科技为经济发展续航等。 请以“续航”为题目,写一篇议论文。 要求:论点明确,论据充实&…...
《微服务实战》 第二十八章 分布式锁框架-Redisson
前言 Redisson 在基于 NIO 的 Netty 框架上,充分的利⽤了 Redis 键值数据库提供的⼀系列优势,在Java 实⽤⼯具包中常⽤接⼝的基础上,为使⽤者提供了⼀系列具有分布式特性的常⽤⼯具类。使得原本作为协调单机多线程并发程序的⼯具包获得了协调…...
局部搜索,变邻域搜索算法
目录 局部搜索 02 变邻域搜索算法 局部搜索 1.1 局部搜索是什么玩意儿? 官方一点:局部搜索是解决优化问题的一种启发式算法。对于某些计算起来非常复杂的优化问题,比如各种NP-难问题,要找到最优解需要的时间随问题规模呈指数增长,因此诞生了各种启发式算法来退而求其次…...
软件工程实训——第一天
第一天 前后分离 前端:android 后端:springbootmbatis-plus 高心星 软件工程的思维来开发项目 问题定义 可行性研究 需求分析 概要设计 详细设计 编码 测试 维护 需求分析 1.用户的信息管理 2.新增支出 3.新增收入 4.支出统计 5.收入…...
嵌入式C语言中if/else如何优化详解
观点一(灵剑): 前期迭代懒得优化,来一个需求,加一个if,久而久之,就串成了一座金字塔。 当代码已经复杂到难以维护的程度之后,只能狠下心重构优化。那,有什么方案可以优雅…...
【LSTM】读取时间序列数据 | 时间序列数据的小批量划分方法
由于序列数据本质上是连续的,因此我们在处理数据时需要解决这个问题。当序列过长而不能被模型一次性全部处理时,我们希望能拆分这样的序列以便模型方便读取。 Q:怎样随机生成一个具有n个时间步的mini batch的特征和标签? A&…...
K8s in Action 阅读笔记——【12】Securing the Kubernetes API server
K8s in Action 阅读笔记——【12】Securing the Kubernetes API server 12.1 Understanding authentication 在上一章中,我们提到API服务器可以配置一个或多个认证插件(授权插件也是同样的情况)。当API服务器接收到一个请求时,它…...
爆肝整理,3个月从功能进阶自动化测试,一跃成测试卷王...
目录:导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜) 前言 首先先了解自动化…...
人生这场概率游戏,怎么玩
只会标准答案,是不可救药的愚蠢 那么为了便于理解,我用一些典型的案例来讲解,什么是概率游戏,以及这个游戏,应该怎么玩。 比如典型的相亲,婚恋。人生大事,用标准答案来说,你的意中人…...
Redis笔记
缓存过期时间很重要!redis是单线程的 对于内存过多的3中方案: 惰性删除: 在定时删除的基础上,对于已经过期了的数据,redis的随机选择算法一直没有选中这个数据,所以导致它就一直没被删除,但是…...
centos 安装supervisor并运行网站
前言 之前一直用宝塔的**进程守护管理器【Supervisor】**来启动一些项目,如ThinkPHP、Hyperf的项目,或laravel的一些命令。如果不用宝塔怎么办呢? 一、简介[supervisor] [Supervisor] 是用Python开发的一个client/server服务,是Linux/Unix系统下的一个进程管理工具,不支…...
Hadoop面试题十道
问题 1:Hadoop是什么? 答案:Hadoop是一个开源的分布式计算框架,用于处理大规模数据集的存储和处理。它基于Google的MapReduce和Google文件系统(GFS)的思想,旨在解决大数据量的处理和分析问题。…...
使用Docker-Compose对Docker容器集群快速编排
目录 一、Docker-Compose1、Docker-Compose使用场景2、Docker-Compose简介3、Docker-Compose安装部署4、YAML 文件格式及编写注意事项5、Docker Compose配置常用字段6、Docker Compose 常用命令7、Docker Compose 文件结构8、docker Compose撰写nginx 镜像9、docker Compose撰写…...
React-Redux 对Todolist修改
在单独使用redux的时候 需要手动订阅store里面 感觉特别麻烦 不错的是react有一个组件可以帮我们解决这个问题, 那就是react-redux。 react-redux提供了Provider 和 connent给我们使用。 先说一下几个重点知道的知识 Provider 就是用来提供store里面的状态 自动getState()co…...
初识微信小程序
新建小程序 创建一个新的微信小程序项目: 打开微信开发者工具,点击“新建项目”。 在弹出的窗口中,填写小程序的 AppID、项目名称和项目目录等信息。 点击“确定”按钮,等待微信开发者工具自动下载并安装所需的依赖库和框架。 …...
我们该如何入门编程呢
提醒:以下内容仅做参考,可自行发散。在发布作品前,请把不需要的内容删掉。 随着信息技术的快速发展,编程已经成为一个越来越重要的技能。那么,我们该如何入门编程呢?选择编程语言:选择一种编程…...
App 软件开发《判断6》试卷及答案
App 软件开发《判断6》试卷及答案 文章目录 App 软件开发《判断6》试卷及答案判断题(对的打“√”,错的打“”;共0分)1.”ionic resources --icon"命令用于生成适应不同分辨率的App图标所应用的图片。(✔)2&#…...
MVC工作原理
MVC工作原理 有视图的情况 1.客户端(浏览器)发起请求,DispatcherServlet拦截请求。 2.DispatcherServlet根据请求信息调用HandlerMapping。HandlerMapping根据uri去匹配查询能处理的Handler(也就是我们所说的Controller&#x…...
使用 Redis 统计网站 UV 的方法
使用 Redis 统计网站 UV 的方法(概率算法) 文章目录 前言思路HyperLogLog 使用 Redis 命令操作使用 Java 代码操作 HyperLogLog 实现原理及特点使用 Java 实现 HyperLogLog小结 前言 网站 UV 就是指网站的独立用户访问量Unique Visitor,即相同用户的多次访问需要…...
在rocky linux 9.5上在线安装 docker
前面是指南,后面是日志 sudo dnf config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo sudo dnf install docker-ce docker-ce-cli containerd.io -y docker version sudo systemctl start docker sudo systemctl status docker …...
【位运算】消失的两个数字(hard)
消失的两个数字(hard) 题⽬描述:解法(位运算):Java 算法代码:更简便代码 题⽬链接:⾯试题 17.19. 消失的两个数字 题⽬描述: 给定⼀个数组,包含从 1 到 N 所有…...
【Go】3、Go语言进阶与依赖管理
前言 本系列文章参考自稀土掘金上的 【字节内部课】公开课,做自我学习总结整理。 Go语言并发编程 Go语言原生支持并发编程,它的核心机制是 Goroutine 协程、Channel 通道,并基于CSP(Communicating Sequential Processes࿰…...
高危文件识别的常用算法:原理、应用与企业场景
高危文件识别的常用算法:原理、应用与企业场景 高危文件识别旨在检测可能导致安全威胁的文件,如包含恶意代码、敏感数据或欺诈内容的文档,在企业协同办公环境中(如Teams、Google Workspace)尤为重要。结合大模型技术&…...
Axios请求超时重发机制
Axios 超时重新请求实现方案 在 Axios 中实现超时重新请求可以通过以下几种方式: 1. 使用拦截器实现自动重试 import axios from axios;// 创建axios实例 const instance axios.create();// 设置超时时间 instance.defaults.timeout 5000;// 最大重试次数 cons…...
AI编程--插件对比分析:CodeRider、GitHub Copilot及其他
AI编程插件对比分析:CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展,AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者,分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...
汇编常见指令
汇编常见指令 一、数据传送指令 指令功能示例说明MOV数据传送MOV EAX, 10将立即数 10 送入 EAXMOV [EBX], EAX将 EAX 值存入 EBX 指向的内存LEA加载有效地址LEA EAX, [EBX4]将 EBX4 的地址存入 EAX(不访问内存)XCHG交换数据XCHG EAX, EBX交换 EAX 和 EB…...
tree 树组件大数据卡顿问题优化
问题背景 项目中有用到树组件用来做文件目录,但是由于这个树组件的节点越来越多,导致页面在滚动这个树组件的时候浏览器就很容易卡死。这种问题基本上都是因为dom节点太多,导致的浏览器卡顿,这里很明显就需要用到虚拟列表的技术&…...
CMake控制VS2022项目文件分组
我们可以通过 CMake 控制源文件的组织结构,使它们在 VS 解决方案资源管理器中以“组”(Filter)的形式进行分类展示。 🎯 目标 通过 CMake 脚本将 .cpp、.h 等源文件分组显示在 Visual Studio 2022 的解决方案资源管理器中。 ✅ 支持的方法汇总(共4种) 方法描述是否推荐…...
GC1808高性能24位立体声音频ADC芯片解析
1. 芯片概述 GC1808是一款24位立体声音频模数转换器(ADC),支持8kHz~96kHz采样率,集成Δ-Σ调制器、数字抗混叠滤波器和高通滤波器,适用于高保真音频采集场景。 2. 核心特性 高精度:24位分辨率,…...
