RabbitMQ消息可靠性(一)-- 生产者消息确认
前言
在项目中,引入了RabbitMQ这一中间件,必然也需要在业务中增加对数据安全性的一层考虑,来保证RabbitMQ消息的可靠性,否则一个个消息丢失可能导致整个业务的数据出现不一致等问题,对系统带来巨大的影响,消息的可靠性可以主要在三个方面去考虑:生产者消息确认,消费者消息确认,消息持久化,这篇文件说明生产者消息确认的。
一、消息确认流程图

由图可知,消息确认是分为生产者确认和消费者确认的,生产者和MQ之间的消息确认机制为生产者消息确认,MQ和消费者之间的消息确认机制为消费者消息确认
消息丢失的情景有三种情况:
- 发送消息过程中出现网络问题:生产者以为发送成功,但MQ没有收到;(需要生产者消息确认)
- 接收到消息后由于MQ服务器宕机或重启等原因(消息默认存在内存中)导致消息丢失;(需要消息持久化)
- 消费者接收到消息后处理消息出错,没有完成消息的处理,但是自动返回ack(这时候需要开启手动确认模式,消费者消息确认)
二、生产者消息确认
RabbitMQ提供了publisher confirm机制来避免消息投递到MQ过程中丢失。这种机制下每个message都必须要有一个独一无二的ID,来区分开不同的消息,避免ack(消息确认参数)冲突。每当消息发送到MQ成功后,MQ都会返回一个结果给生产者,以保证生产者消息确认。在生产者消息确认时,又有两种返回结果方式(通常两个都要实现)来确保消息投递可靠性,分别为publisher-confirm和publisher-return,以下作出说明。
1、publisher-confirm(发送者确认)
消息成功投递到交换机,返回ack
消息未投递到交换机,返回nack
2、publisher-return(发送者回执)
消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。
三、代码实现
1、配置文件
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest#确认消息已发送到交换机(Exchange)publisher-confirm-type: correlated#确认消息已发送到队列(Queue)publisher-returns: true
publish-confirm-type有三个值,
- none:禁用发布确认模式,是默认值
- simple:同步等待confirm结果,直到超时
- correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publisher-returns:开启消息失败回调,回调函数ReturnCallback
2、配置ConfirmCallback函数和ReturnCallback函数
/*** 生产者消息回调配置类*/
@Configuration
@Slf4j
public class ProviderCallBackConfig {@Resourceprivate CachingConnectionFactory cachingConnectionFactory;@Beanpublic RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);// 当mandatory设置为true时,若exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,//那么broker会调用basic.return方法将消息返还给生产者。// 当mandatory设置为false时,出现上述情况broker会直接将消息丢弃。rabbitTemplate.setMandatory(true);/*** TODO RabbitMQ生产者发送消息确认回调,解决消息可靠性问题* 消息确认回调,确认消息是否到达broker* data:消息唯一标识* ack:确认结果* cause:失败原因*/rabbitTemplate.setConfirmCallback((data, ack, cause) -> {if (ack) {//消息发送成功后,更新数据库消息状态等逻辑log.info("消息发送至exchange成功------>消息唯一标识: {}, 确认状态: {}, 造成原因: {}",data, ack, cause);} else {//信息发送失败,打印日志后,可以根据业务选择是否重发消息log.info("消息发送至exchange失败------>消息唯一标识: {}, 确认状态: {}, 造成原因: {}", data, ack, cause);}});/*** TODO RabbitMQ生产者发送消息失败回调,解决消息可靠性问题* message 消息* replyCode 回应码* replyText 回应信息* exchange 交换机* routingKey 路由键*/rabbitTemplate.setReturnsCallback((res) -> {//若发送失败,打印错误信息,然后可以根据业务选择重发消息log.error("消息发送至queue失败-------->res: {}", JSON.toJSONString(res));});return rabbitTemplate;}}
到这里,生产者推送消息的消息确认调用回调函数已经完毕。
可以看到上面写了两个回调函数,一个叫 ConfirmCallback ,一个叫 RetrunCallback;
那么以上这两种回调函数都是在什么情况会触发呢?
先从总体的情况分析,推送消息存在3种情况:
①消息推送到server,但是在server里找不到交换机
②消息推送到server,找到交换机了,但是没找到队列
③消息推送成功
①消息推送到server,但是在server里找不到交换机
写个测试接口,把消息推送到名为‘non-existent-exchange’的交换机上(这个交换机是没有创建没有配置的):
@GetMapping("/testProviderMessageBack")@ApiOperation(value = "测试生产者消息回调")@ApiOperationSupport(order = 5)public String testProviderMessageBack() {CorrelationData data = new CorrelationData();data.setId("111");rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", "测试生产者消息回调",data);return "ok";}
调用接口,查看项目的控制台输出情况(原因里面有说,没有找到交换机'non-existent-exchange'):
结论: ①这种情况触发的是 ConfirmCallback 回调函数
消息发送至exchange失败------>
消息唯一标识: CorrelationData [id=111],
确认状态: false,
造成原因: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost '/', class-id=60, method-id=40)
②消息推送到server,找到交换机了,但是没找到队列
这种情况就是需要新增一个交换机,但是不给这个交换机绑定队列,我来简单地在DirectRabitConfig里面新增一个直连交换机,名叫‘lonelyDirectExchange’,但没给它做任何绑定配置操作:
@BeanDirectExchange lonelyDirectExchange() {return new DirectExchange("lonelyDirectExchange");}
然后写个测试接口,把消息推送到名为‘lonelyDirectExchange’的交换机上(这个交换机是没有任何队列配置的):
@GetMapping("/testProviderMessageBack2")@ApiOperation(value = "测试生产者消息回调2")@ApiOperationSupport(order = 6)public String testProviderMessageBack2() {CorrelationData data = new CorrelationData();data.setId("222");rabbitTemplate.convertAndSend("lonelyDirectExchange", "TestLonelyDirectRouting", "测试生产者消息回调2",data);return "ok";}
消息发送至exchange成功------>
消息唯一标识: CorrelationData [id=222],
确认状态: true,
造成原因: null
消息发送至queue失败-------->
res: {"exchange":"lonelyDirectExchange","message":{"body":"5rWL6K+V55Sf5Lqn6ICF5raI5oGv5Zue6LCDMg==","messageProperties":{"contentEncoding":"UTF-8","contentLength":0,"contentType":"text/plain","deliveryTag":0,"finalRetryForMessageWithNoId":false,"headers":{"spring_returned_message_correlation":"222"},"lastInBatch":false,"priority":0,"projectionUsed":false,"publishSequenceNumber":0,"receivedDeliveryMode":"PERSISTENT"}},"replyCode":312,"replyText":"NO_ROUTE","routingKey":"TestLonelyDirectRouting"}
这种情况下,两个函数都被调用了,
消息是推送成功到交换机了的,所以ConfirmCallback对消息确认情况是true;
而在RetrunCallback回调函数的打印参数里面可以看到,在路由分发给队列的时候,找不到队列,所以报了错误 NO_ROUTE 。
结论:②这种情况触发的是 ConfirmCallback和RetrunCallback两个回调函数。
③消息推送成功
那么测试下,按照正常调用之前消息推送的接口就行,就调用下 /sendDirectMessage接口,可以看到控制台输出:
结论:这种情况触发的是 ConfirmCallback 回调函数。
相关文章:
RabbitMQ消息可靠性(一)-- 生产者消息确认
前言 在项目中,引入了RabbitMQ这一中间件,必然也需要在业务中增加对数据安全性的一层考虑,来保证RabbitMQ消息的可靠性,否则一个个消息丢失可能导致整个业务的数据出现不一致等问题,对系统带来巨大的影响,…...
9 种方法使用 Amazon CodeWhisperer 快速构建应用
Amazon CodeWhisperer 是一款很赞的生成式人工智能编程工具。自从在工作中使用了 CodeWhisperer,我发现不仅代码编译的效率有所提高,应用开发的工作也变得快乐起来。然而,任何生成式 AI 工具的有效学习都需要初学者要有接受新工作方式的心态和…...
性能测试-性能工程落地的4个阶段(21)
性能工程按照不同的内容和目的划分为4个阶段,分别是线下单系统压测分析阶段、线下全链路压测分析阶段、生产只读业务压测及容量评估阶段、生产读写业务全链路压测及容量评估阶段。(也可以理解为一个企业性能测试体系的发展阶段) 线下单系统压测分析阶段 针对单系统的性能…...
小程序 navigateBack 携带参数返回的三种方式(详细)
如果觉着主图好看,点个赞,你早晚也会看到这么好看的景色! 第一种方式 getCurrentPages 获取当前页面栈。数组中第一个元素为首页,最后一个元素为当前页面。不要尝试修改页面栈,会导致路由以及页面状态错误。不要在 App.onLaunch 的时候调用 getCurrentPages(),此时 page …...
通过内网穿透实现远程连接群晖Drive,轻松实现异地访问群晖NAS
文章目录 前言1.群晖Synology Drive套件的安装1.1 安装Synology Drive套件1.2 设置Synology Drive套件1.3 局域网内电脑测试和使用 2.使用cpolar远程访问内网Synology Drive2.1 Cpolar云端设置2.2 Cpolar本地设置2.3 测试和使用 3. 结语 前言 群晖作为专业的数据存储中心&…...
vue3 + vite常用工具
1. plop 1.1 安装 yarn add plop -D1.2 使用 1.2.1 package.json 配置脚本命令 "scripts": {"dev": "vite --mode dev","build": "vue-tsc --noEmit && vite build","serve": "vite preview"…...
Vue框架分享与总结
总结开发中最常用的vue语法,以及对特定语法的理解。vue官网 文章目录 一、创建vue项目1、使用开发工具创建2、使用命令行创建3、vue框架结构4、Vue文件结构 二、Vue 常用模板语法1、v-if、v-show2、v-for3、v-on4、v-bind5、v-model 三、组件通信1、父组件给子组件传…...
声音生成评价指标——使用声音分类模型评价生成声音质量(基于resnetish、VGGish、AlexNet)
文章目录 引言正文数据预处理将wav转成log-mel频谱图进行保存创建dataset类保存数据 模型定义模型训练过程训练代码定义loss为nan从AlexNet到ResNetloss上下剧烈波动——使用学习率衰减策略学习率调整——根据准确率来调整学习率数据处理问题 模型的测试 总结 引言 这篇文章主要…...
HarmonyOS学习路之方舟开发框架—学习ArkTS语言(状态管理 六)
AppStorage:应用全局的UI状态存储 AppStorage是应用全局的UI状态存储,是和应用的进程绑定的,由UI框架在应用程序启动时创建,为应用程序UI状态属性提供中央存储。 和LocalStorage不同的是,LocalStorage是页面级的&…...
SPA首屏加载速度慢
什么是首屏加载 首屏时间(First Contentful Paint),指的是浏览器从响应用户输入网址地址,到首屏内容渲染完成的时间,此时整个网页不一定要全部渲染完成,但需要展示当前视窗需要的内容 首屏加载可以说是用…...
JVM执行流程
一、Java为什么是一种跨平台的语言? 通常,我们编写的java源代码会被JDK的编译器编译成字节码文件,再由JVM将字节码文件翻译成计算机读的懂得机器码进行执行;因为不同平台使用的JVM不一样,所以不同的JVM会把相同的字节码…...
laravel 凌晨0点 导出数据库
一、创建导出模型 <?php namespace App\Models;use Illuminate\Support\Facades\DB;class DbBackup {private $table;public function __construct(){$this->table env(DB_DATABASE);}public function run($file ){$file !$file ? public_path($this->t…...
mysql MVCC多版本并发控制
mvcc的概念 mvcc 的实现依赖于: 隐藏字段 行格式(row_id,trx_id,roll_ponter)UndologRead view innodb 存储引擎的表来说,聚集索引记录中都包含两个必要的隐藏字段,row_id(如果没有聚集索引,才会创建的) …...
new/delete, malloc/free 内存泄漏如何检测
区别: 首先new/delete是运算符,malloc/free是库函数。malloc/free只开辟内存不初始化;new/delete及开辟内存也初始化。抛出异常的方式:new/delete开辟失败使用抛出bad_alloc;malloc/free通过返回值判断。malloc和new区…...
Java开发推荐关注的网站
一、开发者社区 阿里云开发者社区:https://developer.aliyun.com/腾讯云开发者社区:https://cloud.tencent.com/developer 二、开发规范 阿里巴巴Java开发规范 github地址:https://github.com/alibaba/p3c gitcode地址:https:/…...
OpenHarmony社区运营报告(2023年8月)
本月快讯 2023年8月3日,OpenAtom OpenHarmony(以下简称“OpenHarmony”)发布了Beta2版本。OpenHarmony 4.0 Beta2在系统能力、应用框架、分布式通信、媒体功能、安全性等方面进行了全面升级。其中,ArkUI增强了界面组件能力&#x…...
Web学习笔记-React(路由)
笔记内容转载自 AcWing 的 Web 应用课讲义,课程链接:AcWing Web 应用课。 CONTENTS 1. Web分类2. Route组件3. URL中传递参数4. Search Params传递参数5. 重定向6. 嵌套路由 本节内容是如何将页面和 URL 一一对应起来。 1. Web分类 Web 页面可以分为两…...
MySQL无法查看系统默认字符集以及校验规则
show variables like character_set_database; show variables like collation_database;这个错误信息表示MySQL在尝试访问performance_schema.session_variables表时,发现该表不存在。这个问题可能是由于MySQL的版本升级导致的。解决这个问题的一种方法是运行mysql…...
不负昭华,前程似锦,新一批研发效能认证证书颁发丨IDCF
亲爱的认证学员, 恭喜你成功获得由国家工业和信息化部教育与考试中心颁发的职业技术证书——《研发效能(DevOps)工程师国家职业技术认证》。你的努力和才华得到了官方的认可,这是你职业生涯中的一个重要的里程碑。 这个证书不仅代表着你的专业知识和技…...
深入理解ES6模块化:语法、特性与最佳实践
目录 一、前言 二、ES6模块化基础 1. 模块的定义与导出 2. 模块的导入与使用 3. 模块默认导出与命名导出 4. 模块的循环引用与解决方案 三、模块化语法进阶 1. 模块的命名导出与默认导出的混合使用 2. 模块的别名导出与导入 3. 命名空间的使用与作用 4. 动态导入模块…...
浅谈 React Hooks
React Hooks 是 React 16.8 引入的一组 API,用于在函数组件中使用 state 和其他 React 特性(例如生命周期方法、context 等)。Hooks 通过简洁的函数接口,解决了状态与 UI 的高度解耦,通过函数式编程范式实现更灵活 Rea…...
MPNet:旋转机械轻量化故障诊断模型详解python代码复现
目录 一、问题背景与挑战 二、MPNet核心架构 2.1 多分支特征融合模块(MBFM) 2.2 残差注意力金字塔模块(RAPM) 2.2.1 空间金字塔注意力(SPA) 2.2.2 金字塔残差块(PRBlock) 2.3 分类器设计 三、关键技术突破 3.1 多尺度特征融合 3.2 轻量化设计策略 3.3 抗噪声…...
云计算——弹性云计算器(ECS)
弹性云服务器:ECS 概述 云计算重构了ICT系统,云计算平台厂商推出使得厂家能够主要关注应用管理而非平台管理的云平台,包含如下主要概念。 ECS(Elastic Cloud Server):即弹性云服务器,是云计算…...
PHP和Node.js哪个更爽?
先说结论,rust完胜。 php:laravel,swoole,webman,最开始在苏宁的时候写了几年php,当时觉得php真的是世界上最好的语言,因为当初活在舒适圈里,不愿意跳出来,就好比当初活在…...
云启出海,智联未来|阿里云网络「企业出海」系列客户沙龙上海站圆满落地
借阿里云中企出海大会的东风,以**「云启出海,智联未来|打造安全可靠的出海云网络引擎」为主题的阿里云企业出海客户沙龙云网络&安全专场于5.28日下午在上海顺利举办,现场吸引了来自携程、小红书、米哈游、哔哩哔哩、波克城市、…...
Qwen3-Embedding-0.6B深度解析:多语言语义检索的轻量级利器
第一章 引言:语义表示的新时代挑战与Qwen3的破局之路 1.1 文本嵌入的核心价值与技术演进 在人工智能领域,文本嵌入技术如同连接自然语言与机器理解的“神经突触”——它将人类语言转化为计算机可计算的语义向量,支撑着搜索引擎、推荐系统、…...
Neo4j 集群管理:原理、技术与最佳实践深度解析
Neo4j 的集群技术是其企业级高可用性、可扩展性和容错能力的核心。通过深入分析官方文档,本文将系统阐述其集群管理的核心原理、关键技术、实用技巧和行业最佳实践。 Neo4j 的 Causal Clustering 架构提供了一个强大而灵活的基石,用于构建高可用、可扩展且一致的图数据库服务…...
EtherNet/IP转DeviceNet协议网关详解
一,设备主要功能 疆鸿智能JH-DVN-EIP本产品是自主研发的一款EtherNet/IP从站功能的通讯网关。该产品主要功能是连接DeviceNet总线和EtherNet/IP网络,本网关连接到EtherNet/IP总线中做为从站使用,连接到DeviceNet总线中做为从站使用。 在自动…...
MySQL中【正则表达式】用法
MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现(两者等价),用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例: 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...
自然语言处理——循环神经网络
自然语言处理——循环神经网络 循环神经网络应用到基于机器学习的自然语言处理任务序列到类别同步的序列到序列模式异步的序列到序列模式 参数学习和长程依赖问题基于门控的循环神经网络门控循环单元(GRU)长短期记忆神经网络(LSTM)…...
