当前位置: 首页 > news >正文

RabbitMq-发布确认高级(避坑指南版)

在初学rabbitMq的时候,伙伴们肯定已经接触到了“发布确认”的概念,但是到了后期学习中,会接触到“springboot”中使用“发布确认”高级的概念。后者主要是解决什么问题呢?或者是什么样的场景引出这样的概念呢?

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 rabbitmq 重启期间生产者投递失败,导致消息丢失,需要手动处理和恢复。因此为了确保rabbitmq 的消息可靠投递,特别是在这样比较极端的情况,rabbitmq 集群不可用的时候,对无法投递的消息进行处理。

废话不说直接开始撸代码!!!在代码中解决实际问题~

一、代码架构分析:

        接触到这里,对于一条完整的“rabbitmq消息”发布链的构成大家已经不陌生了。主要是由:“消息生产者”、“交换机”、“队列”、“消费者”四个方面构成,如图所示:

二、构造“配置类”代码: 

声明交换机“confirm_exchange”、声明队列“confirm_queue”、通过routing-key对交换机和队列进行绑定。

package com.example.rabbitmq_demo.fabuquerengaoji;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @ClaseName: ConfirmConfig$* @Description:配置类 发布确认(高级)* @Author: wuhs* @Date: 2023/8/16$ 14:32$* 快捷键ctrl+shift+u  字母大小写转化*/
@Configuration
public class ConfirmConfig {// 交换机public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";// 队列public static final String CONFIRM_QUEUE_NAME = "confirm_queue";// ROUTING-KEYpublic static final String CONFIRM_ROUTING_KEY = "key1";@Beanpublic DirectExchange confirmExchange() {return new DirectExchange(CONFIRM_EXCHANGE_NAME);}@Beanpublic Queue confirmQueue() {return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}@Beanpublic Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange directExchange,@Qualifier("confirmQueue") Queue queue) {//一般使用在项目中使用@Qualifier来限定注入的Bean。return BindingBuilder.bind(queue).to(directExchange).with(CONFIRM_ROUTING_KEY);}
}

三、构建消费者代码:

通过@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)来监听队列,以此“充当”消费者。这一块也没啥好说的,直接上代码!

package com.example.rabbitmq_demo.fabuquerengaoji;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @ClaseName: Consumer$* @Description:消费者* @Author: wuhs* @Date: 2023/8/16$ 15:18$】*/
@Slf4j
@Component
public class Consumer {@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)public void reciverConfirmMessage(Message message) {String msg = new String(message.getBody());log.info("接收到的队列confirm.queue消息:{}", msg);}
}

四、创建“回调”方法

        在最开始我们说到“确保rabbitmq 的消息可靠投递”的概念,那么具体如何确保呢?如果我们在消费者每次消费成功、未消费成功交换机都能进行“回调”确认,是不是就能知道哪些消息消费成功、哪些没有消费成功呢?

        在RabbitTemplate中有一个方法接口(ConfirmCallback),我们只需要实现这个接口并实现“confirm”方法,并将它注入进RabbitTemplate工具中即可创建“回调”。具体代码如下:

package com.example.rabbitmq_demo.fabuquerengaoji;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*** @ClaseName: MyCallBack$* @Description:* @Author: wuhs* @Date: 2023/8/16$ 16:17$*/
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {// 注入rabbitTemplate.setConfirmCallback(this);}//交换机确认回调方法  @Overridepublic void confirm(CorrelationData correlationData, boolean ack, String reason) {String id = correlationData != null ? correlationData.getId() : "";if (ack) {//发消息 交换机接收到了消息 回调log.info("交换机已经收到了ID为:{}的消息", id);} else {//发消息 交换机没有接收到了消息 回调log.info("交换机没有收到了ID为:{}发的的消息,失败的原因是:{}", id, reason);}}
}

confirm方法参数介绍:

* 1. correlationData 保存回调消息的ID及相关信息
* 2. 交换机是否收到了消息 ack=true(收到)、ack=false(未收到)
* 3. reason 失败的原因

 六、配置类声明:(application.yml)

        在这里需要注意!!这也是最容易踩得坑,不知道有没有小伙伴遇没遇到,“publisher-confirm-type: correlated”也声明了,但是项目创建启动发布消息之后“没有成功回调”的情况,查看了很多的文章,很多博主只配置了publisher-confirm-type、但是并没有开启“confirm 确认机制”,所以会存在“误导”,导致一直找不到失败的原因~具体正确配置,看代码:

server:port: 8899spring:rabbitmq:host: 124.221.94.214port: 5672username: xgsmpassword: xgsm123# 发送者开启 confirm 确认机制publisher-confirms: truepublisher-confirm-type: correlated

 publisher-confirm-type参数介绍:

publisher-confirm-type这个参数一共有三种配置方法:

# NONE:禁用发布确认,是默认值。

# CORRELATED:发布消息后,交换机会触发回调方法。

# SIMPLE:有两种效果:

1:和CORRELATED一样会触发回调方法

2:发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,

# 要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。

 七、创建Controller层(消息生产者)

        这里演示三种情况。第一种为正常情况下,发送成功后的回调;第二种消息为发送失败、当交换机不存在则发送失败(模拟发送失败),所以将交换机名称修改即可

package com.example.rabbitmq_demo.fabuquerengaoji;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @ClaseName: ProducerController$* @Description:消息生产者* @Author: wuhs* @Date: 2023/8/16$ 14:58$*/
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;// @PathVariable主要作用:映射URL绑定的占位符@RequestMapping("/sendMessage/{message}")public void sendMessage(@PathVariable String message) {//正常发送CorrelationData correlationData = new CorrelationData("1");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData);//发送失败-交换机不存在的情况CorrelationData correlationData2 = new CorrelationData("2");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME+"2", ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData2);log.info("发送的消息为:{}", message);}
}

测试结果: 

 如果是routing-key错误,这种情况会触发回调嘛?让我们验证一下;修改routing-key为“错误值”

  CorrelationData correlationData3 = new CorrelationData("3");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY+"key3", message,correlationData3);

测试结果:

         通过结果可以看出,消息发送成功了,而且也触发了“成功的回调”。但是我们知道的是,由于路由失败,这里消费者并没有对消息进行消费,这是为什么呢?那是因为,在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。解决方式为:通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。具体操作如下

1、application.yml文件中添加消息回退配置

# 发送者开启 return 确认机制publisher-returns: true

 2、实现RabbitTemplate中的方法接口ReturnCallback,并实现“returnedMessage”方法,最后将类注入到RabbitTemplate的RabbitTemplate中,详细代码如下:

package com.example.rabbitmq_demo.fabuquerengaoji;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*** @ClaseName: MyCallBack$* @Description:* @Author: wuhs* @Date: 2023/8/16$ 16:17$*/
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {// 注入rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}/*** 交换机确认回调方法* 1、发消息 交换机接收到了消息 回调* 1.1 correlationData 保存回调消息的ID及相关信息* 1.2 交换机收到消息 ack=true* 2、发消息 交换机接收失败了 回调* 2.1 correlationData 保存回调消息的ID及相关信息* 2.2 交换机接收到消息 ack=false* 2.3 reason 失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String reason) {String id = correlationData != null ? correlationData.getId() : "";if (ack) {log.info("交换机已经收到了ID为:{}的消息", id);} else {log.info("交换机没有收到了ID为:{}发的的消息,失败的原因是:{}", id, reason);}}//可以在当消息传递的过长中不可达目的地时将消息返回给生产者// 只有不可待目的地的时候 才进行回退@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("消息{},被交换机{}退回,退回原因:{},路由key:{}", message, exchange, replyText, routingKey);}
}

测试结果:

2023-08-17 10:36:32.476  INFO 21108 --- [221.94.214:5672] c.e.r.fabuquerengaoji.MyCallBack : 消息(Body:'消息确认发布测试' MessageProperties [headers={spring_returned_message_correlation=3}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),被交换机confirm_exchange退回,退回原因:NO_ROUTE,路由key:key1key3

问题解决!~ 

相关文章:

RabbitMq-发布确认高级(避坑指南版)

在初学rabbitMq的时候,伙伴们肯定已经接触到了“发布确认”的概念,但是到了后期学习中,会接触到“springboot”中使用“发布确认”高级的概念。后者主要是解决什么问题呢?或者是什么样的场景引出这样的概念呢? 在生产环…...

Blender增强现实3D模型制作指南【AR】

推荐:用 NSDT编辑器 快速搭建可编程3D场景 将静态和动画 3D 内容集成到移动增强现实 (AR) 体验中是增强用户沉浸感和参与度的高效方法。 然而,为 AR 创建 3D 对象可能相当艰巨,尤其是对于那些缺乏 3D 建模经验的人来说。 与添加视频或照片 AR…...

Java查看https证书过期时间(JKS,CERT)

在这里需要使用X.509 证书的抽象类 X509Certificate 。此类提供了一种访问 X.509 证书所有属性的标准方式。 这些证书被广泛使用以支持 Internet 安全系统中的身份验证和其他功能。常见的应用包括增强保密邮件 (PEM)、传输层安全 (SSL)、用于受信任软件发布的代码签名和安全电…...

关于vue,记录一次修饰符.stop和.once的使用,以及猜想。

内置指令 | Vue.js 在vue的api里&#xff0c;关于v-on有stop和once两个事件标签。 .stop - 调用 event.stopPropagation()。.once - 最多触发一次处理函数。 原有主要代码和页面效果 &#xff08;无stop和once&#xff09;: ...<div class"div" click"di…...

解决git reset --soft HEAD^撤销commit时报错

今天在使用git回退功能的时候&#xff0c;遇到以下错误&#xff1a; 解决git reset --soft HEAD^撤销commit时报错 问题&#xff1a; 在进行完commit后&#xff0c;想要撤销该commit&#xff0c;于是使用了git reset --soft HEAD^命令&#xff0c;但是出现如下报错&#xff1…...

【BASH】回顾与知识点梳理(三十四)

【BASH】回顾与知识点梳理 三十四 三十四. 认识系统服务&#xff08;二&#xff09;34.1 systemctl 针对 service 类型的配置文件systemctl 配置文件相关目录简介systemctl 配置文件的设定项目简介[Unit] 部份[Service] 部份[Install] 部份 两个 vsftpd 运作的实例多重的重复设…...

Python可视化在量化交易中的应用(11)_Seaborn折线图

举个栗子&#xff0c;用seaborn绘制折线图。 Seaborn中折线图的绘制方法 在seaborn中&#xff0c;我们一般使用sns作为seaborn模块的别名&#xff0c;因此&#xff0c;在下文中&#xff0c;均以sns指代seaborn模块。 seaborn中绘制折线图使用的是sns.plot()函数&#xff1a; …...

无涯教程-TensorFlow - TensorBoard可视化

TensorFlow包含一个可视化工具&#xff0c;称为TensorBoard&#xff0c;它用于分析数据流图&#xff0c;还用于了解机器学习模型。 TensorBoard的重要功能包括查看有关垂直对齐的任何图形的参数和详细信息的不同类型统计的视图。 深度神经网络包括多达36&#xff0c;000个节点…...

[uni-app] uview封装Popup组件,处理props及v-model的传值问题

文章目录 需求及效果遇到的问题解决的办法偷懒的写法 需求及效果 uView(1.x版本)中, 有Pop弹出层的组件, 现在有个需求是,进行简单封装,有些通用的设置不想每次都写(比如 :mask-custom-style"{background: rgba(0, 0, 0, 0.7)}"这种) 然后内部内容交给插槽去自己随…...

【C++】int a;和int *p=new int;有什么区别?

2023年8月19日&#xff0c;周六早上 int a; 和 int *p new int; 之间有以下区别&#xff1a; 1. 内存分配方式&#xff1a;int a; 是在栈上分配内存&#xff0c;而 int *p new int; 是在堆上动态分配内存。 2. 生命周期&#xff1a;int a; 的生命周期与其所在的作用域相同&…...

redis事务管理

目录 一、redis事务定义 二、事务控制命令——Multi、Exec、discard 三、事务的错误处理 四、事务的冲突问题 悲观锁 乐观锁 WATCH unwatch 五、事务特性 单独的隔离操作 没有隔离级别的概念 不保证原子性 一、redis事务定义 Redis 事务是一个单独的隔离操作&…...

TPS_C++版本及功能支持备注

TPS_C版本及功能支持备注 相关参考链接C23&#xff1a;https://zh.cppreference.com/w/cpp/23 相关参考链接C20&#xff1a;https://zh.cppreference.com/w/cpp/20 相关参考链接C17&#xff1a;https://zh.cppreference.com/w/cpp/17 相关参考链接C14&#xff1a;https://zh.cp…...

同步jenkinsfile流水线(sync-job)

环境 变量&#xff1a;env&#xff08;环境变量&#xff1a;sit/dev/simulation/prod/all&#xff09;&#xff0c;job&#xff08;job-name/all&#xff09;目录&#xff1a;/var/lib/jenkins/jenkinsfile environment.json&#xff1a; [roottest-01 jenkinsfile]# cat env…...

STM32单片机WIFI-APP智能温室大棚系统CO2土壤湿度空气温湿度补光

实践制作DIY- GC0161--智能温室大棚系统 基于STM32单片机设计---智能温室大棚系统 二、功能介绍&#xff1a; 电路组成&#xff1a;STM32F103CXT6最小系统LCD1602显示器DHT11空气温度湿度光敏电阻光强土壤湿度传感器SGP30二氧化碳传感器 1个继电器&#xff08;空气加湿&#x…...

SpringBoot复习:(52)不再需要使用@EnableTransactionManagement的原因

在Spring项目中&#xff0c;要用事务&#xff0c;需要EnableTransactionManagement注解加Transactional注解。而在SpringBoot项目&#xff0c;有事务的自动配置类TransactionAutoConfiguration,代码如下&#xff1a; 可以在其内部类EnableTransactionManagementConfiguratio…...

HackNos 3靶场

配置 进入控制面板配置网卡 第一步&#xff1a;启动靶机时按下 shift 键&#xff0c; 进入以下界面 第二步&#xff1a;选择第二个选项&#xff0c;然后按下 e 键&#xff0c;进入编辑界面 将这里的ro修改为rw single init/bin/bash&#xff0c;然后按ctrlx&#xff0c;进入…...

【办公自动化】使用Python批量生成PPT版荣誉证书

&#x1f935;‍♂️ 个人主页&#xff1a;艾派森的个人主页 ✍&#x1f3fb;作者简介&#xff1a;Python学习者 &#x1f40b; 希望大家多多支持&#xff0c;我们一起进步&#xff01;&#x1f604; 如果文章对你有帮助的话&#xff0c; 欢迎评论 &#x1f4ac;点赞&#x1f4…...

【C++深入浅出】初识C++中篇(引用、内联函数)

目录 一. 前言 二. 引用 2.1 引用的概念 2.2 引用的使用 2.3 引用的特性 2.4 常引用 2.5 引用的使用场景 2.6 传值、传引用效率比较 2.7 引用和指针的区别 三. 内联函数 3.1 内联函数的概念 3.2 内联函数的特性 一. 前言 上期说道&#xff0c;C是在C的基础之上&…...

前端:VUE2中的父子传值

文章目录 一、背景什么是父子传值二、业务场景子传父1、在父页面中引入子页面2、子传父&#xff1a;父组件标识3、子传父&#xff1a;子组件标识 父传子父组件调用子组件中的方法 总结&#xff1a; 一、背景 最近做项目中需要使用到流工作&#xff0c;在这里流工作需要用到父子…...

【100天精通python】Day40:GUI界面编程_PyQt 从入门到实战(完)_网络编程与打包发布

目录 8 网络编程 8.1 使用PyQt 网络模块进行网络通信 服务器端示例 客户端示例 8.2 处理网络请求和响应 9 打包和发布 9.1 创建可执行文件或安装程序 9.2 解决依赖问题 9.3 发布 PyQt 应用到不同平台 9.3.1 发布到 Windows 9.3.2 发布到 macOS 9.3.3 发布到 Linux 9…...

Redis——set类型详解

概要 Set&#xff08;集合&#xff09;&#xff0c;将一些有关联的数据放到一起&#xff0c;集合中的元素是无序的&#xff0c;并且集合中的元素是不能重复的 之前介绍的list就是有序的&#xff0c;对于列表来说[1, 2, 3] 和 [2, 1, 3]是两个不同的列表&#xff0c;而对于集合…...

redis---》高级用法之慢查询/pipline与事务/发布订阅/bitmap位图/HyperLogLog/GEO地理位置信息/持久化

高级用法之慢查询 # 配置一个时间&#xff0c;如果查询时间超过了我们设置的时间&#xff0c;我们就认为这是一个慢查询 # 配置的慢查询&#xff0c;只在命令执行阶段# 慢查询演示-设置慢查询---》只要超过某个时间的命令---》都会保存起来# 设置记录所有命令CONFIG SET slowl…...

Find My资讯|苹果Vision Pro开发者需将设备配对 AirTag

最近苹果Vision Pro获开发者申请&#xff0c;苹果要求获批的申请者使用 Measure and Fit 应用确认合适的佩戴尺寸&#xff0c;并会根据申请者提交的信息&#xff0c;定制不同的 Vision Pro 开发者套件&#xff0c;以便于契合申请者的面部特征&#xff0c;提供更好的佩戴体验。 …...

Go 语言中排序的 3 种方法

原文链接&#xff1a; Go 语言中排序的 3 种方法 在写代码过程中&#xff0c;排序是经常会遇到的需求&#xff0c;本文会介绍三种常用的方法。 废话不多说&#xff0c;下面正文开始。 使用标准库 根据场景直接使用标准库中的方法&#xff0c;比如&#xff1a; sort.Intsso…...

12----Emoji表情

本节我们主要讲解markdown的Emoji 在 Markdown 里使用 Emoji 表情有两种方法:一种是直接输入 Emoji 表情&#xff0c;另一种是使用 Emoji 表情短码(emoji shartcodes)。 一、打印方式&#xff1a; 直接输入 Emoji 表情&#xff1a;在 Markdown 中&#xff0c;可以直接输入 Em…...

C++四种强制类型转换

一、C强制转换与C强制转换 c语言强制类型转换主要用于基础的数据类型间的转换&#xff0c;语法为&#xff1a; (type-id)expression//转换格式1 type-id(expression)//转换格式2c除了能使用c语言的强制类型转换外&#xff0c;还新增了四种强制类型转换&#xff1a;static_cas…...

git仓库新建上传记录

新建git仓会出现版本分支问题&#xff0c;解决过程&#xff1a; 其他的前期绑定之类的传送&#xff1a;https://blog.csdn.net/qq_37194189/article/details/130767397 大概思路&#xff1a;新建一个分支&#xff0c;上传&#xff0c;合并&#xff0c;删除分支 git branch …...

flutter调用so

lutter是一种基于Dart语言的跨平台开发框架&#xff0c;通常用于开发Android和iOS应用程序。如果您想要在Flutter应用程序中调用一个SO库&#xff0c;您可以按照以下步骤进行操作&#xff1a; 首先&#xff0c;将您的SO库文件复制到Flutter项目的“lib”目录下。 接下来&…...

c#依赖注入

依赖注入(Dependency Injection,简称 DI)是一种设计模式,用于将对象的创建和管理责任从使用它的类中分离出来,从而实现松耦合和易于测试的代码。在 C# 中,依赖注入通常通过以下方式实现: 构造函数注入(Constructor Injection): 这是最常见的依赖注入方式,通过类的构…...

Django框架使用定时器-APScheduler实现定时任务:django实现简单的定时任务

一、系统环境依赖 系统&#xff1a;windows10 python: python3.9.0 djnago3.2.0 APScheduler3.10.1 二、django项目配置 1、创建utils包&#xff0c;在包里面创建schedulers包 utils/schedulers/task.py #1、设置 Django 环境&#xff0c;就可以导入项目的模型类这些了 …...