RabbitMQ-消息的可靠性投递
文章目录
- 0. 什么是消息的可靠性投递
- 1. confirm机制
- 2. return机制
- 3. 总结
0. 什么是消息的可靠性投递
在生产环境中,如果因为一些不明原因导致RabbitMQ重启,RabbitMQ重启过程中是无法接收消息的,那么我们就需要生产者重新发送消息。或者在消息从交换机到队列的过程中出现意外,消息没有正常投放,我们需要消息回到交换机重新投放。
为了解决因为这些种种意外而产生的问题,就需要使用消息的可靠性投递。我们需要在三个过程保证消息传输的正常 :
- 消息
从生产者到交换机的过程 - 消息
从交换机到队列的过程 - 消息
从队列到消费者的过程
消息从生产者到交换机的过程中,我们可以使用confirm机制来保障消息的可靠性。
消息从交换机到队列的过程中,我们可以使用return机制来保障消息的可靠性。
消息从队列到消费者的过程中,我们可以使用ack这个参数来保障消息的可靠性。
其中confirm机制和return机制都是使用回调函数,当消息投放失败后在回调函数中将消息放入redis,用定时任务重新投放。需要我们自己编码。
ack参数是RabbitMQ实现的,我们需要手动ack。RabbitMQ将一条消息推送给消费者,如果没有接收到ack就会重发。
1. confirm机制
使用confirm机制需要在配置文件中将rabbitmq.publisher-confirm-type设置为correlated。表示使用confirm机制。
publisher-confirm-type: correlated
publisher-confirm-type有以下三种值:
simple:简单的执行ack的判断,在发布消息成功后使用 rabbitTemplate调用waitForConfirms或者waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判断下一步的逻辑。但是要注意的是当 waitForConfirmsOrDie 方法如果返回false则会关闭channel。(同步confirm,性能较差)correlated:打开消息确认机制, 执行ack的时候还会携带消息的元数据。none:禁用发布确认模式,默认值。
最后在RabbitTemplate中设置回调函数。这个函数不管是消息发送成功与否都会执行。该函数接收一个接口 :ConfirmCallback,此接口只有一个方法 :
void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);
-
correlationData :保存消息id以及相关信息
-
ack :交换机是否收到消息,收到为true
-
cause :消息接收不到原因,成功接收消息则为null
在这个函数里,可以判断ack,如果为false则代表信息发送失败,可以存入redis,让定时任务扫描redis重新发送消息。(解决方案有很多,这里简单提个建议)
@Slf4j
@Configuration
public class RabbitConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {// 打印日志log.info("ConfirmCallback:" + "相关数据:" + correlationData);log.info("ConfirmCallback:" + "确认情况:" + ack);log.info("ConfirmCallback:" + "原因:" + cause);// 存入数据库中定时重发.// ...});return rabbitTemplate;}
}
总结 :
confirm机制有两步 :
-
在yml配置文件中将
publisher-confirm-type设置为correlated,开启confirm机制publisher-confirm-type: correlated -
设置回调函数,setConfirmCallback
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {// 打印日志log.info("ConfirmCallback:" + "相关数据:" + correlationData);log.info("ConfirmCallback:" + "确认情况:" + ack);log.info("ConfirmCallback:" + "原因:" + cause);// 存入数据库中定时重发.// ... });
2. return机制
在了解return 机制前先了解一个参数 :mandatory。
mandatory是AMQP协议中basic.publish方法中的标识位。
如果交换机根据路由键找不到对应的队列,那么此时它有两个选择 :将消息返回交换机、将消息丢弃,当mandatory的值为true时,消息将返回给交换机;当mandatory的值为false时,消息将被丢弃。
我们想要实现消息的可靠性投递,当消息投放错时让消息重新投放,那么我们就需要将mandatory的值设置为true。
所以实现return机制有三步 :
想使用return机制,首先要在配置文件中将publisher-returns的值设置为true,代表开启return机制
publisher-returns: true
接下来将mandatory这个值设置为true,代表如果消息丢失或者出现意外,将消息返回,而不是丢弃。
rabbitTemplate.setMandatory(true);
最后实现ReturnCallback这个接口 :
void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey);
- message :此条消息
- replyCode :错误编码
- replyText :消息接收失败的原因
- exchange :此条消息的目标交换机
- routingKey :此条消息的路由键
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 打印日志log.info("ReturnCallback:" + "消息:" + message);log.info("ReturnCallback:" + "回应码:" + replyCode);log.info("ReturnCallback:" + "回应信息:" + replyText);log.info("ReturnCallback:" + "交换机:" + exchange);log.info("ReturnCallback:" + "路由键:" + routingKey);// 做其他处理...
});
执行到returnCallback函数就代表消息从交换机到队列的过程出现问题,例如路由键错误、网络问题、找不到相应队列…这些情况下可以先打印日志,再使用手动签收机制让交换机重新发送消息。
因为路由键错误这种就是代码写错了,重发消息也没用,只能打印日志等操作来尽可能提醒开发人员,但是网络问题可以重新发送消息。
3. 总结
实现消息的可靠性投递通过confirm机制和return机制。
-
在配置文件中开启这两种机制:
publisher-confirm-type: correlated publisher-returns: true -
注入RabbitTemplate时将mandatory设置为true,代表消息投递异常时将消息返回,而不是丢弃。
-
注入RabbitTemplate时实现ConfirmCallback和ReturnCallback。在里面完成消息投放错误后的保障工作。
@Slf4j @Configuration public class RabbitConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {// 打印日志log.info("ConfirmCallback:" + "相关数据:" + correlationData);log.info("ConfirmCallback:" + "确认情况:" + ack);log.info("ConfirmCallback:" + "原因:" + cause);// 存入redis数据库中定时重发.// ...});rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 打印日志log.info("ReturnCallback:" + "消息:" + message);log.info("ReturnCallback:" + "回应码:" + replyCode);log.info("ReturnCallback:" + "回应信息:" + replyText);log.info("ReturnCallback:" + "交换机:" + exchange);log.info("ReturnCallback:" + "路由键:" + routingKey);});return rabbitTemplate;} }- 在保证confirm机制和return机制的同时别忘记让消费者手动ack。
注意 :
为什么return机制和ack机制可以重新发送,而confirm机制只能借用外部手段呢(例如redis、xxl-job)?
因为return机制触发时消息已经到了交换机,可以通过判断通过mandatory这个参数来确定是重新发送还是丢弃。(只是在发送给队列时出现错误,人家RabbitMQ自己设计的可以重发。)
但是confirm机制触发时消息刚从生产者发送给交换机,还没进入RabbitMQ,只能借用外部手段了。
相关文章:
RabbitMQ-消息的可靠性投递
文章目录0. 什么是消息的可靠性投递1. confirm机制2. return机制3. 总结0. 什么是消息的可靠性投递 在生产环境中,如果因为一些不明原因导致RabbitMQ重启,RabbitMQ重启过程中是无法接收消息的,那么我们就需要生产者重新发送消息。或者在消息…...
华为OD机试题 - 最小叶子节点(JavaScript)| 含思路
华为OD机试题 最近更新的博客使用说明本篇题解:最小叶子节点题目输入输出示例一输入输出示例二输入输出Code解题思路华为OD其它语言版本最近更新的博客 华为od 2023 | 什么是华为od,od 薪资待遇,od机试题清单华为OD机试真题大全,用 Python 解华为机试题 | 机试宝典【华为OD…...
嵌入式系统硬件设计与实践(开发过程)
【 声明:版权所有,欢迎转载,请勿用于商业用途。 联系信箱:feixiaoxing 163.com】 如果把电路设计看成是画板子的,这本身其实是狭隘了。嵌入式硬件设计其实是嵌入式系统中很重要的一个部分。里面软件做的什么样…...
入门vue(1-10)
正确学习方式:视频->动手实操->压缩提取->记录表述 1基础结构 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"&…...
C#开发的OpenRA的游戏主界面怎么样创建3
继续游戏主界面创建的主题, 我们知道游戏的主界面上有很多部件,比如显示文本的标签(LabelWidget), 显示按钮(ButtonWidget)。那么这些部件又是如何创建在主界面上的呢? 其实这些部件是否显示,都是来源于文件yaml,在这里就是文件mainmenu.yaml, 在这个文件里定义了所有…...
秒懂算法 | 基于主成分分析法、随机森林算法和SVM算法的人脸识别问题
本文的任务与手写数字识别非常相似,都是基于图片的多分类任务,也都是有监督的。 01、数据集介绍与分析 ORL人脸数据集共包含40个不同人的400张图像,是在1992年4月至1994年4月期间由英国剑桥的Olivetti研究实验室创建。 此数据集下包含40个目录,每个目录下有10张图像,每个…...
QML Loader(加载程序)
Loader加载器用于动态加载 QML 组件。加载程序可以加载 QML 文件(使用 source 属性)或组件对象(使用 sourceComponent 属性) 常用属性: active 活动asynchronous异步,默认为falseitem项目progress 进度so…...
C++——类型转换
目录 C语言中的类型转换 C强制类型转换 static_cast reinterpret_cast const_cast dynamic_cast 延伸问题 RTTI(了解) C语言中的类型转换 在C语言中,如果赋值运算符左右两侧类型不同,或者形参与实参类型不匹配,或…...
vue3:生命周期(onErrorCaptured)
一、背景 当项目如果发生报错,影响程序体验。如果能以捕获的方式得到错误信息,而且还能定位问题,这样就好了,本文介绍onErrorCaptured实现我们想要的效果。 vue2:errorCaptured。使用与vue3同理。 vue3:…...
vue过滤器
vue 过滤器 对要显示的数据进行特定格式化之后再显示 注册过滤器 Vue.filter(name,callback)new Vue({filters:{}}) 使用过滤器 {{ name | 过滤器名 }}v-band:属性“name | 过滤器名” 局部过滤器 <p>{{time | timeFormater }}</p> <!-- 过滤器可接受额外参…...
I/O模型
写在前面 前面聊完了IO方式, 也就意味着网络数据的收发通道是建立起来了。但业务场景中, 通道本身是不会发送数据的。在常见的网络应用中, server端会创建多个链接以服务更多client, 同时要求各个client尽可能互不影响。这是I/O模型(也就是IO方式线程模型)要解决的问题。由于加…...
前端必备技术之——AJAX
简介 AJAX 全称为 Asynchronous JavaScript And XML,就是异步的 JS 和 XML(现在已经基本被json取代)。通过 AJAX 可以在浏览器中向服务器发送异步请求,最大的优势:无刷新获取数据。AJAX 不是新的编程语言,而是一种将现有的标准组…...
MySQL数据库 各种指令操作大杂烩(DML增删改、DQL查询、SQL...)
文章目录前言一、DML 增删改添加数据修改数据删除数据二、DQL 查询基本查询条件查询聚合函数(count、max、min、avg、sum)分组查询(group by)排序查询(order by)分页查询(limit)DQL 语句练习三、SQLDCL 权限控制约束案例多表查询事务存储引擎字符串函数数值函数日期函数流程函数…...
Java分布式全局ID(一)
随着互联网的不断发展,互联网企业的业务在飞速变化,推动着系统架构也在不断地发生变化。 如今微服务技术越来越成熟,很多企业都采用微服务架构来支撑内部及对外的业务,尤其是在高 并发大流量的电商业务场景下,微服务…...
算法分析与设计之并查集详解
算法分析与设计之并查集1.前言2.并查集的基础2.1.关于动态连通性2.2.动态连通性的应用场景:2.3.对问题建模:2.4.建模思路:2.5.API2.7.Quick-Find算法:2.8.Quick-Union算法:3. 并查集的应用1.前言 本文主要介绍解决动态…...
Linux - 内存性能评估
文章目录概述free 命令指定的时间段内不间断地监控内存的使用情况通过watch与free相结合动态监控内存状况vmstat命令监控内存“sar –r”命令组合小结概述 内存的管理和优化是系统性能优化的一个重要部分,内存资源的充足与否直接影响应用系统的使用性能。在进行内存…...
00后初中辍学,转行程序员后,终于找到了女朋友
大家好,这里是程序员晚枫,今天继续分享我们的读者投稿,如需投稿赚稿费的朋友,请在后台私信我:投稿。下面我们进入正文吧~ 我是一位 00 后,从初一辍学,到目前为止已有 8 年的时间了,在…...
“Vue学习注意事项:掌握核心特性,注意性能优化和第三方库的使用“
Vue是一款易学易用的JavaScript框架,它可以帮助开发者构建动态、高性能的用户界面。Vue的核心概念包括数据绑定、指令、计算属性和组件化,学习Vue需要注意以下几个点:1. 理解Vue的基本概念和用法Vue的基本概念包括模板、组件、数据绑定、计算…...
计算机网络协议详解(二)
文章目录🔥HTTP协议介绍🔥HTTP协议特点🔥HTTP协议发展和版本🔥HTTP协议中URI、URL、URN🔥HTTP协议的请求分析🔥HTTP协议的响应分析🔥MIME类型🔥HTTP协议介绍 HTTP协议介绍 什么是超…...
【CSS】CSS 复合选择器 ② ( 子元素选择器 | 交集选择器 )
文章目录一、子元素选择器1、语法说明2、代码分析3、代码示例二、交集选择器1、语法说明2、代码示例一、子元素选择器 1、语法说明 子元素选择器 可以选择 某个基础选择器 选择出的 元素组 的 直接子元素 ( 亲儿子元素 ) 中 使用基础选择器 选择 元素 ; 子元素选择器语法 : 父选…...
cf2117E
原题链接:https://codeforces.com/contest/2117/problem/E 题目背景: 给定两个数组a,b,可以执行多次以下操作:选择 i (1 < i < n - 1),并设置 或,也可以在执行上述操作前执行一次删除任意 和 。求…...
Redis数据倾斜问题解决
Redis 数据倾斜问题解析与解决方案 什么是 Redis 数据倾斜 Redis 数据倾斜指的是在 Redis 集群中,部分节点存储的数据量或访问量远高于其他节点,导致这些节点负载过高,影响整体性能。 数据倾斜的主要表现 部分节点内存使用率远高于其他节…...
Rapidio门铃消息FIFO溢出机制
关于RapidIO门铃消息FIFO的溢出机制及其与中断抖动的关系,以下是深入解析: 门铃FIFO溢出的本质 在RapidIO系统中,门铃消息FIFO是硬件控制器内部的缓冲区,用于临时存储接收到的门铃消息(Doorbell Message)。…...
Mac下Android Studio扫描根目录卡死问题记录
环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中,提示一个依赖外部头文件的cpp源文件需要同步,点…...
鸿蒙DevEco Studio HarmonyOS 5跑酷小游戏实现指南
1. 项目概述 本跑酷小游戏基于鸿蒙HarmonyOS 5开发,使用DevEco Studio作为开发工具,采用Java语言实现,包含角色控制、障碍物生成和分数计算系统。 2. 项目结构 /src/main/java/com/example/runner/├── MainAbilitySlice.java // 主界…...
零基础在实践中学习网络安全-皮卡丘靶场(第九期-Unsafe Fileupload模块)(yakit方式)
本期内容并不是很难,相信大家会学的很愉快,当然对于有后端基础的朋友来说,本期内容更加容易了解,当然没有基础的也别担心,本期内容会详细解释有关内容 本期用到的软件:yakit(因为经过之前好多期…...
搭建DNS域名解析服务器(正向解析资源文件)
正向解析资源文件 1)准备工作 服务端及客户端都关闭安全软件 [rootlocalhost ~]# systemctl stop firewalld [rootlocalhost ~]# setenforce 0 2)服务端安装软件:bind 1.配置yum源 [rootlocalhost ~]# cat /etc/yum.repos.d/base.repo [Base…...
解读《网络安全法》最新修订,把握网络安全新趋势
《网络安全法》自2017年施行以来,在维护网络空间安全方面发挥了重要作用。但随着网络环境的日益复杂,网络攻击、数据泄露等事件频发,现行法律已难以完全适应新的风险挑战。 2025年3月28日,国家网信办会同相关部门起草了《网络安全…...
《信号与系统》第 6 章 信号与系统的时域和频域特性
目录 6.0 引言 6.1 傅里叶变换的模和相位表示 6.2 线性时不变系统频率响应的模和相位表示 6.2.1 线性与非线性相位 6.2.2 群时延 6.2.3 对数模和相位图 6.3 理想频率选择性滤波器的时域特性 6.4 非理想滤波器的时域和频域特性讨论 6.5 一阶与二阶连续时间系统 6.5.1 …...
从实验室到产业:IndexTTS 在六大核心场景的落地实践
一、内容创作:重构数字内容生产范式 在短视频创作领域,IndexTTS 的语音克隆技术彻底改变了配音流程。B 站 UP 主通过 5 秒参考音频即可克隆出郭老师音色,生成的 “各位吴彦祖们大家好” 语音相似度达 97%,单条视频播放量突破百万…...
