当前位置: 首页 > news >正文

Spring RabbitMQ那些事(3-消息可靠传输和订阅)

目录

  • 一、序言
  • 二、生产者确保消息发送成功
    • 1、为什么需要Publisher Confirms
    • 2、哪些消息会被确认处理成功
  • 三、消费者保证消息被处理
  • 四、Spring RabbitMQ支持代码示例
    • 1、 application.yml
    • 2、RabbigtMQ配置
    • 3、可靠生产者配置
    • 4、可靠消费者配置
    • 5、测试用例

一、序言

在有些业务场景中,消息是不能丢的,比如分布式事务资金动账,出账方扣款,那么入账方就一定要收款。以前写了一篇分布式事务的文章,里面的跨地区转账就是一个实际案例。

消息是有可能丢的,比如生产者在发送消息时broker服务挂了,消息没有来得及落盘,这时消息就彻底丢了。

保证MQ消息可靠传输主要有两个方面,一方面是消息生产者确保消息一定发送成功,另一方面是消费者确保消息一定被处理。


二、生产者确保消息发送成功

1、为什么需要Publisher Confirms

在Spring AMQP中AmqpTemplate的实现RabbitTemplate已经支持 Publisher Confirms and Returns,所谓的publisher confirms意思就是消息发布者确认消息是否已经被发送。

在RabbitMQ官方文档描述中,持久化的消息在Broker重启时也是应该存活的,这里的词用的是应该,因为消息有可能在落地磁盘前Broker就挂了,导致消息丢失。

最直接的解决方案是通过事务,但是通过事务有两个问题:

  • 事务阻塞:发布者必须等待Broker处理完每条消息。
  • 事务很重:每次提交都会要求触发fsync(),强制磁盘,这个过程需要花很长的时间。

备注:在RabbitMQ官方测试中,通过事务去保证,发布10000条消息需要花至少4分钟的时间。

在这里插入图片描述

而通过Publisher Confirm机制,一旦Broker处理完就会确认消息,而且这个过程是异步的,生产者可以流式发布消息,不需要等待Broker,并且Broker会批量高效将消息落盘。

2、哪些消息会被确认处理成功

当Broker确认消息时,会通知消息发布者消息是否被成功处理,成功处理的基本规则如下:

  • 无法路由的mandatory(必须有符合条件的队列)和immediate(必须有消费者在线)类型在被basic.return后会被确认。
  • 非持久化消息在入队时会被确认。
  • 持久化消息当持久化到磁盘或者被消费者消费时会被确认。

三、消费者保证消息被处理

消费者端确保消息消费很简单,关闭消息自动确认就好,开启消息手动确认。当然有些场景消息只能被处理一次,可以通过分布式锁来实现。


四、Spring RabbitMQ支持代码示例

1、 application.yml

server:port: 8080
spring:rabbitmq:addresses: localhost:5672username: adminpassword: adminvirtual-host: /publisher-returns: truepublisher-confirm-type: correlatedlistener:type: simplesimple:acknowledge-mode: manualconcurrency: 5max-concurrency: 20prefetch: 5template:mandatory: true

备注:

  • 这里一定要设置spring.rabbitmq.publisher-returnstrue,并且设置spring.rabbitmq.publisher-confirm-typecorrelated,同时设置spring.rabbitmq.template.mandatorytrue
  • 上面我们将消费者的确认模式改为了手动确认

2、RabbigtMQ配置

@Configuration
public class RabbitReliableTransportConfig {/*** RabbitTemplate消息转换器配置,自动将对象转换为json字符串** @return*/@Beanpublic MessageConverter jackson2JsonMessageConverter() {Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();messageConverter.setClassMapper(new DefaultJackson2JavaTypeMapper());return messageConverter;}@Beanpublic Queue reliableQueue() {return QueueBuilder.durable("reliable-queue").build();}
}

3、可靠生产者配置

@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqReliableProducer {private final RabbitTemplate rabbitTemplate;public void sendReliableMsg(String body) {// 发送可靠消息ReliableMsgDTO reliableMsgDTO = ReliableMsgDTO.builder().body(body).build();CorrelationData correlationData = new CorrelationData();rabbitTemplate.convertAndSend("reliable-queue", reliableMsgDTO, correlationData);// 发送确认逻辑CompletableFuture<Confirm> future = correlationData.getFuture().completable();future.whenComplete((confirm, throwable) -> {if (confirm.isAck()) {log.info("消息已经被成功发送, 消息内容:{}", JSON.toJSONString(reliableMsgDTO));return;}log.warn("消息发送未成功发送, 原因:{}, 消息内容:{}", confirm.getReason(), JSON.toJSONString(reliableMsgDTO), throwable);// 5秒后再发送LockSupport.parkNanos(5 * 1000 * 1000 * 1000L);rabbitTemplate.convertSendAndReceive(reliableMsgDTO, correlationData);});}
}

4、可靠消费者配置

@Slf4j
@Component
public class RabbitMQReliableConsumer {@RabbitListener(queues = "reliable-queue")public void handleMsgFromQueue(ReliableMsgDTO reliableMsgDTO, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {channel.basicAck(tag, false);// channel.basicNack(tag, false, false);log.info("Message received from queue, message body: {}", JSON.toJSONString(reliableMsgDTO));}
}

备注:这里我们开启了消息的手动确认,如果消息处理失败没有确认,那么消息将会在下次消费者参加连接时再次被投递。

5、测试用例

测试结果如下,每当消息发送至Broker成功后会触发回调,如果消息发送失败将会触发重新发送。

2024-01-20 18:13:11.399  INFO 12316 --- [78.107.127:5672] c.u.r.i.p.RabbitMqReliableProducer       : 消息已经被成功发送, 消息内容:{"body":"hello"}
2024-01-20 18:13:11.399  INFO 12316 --- [ntContainer#0-5] c.u.r.i.c.RabbitMQReliableConsumer       : Message received from queue, message body: {"body":"hello"}

在这里插入图片描述

相关文章:

Spring RabbitMQ那些事(3-消息可靠传输和订阅)

目录 一、序言二、生产者确保消息发送成功1、为什么需要Publisher Confirms2、哪些消息会被确认处理成功 三、消费者保证消息被处理四、Spring RabbitMQ支持代码示例1、 application.yml2、RabbigtMQ配置3、可靠生产者配置4、可靠消费者配置5、测试用例 一、序言 在有些业务场…...

揭秘 Kafka 高性能之谜:一文读懂背后的设计精粹与技术实现

Kafka在性能方面有着显著的优势&#xff0c;这也使得Kafka的应用非常广泛&#xff0c;那kakfa的性能为何如此优异呢&#xff1f;本文将带你探寻kafka高性能之谜。 kafka的高性能概括起来有如下几点&#xff1a;顺序写入磁盘与I/O优化、批量处理、页缓存、零拷贝技术、分区并行处…...

canvas绘制美国国旗(USA Flag)

查看专栏目录 canvas实例应用100专栏&#xff0c;提供canvas的基础知识&#xff0c;高级动画&#xff0c;相关应用扩展等信息。canvas作为html的一部分&#xff0c;是图像图标地图可视化的一个重要的基础&#xff0c;学好了canvas&#xff0c;在其他的一些应用上将会起到非常重…...

Python中的`__all__`魔法函数使用详解

概要 Python是一门灵活而强大的编程语言&#xff0c;提供了各种机制来控制模块的导入和访问。其中&#xff0c;__all__魔法函数是一种用于限制模块导入的机制&#xff0c;可以明确指定哪些变量、函数或类可以被导入。本文将深入探讨__all__的作用、用法以及示例&#xff0c;以…...

Studio One 6 mac 6.5.2 激活版 数字音乐编曲创作

PreSonus Studio One是PreSonus出品的一款功能强大的音乐创作软件。主要为用户提供音乐创作、录音、编辑、制作等功能。它可以让你创造音乐&#xff0c;无限的轨道&#xff0c;无限的MIDI和乐器轨道&#xff0c;虚拟乐器和效果通道&#xff0c;这些都是强大和完美的。 软件下载…...

GitHub图床TyporaPicGo相关配置

本文作者&#xff1a; slience_me 文章目录 GitHub图床&Typora&PicGo相关配置1. Github配置2. picGo配置3. Typora配置 GitHub图床&Typora&PicGo相关配置 关于Typora旧版的百度网盘下载路径 链接&#xff1a;https://pan.baidu.com/s/12mq-dMqWnRRoreGo4MTbKg?…...

FireAlpaca:轻量级、免费的Mac/Win绘图软件,让你的创意如火燃烧!

FireAlpaca是一款轻量级、免费的绘图软件&#xff0c;适用于Mac和Win系统&#xff0c;让你的创作过程更加快捷、简便。无论是绘制漫画、插图、设计作品还是进行简单的图片编辑&#xff0c;FireAlpaca都能满足你的需求。 首先&#xff0c;FireAlpaca具有直观友好的用户界面&…...

用 Python 制作可视化 GUI 界面,一键实现自动分类管理文件!

经常杂乱无章的文件夹会让我们找不到所想要的文件&#xff0c;因此小编特意制作了一个可视化GUI界面&#xff0c;通过输入路径一键点击实现文件分门别类的归档。 不同的文件后缀归类为不同的类别 我们先罗列一下大致有几类文件&#xff0c;根据文件的后缀来设定&#xff0c;大…...

【STM32】USB程序烧录需要重新上电 软件复位方法

文章目录 一、问题二、解决思路2.1 直接插拔USB2.2 给芯片复位 三、解决方法3.1 别人的解决方法3.2 在下载界面进行设置 一、问题 最近学习STM32的USB功能&#xff0c;主要是想要使用虚拟串口功能&#xff08;VCP&#xff09;&#xff0c;发现每次烧录之后都需要重新上电才可以…...

Java数据结构与算法:图算法之深度优先搜索(DFS)

Java数据结构与算法&#xff1a;图算法之深度优先搜索&#xff08;DFS&#xff09; 大家好&#xff0c;我是免费搭建查券返利机器人赚佣金就用微赚淘客系统3.0的小编&#xff0c;一个热爱编程的程序猿。今天&#xff0c;让我们一起探索图算法中的深度优先搜索&#xff08;DFS&…...

SpringBoot整合QQ邮箱发送验证码

一、QQ开启SMTP 打开QQ邮箱&#xff0c;点击设置&#xff0c;进入账号&#xff0c;往下滑后&#xff0c;看见服务状态后&#xff0c;点击管理服务 进入管理服务后&#xff0c;打开服务&#xff0c;然后获取授权码 二 、导入依赖 <!-- 邮箱--><dependency>&…...

云虚拟主机怎么修改代码?如何修改部署在虚拟主机的网站代码?

很多站长成功创建网站之后&#xff0c;或多或少都会对网站代码进行适当修改。比如boke112百科使用YIA主题后&#xff0c;也根据自己的需要进行了多个方面的小修改。 那么如果网站是部署在虚拟主机上的&#xff0c;那么应该如何修改这些网站代码呢&#xff1f;其实&#xff0c;…...

电脑加固态硬盘有什么好处

电脑加固态硬盘有很多好处&#xff0c;以下是一些主要的优点&#xff1a; 1. 启动速度更快&#xff1a;固态硬盘&#xff08;SSD&#xff09;的启动速度比传统机械硬盘&#xff08;HDD&#xff09;快得多。这是因为固态硬盘没有旋转部件&#xff0c;而传统硬盘的读写头需要不断…...

LabVIEW电火花线切割放电点位置

介绍了一个电火花线切割放电点位置分布评价系统&#xff0c;特别是在系统组成、硬件选择和LabVIEW软件应用方面。 本系统由两个主要部分组成&#xff1a;硬件和软件。硬件部分包括电流传感器、高速数据采集卡、开关电源、电阻和导线。软件部分则由LabVIEW编程环境构成&#xf…...

信通院发布《全球数字经济白皮书 (2023年)》解析

文章目录 前言一、白皮书目录二、白皮书核心观点(一)主要国家优化政策布局,数字经济政策导向更加明晰、体系更加完善(二) 数字经济加速构筑经济复苏关键支撑(三)全球数字经济多极化趋势进一步深化(四)数字经济重点领域发展成效显著三、白皮书的主要内容前言 当前,世…...

Spring5系列学习文章分享---第三篇(AOP概念+原理+动态代理+术语+Aspect+操作案例(注解与配置方式))

目录 AOP概念AOP底层原理AOP(JDK动态代理)使用 JDK 动态代理&#xff0c;使用 Proxy 类里面的方法创建代理对象**编写** **JDK** 动态代理代码 AOP(术语)AOP操作&#xff08;准备工作&#xff09;**AOP** **操作&#xff08;**AspectJ注解)**AOP** **操作&#xff08;**AspectJ…...

BL0942 内置时钟免校准计量芯片 用于智能家居领域 上海贝岭 低成本 使用指南

BL0939是上海贝岭股份有限公司开发的一款用于智能家居领域进行电能测量的专用芯片&#xff0c;支持两路测量&#xff0c;可同时进行计量和漏电故障检测&#xff0c;漏电检测电流可设&#xff0c;响应时间快&#xff0c;具有体积小&#xff0c;外围电路简单&#xff0c;成本低廉…...

【算法专题】动态规划之路径问题

动态规划2.0 动态规划 - - - 路径问题1. 不同路径2. 不同路径Ⅱ3. 珠宝的最高价值4. 下降路径最小和5. 最小路径和6. 地下城游戏 动态规划 - - - 路径问题 1. 不同路径 题目链接 -> Leetcode -62.不同路径 Leetcode -62.不同路径 题目&#xff1a;一个机器人位于一个 m …...

Python range函数

Python中的range()函数是一个强大的工具&#xff0c;用于生成一系列的整数。它在循环、迭代和序列生成等方面都有广泛的应用。本文将深入探讨range()函数的用法&#xff0c;提供详细的示例代码&#xff0c;并讨论其在Python编程中的实际应用。 什么是range()函数&#xff1f; …...

Unity中实现捏脸系统

前言 目前市面上常见的捏脸一般是基于BlendShapes和控制骨骼点坐标两种方案实现的。后者能够控制的精细程度更高&#xff0c;同时使用BlendShapes来控制表情。 控制骨骼点坐标 比如找到控制鼻子的骨骼节点修改localScale缩放&#xff0c;调节鼻子大小。 BlendShapes控制表…...

网络六边形受到攻击

大家读完觉得有帮助记得关注和点赞&#xff01;&#xff01;&#xff01; 抽象 现代智能交通系统 &#xff08;ITS&#xff09; 的一个关键要求是能够以安全、可靠和匿名的方式从互联车辆和移动设备收集地理参考数据。Nexagon 协议建立在 IETF 定位器/ID 分离协议 &#xff08;…...

装饰模式(Decorator Pattern)重构java邮件发奖系统实战

前言 现在我们有个如下的需求&#xff0c;设计一个邮件发奖的小系统&#xff0c; 需求 1.数据验证 → 2. 敏感信息加密 → 3. 日志记录 → 4. 实际发送邮件 装饰器模式&#xff08;Decorator Pattern&#xff09;允许向一个现有的对象添加新的功能&#xff0c;同时又不改变其…...

微软PowerBI考试 PL300-选择 Power BI 模型框架【附练习数据】

微软PowerBI考试 PL300-选择 Power BI 模型框架 20 多年来&#xff0c;Microsoft 持续对企业商业智能 (BI) 进行大量投资。 Azure Analysis Services (AAS) 和 SQL Server Analysis Services (SSAS) 基于无数企业使用的成熟的 BI 数据建模技术。 同样的技术也是 Power BI 数据…...

多模态商品数据接口:融合图像、语音与文字的下一代商品详情体验

一、多模态商品数据接口的技术架构 &#xff08;一&#xff09;多模态数据融合引擎 跨模态语义对齐 通过Transformer架构实现图像、语音、文字的语义关联。例如&#xff0c;当用户上传一张“蓝色连衣裙”的图片时&#xff0c;接口可自动提取图像中的颜色&#xff08;RGB值&…...

Nginx server_name 配置说明

Nginx 是一个高性能的反向代理和负载均衡服务器&#xff0c;其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机&#xff08;Virtual Host&#xff09;。 1. 简介 Nginx 使用 server_name 指令来确定…...

【学习笔记】深入理解Java虚拟机学习笔记——第4章 虚拟机性能监控,故障处理工具

第2章 虚拟机性能监控&#xff0c;故障处理工具 4.1 概述 略 4.2 基础故障处理工具 4.2.1 jps:虚拟机进程状况工具 命令&#xff1a;jps [options] [hostid] 功能&#xff1a;本地虚拟机进程显示进程ID&#xff08;与ps相同&#xff09;&#xff0c;可同时显示主类&#x…...

JVM暂停(Stop-The-World,STW)的原因分类及对应排查方案

JVM暂停(Stop-The-World,STW)的完整原因分类及对应排查方案,结合JVM运行机制和常见故障场景整理而成: 一、GC相关暂停​​ 1. ​​安全点(Safepoint)阻塞​​ ​​现象​​:JVM暂停但无GC日志,日志显示No GCs detected。​​原因​​:JVM等待所有线程进入安全点(如…...

什么是Ansible Jinja2

理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具&#xff0c;可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板&#xff0c;允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板&#xff0c;并通…...

html css js网页制作成品——HTML+CSS榴莲商城网页设计(4页)附源码

目录 一、&#x1f468;‍&#x1f393;网站题目 二、✍️网站描述 三、&#x1f4da;网站介绍 四、&#x1f310;网站效果 五、&#x1fa93; 代码实现 &#x1f9f1;HTML 六、&#x1f947; 如何让学习不再盲目 七、&#x1f381;更多干货 一、&#x1f468;‍&#x1f…...

使用Matplotlib创建炫酷的3D散点图:数据可视化的新维度

文章目录 基础实现代码代码解析进阶技巧1. 自定义点的大小和颜色2. 添加图例和样式美化3. 真实数据应用示例实用技巧与注意事项完整示例(带样式)应用场景在数据科学和可视化领域,三维图形能为我们提供更丰富的数据洞察。本文将手把手教你如何使用Python的Matplotlib库创建引…...