当前位置: 首页 > news >正文

MQ高级:RabbitMQ小细节

在之前的学习中,我们只介绍了消息的发送,但是没有考虑到异常的情况,今天我们就介绍一些异常情况,和细节的部分。

目录

生产者可靠性

生产者重连

生产者确认

MQ可靠性

持久化

Lazy Queue

消费者可靠性

消费者确认机制

失败重试机制

业务幂等性

延迟消息

死信交换机

延迟消息插件


生产者可靠性

生产者重连

有时候由于网络波动,可能会出现客户端连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制:

spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长=initial-interval * multipliermax-attempts: 3 # 最大重试次数

在停止mq服务之后,运行代码,会发现测试失败,因为连接失败。最大重试次数设置的是3,此处就重试了3次再停止。

当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMOP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,
会影响业务性能。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

生产者确认

在开启了生产者确认机制后,在MQ成功收到消息后会返回确认消息ACK给生产者,如果有异常,会返回NACK。

  • 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
  • 其它情况都会返回NACK,告知投递失败

spring:rabbitmq:publisher-confirm-type: correlated# 开启publisher confirm机制,并设置confirm类型为correlatedpublisher-returns: true# 开启publisher return机制

 这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制
  • simple:同步阻塞等待MO的回执消息
  • correlated:MO异步回调方式返回回执消息

但是!生产者确认需要额外的网络和系统资源开销,尽量不要使用。
如果一定要使用,无需开启Publisher-Return机制,因为一般路由失败是自己业务问题。
对于nack消息可以有限次数重试,依然失败则记录异常消息。

MQ可靠性

解决了生产者可靠性,还需要解决MQ的可靠性。

通过生产者发送的消息被MQ存放到内存中,经过某些特殊情况或者MQ重启后,这部分数据会丢失。并且内存空间是有限的,当消费者故障或者处理太慢时,会导致消息积压,导致MQ阻塞。

持久化

为了解决这个问题,MQ引入了数据持久化,包括交换机持久化、队列持久化、消息持久化。

前两者只需要在创建的时候设置成Durable(默认)即可。

消息持久化默认是不开启的,要手动开启。

当不开启磁盘持久化,消息会全部存放在内存中。但是发送消息过多,会占满内存。之后多出来的消息会存放到Paged out中,也就是磁盘中。等待内存中的消息被处理完后,会再把磁盘中的消息加载到内存中,再继续处理。

当开启了磁盘持久化,接收到的消息会在内存和磁盘中都存一份。此时处理的消息是从内存中处理。内存会在将要满的时候清理一次,再继续完成消息处理。磁盘则会把所有消息都保存下来。

Lazy Queue

Lazy Queue惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储

并且惰性队列的性能很高,比之前的几种性能都会好一些。

消费者可靠性

消费者确认机制

当不开启消费者确认机制,生产者投递了一条消息,不管消费者是否处理完了,会马上被RabbitMQ删除,当做已经处理完了。但是如果消费者出现网络波动或者其他异常情况,会导致没有接收到这条消息,生产者这边还会认为消费者已经接收到消息了。告知RabbitMQ自己消息处理状态。处理消息结束后,应该向RabbitMQ发送一个回执,

回执有三种可选值:

  • ack:成功处理消息,RabbitMO从队列中删除该消息
  • nack:消息处理失败,RabbitMO需要再次投递消息
  • reiect:消息处理失败并拒绝该消息,RabbitMO从队列中删除该消息

SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:

  • none:不处理消息确认。消息投递给消费者后立刻ack,消息会立刻从MQ删除。这种方式非常不安全,不建议使用。

  • manual:手动模式。需要在业务代码中手动调用API发送ack或reject。虽然存在业务侵入,但提供了更大的灵活性。

  • auto:自动模式。SpringAMQP利用AOP对消息处理逻辑进行环绕增强。当业务正常执行时,自动返回ack。当业务出现异常时,根据异常类型自动返回不同的结果:
    1.如果是业务异常,自动返回nack。
    2.如果是消息处理或校验异常,自动返回reject。

spring:rabbitmq:listener:simple:acknowledge-mode: auto  # 可以设置为 none, manual, auto

失败重试机制

当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,形成无限循环。这会导致MQ的消息处理量飙升,给系统带来不必要的压力。

我们可以利用Spring的retry机制,在消费者出现异常时进行本地重试,而不是无限制地requeue到MQ队列。并且指定最大重试次数。

spring:rabbitmq:listener:simple:prefetch: 1retry:enabled: true  # 开启消费者失败重试initial-interval: 1000ms  # 初始的失败等待时长为1秒multiplier: 1  # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3  # 最大重试次数stateless: true  # true无状态; false有状态。如果业务中包含事务,这里改为false

  

在开启重试模式后,如果重试次数耗尽且消息依然失败,则需要有MessageRecoverer接口来处理。MessageRecoverer包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式。
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队。
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

当通过最后一种方式重试耗尽后,我们可以额外设置一个队列,比如error.queue,当发送失败的消息进入这个队列后,再通过邮件强提醒这样的机制推送给工作人员,可以有效解决消息发送失败的极端情况。

业务幂等性

在程序开发中,业务幂等性指的是同一个业务,执行一次或者执行多次对业务的状态是没有影响的。

唯一消息id
方案一,是给每个消息都设置一个唯一id,利用id区分是否是重复消息:

  1. 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
  3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

业务判断

方案二,是结合业务逻辑,基于业务本身做判断。以我们的业务为例:我们要在支付后修改订单状态为已支付,应该在修改订单状态前先查询订单状态,判断状态是否是未支付。只有未支付订单才需要修改,其它状态不做处理。

如何保证支付服务与交易服务之间的订单状态一致性?

为确保支付服务与交易服务之间的订单状态一致性,我们采取了以下措施:

  1. 消息通知

    • 支付服务在用户支付成功后,通过MQ(消息队列)发送消息通知交易服务,以完成订单状态的同步。
  2. 消息可靠性策略

    • 采用生产者确认机制、消费者确认和消费者失败重试等策略,确保消息的可靠投递和处理。
    • 开启MQ的持久化功能,避免因服务宕机导致消息丢失。
  3. 业务幂等性

    • 在交易服务更新订单状态时进行业务幂等性判断,防止因消息重复消费导致订单状态异常。

如果交易服务消息处理失败,有什么兜底方案?

  • 定时任务
    • 在交易服务中设置定时任务,定期查询订单的支付状态。
    • 即使MQ通知失败,定时任务也可以作为兜底方案,确保订单支付状态的最终一致性。

延迟消息

延迟消息是消息队列中的一种重要功能,它允许消息在被发送到消息队列后并不会立即被消费者消费,而是在经过特定的时间延迟后才能被消费者获取和处理。这种特性在很多业务场景中都非常有用,比如订单处理超时、定时提醒等。

比如,用户A下单某商品的最后一件,订单确认后,迟迟不支付,但是又占用着这个名额。等到很久以后取消这个订单,此时想买的人没买到,商家没有卖掉,而这个人又没有买。这种情况用延迟消息就能很好的解决这个问题。当用户下单商品后,会设置一个延迟消息,假设30分钟内没有下单,这个延迟消息就会被发送到MQ,提醒数据库这个人订单超时了,强制让这个人取消订单。

死信交换机

当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
  • 要投递的队列消息堆积满了,最早的消息可能成为死信

死信消息,经过死信交换机可以变成延迟消息。

当publisher发布一个消息后,通过交换机进入队列。通过手动设置一个过期时间,让消息变成死信消息,此时消息会自动进入通过dead-letter-exchang设置的交换机dlx.direct,再一步步的进入到consumer。

延迟消息插件


RabbitMO的官方也推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机
当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。

@RabbitListener(bindings=@QueueBinding(
value=@Queue(name="delay.queue", durable="true"),
exchange=@Exchange(name="delay.direct",delayed="true"),
key="delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}",msg);
}
@Bean
public DirectExchange delayExchange(){
return ExchangeBuilder
.directExchange("delay.direct")
.delayed()//设置delay的属性为true
.durable(true)//持久化
.build();
}

发送消息时需要通过消息头x-delay来设置过期时间:

@Test
void testPublisherDelayMessage() {//1.创建消息String message = "hello, delayed message";//2.发送消息,利用消息后置处理器添加消息头rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//添加延迟消息属性message.getMessageProperties().setDelay(5000);return message;}});
}

相关文章:

MQ高级:RabbitMQ小细节

在之前的学习中,我们只介绍了消息的发送,但是没有考虑到异常的情况,今天我们就介绍一些异常情况,和细节的部分。 目录 生产者可靠性 生产者重连 生产者确认 MQ可靠性 持久化 Lazy Queue 消费者可靠性 消费者确认机制 失…...

期权卖方怎么选择权利金高的品种,期货VIX高低对行情有什么影响

VIX指数——全称为芝加哥期权交易所市场波动率指数,俗称恐慌指数。 是衡量波动性的重要指标。VIX指数上升,预期未来市场波动性会增加。VIX指数下降,预期未来市场波动性会降低。 期货VIX指数最新价格排序 期权卖方尽量选择期货VIX指数在25以…...

内存对齐的原理和使用

1. 什么是内存对齐? 内存对齐是指将数据存储在内存中时,按照数据类型的大小,将数据放在特定的内存边界上。例如,4 字节的 int 通常放在能够被 4 整除的地址上,8 字节的 double 则放在能被 8 整除的地址上。 2. 为什么…...

搭建企业级私有仓库harbor

华子目录 harbor简介实验环境准备下载软件包安装docker-cehosts解析 实验步骤配置https加密传输解压进入解压目录,修改文件配置启动harbor 测试客户端配置harbor本地加速器注意 通过docker compose管理harbor harbor简介 harbor是由wmware公司开源的企业级docker r…...

互联网前后端分离的开发场景,一般会员和数据权限的判断是放在前端还是后端?

推荐学习文档 golang应用级os框架,欢迎stargolang应用级os框架使用案例,欢迎star案例:基于golang开发的一款超有个性的旅游计划app经历golang实战大纲golang优秀开发常用开源库汇总想学习更多golang知识,这里有免费的golang学习笔…...

李宏毅机器学习2022-HW8-Anomaly Detection

文章目录 TaskBaselineReportQuestion2 Code Link Task 异常检测Anomaly Detection 将data经过Encoder,在经过Decoder,根据输入和输出的差距来判断异常图像。training data是100000张人脸照片,testing data有大约10000张跟training data相同…...

用户体验分享 | YashanDB V23.2.3安装部署

近期崖山新版体验过程中,总能看到用户提问:openssl版本问题、monit命令找不到问题、yashan用户权限问题、数据库重装问题 今日整理了多位用户的安装经验,希望能够帮助到大家~ 1.Lucifer三思而后行 :YashanDB 个人版数据库安装部…...

【漏洞复现】泛微OA E-Office /E-mobile/App/init.php 任意文件上传漏洞

免责声明: 本文旨在提供有关特定漏洞的信息,以帮助用户了解潜在风险。发布此信息旨在促进网络安全意识和技术进步,并非出于恶意。读者应理解,利用本文提到的漏洞或进行相关测试可能违反法律或服务协议。未经授权访问系统、网络或应用程序可能导致法律责任或严重后果…...

SpringCloudEureka实战:搭建EurekaServer

1、依赖引入 <dependencies><!-- 注册中心 --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-server</artifactId></dependency> </dependencies> <de…...

DataLight(V1.4.5) 版本更新,新增 Ranger、Solr

DataLight&#xff08;V1.4.5&#xff09; 版本更新&#xff0c;新增 Ranger、Solr DataLight 迎来了重大的版本更新&#xff0c;现已发布 V1.4.5 版本。本次更新对平台进行了较多的功能拓展和优化&#xff0c;新增了对 Ranger 和 Solr 服务组件的支持&#xff0c;同时对多项已…...

深度解析:Python蓝桥杯青少组精英赛道与高端题型概览

目录 一、蓝桥杯青少组简介二、赛项组别与年龄范围三、比赛内容与题型1. 基础知识范围2. 题型设置2.1 选择题2.2 编程题 3. 考试时长 四、奖项设置与激励措施五、总结 一、蓝桥杯青少组简介 蓝桥杯全国软件和信息技术专业人才大赛&#xff08;简称“蓝桥杯”&#xff09;是由工…...

如何使用SCCMSecrets识别SCCM策略中潜在的安全问题

关于SCCMSecrets SCCMSecrets是一款针对SCCM策略的安全扫描与检测工具&#xff0c;该工具旨在提供一种有关 SCCM 策略的全面安全检测方法。 该工具可以从各种权限级别执行&#xff0c;并将尝试发现与策略分发相关的潜在错误配置。除了分发点上托管的包脚本外&#xff0c;它还将…...

Qt 信号重载问题--使用lambda表达式--解决方法

在connect()中&#xff0c;使用lambda表达式时遇到信号重载&#xff0c;无法识别使用哪个参数时&#xff0c;可通过以下方法处理&#xff1a; 1. 使用QOverload: Qt5.7才有 connect(comboBox,QOverload<int>::of(&QComboBox::currentIndexChanged), [](int index)…...

并行编程实战——TBB框架的应用之一Supra的基础

一、TBB的应用 在前面分析了TBB框架的各种基本知识和相关的基础应用。这些基础的应用很容易通过学习文档或相关的代码来较为轻松的掌握。为了能够更好的理解TBB框架的优势&#xff0c;这里从一个开源的应用程序来分析一下TBB在其中的更高一层的抽象应用&#xff0c;以方便开发…...

std::vector

std::vector是C标准库中一个非常强大的容器类&#xff0c;它提供了动态数组的功能。std::vector可以自动调整大小&#xff0c;提供了随机访问的能力&#xff0c;同时还支持在序列的尾部高效地添加和删除元素。 当创建一个空的std::vector对象时&#xff0c;它不分配任何内存&a…...

Java Web 之 Cookie 详解

在 JavaWeb 开发中&#xff0c;Cookie 就像网站给浏览器贴的小纸条&#xff0c;用于记录一些用户信息或状态&#xff0c;方便下次访问时识别用户身份或进行个性化服务。 也可以这么理解&#xff1a; 场景一&#xff1a;想象一下&#xff0c;你去一家咖啡店&#xff0c;店员认…...

linux系统下让.py文件开机自启动

一 创建服务文件 1、打开终端 2、切换到root用户 sudo su3、创建一个新的systemd服务文件 nano /etc/systemd/system/total_test0619.service 4、在服务文件中添加以下内容 [Unit] DescriptionRun total_test0619.py at startup[Service] Typesimple ExecStart/usr/bin/n…...

linux远程桌面:xrdp 安装失败

window 如何远程 Linux 桌面 安装xrdp yum install xrdpsystemctl start xrdp 如果找不到软件包&#xff0c;就安装epel源&#xff0c;最好改成国内镜像的 在 /etc/yum.repos.d/ 下创建epel.repo,内容如下 [epel] nameExtra Packages for Enterprise Linux 7 - $basearch …...

9.30Python基础-元组(补充)、字典、集合

Python元组&#xff08;tuple&#xff09;补充 1、元组的不可变性 元组&#xff08;tuple&#xff09;是Python中的一种内置数据类型&#xff0c;用于存储不可变的序列。虽然元组本身不可变&#xff0c;但元组内的元素如果是可变对象&#xff08;如列表&#xff09;&#xff…...

桥接模式和NET模式的区别

桥接模式和NET模式的区别 NAT模式&#xff1a; NAT&#xff1a;网络地址转换&#xff08;模式&#xff09;&#xff1a;借助宿主机来上网&#xff0c;没桥接那么麻烦&#xff0c;只用配置DNS即可。 缺点&#xff1a;扎根于宿主机&#xff0c;不能和局域网内其它真实的主机进行…...

Pigar:Python 项目的依赖管理利器

&#x1f31f; 引言 在Python项目开发过程中&#xff0c;依赖管理是一个不可忽视的环节。一个精确且易于维护的requirements.txt文件对于项目的部署和协作至关重要。今天&#xff0c;我们将介绍一款名为Pigar的自动生成requirements.txt文件的依赖管理工具&#xff0c;它通过一…...

泰勒图 ——基于相关性与标准差的多模型评价指标可视化比较-XGBoost、sklearn

1、基于相关性与标准差的多模型评价指标可视化比较 # 数据读取并分割 import pandas as pd import numpy as np import matplotlib.pyplot as plt from sklearn.model_selection import train_test_split plt.rcParams[font.family] = Times New Roman plt.rcParams[axes.unic…...

记录|Modbus-TCP产品使用记录【摩通传动】

目录 前言一、摩通传动实验图1.1 配置软件 IO_Studio1.2 测试软件Modbus Poll1.2.1 读写设置测试1.2.2 AI信号的读取 1.3 对应的C#连接Modbus的测试代码如下【自制&#xff0c;仅供参考】1.4 最终实验图 更新时间 前言 参考文章&#xff1a; 自己需要了解和对比某些产品的Modbu…...

工业交换机的RMON

工业交换机在现代网络中扮演着至关重要的角色&#xff0c;它不仅负责数据的高效传输&#xff0c;还具备强大的监控和管理能力。其中&#xff0c;RMON&#xff08;远程监控&#xff09;功能使得交换机的性能得以进一步提升&#xff0c;成为网络管理的重要工具。RMON提供了一种先…...

生态遥感数据下载分享

中国土壤湿度/土壤水分数据集&#xff08;2000-2020&#xff09; 下载网站&#xff1a;https://poles.tpdc.ac.cn/zh-hans/data/49b22de9-5d85-44f2-a7d5-a1ccd17086d2/#:~:text%E6%88%91%E4%BB%AC%E6%8F%90%E4%BE%9B%E4%BA%86%E4%B8%AD%E5%9B%BD%E8%8C%83 note: The data can …...

ECharts 快速使用

最终效果 使用介绍 echarts图表的绘制&#xff0c;大体分为三步&#xff1a; 根据 DOM实例&#xff0c;通过 echarts.init方法&#xff0c;生成 echarts实例构建 options配置对象&#xff0c;整个echarts的样式&#xff0c;皆有该对象决定最后通过实例.setOption方法&#xf…...

进程--消息队列和共享内存

目录 消息队列 创建消息队列 删除消息队列 发送消息和接收 消息队列 消息队列就是一个消息的列表&#xff0c;进程可以在消息队列中添加消息和的读取消息 消息队列具有FIFO的特性&#xff0c;具有无名管道与有名管道各自的优势&#xff0c;可以支持任意两个进程的进程间通讯…...

useCallback()

官网直达&#xff1a;https://zh-hans.react.dev/reference/react/useCallback 点击按钮&#xff0c;子组件会重新渲染 import { memo, useState, useCallback } from react;const Child (props) > {console.log(我是子组件&#xff01;我在渲染呢&#xff01;&#xff0…...

Python面试题精选及解析--第二篇

在Python的面试中&#xff0c;除了基础语法和常用库的知识外&#xff0c;面试官往往还会通过一系列的问题来考察应聘者的逻辑思维、问题解决能力以及项目经验。以下是一些精心挑选的Python面试题及其详细答案&#xff0c;旨在帮助求职者更好地准备面试。 面试题一&#xff1a;…...

Linux操作常用问题

目录 Ubuntu操作问题vi编辑方向键键盘乱码回退键不能使用的问题解决问题的方法 Ubuntu操作问题 vi编辑方向键键盘乱码回退键不能使用的问题 编辑/etc/systemd/resolved.conf文件来修改DNS&#xff0c;结果编辑时键盘乱码&#xff0c;按下方向键会出现ABCD&#xff0c;且回退键…...

门户网站直接登录系统/如何网页优化

案例介绍本章节主要用java实现&#xff1b;方法调用指令、返回指令、解析方法符号引用、参数传递等。实现新的指令后我们的虚拟机就可以执行稍微复杂的运算并输出结果。从调用的角度来看&#xff0c;方法可以分为两类&#xff1a;静态方法(或者类方法)和实例方法。静态方法通过…...

微网站开发微网站建设/武汉关键词排名工具

每个行业&#xff0c;都有业内“行话”&#xff0c;不了解这些行话的人&#xff0c;很难融入到行业中&#xff0c;也永远装不了逼。 从Curry到Closes&#xff0c;有很多JavaScript行话&#xff08;该领域中使用的特殊词汇&#xff09;知道这些行话不仅能帮助你增加词汇量&…...

wordpress 文学付费/手机端网站优化

Linux提供了大量的命令&#xff0c;利用它可以有效地完成大量的工作&#xff0c;如磁盘操作、文件存取、目录操作、进程管理、文件权限设定等。所以&#xff0c;在Linux系统上工作离不开使用系统提供的命令。要想真正理解Linux系统&#xff0c;就必须从Linux命令学起&#xff0…...

查企业去哪个网站/网络推广员是干嘛的

我的上篇博客【我心目中的Asp.net核心对象】 讲述了一些我认为在Asp.net中比较重要的核心对象&#xff0c;以及演示了直接使用它们也能实现一个简单的服务响应。今天&#xff0c;我将继续把我认为Asp.net的另一些重要的内容拿出来与大家一起分享&#xff0c; 同时将使用本次所讲…...

万网空间/江门网站优化公司

1. 直接插入排序 插入排序的代码实现虽然没有冒泡排序和选择排序那么简单粗暴&#xff0c;但它的原理应该是最容易理解的了&#xff0c;因为只要打过扑克牌的人都应该能够秒懂。插入排序是一种最简单直观的排序算法&#xff0c;它的工作原理是通过构建有序序列&#xff0c;对于…...

wordpress 插件大全/企业文化设计

windows环境下MySQL重启的命令行说明 windowsR 弹出运行框 在运行框中输入cmd 回车 进入系统的dos窗口 .启动mysql&#xff1a;输入 net start mysql; .停止mysql&#xff1a;输入 net stop mysql; windows下不能直接重启(restart)&#xff0c;只能先停止&#xff0c;再启…...