当前位置: 首页 > news >正文

RabbitMQ高级篇(如何保证消息的可靠性、如何确保业务的幂等性、延迟消息的概念、延迟消息的应用)

文章目录

  • 1. 消息丢失的情况
  • 2. 生产者的可靠性
    • 2.1 生产者重连
    • 2.2 生产者确认
    • 2.3 生产者确认机制的代码实现
    • 2.4 如何看待和处理生产者的确认信息
  • 3. 消息代理(RabbitMQ)的可靠性
    • 3.1 数据持久化
    • 3.2 LazyQueue( 3.12 版本后所有队列都是 Lazy Queue 模式)
  • 4. 消费者的可靠性
    • 4.1 消费者确认机制
    • 4.2 失败重试机制
    • 4.3 失败消息的处理策略
    • 4.4 业务幂等性
      • 4.4.1 方案一:为每条消息设置一个唯一的 id
      • 4.4.2 方案二:结合业务判断
    • 4.5 兜底的解决方案
  • 5. 延迟消息
    • 5.1 什么是延迟消息
    • 5.2 死信交换机
    • 5.3 延迟消息插件(推荐使用)
      • 5.3.1 下载并安装延迟插件
      • 5.3.2 安装插件时可能遇到的问题
      • 5.3.3 在 Java 代码中发送延迟消息
      • 5.3.4 延迟消息的原理和缺点
    • 5.4 取消超时订单
    • 5.5 发送延迟检测订单的消息

对 RabbitMQ 不是很了解的同学,可以看一下我的另一篇博文:RabbitMQ快速入门(MQ的概念、安装RabbitMQ、在 SpringBoot 项目中集成 RabbitMQ )

1. 消息丢失的情况

消息丢失的情况主要有以下三种:

  1. 生产者向消息代理传递消息的过程中,消息丢失了
  2. 消息代理( RabbitMQ )把消息弄丢了
  3. 消费者把消息弄丢了

在这里插入图片描述

那怎么保证消息的可靠性呢,我们可以从消息丢失的情况入手——从生产者、消息代理( RabbitMQ )、消费者三个方面来保证消息的可靠性

2. 生产者的可靠性

2.1 生产者重连

由于网络问题,可能会出现客户端连接 RabbitMQ 失败的情况,我们可以通过配置开启连接 RabbitMQ 失败后的重连机制

application.yml(将 host 更改为部署 RabbitMQ 的服务器的地址)

spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /blogusername: CaiXuKunpassword: T1rhFXMGXIOYCoyiconnection-timeout: 1s # 连接超时时间template:retry:enabled: true # 开启连接超时重试机制initial-interval: 1000ms # 连接失败后的初始等待时间multiplier: 1 # 连接失败后的等待时长倍数,下次等待时长 = (initial-interval) * multipliermax-attempts: 3 # 最大重试次数

填写完配置信息后,我们手动停止 RabbitMQ ,模拟生产者连接 RabbitMQ 失败的情况

sudo docker stop rabbitmq

启动测试类

@Test
void testSendMessageToQueue() {String queueName = "simple.queue";String msg = "Hello, SpringAMQP!";rabbitTemplate.convertAndSend(queueName, msg);
}

可以在控制台看到,总共有三次重新连接 RabbitMQ 的记录,三次连接都失败后,就直接抛异常了

在这里插入图片描述

注意事项:

  1. 当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率,但 SpringAMOP 提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,线程会被阻塞,影响业务性能
  2. 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长(比如 200 ms)和重试次数,也
    可以考虑使用异步线程来执行发送消息的代码

2.2 生产者确认

RabbitMQ 提供了 Publisher ConfirmPublisher Return 两种确认机制。开启确机制认后,如果 MQ 成功收到消息后,会返回确认消息给生产者,返回的结果有以下几种情况:

  1. 消息投递到了 MQ,但是路由失败,此时会通过 PublisherReturn 机制返回路由异常的原因,然后返回 ACK,告知生产者消息投递成功
  2. 临时消息投递到了 MQ,并且入队成功,返回 ACK,告知生产者消息投递成功
  3. 持久消息投递到了MQ,并且入队完成持久化,返回 ACK,告知生产者消息投递成功
  4. 其它情况都会返回 NACK,告知生产者消息投递失败

在这里插入图片描述

2.3 生产者确认机制的代码实现

在 publisher 服务中编写与生产者确认机制有关的配置信息( application.yml 文件)

spring:rabbitmq:publisher-returns: truepublisher-confirm-type: correlated

publisher-confirm-type 有三种模式:

  1. none:关闭 confirm 机制
  2. simple:以同步阻塞等待的方式返回 MQ 的回执消息
  3. correlated:以异步回调方式的方式返回 MQ 的回执消息

每个 RabbitTemplate 只能配置一个 ReturnCallback

在 publisher 模块新增一个名为 RabbitMQConfig 的配置类,并让该类实现 ApplicationContextAware 接口

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 配置回调rabbitTemplate.setReturnsCallback((returnedMessage) -> {System.out.println("收到消息的return callback, " +"exchange = " + returnedMessage.getExchange() + ", " +"routingKey = " + returnedMessage.getRoutingKey() + ", " +"replyCode = " + returnedMessage.getReplyCode() + ", " +"replyText = " + returnedMessage.getReplyText() + ", " +"message = " + returnedMessage.getMessage());});}}

测试前先运行 RabbitMQ

sudo docker start rabbitmq

在 publisher 模块添加一个测试类,测试 ReturnCallback 的效果

@Test
void testConfirmCallback() throws InterruptedException {CorrelationData correlationData = new CorrelationData();correlationData.getFuture().whenCompleteAsync((confirm, throwable) -> {if (confirm.isAck()) {// 消息发送成功System.out.println("消息发送成功,收到ack");} else {// 消息发送失败System.err.println("消息发送失败,收到nack,原因是" + confirm.getReason());}if (throwable != null) {// 消息回调失败System.err.println("消息回调失败");}});rabbitTemplate.convertAndSend("blog.direct", "red", "Hello, confirm callback", correlationData);// 测试方法执行结束后程序就结束了,所以这里需要阻塞线程,否则程序看不到回调结果Thread.sleep(2000);
}

发送成功后可以看到消息发送成功的回调信息


如果交换机不存在会怎么样呢,我们故意使用一个不存在的交换机,观察控制台的输出结果

在这里插入图片描述


如果 routingKey 不存在会怎么样呢,我们故意使用一个不存在的 routingKey ,观察控制台的输出结果

在这里插入图片描述

可以看到,confirmCallback 和 ReturnCallback 都返回了回调信息(deliveryTag0 表示消息无法路由到队列)

2.4 如何看待和处理生产者的确认信息

  1. 生产者确认需要额外的网络开销和系统资源开销,尽量不要使用
  2. 如果一定要使用,无需开启 Publisher-Return 机制,因为路由失败一般是业务出了问题
  3. 对于返回 nack 的消息,可以尝试重新投递,如果依然失败,则记录异常消息

3. 消息代理(RabbitMQ)的可靠性

在默认情况下,RabbitMQ 会将接收到的信息保存在内存中以降低消息收发的延迟,这样会导致两个问题:

  1. 一旦 RabbitMQ 宕机,内存中的消息会丢失
  2. 内存空间是有限的,当消费者处理过慢或者消费者出现故障或时,会导致消息积压,引发 MQ 阻塞( Paged Out 现象)

怎么理解 MQ 阻塞呢,当队列的空间被消息占满了之后,RabbitMQ 会先把老旧的信息存到磁盘,为新消息腾出空间,在这个过程中,整个 MQ 是被阻塞的,也就是说,在 MQ 完成这一系列工作之前,无法处理已有的消息和接收新的消息

在这里插入图片描述

在这里插入图片描述


我们来测试一下消息丢失的情况,在 RabbitMQ 的控制台中向 simple.queue 队列发送一条信息,发送后重启 RabbitMQ ,模拟 RabbitMQ 宕机后重启的情况

测试前,记得先把监听 simple.queue 队列的代码注释掉

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {System.out.println("消费者收到了simple.queue的消息:【" + message + "】");
}

第一步:先发送一条消息

在这里插入图片描述

第二步:查看消息的情况

在这里插入图片描述

在这里插入图片描述

第三步:重启 RabbitMQ ,模拟 RabbitMQ 宕机后重启的情况

sudo docker restart rabbitmq

第四步:查看消息的情况(可以看到,RabbitMQ 重启后,消息丢失了)

在这里插入图片描述

3.1 数据持久化

RabbitMQ 实现数据持久化包括 3 个方面:

  1. 交换机持久化
  2. 队列持久化
  3. 消息持久化

注意事项:

  1. 利用 SpringAMQP 创建的交换机、队列、消息,默认都是持久化的
  2. 在 RabbitMQ 控制台创建的交换机、队列默认是持久化的,而消息默认是存在内存中( 3.12 版本之前默认存放在内存,3.12 版本及之后默认先存放在磁盘,消费者处理消息时才会将消息取出来放到内存中)

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

我们来演示一下 RabbitMQ 发生 Paged Out 现象(也就是队列的空间被消息占满了之后,将老旧消息移到磁盘,为新消息腾出空间的情况)

我们编写一个测试类,向 simple.queue 一次性发送一百万条消息

在发送消息之前,先把生产者确认机制关闭,提高消息发送的速度

spring:rabbitmq:publisher-returns: falsepublisher-confirm-type: none

先测试发送非持久化信息

@Test
void testPagedOut() {Message message = MessageBuilder.withBody("Hello, paged out".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();for (int i = 0; i < 1; i++) {rabbitTemplate.convertAndSend("simple.queue", message);}
}

测试结果

在这里插入图片描述


再测试发送持久化信息

@Test
void testPagedOut() {Message message = MessageBuilder.withBody("Hello, paged out".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();for (int i = 0; i < 1; i++) {rabbitTemplate.convertAndSend("simple.queue", message);}
}

在这里插入图片描述

3.2 LazyQueue( 3.12 版本后所有队列都是 Lazy Queue 模式)

从 RabbitMQ 的 3.6.0 版本开始,增加了 Lazy Queue 的概念,也就是惰性队列,惰性队列的特征如下:

  1. 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认 2048条 )
  2. 消费者要处理消息时才会从磁盘中读取并加载到内存
  3. 支持数百万条的消息存储,在 3.12 版本后,所有队列都是 Lazy Queue 模式,无法更改

开启持久化和生产者确认时,RabbitMQ 只有在消息持久化完成后才会给生产者返回 ACK 回执


在 RabbitMQ 的控制台可以看到 RabbitMQ 的版本

在这里插入图片描述

在 RabbitMQ 控制台中,要创建一个惰性队列,只需要在声明队列时,指定 x-queue-mode 属性为 lazy 即可

在这里插入图片描述

x-queue-mode

在 Java 代码中,要创建一个惰性队列,只需要在声明队列时,指定 x-queue-mode 属性为 lazy 即可

编程式创建

@Bean
public org.springframework.amqp.core.Queue lazeQueue() {return QueueBuilder.durable("lazy.queue1").lazy().build();
}

注解式创建

@RabbitListener(queuesToDeclare = @org.springframework.amqp.rabbit.annotation.Queue(name = "lazy.queue2",durable = "true",arguments = @Argument(name = "x-queue-mode",value = "lazy")
))
public void listenLazeQueue(String message) {System.out.println("消费者收到了 laze.queue2的消息: " + message);
}

4. 消费者的可靠性

4.1 消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ 提供了消费者确认机制(Consumer Acknowledgement)。处理消息后,消费者应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 消息的处理状态,回执有三种可选值:

  1. ack:成功处理消息,RabbitMQ 从队列中删除该消息
  2. nack:消息处理失败,RabbitMQ 需要再次投递消息
  3. reject:消息处理失败并拒绝该消息,RabbitMQ 从队列中删除该消息

SpringAMQP 已经实现了消息确认功能,并允许我们通过配置文件选择 ACK 的处理方式,有三种方式:

  1. none:不处理,即消息投递给消费者后立刻 ack,消息会会立刻从 MQ 中删除,非常不安全,不建议使用
  2. manual:手动模式。需要自己在业务代码中调用 api,发送 ack 或 reject ,存在业务入侵,但更灵活
  3. auto:自动模式,SpringAMQP 利用 AOP 对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回 ack,当业务出现异常时,会根据异常的类型返回不同结果:
    • 如果是业务异常,会自动返回 nack
    • 如果是消息处理或校验异常,自动返回 reject

开启消息确认机制,需要在 application.yml 文件中编写相关的配置

spring:rabbitmq:listener:simple:prefetch: 1acknowledge-mode: none

先测试处理模式为 none 的情况,向 simple.queue 队列发送一条消息,同时监听 simple.queue 队列的消息,监听到队列中的消息后手动抛出一个异常

publisher 服务

@Test
void testSendMessageToQueue() {String queueName = "simple.queue";String msg = "Hello, SpringAMQP!";rabbitTemplate.convertAndSend(queueName, msg);
}

consumer 服务

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {System.out.println("消费者收到了simple.queue的消息:【" + message + "】");throw new RuntimeException("故意抛出异常");
}

不出意外,程序报错了

在这里插入图片描述

但在 RabbitMQ 的控制台可以看到,消息也丢失了

在这里插入图片描述

再测试处理模式为 none 的情况

可以看到,控制台一直在报错,报错之后一直在尝试重新发送消息

在 RabbitMQ 的控制台可以看到,simple.queue 一直在收发消息,速率达到了 97 次每秒(状态为 running ,消息的状态为 Unacked )

在这里插入图片描述

此时,我们手动关闭 consumer 服务,查看 RabbitMQ 的控制台,可以看到消息恢复到正常的状态了

在这里插入图片描述

再来测试异常类型为 MessageConversionException 的情况

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {System.out.println("消费者收到了simple.queue的消息:【" + message + "】");throw new MessageConversionException("故意抛出异常");
}

在控制台可以看到,消息被拒绝了,而且消息也没有重新发送

在这里插入图片描述

查看 RabbitMQ 的控制台,可以发现消息已经从队列中移除了

在这里插入图片描述

4.2 失败重试机制

当消费者出现异常后,消息会不断重新入队,重新发送给消费者,然后再次发生异常,再次 requeue(重新入队),陷入 无限循环,给 RabbitMQ 带来不必要的压力

我们可以利用 Spring 提供的 retry 机制,在消费者出现异常时利用本地重试,而不是无限制地重新入队

在 application.yml 配置文件中开启失败重试机制

spring:rabbitmq:listener:simple:prefetch: 1acknowledge-mode: autoretry:enabled: true # 开启消息消费失败重试机制initial-interval: 1000ms # 消息消费失败后的初始等待时间multiplier: 1 # 消息消费失败后的等待时长倍数,下次等待时长 = (initial-interval) * multipliermax-attempts: 3 # 最大重试次数stateless: true # true表示无状态,false表示有状态,如果业务中包含事务,需要设置为false

我们将抛出的异常类型改回 RuntimeException

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {System.out.println("消费者收到了simple.queue的消息:【" + message + "】");throw new RuntimeException("故意抛出异常");
}

在控制台可以看出,消息的重新发送次数已经耗尽了

在这里插入图片描述

查看 RabbitMQ 的控制台,发现消息也丢失了

正常情况下,消息丢失都不是我们想看到的,该怎么解决这个问题呢

4.3 失败消息的处理策略

开启重试模式后,如果重试次数耗尽后消息依然处理失败,则需要由 MessageRecoverer 接口来处理, MessageRecoverer 有三个实现类:

  1. RejectAndDontRequeueRecoverer:重试次数耗尽后,直接 reject,丢弃消息,默认就是这种方式
  2. ImmediateRequeueMessageRecoverer:重试次数耗尽后,返回 nack,消息重新入队
  3. RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

在这里插入图片描述

我们来演示一下使用 RepublishMessageRecoverer 类的情况

第一步:定义一个名为 blog.error 的交换机、一个名为 error.queue 的队列,并将队列和交换机进行绑定

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
public class ErrorConfiguration {@Beanpublic DirectExchange errorExchange() {return new DirectExchange("error.direct", true, false);}@Beanpublic Queue errorQueue() {return new Queue("error.queue", true, false, false);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");}}

第二步:将失败处理策略改为 RepublishMessageRecoverer (开起了消费者重试机制才会生效)

@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

在控制台中可以看到,消息的重试次数耗尽后,消息被放入了 error.queue 队列

在这里插入图片描述

在 RabbitMQ 的控制塔也可以看到, error.direct 交换机 和 error.queue 队列成功创建,消息也成功放入了 error.queue 队列

在这里插入图片描述

总结:消费者如何保证消息一定被消费?

  1. 开启消费者确认机制为 auto ,由 Spring 帮我们确认,消息处理成功后返回 ack,异常时返回 nack
  2. 开启消费者失败重试机制,并设置 MessageRecoverer ,多次重试失败后将消息投递到异常交换机,交由人工处理

4.4 业务幂等性

幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x)),绝对值函数具有幂等性

在程序开发中,幂等是指同一个业务,执行一次或多次对业务状态的影响是一致的

在这里插入图片描述

那么有什么方法能够确保业务的幂等性呢

4.4.1 方案一:为每条消息设置一个唯一的 id

给每个消息都设置一个唯一的 id,利用 id 区分是否是重复消息:

  1. 为每条消息都生成一个唯一的 id,与消息一起投递给消费者
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息 id 保存到数据库
  3. 如果消费者下次又收到相同消息,先去数据库查询该消息对应的 id 是否存在,如果存在则为重复消息,放弃处理

可以在指定 MessageConverter 的具体类型时,同时为 MessageConverter 设置自动创建一个 messageId

@Bean
public MessageConverter jacksonMessageConvertor() {Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}

发送消息后,在 RabbitMQ 的控制台可以看到,消息的 properties 属性附带了 messageId 信息

在这里插入图片描述

但这种方式对业务有一定的侵入性

4.4.2 方案二:结合业务判断

结合业务逻辑,基于业务本身做判断。以支付业务为例:我们要在支付后修改订单状态为已支付,应该在修改订单状态前先查询订单状态,判断状态是否是未支付,只有未支付订单才需要修改,其它状态的订单不做处理

在这里插入图片描述

总结:如何保证支付服务与交易服务之间的订单状态一致性?

  1. 首先,支付服务会正在用户支付成功以后利用 MQ 发送消息通知交易服务,完成订单状态同步
  2. 其次,为了保证 MQ 消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递和处理的可靠性,同时也开启了MQ的持久化,避免因服务宕机导致消息丢失
  3. 最后,我们还在交易服务更新订单状态时做了业务幕等判断,避免因消息重复消费导致订单状态异常

4.5 兜底的解决方案

如果交易服务消息处理失败,支付服务和交易服务出现了数据不一致的情况,有没有什么兜底的解决方案?

我们可以在交易服务设置定时任务,定期查询订单支付状态,这样即便 MQ 通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性

5. 延迟消息

5.1 什么是延迟消息

延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才会收到消息

延迟任务:一定时间之后才会执行的任务

在这里插入图片描述

5.2 死信交换机

当一个队列中的某条消息满足下列情况之一时,就会成为死信(dead letter):

  1. 消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false
  2. 过期消息(达到了队列或消息本身设置的过期时间),消息超时后无人消费
  3. 要投递的队列消息堆积满了,最早的消息可能成为死信

如果队列通过 dead-letter-exchange 属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中,这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)

在这里插入图片描述

利用死信交换机的特点,可以实现发送延迟消息的功能

5.3 延迟消息插件(推荐使用)

5.3.1 下载并安装延迟插件

RabbitMQ 的官方推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后,可以将消息暂存一段时间,时间到了之后再将消息投递到队列中

插件的下载地址:rabbitmq-delayed-message-exchange

在这里插入图片描述


下载完插件后,运行以下指令,在输出信息中找到 Mounts ,再找到 RabbitMQ 的插件的安装目录

sudo docker inspect rabbitmq

在这里插入图片描述

然后进入 RabbitMQ 的插件的安装目录,将刚才下载的插件上传到该目录下

一般与 docker 相关的目录只有 root 用户才有权限访问,所以我们需要先打开 docker 目录的部分权限(耗时可能较长)

sudo chmod +rx -R /var/lib/docker

接着打开/var/lib/docker/volumes/rabbitmq-plugins/_data目录的写权限(如果修改权限不生效,请切换到 root 用户执行指令

sudo chmod 777 /var/lib/docker/volumes/rabbitmq-plugins/_data

将刚才下载的插件上传到/var/lib/docker/volumes/rabbitmq-plugins/_data目录

上传成功后将/var/lib/docker/volumes/rabbitmq-plugins/_data目录的权限复原

sudo chmod 755 /var/lib/docker/volumes/rabbitmq-plugins/_data

最后进入容器内部,运行指令安装插件,安装完成后退出容器内部

sudo docker exec -it rabbitmq bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
exit

看到以下信息,说明插件安装成功了

在这里插入图片描述

5.3.2 安装插件时可能遇到的问题

如果你遇到了以下错误,在执行rabbitmq-plugins enable rabbitmq_delayed_message_exchange指令前先执行以下指令

在这里插入图片描述

chmod 400 /var/lib/rabbitmq/.erlang.cookie

5.3.3 在 Java 代码中发送延迟消息

声明延迟交换机

@Bean
public DirectExchange delayExchange() {return ExchangeBuilder.directExchange("delay.direct").delayed().build();
}

声明队列和延迟交换机,并将队列和延迟交换机绑定在一起

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue"),exchange = @Exchange(name = "delay.direct", delayed = "true", type = ExchangeTypes.DIRECT),key = "delay"
))
public void listenDelayQueue(String message) {SimpleDateFormat simpleDateFormat = new SimpleDateFormat();simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");System.out.println("消费者收到了 delay.queue的消息: " + message + ",时间:" + simpleDateFormat.format(System.currentTimeMillis()));
}

编写测试方法,测试发送延迟消息

@Test
void testSendDelayMessage() {rabbitTemplate.convertAndSend("delay.direct", "delay", "Hello, DelayQueue!", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(10000); // 毫秒return message;}});SimpleDateFormat simpleDateFormat = new SimpleDateFormat();simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");System.out.println("发送消息成功!发送时间:" + simpleDateFormat.format(System.currentTimeMillis()));
}

发送延迟消息的本质是在消息头属性中添加 x-delay 属性

在这里插入图片描述

5.3.4 延迟消息的原理和缺点

RabbitMQ 的延迟消息是怎么实现的呢?RabbitMQ 会自动维护一个时钟,这个时钟每隔一秒就跳动一次,如果对时钟的精度要求比较高的,可能还要精确到毫秒,甚至纳秒

RabbitMQ 会为发送到交换机的每一条延迟消息创建一个时钟,时钟运行的过程中需要 CPU 不断地进行计算。发送到交换机的延迟消息数越多,RabbitMQ 需要维护的时钟就越多,对 CPU 的占用率就越高(Spring 提供的定时任务的原理也是类似)

定时任务属于 CPU 密集型任务,中间涉及到的计算过程对 CPU 来说压力是很大的,所以说,采用延迟消息会给服务器的 CPU 带来更大的压力。当交换机中有非常多的延迟消息时,对 CPU 的压力就会特别大

所以说,延迟消息适用于延迟时间较短的场景

5.4 取消超时订单

设置 30 分钟后检测订单支付状态实现起来非常简单,但是存在两个问题:

  1. 如果并发较高,30分钟可能堆积消息过多,对 MQ 压力很大
  2. 大多数订单在下单后 1 分钟内就会支付,但消息需要在 MQ 中等待30分钟,浪费资源

在这里插入图片描述

在这里插入图片描述

5.5 发送延迟检测订单的消息

我们定义一个实体类,用于记录延迟消息的内容和延迟消息的延迟时间列表(该实体类也是延迟消息的类型)

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;public class MultipleDelayMessage<T> {private T data;private List<Long> delayMillis;public MultipleDelayMessage() {}public MultipleDelayMessage(T data, Long... delayMillis) {this.data = data;this.delayMillis = new ArrayList<>(Arrays.asList(delayMillis));}public MultipleDelayMessage(T data, List<Long> delayMillis) {this.data = data;this.delayMillis = delayMillis;}public static <T> MultipleDelayMessage<T> of(T data, Long... delayMillis) {return new MultipleDelayMessage<>(data, new ArrayList<>(Arrays.asList(delayMillis)));}public static <T> MultipleDelayMessage<T> of(T data, List<Long> delayMillis) {return new MultipleDelayMessage<>(data, delayMillis);}public boolean hasNextDelay() {return !delayMillis.isEmpty();}public Long removeNextDelay() {return delayMillis.remove(0);}public T getData() {return data;}public void setData(T data) {this.data = data;}public List<Long> getDelayMillis() {return delayMillis;}public void setDelayMillis(List<Long> delayMillis) {this.delayMillis = delayMillis;}@Overridepublic String toString() {return "MultipleDelayMessage{" +"data=" + data +", delayMillis=" + delayMillis +'}';}}

我们再定义一个发送延迟消息的消息处理器,供所有服务使用

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;public class DelayMessagePostProcessor implements MessagePostProcessor {private final Integer delay;public DelayMessagePostProcessor(Integer delay) {this.delay = delay;}@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(delay);return message;}}

改造后的发送延迟消息的测试方法

@Test
void testSendDelayMessage() {rabbitTemplate.convertAndSend("delay.direct", "delay", "Hello, DelayQueue!", new DelayMessagePostProcessor(10000));SimpleDateFormat simpleDateFormat = new SimpleDateFormat();simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");System.out.println("发送消息成功!发送时间:" + simpleDateFormat.format(System.currentTimeMillis()));
}

相关文章:

RabbitMQ高级篇(如何保证消息的可靠性、如何确保业务的幂等性、延迟消息的概念、延迟消息的应用)

文章目录 1. 消息丢失的情况2. 生产者的可靠性2.1 生产者重连2.2 生产者确认2.3 生产者确认机制的代码实现2.4 如何看待和处理生产者的确认信息 3. 消息代理&#xff08;RabbitMQ&#xff09;的可靠性3.1 数据持久化3.2 LazyQueue&#xff08; 3.12 版本后所有队列都是 Lazy Qu…...

正点原子imx6ull-mini-Linux驱动之platform设备驱动实验(14)

我们在前面几章编写的设备驱动都非常的简单&#xff0c;都是对IO进行最简单的读写操作像I2C、 SPI、LCD 等这些复杂外设的驱动就不能这么去写了&#xff0c;Linux 系统要考虑到驱动的可重用性&#xff0c;因此提出了驱动的分离与分层这样的软件思路&#xff0c;在这个思路下诞生…...

z3基础学习

z3基础学习 ​ z3是一个微软出品的开源约束求解器&#xff0c;能够解决很多种情况下的给定部分约束条件寻求一组满足条件的解的问题。 安装&#xff1a;pip install z3-solver 1. 简单使用 from z3 import * x Int(x) #创建名为x的int类型变量 y Int(y) solve(x y10,2*x…...

开发助手专业版,有反编译等多种功能

软件介绍 开发助手能够用来快速调试应用以及查看手机软硬件相关信息&#xff0c;包括&#xff1a;快速打开或关闭开发者选项中的选项。 将原来几十秒的操作缩短为一次点击。包括显示布局边界&#xff0c;显示 GPU 过度绘制。显示布局更新。强制 GPU 渲染 显示 GPU 视图更新&a…...

嵌入式初学-C语言-十一

#接嵌入式初学-C语言-十,以及部分例题# 循环结构 break和continue break 功能&#xff1a; 1. 用在switch中&#xff0c;用来跳出switch的case语句&#xff1b;如果case没有break&#xff0c;可能会产生case穿透。 2. 用在循环中&#xff08;while、do..while、for..&#…...

浅谈几个常用OJ的注册方式

众所周知&#xff0c;好的OJ是成功的一半&#xff0c;但是有些英文OJ的注册很让人伤脑筋。 CodeForces 点进官网 戳这里 然后就会进入这个页面 在这一页里面里填写好信息即可 最后&#xff0c;一个邮件就会发到你的邮箱上&#xff0c;点击其中的链接即可激活账号 AtCoder …...

Html实现全国省市区三级联动

目录 前言 1.全国省市区的Json数据 2.找到Json数据文件(在此博文绑定资源)之后&#xff0c;放到resource目录下。 3.通过类加载器加载资源文件&#xff0c;读取Json文件 3.1 创建JsonLoader类 3.2 注入JsonLoader实体&#xff0c;解析Json文件 4.构建前端Html页面 5.通过…...

前端构建工具Webpack 与 Vite 大对比

在现代前端开发领域&#xff0c;构建工具扮演着至关重要的角色。它们不仅可以帮助我们管理项目依赖关系&#xff0c;还可以优化我们的代码&#xff0c;使其在生产环境中运行得更快更高效。其中两个最受欢迎的构建工具就是 Webpack 和 Vite。在这篇文章中&#xff0c;我们将深入…...

Ubuntu-22.04环境搭建

安装wget(一般ubuntu会自带) sudo apt-get install wget 更换国内软件源 先备份原来的/etc/apt/source.list⽂件 sudo cp /etc/apt/sources.list /etc/apt/sources.list.bak 防止修改错误 导致无可挽回 将下列国内镜像源 写入原来的/etc/apt/source.list⽂件&#xff08;注…...

嵌入式学习---DAY17:共用体与位运算

链表剩余的一些内容 一、共用体 union 共用体名 名称首字母大写 { 成员表列&#xff1b; }&#xff1b; union Demo {int i;short s;char c; }; int main(void) {union Demo d;d.i 10;d.s 100;d.c 200;printf("%d\n", sizeof(d)); /…...

蓝牙网关和蓝牙MESH总结

可参考&#xff1a; https://zhuanlan.zhihu.com/p/695144946 蓝牙网关 参考&#xff1a; https://www.bilibili.com/read/cv28872282/ 蓝牙网关是一种特殊的网络设备&#xff0c;它能够实现蓝牙设备与互联网或其他类型网络之间的数据传输和通信。通过蓝牙网关&#xff0c;用户…...

了解关于标准化的知识

1.标准化组织 1.1国家标准化管理委员会(Standardization Administration of the Peoples Republic of China&#xff0c;简称SAC) TC--(Technical Committee) 技术委员会. SAC/TC,就是“国家标准化管理委员会”下属的一个专项或一个行业的“技术委员会或技术小组”&a…...

【云原生】数据库忘记密码怎么办?

相信很多人都会遇到在虚拟机中忘记数据库密码的情况&#xff0c;想必大家都很苦恼&#xff0c;所以今天给大家来讲讲数据库忘记密码了如何修改密码再登录数据库&#xff01;&#xff01;&#xff01; 1、关闭数据库服务 systemctl stop mariadb 2、执行MySQL 服务器在启动时跳…...

Postman 接口测试详解

Postman 接口测试详解 Postman 接口测试详解1. Postman 基础知识1.1 什么是 Postman&#xff1f;1.2 Postman 的主要功能 2. 安装与设置2.1 安装 Postman2.2 创建 Postman 账户 3. Postman 的基本操作3.1 创建和发送请求3.2 解析响应数据3.3 使用环境和变量 4. 进阶功能4.1 编写…...

【JavaEE】线程状态

目录 前言 一.线程状态图 二.线程状态 1.初始状态(NEW) 2.运行状态(RUNNING) 3.等待状态&#xff08;WAITING) 4.超时等待&#xff08;TIMED_WAITING) 5.阻塞状态&#xff08;BLOCKED) 6.终止状态(TERMINATED) 三.线程状态间的转换 四.总结 前言 线程状态及其状态转换…...

C++笔记之编译过程和面向对象

回顾&#xff1a; “abcd”//数据类型 字符串常量 const char *p"abc"; new STU const char *//8 指针的内存空间 int float 指针的内存空间 p 指针指向的内存空间 "abc" 取决于字符串长度 指针变量的内容一级指针 指针变量的地址二级指针 …...

ModuleNotFoundError: No module named ‘tqdm‘

报错信息&#xff1a; tqdm是一个快速、可扩展的Python进度条库&#xff0c;用于展示迭代器的长循环执行进度。 解决&#xff1a;通过以下命令安装 使用conda命令安装 conda install tqdm使用pip安装&#xff1a; pip install tqdm...

东京电影节公布2024年竞赛片评审团成员并对其业绩分别进行评介 没什么含金量

第37届东京国际电影节竞赛单元评审团名单正式公布。 周五&#xff0c;电影节组织者宣布&#xff0c;香港电影制片人杜琪峰、匈牙利电影制片人伊尔迪科恩耶迪、日本女演员桥本爱和法国女演员基娅拉马斯楚安尼将与之前宣布的评审团主席梁朝伟一起担任 2024 年主竞赛评审团成员。 …...

智能景区垃圾识别系统:基于YOLO的深度学习实现

基于深度学习的景区垃圾识别系统&#xff08;UI界面YOLOv8/v7/v6/v5代码训练数据集&#xff09; 1. 引言 景区垃圾识别是环保管理的重要任务之一。传统的人工清理方式效率低、成本高&#xff0c;而借助深度学习技术可以实现自动化的垃圾检测与识别&#xff0c;提高景区的清洁…...

ventoy和微pe可以共存吗?ventoy和pe共存使用教程

Ventoy新一代多系统启动U盘解决方案。国产开源U盘启动制作工具&#xff0c;支持Legacy BIOS和UEFI模式&#xff0c;理论上几乎支持任何ISO镜像文件&#xff0c;支持加载多个不同类型的ISO文件启动&#xff0c;无需反复地格式化U盘&#xff0c;插入U盘安装写入就能制作成可引导的…...

如何获取和安装SSL证书

SSL&#xff08;Secure Sockets Layer&#xff09;证书是用于加密网站服务器和客户端之间通信的一种数字证书。它通过HTTPS协议保护数据传输的安全性&#xff0c;防止数据被窃听或篡改。本文将指导您如何为您的网站获取并安装SSL证书。 步骤1&#xff1a;选择SSL证书提供商 首…...

makefile在IC设计中的使用笔记

1 makefile在IC设计中的地位 关于makefile的详细介绍可以参考第一个连接&#xff0c;里面的内容很多也很详细。但在数字IC设计中&#xff0c;并不会把所有的用法都用到&#xff0c;下面记录一下主要用到的规则。 2 IC设计涉及到的主要用法 2.1 变量的定义和使用 在makefile…...

Suno声称在受版权保护的音乐上训练模型属于“合理使用“

继美国唱片业协会&#xff08;RIAA&#xff09; 最近对音乐生成初创公司 Udio 和 Suno 提起诉讼之后&#xff0c;Suno 在周四提交的一份法庭文件中承认&#xff0c;该公司确实使用了受版权保护的歌曲来训练其人工智能模型。但它声称&#xff0c;根据合理使用原则&#xff0c;这…...

Java | Leetcode Java题解之第316题去除重复字母

题目&#xff1a; 题解&#xff1a; class Solution {public String removeDuplicateLetters(String s) {boolean[] vis new boolean[26];int[] num new int[26];for (int i 0; i < s.length(); i) {num[s.charAt(i) - a];}StringBuffer sb new StringBuffer();for (in…...

Taro学习记录

一、安装taro-cli 二、项目文件 三、项目搭建 1、Eslint配置 在项目生成的 .eslintrc 中进行配置 {"extends": ["taro/react"], //一个配置文件&#xff0c;可以被基础配置中的已启用的规则继承"parser": "babel/eslint-parser…...

Spring Cache框架详解

Spring Cache框架详解 Spring Cache是Spring框架提供的一个强大的缓存抽象层&#xff0c;旨在简化缓存技术的集成和使用。自Spring 3.1版本开始&#xff0c;Spring Cache就被引入以支持在Spring应用程序中添加缓存功能。随着Spring版本的迭代&#xff0c;Spring Cache的功能日…...

解决Html iframe 内嵌video标签导致视频无法全屏展示的问题

原因&#xff1a; 由于浏览器的安全策略所限制的。为了防止恶意网站利用全屏播放功能进行滥用或欺骗用户&#xff0c;浏览器对iframe中的视频播放做了限制。 在iframe标签中播放视频时&#xff0c;浏览器会根据安全策略阻止视频全屏播放。这是因为iframe标签中的内容被认为是第…...

谷粒商城实战笔记-110~114-全文检索-ElasticSearch-查询

文章目录 一&#xff0c;110-全文检索-ElasticSearch-进阶-两种查询方式二&#xff0c;111-全文检索-ElasticSearch-进阶-QueryDSL基本使用&match_all三&#xff0c;112-全文检索-ElasticSearch-进阶-match全文检索四&#xff0c;113-全文检索-ElasticSearch-进阶-match_ph…...

【开源】嵌入式Linux(IMX6U)应用层综合项目(1)--云平台调试APP

目录 1.简介 1.1功能介绍 1.2技术栈介绍 1.3演示视频 1.4硬件介绍 2.软件设计 2.1连接阿里云 2.2云平台调试UI 2.3Ui_main.c界面切换处理文件 2.4.main函数 3.结尾&#xff08;附网盘链接&#xff09; 1.简介 此文章并不是教程&#xff0c;只能当作笔者的学习分享&…...

AI人工智能分析王楚钦球拍被踩事件的真相

在2024年巴黎奥运会乒乓球混双决赛的热烈氛围中&#xff0c;中国队王楚钦与孙颖莎以出色的表现夺得金牌&#xff0c;然而&#xff0c;赛后发生的一起意外事件——王楚钦的球拍被踩坏&#xff0c;引起了广泛关注和热议。为了探寻这一事件的真相&#xff0c;我们可以借助AI人工智…...