当前位置: 首页 > news >正文

RabbitMQ消息可靠性(一)-- 生产者消息确认

前言

在项目中,引入了RabbitMQ这一中间件,必然也需要在业务中增加对数据安全性的一层考虑,来保证RabbitMQ消息的可靠性,否则一个个消息丢失可能导致整个业务的数据出现不一致等问题,对系统带来巨大的影响,消息的可靠性可以主要在三个方面去考虑:生产者消息确认,消费者消息确认,消息持久化,这篇文件说明生产者消息确认的。


一、消息确认流程图

由图可知,消息确认是分为生产者确认和消费者确认的,生产者和MQ之间的消息确认机制为生产者消息确认,MQ和消费者之间的消息确认机制为消费者消息确认

消息丢失的情景有三种情况:

  1. 发送消息过程中出现网络问题:生产者以为发送成功,但MQ没有收到;(需要生产者消息确认)
  2. 接收到消息后由于MQ服务器宕机或重启等原因(消息默认存在内存中)导致消息丢失;(需要消息持久化)
  3. 消费者接收到消息后处理消息出错,没有完成消息的处理,但是自动返回ack(这时候需要开启手动确认模式,消费者消息确认)

二、生产者消息确认

RabbitMQ提供了publisher confirm机制来避免消息投递到MQ过程中丢失。这种机制下每个message都必须要有一个独一无二的ID,来区分开不同的消息,避免ack(消息确认参数)冲突。每当消息发送到MQ成功后,MQ都会返回一个结果给生产者,以保证生产者消息确认。在生产者消息确认时,又有两种返回结果方式(通常两个都要实现)来确保消息投递可靠性,分别为publisher-confirm和publisher-return,以下作出说明。

1、publisher-confirm(发送者确认)

消息成功投递到交换机,返回ack

消息未投递到交换机,返回nack

2、publisher-return(发送者回执)

消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。


三、代码实现

1、配置文件

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest#确认消息已发送到交换机(Exchange)publisher-confirm-type: correlated#确认消息已发送到队列(Queue)publisher-returns: true

publish-confirm-type有三个值,

  1. none:禁用发布确认模式,是默认值
  2. simple:同步等待confirm结果,直到超时
  3. correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback

publisher-returns:开启消息失败回调,回调函数ReturnCallback

2、配置ConfirmCallback函数和ReturnCallback函数

/*** 生产者消息回调配置类*/
@Configuration
@Slf4j
public class ProviderCallBackConfig {@Resourceprivate CachingConnectionFactory cachingConnectionFactory;@Beanpublic RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);// 当mandatory设置为true时,若exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,//那么broker会调用basic.return方法将消息返还给生产者。// 当mandatory设置为false时,出现上述情况broker会直接将消息丢弃。rabbitTemplate.setMandatory(true);/*** TODO RabbitMQ生产者发送消息确认回调,解决消息可靠性问题* 消息确认回调,确认消息是否到达broker* data:消息唯一标识* ack:确认结果* cause:失败原因*/rabbitTemplate.setConfirmCallback((data, ack, cause) -> {if (ack) {//消息发送成功后,更新数据库消息状态等逻辑log.info("消息发送至exchange成功------>消息唯一标识: {}, 确认状态: {}, 造成原因: {}",data, ack, cause);} else {//信息发送失败,打印日志后,可以根据业务选择是否重发消息log.info("消息发送至exchange失败------>消息唯一标识: {}, 确认状态: {}, 造成原因: {}", data, ack, cause);}});/*** TODO RabbitMQ生产者发送消息失败回调,解决消息可靠性问题* message      消息* replyCode    回应码* replyText    回应信息* exchange     交换机* routingKey   路由键*/rabbitTemplate.setReturnsCallback((res) -> {//若发送失败,打印错误信息,然后可以根据业务选择重发消息log.error("消息发送至queue失败-------->res: {}", JSON.toJSONString(res));});return rabbitTemplate;}}

到这里,生产者推送消息的消息确认调用回调函数已经完毕。
可以看到上面写了两个回调函数,一个叫 ConfirmCallback ,一个叫 RetrunCallback;
那么以上这两种回调函数都是在什么情况会触发呢?

先从总体的情况分析,推送消息存在3种情况:

①消息推送到server,但是在server里找不到交换机
②消息推送到server,找到交换机了,但是没找到队列
③消息推送成功

①消息推送到server,但是在server里找不到交换机
写个测试接口,把消息推送到名为‘non-existent-exchange’的交换机上(这个交换机是没有创建没有配置的):

@GetMapping("/testProviderMessageBack")@ApiOperation(value = "测试生产者消息回调")@ApiOperationSupport(order = 5)public String testProviderMessageBack() {CorrelationData data = new CorrelationData();data.setId("111");rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", "测试生产者消息回调",data);return "ok";}

调用接口,查看项目的控制台输出情况(原因里面有说,没有找到交换机'non-existent-exchange'):

结论: ①这种情况触发的是 ConfirmCallback 回调函数

消息发送至exchange失败------>

消息唯一标识: CorrelationData [id=111],

确认状态: false,

造成原因: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost '/', class-id=60, method-id=40)

②消息推送到server,找到交换机了,但是没找到队列  
这种情况就是需要新增一个交换机,但是不给这个交换机绑定队列,我来简单地在DirectRabitConfig里面新增一个直连交换机,名叫‘lonelyDirectExchange’,但没给它做任何绑定配置操作:

    @BeanDirectExchange lonelyDirectExchange() {return new DirectExchange("lonelyDirectExchange");}

然后写个测试接口,把消息推送到名为‘lonelyDirectExchange’的交换机上(这个交换机是没有任何队列配置的):

@GetMapping("/testProviderMessageBack2")@ApiOperation(value = "测试生产者消息回调2")@ApiOperationSupport(order = 6)public String testProviderMessageBack2() {CorrelationData data = new CorrelationData();data.setId("222");rabbitTemplate.convertAndSend("lonelyDirectExchange", "TestLonelyDirectRouting", "测试生产者消息回调2",data);return "ok";}

消息发送至exchange成功------>

消息唯一标识: CorrelationData [id=222],

确认状态: true,

造成原因: null

消息发送至queue失败-------->

res: {"exchange":"lonelyDirectExchange","message":{"body":"5rWL6K+V55Sf5Lqn6ICF5raI5oGv5Zue6LCDMg==","messageProperties":{"contentEncoding":"UTF-8","contentLength":0,"contentType":"text/plain","deliveryTag":0,"finalRetryForMessageWithNoId":false,"headers":{"spring_returned_message_correlation":"222"},"lastInBatch":false,"priority":0,"projectionUsed":false,"publishSequenceNumber":0,"receivedDeliveryMode":"PERSISTENT"}},"replyCode":312,"replyText":"NO_ROUTE","routingKey":"TestLonelyDirectRouting"}

这种情况下,两个函数都被调用了,

消息是推送成功到交换机了的,所以ConfirmCallback对消息确认情况是true;
而在RetrunCallback回调函数的打印参数里面可以看到,在路由分发给队列的时候,找不到队列,所以报了错误 NO_ROUTE 。
结论:②这种情况触发的是 ConfirmCallback和RetrunCallback两个回调函数。

③消息推送成功
那么测试下,按照正常调用之前消息推送的接口就行,就调用下 /sendDirectMessage接口,可以看到控制台输出:

结论:这种情况触发的是 ConfirmCallback 回调函数。

相关文章:

RabbitMQ消息可靠性(一)-- 生产者消息确认

前言 在项目中&#xff0c;引入了RabbitMQ这一中间件&#xff0c;必然也需要在业务中增加对数据安全性的一层考虑&#xff0c;来保证RabbitMQ消息的可靠性&#xff0c;否则一个个消息丢失可能导致整个业务的数据出现不一致等问题&#xff0c;对系统带来巨大的影响&#xff0c;…...

9 种方法使用 Amazon CodeWhisperer 快速构建应用

Amazon CodeWhisperer 是一款很赞的生成式人工智能编程工具。自从在工作中使用了 CodeWhisperer&#xff0c;我发现不仅代码编译的效率有所提高&#xff0c;应用开发的工作也变得快乐起来。然而&#xff0c;任何生成式 AI 工具的有效学习都需要初学者要有接受新工作方式的心态和…...

性能测试-性能工程落地的4个阶段(21)

性能工程按照不同的内容和目的划分为4个阶段,分别是线下单系统压测分析阶段、线下全链路压测分析阶段、生产只读业务压测及容量评估阶段、生产读写业务全链路压测及容量评估阶段。(也可以理解为一个企业性能测试体系的发展阶段) 线下单系统压测分析阶段 针对单系统的性能…...

小程序 navigateBack 携带参数返回的三种方式(详细)

如果觉着主图好看,点个赞,你早晚也会看到这么好看的景色! 第一种方式 getCurrentPages 获取当前页面栈。数组中第一个元素为首页,最后一个元素为当前页面。不要尝试修改页面栈,会导致路由以及页面状态错误。不要在 App.onLaunch 的时候调用 getCurrentPages(),此时 page …...

通过内网穿透实现远程连接群晖Drive,轻松实现异地访问群晖NAS

文章目录 前言1.群晖Synology Drive套件的安装1.1 安装Synology Drive套件1.2 设置Synology Drive套件1.3 局域网内电脑测试和使用 2.使用cpolar远程访问内网Synology Drive2.1 Cpolar云端设置2.2 Cpolar本地设置2.3 测试和使用 3. 结语 前言 群晖作为专业的数据存储中心&…...

vue3 + vite常用工具

1. plop 1.1 安装 yarn add plop -D1.2 使用 1.2.1 package.json 配置脚本命令 "scripts": {"dev": "vite --mode dev","build": "vue-tsc --noEmit && vite build","serve": "vite preview"…...

Vue框架分享与总结

总结开发中最常用的vue语法&#xff0c;以及对特定语法的理解。vue官网 文章目录 一、创建vue项目1、使用开发工具创建2、使用命令行创建3、vue框架结构4、Vue文件结构 二、Vue 常用模板语法1、v-if、v-show2、v-for3、v-on4、v-bind5、v-model 三、组件通信1、父组件给子组件传…...

声音生成评价指标——使用声音分类模型评价生成声音质量(基于resnetish、VGGish、AlexNet)

文章目录 引言正文数据预处理将wav转成log-mel频谱图进行保存创建dataset类保存数据 模型定义模型训练过程训练代码定义loss为nan从AlexNet到ResNetloss上下剧烈波动——使用学习率衰减策略学习率调整——根据准确率来调整学习率数据处理问题 模型的测试 总结 引言 这篇文章主要…...

HarmonyOS学习路之方舟开发框架—学习ArkTS语言(状态管理 六)

AppStorage&#xff1a;应用全局的UI状态存储 AppStorage是应用全局的UI状态存储&#xff0c;是和应用的进程绑定的&#xff0c;由UI框架在应用程序启动时创建&#xff0c;为应用程序UI状态属性提供中央存储。 和LocalStorage不同的是&#xff0c;LocalStorage是页面级的&…...

SPA首屏加载速度慢

什么是首屏加载 首屏时间&#xff08;First Contentful Paint&#xff09;&#xff0c;指的是浏览器从响应用户输入网址地址&#xff0c;到首屏内容渲染完成的时间&#xff0c;此时整个网页不一定要全部渲染完成&#xff0c;但需要展示当前视窗需要的内容 首屏加载可以说是用…...

JVM执行流程

一、Java为什么是一种跨平台的语言&#xff1f; 通常&#xff0c;我们编写的java源代码会被JDK的编译器编译成字节码文件&#xff0c;再由JVM将字节码文件翻译成计算机读的懂得机器码进行执行&#xff1b;因为不同平台使用的JVM不一样&#xff0c;所以不同的JVM会把相同的字节码…...

laravel 凌晨0点 导出数据库

一、创建导出模型 <?php namespace App\Models;use Illuminate\Support\Facades\DB;class DbBackup {private $table;public function __construct(){$this->table env(DB_DATABASE);}public function run($file ){$file !$file ? public_path($this->t…...

mysql MVCC多版本并发控制

mvcc的概念 mvcc 的实现依赖于&#xff1a; 隐藏字段 行格式&#xff08;row_id,trx_id,roll_ponter&#xff09;UndologRead view innodb 存储引擎的表来说&#xff0c;聚集索引记录中都包含两个必要的隐藏字段&#xff0c;row_id(如果没有聚集索引&#xff0c;才会创建的) …...

new/delete, malloc/free 内存泄漏如何检测

区别&#xff1a; 首先new/delete是运算符&#xff0c;malloc/free是库函数。malloc/free只开辟内存不初始化&#xff1b;new/delete及开辟内存也初始化。抛出异常的方式&#xff1a;new/delete开辟失败使用抛出bad_alloc&#xff1b;malloc/free通过返回值判断。malloc和new区…...

Java开发推荐关注的网站

一、开发者社区 阿里云开发者社区&#xff1a;https://developer.aliyun.com/腾讯云开发者社区&#xff1a;https://cloud.tencent.com/developer 二、开发规范 阿里巴巴Java开发规范 github地址&#xff1a;https://github.com/alibaba/p3c gitcode地址&#xff1a;https:/…...

OpenHarmony社区运营报告(2023年8月)

本月快讯 2023年8月3日&#xff0c;OpenAtom OpenHarmony&#xff08;以下简称“OpenHarmony”&#xff09;发布了Beta2版本。OpenHarmony 4.0 Beta2在系统能力、应用框架、分布式通信、媒体功能、安全性等方面进行了全面升级。其中&#xff0c;ArkUI增强了界面组件能力&#x…...

Web学习笔记-React(路由)

笔记内容转载自 AcWing 的 Web 应用课讲义&#xff0c;课程链接&#xff1a;AcWing Web 应用课。 CONTENTS 1. Web分类2. Route组件3. URL中传递参数4. Search Params传递参数5. 重定向6. 嵌套路由 本节内容是如何将页面和 URL 一一对应起来。 1. Web分类 Web 页面可以分为两…...

MySQL无法查看系统默认字符集以及校验规则

show variables like character_set_database; show variables like collation_database;这个错误信息表示MySQL在尝试访问performance_schema.session_variables表时&#xff0c;发现该表不存在。这个问题可能是由于MySQL的版本升级导致的。解决这个问题的一种方法是运行mysql…...

不负昭华,前程似锦,新一批研发效能认证证书颁发丨IDCF

亲爱的认证学员&#xff0c; 恭喜你成功获得由国家工业和信息化部教育与考试中心颁发的职业技术证书——《研发效能(DevOps)工程师国家职业技术认证》。你的努力和才华得到了官方的认可&#xff0c;这是你职业生涯中的一个重要的里程碑。 这个证书不仅代表着你的专业知识和技…...

深入理解ES6模块化:语法、特性与最佳实践

目录 一、前言 二、ES6模块化基础 1. 模块的定义与导出 2. 模块的导入与使用 3. 模块默认导出与命名导出 4. 模块的循环引用与解决方案 三、模块化语法进阶 1. 模块的命名导出与默认导出的混合使用 2. 模块的别名导出与导入 3. 命名空间的使用与作用 4. 动态导入模块…...

Matlab图像处理-HSI模型

HSI模型 HSI模型是从人的视觉系统出发&#xff0c;直接使用颜色三要素色调(Hue)、饱和度(Saturation)和亮度&#xff08;Intensity&#xff09;来描述颜色。 亮度是指人眼感知光线的明暗程度。光的能量越大&#xff0c;亮度就越大。 色调是颜色最重要的属性。 它决定了颜色的…...

【Springboot】Springboot如何优雅停机?K8S中Pod如何优雅停机?

什么是优雅停机&#xff1a; 就是对应用进程发送停止指令之后&#xff0c;执行的一系列保证应用正常关闭的操作。这些操作往往包括等待已有请求执行完成、关闭线程、关闭连接和释放资源等 就是对应用进程发送停止指令之后&#xff0c;能保证正在执行的业务操作不受影响&#x…...

伦敦银一手是多少?

伦敦银是以国际现货白银价格为跟踪对象的电子合约交易&#xff0c;无论投资者通过什么地方的平台进入市场&#xff0c;执行的都是统一国际的标准&#xff0c;一手标准的合约所代表的就是5000盎司的白银&#xff0c;如果以国内投资者比较熟悉的单位计算&#xff0c;那约相当于15…...

Language Adaptive Weight Generation for Multi-task Visual Grounding 论文阅读笔记

Language Adaptive Weight Generation for Multi-task Visual Grounding 论文阅读笔记 一、Abstract二、引言三、相关工作3.1 指代表达式理解3.2 指代表达式分割3.3 动态权重网络 四、方法4.1 总览4.2 语言自适应权重生成语言特征聚合权重生成 4.3 多任务头4.4 训练目标 五、实…...

面试算法4:只出现一次的数字

题目 输入一个整数数组&#xff0c;数组中只有一个数字出现了一次&#xff0c;而其他数字都出现了3次。请找出那个只出现一次的数字。例如&#xff0c;如果输入的数组为[0&#xff0c;1&#xff0c;0&#xff0c;1&#xff0c;0&#xff0c;1&#xff0c;100]&#xff0c;则只…...

#与##的用法

# 作用&#xff1a; 左右加双引号&#xff0c;使其变成字符串 #的作用&#xff1a;是在形参左右各加双引号&#xff0c;使它变成字符串。#define STR(param) #paramchar *pStr STR(hello); // 展开后 char *pStr “hello”; ## 作用&#xff1a;胶水&#xff0c;使…...

Flutter的路由router-页面跳转

文章目录 概念介绍基本路由&#xff08;Basic Routing&#xff09;跳转到某个页面弹出页面 命名路由&#xff08;Named Routing&#xff09;第三方路由管理库&#xff08;Third-Party Routing Libraries&#xff09; Android原生的路由Intent-based Routing&#xff08;基于Int…...

24v转5v稳压芯片-5A大电流输出ic

这款24V转5V5A汽车充电芯片具有以下特性和参数&#xff1a; - 宽输入电压范围&#xff1a;4.5V至36V - 最大输出电流&#xff1a;5.0A - 高达92%的转换效率 - 恒流/恒压模式控制 - 最大占空比100% - 可调输出电压 - 2%的输出电压精度 - 集成40mΩ高侧开关 - 集成18mΩ低侧开关 …...

Layui + Flask | 表单元素(组件篇)(06)

表单元素是输入框、选择框、复选框、开关、单选框等表单项组件,用于对表单域进行输入。layui 的表单元素对原生的表单元素进行了大幅的用着,有好看的 UI 同时又有非常方便操作的 API。 输入框 https://layui.dev/docs/2.8/form/input.html 输入框组件是对文本框 <input ty…...

Kakfa - Producer机制原理与调优

Producer是Kakfa模型中生产者组件&#xff0c;也就是Kafka架构中数据的生产来源&#xff0c;虽然其整体是比较简单的组件&#xff0c;但依然有很多细节需要细品一番。比如Kafka的Producer实现原理是什么&#xff0c;怎么发送的消息&#xff1f;IO通讯模型是什么&#xff1f;在实…...

企业备案网站服务内容/搜狗搜索引擎优化指南

这些型号均属于意法半导体&#xff08;STMicroelectronics&#xff09;的STM32F103系列微控制器&#xff0c;其特点如下&#xff1a; STM32F103RCT6&#xff1a;512 KB Flash、64 KB RAM&#xff0c;LQFP64封装&#xff1b;STM32F103RE&#xff1a;512 KB Flash、64 KB RAM&am…...

兰陵建设局网站/seo技术培训教程

这是oracle官网所给的jdk7版本下,jdk jre jvm之间的关系 https://docs.oracle.com/javase/7/docs/ 看完上图之后 查看整体结构 最外层为jdk(java开发工具集 Java Development Kit) jdk内部包含了jre(java运行时环境 Java Runtime Environment) 而jre内部包含了jvm(java虚…...

阿里巴巴怎么做不花钱的网站/深圳网站设计小程序

1.SyntaxError 语法错误 这个错误很常见,没什么好说的,根据系统提示好好检查代码 2.类型错误,常见的是字符串和数字直接拼接在一起name 小刘 age 17 print(name "今年" age) TypeError: must be str, not int 字符串只能和字符串拼接 3.索引错误list1[a,b,c] prin…...

网站制作 wordpress/芜湖网络营销公司

科目三考试&#xff1a; 1、报告考官&#xff0c;我是学员***&#xff0c;申请科目三考试。2、进入车内&#xff0c;调整座椅&#xff0c;同时刷身份证。3、下车&#xff0c;关好车门&#xff0c;围车逆时针转一周&#xff0c;注意在车身正前方时稍作停顿&#xff08;车内摄像头…...

建设个商城网站需要多少钱/新型营销方式

Mybatis-Plus提供了多种方式进行多表查询&#xff0c;其中注解方式是其中的一种。以下是几个使用注解方式进行多表查询的例子&#xff1a; 1.一对一查询 假设我们有两张表&#xff1a;user表和address表&#xff0c;每个用户对应一个地址&#xff0c;这是一个典型的一对一关系…...

网站建设模板的/网络营销公司哪家好

先说说我为什么有这种“奇怪”的想法。它基于这样一个场景&#xff1a;我最近闲来无事完善了一个小demo&#xff1a;音乐播放器。在里面有一个功能 —— 点击列表某一项弹出音乐播放弹框。我原先一直是“为每一项单独加一个click事件监听”。这很糟糕&#xff01;<div id&qu…...