当前位置: 首页 > news >正文

【Rabbitmq篇】RabbitMQ⾼级特性----消息确认

目录

前言:

一.消息确认机制 

• ⾃动确认

• ⼿动确认

 手动确认方法又分为三种:

二. 代码实现(spring环境)

配置相关信息:

1). AcknowledgeMode.NONE 

2 )AcknowledgeMode.AUTO

3)AcknowledgeMode.MANUAL

总结:


前言:

前期讲了RabbitMQ的概念和应⽤,RabbitMQ实现了AMQP0-9-1规范的许多扩展,在RabbitMQ官⽹上,也给⼤家介绍了RabbitMQ的⼀些特性,我们挑⼀些重要的且常⽤的给⼤家讲⼀下

Rabbitmq官网


一.消息确认机制 

⽣产者发送消息之后,到达消费端之后,可能会有以下情况:
a. 消息处理成功
b. 消息处理异常

RabbitMQ向消费者发送消息之后,就会把这条消息删掉,那么第两种情况,就会造成消息丢失.
那么如何确保消费端已经成功接收了,并正确处理了呢
为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消息确认机制(messageacknowledgement)。

消费者在订阅队列时,可以指定autoAck参数,根据这个参数设置,消息确认机制分为以下两种: 

• ⾃动确认

当autoAck等于true时, RabbitMQ 会⾃动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,⽽不管消费者是否真正地消费到了这些消息.⾃动确认模式适合对于消息可靠性要求不⾼的场景.


• ⼿动确认

当autoAck等于false时,RabbitMQ会等待消费者显式地调⽤Basic.Ack命令,回复确认信号后才从内存(或者磁盘)中移去消息.这种模式适合对消息可靠性要求⽐较⾼的场景. 

 手动确认方法又分为三种:

  • 肯定确认:Channel.basicAck(long deliveryTag, boolean multiple)                  RabbitMQ已知道该消息并且成功的处理消息,可以将其丢弃了.
  • 否定确认: Channel.basicReject(long deliveryTag, boolean requeue) 
    消费者客⼾端可以调⽤channel.basicReject⽅法来告诉RabbitMQ拒绝这个消息.
  • 否定批量确认: Channel.basicNack(long deliveryTag, boolean multiple,boolean requeue)
    Basic.Reject命令⼀次只能拒绝⼀条消息,如果想要批量拒绝消息,则可以使⽤Basic.Nack这个命令.消费者客⼾端可以调⽤channel.basicNack⽅法来实现.

参数说明:

1)deliveryTag :
消息的唯⼀标识,它是⼀个单调递增的64位的⻓整型值. deliveryTag 是每个通道
(Channel)独⽴维护的,所以在每个通道上都是唯⼀的.当消费者确认(ack)⼀条消息时,必须使⽤对应的通道上进⾏确认.

2)multiple 

是否批量确认.在某些情况下,为了减少⽹络流量,可以对⼀系列连续的 deliveryTag 进
⾏批量确认.值为true则会⼀次性ack所有⼩于或等于指定deliveryTag的消息.值为false,则只确认当前指定deliveryTag的消息.

 

 3)requeue

表⽰拒绝后,这条消息如何处理.如果requeue参数设置为true,则RabbitMQ会重新将这条
消息存⼊队列,以便可以发送给下⼀个订阅的消费者.如果requeue参数设置为false,则RabbitMQ会把消息从队列中移除,⽽不会把它发送给新的消费者.


二. 代码实现(spring环境)

1.可以直接使用RabbitMQ Java Client 库

2.使用spring集成的amqp

 主要介绍第二种,在spring环境下实现

Spring-AMQP 对消息确认机制提供了三种策略.

public enum AcknowledgeMode { NONE //确认,MANUAL//手动 ,AUTO //默认;
}

配置相关信息:

基本信息以及确认机制

队列,交换机,以及它们之间的绑定关系 

package com.bite.extensions.config;import com.bite.extensions.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {@Bean("ackQueue")public Queue ackQueue() {return QueueBuilder.durable(Constants.ACK_QUEUE).build();}@Bean("directExchange")public DirectExchange directExchange() {return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).build();}@Bean("ackBinding")public Binding ackBinding(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("ackQueue") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with("ack");}
}

生产者:

主要解释消费者在不同确认机制的状态

package com.bite.extensions.controller;import com.bite.extensions.constant.Constants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RequestMapping("/producer")
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack() {rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack","consumer ack mode test...");return "消息发送成功!";}
}

1)AcknowledgeMode.NONE 

这种模式下,消息⼀旦投递给消费者,不管消费者是否成功处理了消息,RabbitMQ就会⾃动确认
消息,从RabbitMQ队列中移除消息.如果消费者处理消息失败,消息可能会丢失.

 

1)消费者 正常消费情况下

package com.bite.extensions.listener;import com.bite.extensions.constant.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.UnsupportedEncodingException;@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void handleMessage(Message message, Channel channel) throws UnsupportedEncodingException {//消费者逻辑long deliverTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);//业务逻辑处理System.out.println("业务逻辑处理!");System.out.println("业务逻辑完成!");}
}

消费者正确处理,MQ删除相应信息

2)消费者 异常消费情况下

@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void handleMessage(Message message, Channel channel) throws UnsupportedEncodingException {//消费者逻辑long deliverTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);//业务逻辑处理System.out.println("业务逻辑处理!");int num = 3/0; //异常System.out.println("业务逻辑完成!");}
}

 可以看到,消费者处理失败,但是消息已经从RabbitMQ中移除.

2 )AcknowledgeMode.AUTO

这种模式下,消费者在消息处理成功时会⾃动确认消息,但如果处理过程中抛出了异常,则不会确认消息. 

    listener:simple:acknowledge-mode: auto  #消息接收确认

1)消费者 正常消费情况下 

@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void handleMessage(Message message, Channel channel) throws UnsupportedEncodingException {//消费者逻辑long deliverTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);//业务逻辑处理System.out.println("业务逻辑处理!");//int num = 3/0;System.out.println("业务逻辑完成!");}
}

 

2)消费者 异常消费情况下 

@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void handleMessage(Message message, Channel channel) throws UnsupportedEncodingException {//消费者逻辑long deliverTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);//业务逻辑处理System.out.println("业务逻辑处理!");int num = 3/0;System.out.println("业务逻辑完成!");}
}
..........
接收到信息: consumer ack mode test..., deliveryTag: 88
业务逻辑处理!
2024-11-17T15:19:11.420+08:00  WARN 22936 --- [rabbitmq-extensions-demo] [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
接收到信息: consumer ack mode test..., deliveryTag: 89
业务逻辑处理!
2024-11-17T15:19:11.477+08:00  INFO 22936 --- [rabbitmq-extensions-demo] [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2024-11-17T15:19:11.477+08:00  INFO 22936 --- [rabbitmq-extensions-demo] [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.

消费者处理异常,会一直重试发送,所有仍然保留在mq中

3)AcknowledgeMode.MANUAL

⼿动确认模式下,消费者必须在成功处理消息后显式调⽤ basicAck ⽅法来确认消息.如果消
息未被确认,RabbitMQ会认为消息尚未被成功处理,并且会在消费者可⽤时重新投递该消息,这
种模式提⾼了消息处理的可靠性,因为即使消费者处理消息后失败,消息也不会丢失,⽽是可以被重新处理.

    listener:simple:acknowledge-mode: manual#消息接收确认

1)消费者 正常消费情况下 

@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void handleMessage(Message message, Channel channel) throws Exception {//消费者逻辑long deliverTag = message.getMessageProperties().getDeliveryTag();try {System.out.printf("接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);//业务逻辑处理System.out.println("业务逻辑处理!");//int  num = 3/0;System.out.println("业务逻辑完成!");//肯定确认channel.basicAck(deliverTag,false);} catch (Exception e) {//否定确认channel.basicNack(deliverTag,false,true);}}
}

如果不进行确认 又会发送什么?

 当我们使用手动确认(manual)的时候,一定要手动添加上肯定确认,不然即使消费者处理成功,也不会进行确认!

 2)消费者 异常消费情况下 

@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void handleMessage(Message message, Channel channel) throws Exception {//消费者逻辑long deliverTag = message.getMessageProperties().getDeliveryTag();try {System.out.printf("接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);//业务逻辑处理System.out.println("业务逻辑处理!");int  num = 3/0;System.out.println("业务逻辑完成!");//肯定确认channel.basicAck(deliverTag,false);} catch (Exception e) {//否定确认channel.basicNack(deliverTag,false,true);}}
}

 否定确认完,又会进行重新入队,会变成Ready状态

此时修改为false,不让它入队,会发生什么? 

消费者处理异常,会不停的重试 

使用manual,一定要进行手动确认


总结:

模式确认方式可靠性性能使用场景
None无确认低,可能丢失消息不关心消息是否成功消费,丢失消息可容忍的场景
Auto自动确认较低,可能丢失消息较高对丢失消息容忍度较高的场景
Manual手动确认高,消息只有成功处理才会确认较低需要确保每条消息被成功消费的场景
  • None 适用于性能要求高,但对消息丢失不敏感的场景。
  • Auto 适合那些不需要太高消息可靠性的应用,但仍然需要自动化确认机制。
  • Manual 最适合那些对消息处理的可靠性要求较高,尤其是在出现异常时需要精细控制消息是否重新入队或丢弃的场景。

选择哪种模式取决于你的具体需求,尤其是对于消息可靠性的要求以及系统的性能考虑。 


结语: 写博客不仅仅是为了分享学习经历,同时这也有利于我巩固知识点,总结该知识点,由于作者水平有限,对文章有任何问题的还请指出,接受大家的批评,让我改进。同时也希望读者们不吝啬你们的点赞+收藏+关注,你们的鼓励是我创作的最大动力!  

相关文章:

【Rabbitmq篇】RabbitMQ⾼级特性----消息确认

目录 前言: 一.消息确认机制 • ⾃动确认 • ⼿动确认 手动确认方法又分为三种: 二. 代码实现(spring环境) 配置相关信息: 1). AcknowledgeMode.NONE 2 )AcknowledgeMode.AUTO 3&…...

开源TTS语音克隆神器GPT-SoVITS_V2版本地整合包部署与远程使用生成音频

文章目录 前言1.GPT-SoVITS V2下载2.本地运行GPT-SoVITS V23.简单使用演示4.安装内网穿透工具4.1 创建远程连接公网地址 5. 固定远程访问公网地址 前言 本文主要介绍如何在Windows系统电脑使用整合包一键部署开源TTS语音克隆神器GPT-SoVITS,并结合cpolar内网穿透工…...

【idea】更换快捷键

因为个人习惯问题需要把快捷键替换一下。我喜欢用CTRLD删除一下,用CTRLY复制一样。恰好这两个快捷键需要互换一下。 打开file——>setting——>Keymap——>Edit Actions 找到CTRLY并且把它删除 找到CTRLD 并且把它删除 鼠标右键添加CTRLY 同样操作在Delet…...

最小的子数组(leetcode 209)

给定一个正整数数组,找到大于等于s的连续的最小长度的区间。 解法一:暴力解法 两层for循环,一个区间终止位置,一个区间起始位置,找到大于等于s的最小区间长度(超时了) 解法二:双指…...

IDEA-Plugins无法下载插件(网络连接问题-HTTP Proxy Settings)

IDEA-Plugins无法下载插件(网络连接问题) 改成如下配置: 勾选 添这个url即可:https://plugins.jetbrains.com/ 重启插件中心,问题解决。...

AWTK-WIDGET-WEB-VIEW 发布

awtk-widget-web-view 是通过 webview 提供的接口,实现的 AWTK 自定义控件,使得 AWTK 可以方便的显示 web 页面。 项目网址: https://gitee.com/zlgopen/awtk-widget-web-view webview 提供了一个跨平台的 webview 接口,是一个非…...

Mysql每日一题(if函数)

两种写法if()和case if()函数 select *,if(T.xT.y>T.z and T.xT.z>T.y and T.yT.z>T.x,Yes,No) as triangle from Triangle as T; case方法 select *, case when T.xT.y>T.z and T.xT.z>T.y and T.yT.z>T.x then Yes else No end as triangle from Trian…...

Spring Cloud Alibaba [Gateway]网关。

1 简介 网关作为流量的入口,常用功能包括路由转发、权限校验、限流控制等。而springcloudgateway 作为SpringCloud 官方推出的第二代网关框架,取代了Zuul网关。 1.1 SpringCloudGateway特点: (1)基于Spring5,支持响应…...

【动手学深度学习Pytorch】2. Softmax回归代码

零实现 导入所需要的包: import torch from IPython import display from d2l import torch as d2l定义数据集参数、模型参数: batch_size 256 # 每次随机读取256张图片 train_iter, test_iter d2l.load_data_fashion_mnist(batch_size) # 将展平每个…...

技术周总结 11.11~11.17 周日(Js JVM XML)

文章目录 一、11.11 周一1.1)问题01:js中的prompt弹窗区分出来用户点击的是 确认还是取消进一步示例 1.2)问题02:在 prompt弹窗弹出时默认给弹窗中写入一些内容 二、11.12 周二2.1) 问题02: 详解JVM中的本地方法栈本地方法栈的主要…...

MATLAB 使用教程 —— 矩阵和数组

矩阵和数组MATLAB 中矩阵和数组长什么样?MATLAB 怎么用矩阵计算?创建和操作矩阵矩阵运算示例串联 访问矩阵的元素 矩阵和数组 MATLAB 是“matrix laboratory”的缩写形式。MATLAB 主要用于处理 整个的矩阵和数组,而其他编程语言大多逐个处理…...

React教程第二节之虚拟DOM与Diffing算法理解

1、什么是虚拟DOM 虚拟DOM 是javascript的一个对象,是内存中的一种数据结构,以树的形式存储UI的状态,树中的每个节点都代表着真实的DOM,用来描述我们希望在页面看到的 HTML结构; 现在的MVVM 框架,大多使用…...

C++——类和对象(part2)

前言 本篇博客继续为大家介绍类与对象的知识,承接part1的内容,本篇内容是类与对象的核心内容,稍微有些复杂,如果你对其感兴趣,请继续阅读,下面进入正文部分。 1. 类的默认成员函数 默认成员函数就是用户…...

【FFmpeg系列】:音频处理

前言 在多媒体处理领域,FFmpeg无疑是一个不可或缺的利器。它功能强大且高度灵活,能够轻松应对各种音频和视频处理任务,无论是简单的格式转换,还是复杂的音频编辑,都不在话下。然而,要想真正发挥FFmpeg的潜…...

Python绘制雪花

文章目录 系列目录写在前面技术需求完整代码代码分析1. 代码初始化部分分析2. 雪花绘制核心逻辑分析3. 窗口保持部分分析4. 美学与几何特点总结 写在后面 系列目录 序号直达链接爱心系列1Python制作一个无法拒绝的表白界面2Python满屏飘字表白代码3Python无限弹窗满屏表白代码4…...

vue3 如何调用第三方npm包内部的 pinia 状态管理库方法

抛砖引玉: 如果在开发vue3项目是, 引用了npm第三方包 ,而且这个包内使用了Pinia 状态管理库,那我们如何去调用 npm内部的 Pinia 状态管理库呢? 实际遇到的问题: 今天在制作npm包时遇到的问题,之前Vue2版本的时候状态管理库用的Vuex ,当时调用npm包内的状态管理库很简单,直接引…...

uni-app快速入门(七)--组件路由跳转和API路由跳转及参数传递

uni-app有两种页面路由跳转模式,即使用navigator组件跳转和调用API跳转,API调转不要理解为调用后台接口的API,而是指脚本函数中使用跳转函数。 一、组件路由跳转 1.1 打开新页面 打开新页面使用组件的open-type"navigate",见下面…...

Flink升级程序和版本

Flink DataStream程序通常设计为长时间运行,如几周、几个月甚至几年。与所有长时间运行的服务一样,Flink streaming应用程序也需要维护,包括修复错误、实现改进或将应用程序迁移到更高版本的Flink集群。 这里就来描述下如何更新Flink streaming应用程序,以及如何将正在运行…...

从0安装mysql server

安装 MySQL Server 首先,你需要在 Ubuntu 上安装 MySQL 服务器。运行以下命令来安装:sudo apt update sudo apt install mysql-server安装完成后,MySQL 服务会自动启动。你可以通过以下命令检查 MySQL 服务是否正在运行: sudo systemctl status mysql如果 MySQL 正在运行,…...

web安全测试渗透案例知识点总结(上)——小白入狱

目录 一、Web安全渗透测试概念详解1. Web安全与渗透测试2. Web安全的主要攻击面与漏洞类型3. 渗透测试的基本流程 二、知识点详细总结1. 常见Web漏洞分析2. 渗透测试常用工具及其功能 三、具体案例教程案例1:SQL注入漏洞利用教程案例2:跨站脚本&#xff…...

PHP访问NetSuite REST Web Services

“同等看待欢乐和痛苦、得到和失去、胜利和失败、投入战斗。以此方式履行职责,你就不会招致任何罪恶。” -Bhagavad Gita 为了帮助PHP开发者快速起步,以REST Web Services方式打通与NetSuite的接口,我们答应给一个样例。但是我是不懂PHP的&a…...

【编译】多图解释 什么是短语、直接短语、句柄、素短语、可归约串

一、什么是短语二、什么是“直接”短语?三、什么是句柄?四、什么是素短语?五、什么是最左素短语可归约串就是“最左素短语” 首先,这些概念 都是相对于【句型】的,都是相对于【句型】的,都是相对于【句型】…...

React中事件绑定和Vue有什么区别?

1. 绑定方式 React:使用jsx语法,通过属性绑定事件。Vue:使用指令(如v-on)在模板中直接绑定事件。 2. 事件处理 React:通过合成事件系统封装原生事件,提供统一的API。Vue:直接使用…...

【DBA攻坚指南:左右Oracle,右手MySQL-学习总结】

处理log file sync等待事件 首先明确什么是log file sync等待事件 从用户提交会话开始,LGWR进程将redo缓存中的信息写入redo日志文件后,LGWR进程通知用户写操作完成,到用户会话接受到LGWR进程通知为止,这整个过程就是可能出现lo…...

C++中的内联函数

在C中,内联函数是一种特殊的函数。 定义 内联函数是在函数定义前加上关键字“inline”的函数。编译器在处理对内联函数的调用时,会尝试将函数体的代码直接插入到函数调用处,而不是像普通函数调用那样,进行跳转指令执行函数体代码…...

ssh.service could not be found“

如果你收到 “ssh.service could not be found” 错误,说明目标主机上没有安装 SSH 服务,或者安装的 SSH 服务的名称不为 ssh。这里有一些解决步骤: 1. 检查 SSH 服务是否已安装 在目标主机上执行以下命令来检查是否安装了 SSH 服务&#x…...

tensorflow有哪些具体影响,和chatgpt有什么关系

### TensorFlow的影响 **1. 深度学习框架的领军者** - **广泛使用**: TensorFlow是由Google开发的开源深度学习框架,广泛应用于各种机器学习任务,包括图像识别、自然语言处理、语音识别等。它是深度学习领域中最受欢迎的框架之一。 - **大规模生产环境*…...

Android OpenGL ES详解——几何着色器

目录 一、概念 1、图元 2、几何着色器 1、输入类型 2、输出类型 3、输出顶点数量最大值限制 二、使用几何着色器 三、应用举例——造几个房子 四、应用举例——爆破物体 1、获取法向量 2、显示法线 五、应用举例——细分三角形 六、应用举例——广告牌技术 一、概…...

Java学生管理系统(GUI和数据库)

Java学生管理系统(GUI和数据库) 本文简介 本资源演示了一个用Java实现的学生管理系统,结合了图形用户界面(GUI)和数据库操作。系统实现了学生、课程和账号三张表的管理功能,包括增删改查等操作。通过本资…...

035_Progress_Dialog_in_Matlab中的进度条对话框

进度条 概念 在使用Matlab开发界面时,有一个很好用的工具就是进度条。在计算过程中,为用户提供计算进度的反馈是改善用户体验的重要手段。 一项进行的计算任务,如果其总体进度是比较容易量化,则可以按照0%~100%的方式&#xff0…...