当前位置: 首页 > news >正文

2023最新谷粒商城笔记之MQ消息队列篇(全文总共13万字,超详细)

MQ消息队列

其实队列JDK中本身就有,不过这种队列也只能单体服务可能会使用,一旦项目使用的分布式架构,那么一定还是需要用到一个消息中间件的。我们引入消息队列的原因就是对我们的页面相应速度再优化,让用户的体验更好,原来下订单可能需要1s等待时间,引入队列之后可能只需要50ms。

消息中间件的好处

  1. 异步处理

    最开始我们执行任务时都是同步的,比如下图的第一种模式。我们必须等各个操作的做完才能返回响应,这样效率就会很慢。例如:发送邮件、发送短信,但它们能不能收到其实并不是侧重点。因此,可以启动两个线程来执行,也就是第二种模式(异步执行),但是使用消息中间件mq可以让效率更上一层楼,我们可以把要处理的任务放进mq中,然后直接返回结果,至于任务则可以慢慢在后面进行处理。

image-20221028192254906
  1. 应用解耦

    最开始我们如果需要调用不同服务直接的代码时需要在我们的代码中加上调用其他服务方法的逻辑,如果方法需要修改比如逻辑要修改,参数要修改,我们就要修改源代码,有了mq之后我们可以把这些调用都交给mq进行处理。即使下单时库存系统不能正常使用,也不影响正常下单。因为下单后,订单系统写入消息队列就不再关心其他的后续操作了,实现订单系统库存系统的应用解耦

    image-20221028192912831
  2. 流量控制

    服务器接收用户的请求后,先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。

    比如对于大并发量的情况(秒杀),我们可以先把请求放进mq中,不需要立刻处理,让服务根据能力处理mq中的请求就可以了,达到流量削峰的目的。

    image-20221028193053095

MQ的相关概念

消息中间件概述

1.大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力

2.消息服务中两个重要概念:

  • 消息代理(message broker就是运行消息中间件的服务器,这个服务器替我们接收、发送消息)和目的地(destination)
  • 当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。

3.消息队列主要有两种形式的目的地

  • 队列(queue): 点对点消息通信(point-to-point)
  • 主题(topic): 发布(publish)/订阅(subscribe)消息通信

4.点对点式

  • 消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获 取消息内容,消息读取后被移出队列
  • 消息只有唯一的发送者和接受者,但并不是说只能有一个接收者,即点对点可以有很多的消息的接收者,但消息的接受者只能有一个,谁能拿到消息需要靠抢

5.发布订阅式:

  • 发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个 主题,那么就会在消息到达时同时收到消息

6.JMS(Java Message Service)JAVA消息服务:

  • 基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现

7.AMQP(Advanced Message Queuing Protocol)

  • 高级消息队列协议,也是一个消息代理的规范,兼容JMS
  • RabbitMQ是AMQP的实现

8.Spring支持

  • spring-jms提供了对JMS的支持
  • spring-rabbit提供了对AMQP的支持
  • 需要ConnectionFactory的实现来连接消息代理
  • 提供JmsTemplate、RabbitTemplate来发送消息
  • @JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上监听消息代理发布的消息
  • @EnableJms@EnableRabbit开启支持

9.Spring Boot自动配置

  • JmsAutoConfiguration
  • RabbitAutoConfiguration

10.市面上的MQ产品

ActiveMQRabbitMQRocketMQKafka

消息代理规范

  • JMS(Java Message Service)JAVA消息服务
    基于JVM消息代理的规范。ActiveMQ、HornetMQ是 JMS 实现
  • AMQP(Advanced Message Queuing Protocol)
    高级消息队列协议,也是一个消息代理的规范,兼容JMS
    RabbitMQ 是 AMQP 的实现

下面我们来看一看JMS和AMQP两个规范(协议):

在这里插入图片描述

RabbitMQ是基于AMQP协议实现的并且兼容JMS,ActiveMQ是基于JMS实现的。

JMS和AMQP的区别在于:JMS面向纯java平台不不支持跨平台而AMQP是可以跨平台,假如后台服务有用PHP编写则可以兼容。

JMS和AMQP的简单对比 :

①.AMQP的消息模型中direct exchange是类比JMS中P2P(Queue),AMQP的其它四种消息模型则是类比于JMS的Topic

②.JMS支持的各种消息类型,AMQP只支持byte[]但也无妨最后都可以json序列化后传输

RabbitMQ概念

RabbitMQ是一个由erlang开发的遵循AMQP(Advanved Message Queue Protocol)协议的开源消息队列实现。

核心概念

Message

消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成, 这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可 能需要持久性存储)等。

Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序。

Exchange

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

Exchange有4种类型:direct(默认)fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别

Queue

消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直 在队列里面,等待消费者连接到这个队列将其取走。

Binding

绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交 换器理解成一个由绑定构成的路由表。

Exchange 和Queue的绑定可以是多对多的关系。

Connection

网络连接,比如一个TCP连接。

Channel

信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道 发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都 是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

Consumer

消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

Virtual Host

虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥 有自己的队列、交换器、绑定和权限机制。vhostAMQP 概念的基础,必须在连接时 指定,RabbitMQ 默认的 vhost 是 / 。

Broker

表示消息队列服务器实体。

在这里插入图片描述

工作流程: 首先,生产者客户端会向消息中间件发送Message,Message由消息头和消息体组成,消息头中有一个route-key属性用于标识存储的队列位置,消息中间件接收到消息之后会由相应的交换机将消息存储到指定的消息队列中,交换机和队列具有绑定关系,无论生产者还是消费者客户端想发送或者接收消息都需要使用connnection去创建一个长连接,长连接类似于高速公路,信道类似于高速公路中的每个车道。RabbitMQ还有一个虚拟主机即类似于Docker中的容器彼此互不干扰,不需要创建多个RabbitMQ只需要创建多个虚拟机即可实现向java后台、PHP后台发送消息(也可以用虚拟主机实现生产和开发环境,其提供一个隔离的RabbitMQ环境)。

长连接的好处是当客户端宕机之后,RabbitMQ将不会向消费者客户端发送消息而是将消息持久化保证消息不会丢失。

image-20221028195049347

docker安装RabbitMQ

这里不需要下载镜像,直接安装。默认会帮你下载

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

修改RabbitMQ配置为只要启动docker自动重启rabbitMQ

docker update rabbitmq --restart=always

下面为RabbitMQ中涉及的端口号:

对应端口号解释:

  • 4369, 25672 (Erlang发现&集群端口)
  • 5672, 5671 (AMQP端口)
  • 15672 (web管理后台端口)
  • 61613, 61614 (STOMP协议端口)
  • 1883, 8883 (MQTT协议端口)

可访问的可视化RabbitMQ端口号 : 15672 访问RabbitMQ控制页面

下面关于在RabbitMQ可视化界面的操作就不记笔记了,因为大多需要粘贴图片,排版出来会很难看,相信小伙伴们看视频也是轻松get。

AMQP中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别, AMQP 中增加了 Exchange 和 Binding 的角色 :生产者把消息发布 到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交 换器的消息应该发送到那个队列。

在这里插入图片描述

因此这里我们简要说明一下RabbitMQ中的自动创建出的四种消息模型的交换机即Exchange的类型

  1. Direct是点对点模式,一个消息只能发送给一个队列,且被一个消息接收者接收。只有消息中的路由键(routing key)和 Binding 中的 binding key 完全一致时, 交换器才会将消息发到对应的队列中,即路由键与队列名完全匹配,如果一个队列绑定到交换机要求其路由键为“dog”,那么交换机只会给这个队列转发 routing key 为“dog”的消息,不会转发 “dog.puppy”,也不会转发“dog.guard” 等等。它是完全匹配、单播的模式。

    在这里插入图片描述

  2. headers这种也是点对点的,同direct。但headers 匹配 AMQP 消息的 header 而不是路由键, headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了。

  3. Fanout广播式,只要跟这个交换机绑定的队列都会收到发送到这个交换机的消息。Fanout 交换器不处理路由键,只是简单的将队列 绑定到交换器上,每个发送到交换器的 消息都会被转发到与该交换器绑定的所 有队列上。它很像子网广播,每台子网内 的主机都获得了一份复制的消息。Fanout 类型转发消息是最快的。

    在这里插入图片描述

  4. topic这种也是广播式,不过它会根据路由键进行匹配(可以通配符模糊匹配),只有匹配成功的队列才会接收到消息。将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。
    识别通配符: #匹配 0 个或多个单词, *匹配一个单词

在这里插入图片描述

SpringBoot整合RabbitMQ

  1. 导入amqp依赖
    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

我们引入这个依赖之后,RabbitAutoConfiguration这个类也就自然而然引入了我们的项目中,这个配置类也就会自动注册进容器,将其配置信息生效,比如这个类就给容器中自动配置了RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate等类

@Configuration
@ConditionalOnClass({RabbitTemplate.class, Channel.class})
@EnableConfigurationProperties({RabbitProperties.class})
@Import({RabbitAnnotationDrivenConfiguration.class})
public class RabbitAutoConfiguration {
  1. 添加RabbitMQ的配置信息(看到上面配置类中加载了一个配置文件,点进去就会发现这个类绑定了我们spring默认的属性配置文件

    @ConfigurationProperties(prefix = "spring.rabbitmq"
    )
    public class RabbitProperties {
    

由此我们就可以去application.properties文件中配置我们rabbitMQ的信息了

spring.rabbitmq.host=192.168.10.10
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
#其他信息就不需要我们配置了,大部分都在RabbitProperties中配置了默认值
  1. 主启动类添加@EnableRabbit注解()
@EnableRabbit
@SpringBootApplication
public class GulimallOrderApplication {public static void main(String[] args) {SpringApplication.run(GulimallOrderApplication.class, args);}
}

RabbitMQ在Java中的使用

CachingConnectionFactory

RabbitTemplate使用CachingConnectionFactory作为连接工厂

配置类上标有这样的注解:@EnableConfigurationProperties(RabbitProperties.class)

向容器中注入CachingConnectionFactory的代码中是从配置文件中加载配置信息的。

spring.rabbitmq为配置的前缀,可以指定一些端口号,ip地址等信息。

AmqpAdmin

AmqpAdminorg.springframework.amqp.core下的类,这个类主要是用来进行一些资源的创建的,如创建交换机,队列,绑定信息等。通过此类,可以用代码的方式创建Exchange、Queue还有Binding:

@Autowired
AmqpAdmin amqpAdmin;/*** 创建绑定*/
@Test
public void createBinding() {// String destination 目的地// DestinationType destinationType 绑定类型:队列/交换机// String exchange 交换机名称// String routingKey 路由键//、Map<String, Object> arguments 参数Binding binding = new Binding("hello.queue" , Binding.DestinationType.QUEUE, "hello", "hello.queue",null);amqpAdmin.declareBinding(binding);
}/*** 创建队列*/
@Test
public void createMQ() {/*** @param name 队列的名称* @param durable 是否持久化队列* @param exclusive 是否声明为一个独占队列* @param autoDelete 如果服务不在使用时是否自动删除队列*/Queue queue = new Queue("hello.queue", true, false, false);String s = amqpAdmin.declareQueue(queue);log.info("创建queue成功... {}", queue);
}/*** 创建交换机* TopicExchange* FanoutExchange* DirectExchange*/
@Test
public void createExchange() {// String name 交换机名称// boolean durable 是否持久化// boolean autoDelete 是否自动删除Exchange exchange = new DirectExchange("hello", true, false);amqpAdmin.declareExchange(exchange);log.info("创建exchange成功...");
}

RabbitTemplate

这个类就是用来控制消息的收发了。通过RabbitTemplate类中的方法,可以像使用Rabbit客户端一样向队列发送消息以及更多其他的操作,并且多个重载的”send“(发送消息)方法。

@Autowired
RabbitTemplate rabbitTemplate;/**
* convertAndSend(String exchange, String routingKey, Object object)
* String exchange, 交换器
* String routingKey,   路由值
* Object object    消息,如果发送的消息是对象,我们会使用序列化机制将对象写出去。*/
@Test
public void test() {// 发送消息rabbitTemplate.convertAndSend("hello", "hello.queue"  ,"msg");
}
  1. 发送消息,如果发送的消息是个对象,我们会使用序列化机制,将对象写出去。对象必须实现Serializable
  2. 或者我们想要发送的对象序列化为JSON格式

通过指定不同的MessageConverter来实现,可以向容器中注入我们想要的MessageConverter从而使用。

在这里插入图片描述

配置MyRabbitConfig,让发送的对象类型的消息可以是一个json

添加“com.atguigu.gulimall.order.config.MyRabbitConfig”类,代码如下:

@Configuration
public class MyRabbitConfig {@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}
}

注意:

配置MyRabbitConfig配置类,向容器中添加一个Json的消息转换器,让发送的对象类型的消息可以是一个json(如果我们不添加,RabbitMQ就会用它配置类中自带的一个对象序列化的转换器,因为对象无法直接在网络中传输,需要转换成字符串。如果使用RabbitMQ自带的不要忘了在传输对象的类上实现Serialize序列化接口)。这里我们添加Json消息转换器,代码如下:

@RabbitListener和@RabbitHandler注解

监听消息:使用@RabbitListener和@RabbitHandler,主启动类必须有@EnableRabbit。

  • @RabbitListener: 类+方法上(监听哪些队列即可)
  • @RabbitHandler: 标在方法上(重载区分不同的消息)

@RabbitListener注解和@RabbitHandler都可以接受消息队列中的消息,并进行处理。

@RabbitListener注解:

使用@RabbitListener时主启动类必须有@EnableRabbit,其可以标记方法或类上进行使用

自定义方法的参数可以为以下类型:

1、Message message:原生消息详细信息。头 + 体

2、T <发送的消息的类型> 可以是我们自定义的对象

3、Channel channel :当前传输数据的信道。

@RabbitListener(queues = {"hello.queue"})
public String receiveMessage(Message message, OrderEntity content) {//消息体信息byte[] body = message.getBody();// 消息头信息MessageProperties messageProperties = message.getMessageProperties();log.info("收到的消息: {}", content);return "ok";
}

同时要注意:Queue可以由很多方法来监听,只要收到消息,队列就删除消息,并且只能有一个方法收到消息。并且一个方法接收消息是一个线性的操作,只有处理完一个消息之后才能接收下条消息。

@RabbitHandler注解:

@RabbitHandler标在方法上用于接受不同类型的消息对象。

@RabbitHandler标记的方法结合@RabbitListener,@RabbitHandler使用可以变得更加灵活:采用在类上加 @RabbitListener 注解,标识监听哪些消息队列。在方法上添加@RabbitHandler注解,重载区分不同的消息。

比如说,当两个方法对一个消息队列进行监听时,用于监听的两个方法用于接收消息内容的参数不同,根据消息的内容可以自动的确定使用那个方法。

@RestController
public class RabbitController {@AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/sendMq")public String sendMq(@RequestParam(value = "num",defaultValue = "10") Integer num){for (int i = 0; i < num; i++){//向一个队列中发送两种不同类型的消息if (i%2==0){OrderReturnApplyEntity orderReturnApplyEntity = new OrderReturnApplyEntity();orderReturnApplyEntity.setId(1L);orderReturnApplyEntity.setCreateTime(new Date());orderReturnApplyEntity.setReturnName("哈哈哈");//配置MyRabbitConfig,让发送的对象类型的消息,可以是一个jsonrabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderReturnApplyEntity, new 				 CorrelationData(UUID.randomUUID().toString()));}else {OrderEntity entity = new OrderEntity();entity.setOrderSn(UUID.randomUUID().toString());rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",entity, new 								CorrelationData(UUID.randomUUID().toString()));}}return "OK";}
}

修改“com.atguigu.gulimall.order.service.impl.OrderItemServiceImpl”类,代码如下:

@RabbitListener(queues = {"hello-java-queue"})//queues:声明需要监听的所有队列
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {@Overridepublic PageUtils queryPage(Map<String, Object> params) {IPage<OrderItemEntity> page = this.page(new Query<OrderItemEntity>().getPage(params),new QueryWrapper<OrderItemEntity>());return new PageUtils(page);}/*** 以下参数是我们自定义的,spring会自动帮我们解析* 参数1、Message message:原生消息详细信息。头+体* 参数2、T<发送的消息类型> OrderReturnApplyEntity content* 参数3、Channel channel 当前传输数据的通道* Queue:可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息* 场景:*       1)、订单服务启动多个;同一个消息,只能有一个客户端收到*       2)、只有一个消息完全处理完,方法运行结束,才可以接收到下一个消息*///@RabbitListener(queues = {"hello-java-queue"})@RabbitHandlerpublic void receiverMessage(Message message,OrderReturnApplyEntity content,Channel channel) throws InterruptedException {//消息体byte[] body = message.getBody();//消息头属性信息MessageProperties properties = message.getMessageProperties();System.out.println("接收到消息...内容:" + content);//Thread.sleep(3000);System.out.println("消息处理完成=》"+content.getReturnName());}@RabbitHandlerpublic void receiverMessage(OrderEntity orderEntity){System.out.println("接收到消息...内容:" + orderEntity);}
}

可靠投递-发送端确认

image-20221028221058895
  • 服务器收到消息 p->b:ConfirmCallback
    1. pring.rabbitmq.publisher-confirms=true
    2. 设置确认回调 ConfirmCallback
  • 消息抵达队列就回调 e->q:ReturnCallback
    1. spring.rabbitmq.publisher-returns: true
    2. spring.rabbitmq.template.mandatory: true
    3. 设置确认回调 ReturnCallback

ConfirmCallback

image-20221028221623629

ConfirmCallbackRetruhnCallback一样都是RabbitTemplate内部的接口。

消息只要被 broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有 broker 接收到才会调用 confirmCallback。

也就是说当消息到达RabbitMQ的服务器就会执行回调方法。但是被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里,所以需要用到接下来的 returnCallback。

首先需要修改配置文件:

#开启发送端确认
spring.rabbitmq.publisher-confirms=true

然后准备一个发送消息使用的接口和两个用来监听消息队列并接收消息的方法

发送消息接口:

@RestController
public class SendMsgController {@AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/sendMsg")public String sendMsg() {for (int i = 0; i < 10; i++) {if (i % 2 == 0) {OrderEntity orderEntity = new OrderEntity();orderEntity.setId(1L);orderEntity.setMemberUsername("Tom");orderEntity.setReceiveTime(new Date());rabbitTemplate.convertAndSend("hello-java-exchange", "hello.news", orderEntity, new CorrelationData(UUID.randomUUID().toString()));} else {OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();orderReturnReasonEntity.setCreateTime(new Date());orderReturnReasonEntity.setId(2L);orderReturnReasonEntity.setName("test");orderReturnReasonEntity.setSort(1);rabbitTemplate.convertAndSend("hello-java-exchange", "hello.news", orderReturnReasonEntity, new CorrelationData(UUID.randomUUID().toString()));}}return "ok";}
}

监听消息队列并接收消息的方法:

@RabbitListener(queues = {"hello.news"})
@Slf4j
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {@RabbitHandlerpublic void receiveMessage1(Message message, OrderReturnReasonEntity content, Channel channel) {//消息体信息byte[] body = message.getBody();// 消息头信息MessageProperties messageProperties = message.getMessageProperties();System.out.println("receiveMessage1 接收消息: " + content);}@RabbitHandlerpublic void receiveMessage2(Message message, OrderEntity content, Channel channel) {//消息体信息byte[] body = message.getBody();// 消息头信息MessageProperties messageProperties = message.getMessageProperties();System.out.println("receiveMessage2 接收消息: " + content);}
}

第三步,在配置类中定制RedisTemplate:

@Configuration
public class MyRabbitConfig {@AutowiredRabbitTemplate rabbitTemplate;@PostConstruct // 该注解表示在初始化构造器之后就调用,初始化定制 RabbitTemplatepublic void initRabbitTemplate() {// 设置确认回调rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/**** @param correlationData 当前消息的唯一相关数据 (这个是消息的唯一id)* @param ack 消息是否成功收到* @param cause 失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("ConfirmCallback... correlationData: [" + correlationData + "] ==> ack: [" + ack + "] ==> cause: [" + cause + "]");}});}
}

那么一旦消息抵达消息队列服务器就会调用我们自己定义的配置类中实现的回调方法(打印输出消息信息)。

ReturnCallback

首先修改application.properties,配置上回调确认

#开启发送端抵达队列确认
spring.rabbitmq.publisher-returns=true
#只要抵达队列,以异步发送优先回调我们这个returnConfirm
spring.rabbitmq.template.mandatory=true

被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里。所以需要用到接下来的 returnCallback

如果在交换机将消息投递到queue的过程中,发生了某些问题,最终导致消息投递失败,就会触发这个方法。

为定制的RabbitTemplate添加这个方法:

rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/*** @param message 投递失败的消息的详细信息* @param replyCode 回复的状态码* @param replyText 回复的文本内容* @param exchange 但是这个消息发给哪个交换机* @param routingKey 当时这个消息使用哪个路由键*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("FailMessage: [" + message + "] ==> replyCode: [" + replyText + "] ==> exchange: [" + exchange + "] ==> routingKey: [" + routingKey + "]");}
});

我们在发送消息的一端故意写错路由键,致使exchange投递消息失败。最后会看到回调方法ReturnCallback中打印的内容:

FailMessage: [(Body:'{"id":2,"name":"test","sort":1,"status":null,"createTime":1641608721639}' MessageProperties [headers={spring_returned_message_correlation=b6b21f2d-73ad-473d-9639-feec76953c7b, __TypeId__=com.atguigu.gulimall.order.entity.OrderReturnReasonEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])] ==> replyCode: [NO_ROUTE] ==> exchange: [hello-java-exchange] ==> routingKey: [hello.news1]

补充:在发送消息的时候还可以指定一个CorrelationData类型的参数(可以回顾上文的发送消息的方法),这个CorrelationData类的构造器参数可以填一个UUID,代表消息的唯一id,在重写ConfirmCallback中的方法的第一个参数就是这个,通过这个参数就可以获取消息的唯一id。

注意:监听方法返回值必须为void,否则控制台会不断打印报错信息。(血的教训)

可靠投递-消费端确认

image-20221029100951566

ACK(Acknowledge)消息确认机制

消费者获取到消息,成功处理,可以回复Ack给Broker

  • basic.ack用于肯定确认;broker可以删除此消息
  • basic.nack用于否定确认;可以指定broker是否丢弃此消息,可以批量
  • basic.reject用于否定确认;同上,但不能批量

在默认状况下,ACK消息确认机制是当消息一旦抵达消费方,客户端会自动确认,服务端就会删除这个消息(出队),但是如果在消息消费过程中服务器宕机了,这些消息也会被删除,这就造成了消息丢失的问题。
问题:
我们收到很多消息,客户端会自动回复给服务器ack进行确认,但如果只有一个消息处理成功,然后客户端宕机了。就会发生消息丢失
这时我们改为手动确认模式。只要我们没有明确告诉MQ,消息被签收了,也就是没有ACK,消息就一直unacked状态,
即使Consumer宕机。消息也不会丢失,状态会重新变为Ready,下一次有新的Consumer连接进来就发给他

通过配置可以开启消息需要经过手动确认,才能从队列中删除消息

#手动ack消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual

修改我们的RabbitMQ的配置类MyRabbitConfig,代码如下:

@RabbitHandler
public void receiveMessage2(Message message, OrderEntity content, Channel channel) {//消息体信息byte[] body = message.getBody();// 消息头信息MessageProperties messageProperties = message.getMessageProperties();long deliveryTag = messageProperties.getDeliveryTag();//手动接收消息//long deliveryTag相当当前消息派发的标签,从messageProperties中获取,并且在Channel中自增的//boolean multiple 是否批量确认try {channel.basicAck(deliveryTag, false);} catch (Exception e) {e.printStackTrace();}System.out.println("receiveMessage2 接收消息: " + content);
}

我们在上方的代码打上断点并观察RabbitMQ客户端的状况:

在这里插入图片描述

对中总共有5条消息,并且进入了Unacked,即未被确认的状态。

但是这里使用debug模式启动然后关掉服务模拟服务器宕机会发生一个问题,就是在关闭服务之前,idea会将未执行完的方法先执行完再关闭服务。

所以可以在cmd杀掉进程模拟宕机。

这时,由于打了断点,没有走到消息确认的那一行代码,随机,服务器宕机,所有没有确认的消息都会从Unacked的状态回调Ready的状态。

有接收消息的方法就有拒绝消息的方法:basicNackbasicReject

//long deliveryTag 当前消息派发的标签
//boolean multiple 是否批量处理
//boolean requeue 拒绝后是否将消息重新入队
channel.basicNack(deliveryTag, false, true);
channel.basicReject(deliveryTag, true);

basicNackbasicReject都可以用来拒绝消息,但是basicNackbasicReject多了一个参数boolean multiple(是否批量处理)

如果将requeue设置为true,被拒绝的消息就会重新入队等待消费,false则拒绝消息就相当于丢弃此消息。

修改配置文件application.properties

#手动确认收货(ack)
spring.rabbitmq.listener.simple.acknowledge-mode=manual
@RabbitHandler
public void receiverMessage(Message message,OrderReturnApplyEntity content,Channel channel) throws InterruptedException {//消息体byte[] body = message.getBody();//消息头属性信息MessageProperties properties = message.getMessageProperties();System.out.println("接收到消息...内容:" + content);//Thread.sleep(3000);System.out.println("消息处理完成=》"+content.getReturnName());//channel内按顺序自增的long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println("deliveryTag:"+deliveryTag);//签收货物,非批量模式try{if (deliveryTag % 2 == 0){//收货channel.basicAck(deliveryTag,false);System.out.println("签收了货物。。。"+deliveryTag);}else {//退货第二个参数是multiple批量处理,第三个参数是丢弃的消息是否重新入队requeue=false 丢弃  requeue=true发回服务器,服务器重新将消息入队。channel.basicNack(deliveryTag,false,true);System.out.println("没有签收货物..."+deliveryTag);}}catch (Exception e){//网络中断}
}

如何签收信息:
业务成功就应该签收:channel.basicAck(deliveryTag,false);
业务处理失败就应该拒签,让别人处理:channel.basicNack(deliveryTag,false,true);


感谢耐心看到这里的同学,觉得文章对您有帮助的话希望同学们不要吝啬您手中的赞,动动您智慧的小手,您的认可就是我创作的动力!
之后还会勤更自己的学习笔记,感兴趣的朋友点点关注哦。

相关文章:

2023最新谷粒商城笔记之MQ消息队列篇(全文总共13万字,超详细)

MQ消息队列 其实队列JDK中本身就有&#xff0c;不过这种队列也只能单体服务可能会使用&#xff0c;一旦项目使用的分布式架构&#xff0c;那么一定还是需要用到一个消息中间件的。我们引入消息队列的原因就是对我们的页面相应速度再优化&#xff0c;让用户的体验更好&#xff…...

多变量线性回归模型

多变量线性回归模型 模型参数为n1维向量&#xff0c;此时模型公式为 hθ(x)θ0x0θ1x1θ2x2...θnxnh_{\theta}(x)\theta_{0}x_{0}\theta_{1}x_{1}\theta_{2}x_{2}...\theta_{n}x_{n} hθ​(x)θ0​x0​θ1​x1​θ2​x2​...θn​xn​ 可以简化为 hθ(x)θTXh_{\theta}(x)\th…...

php 基于ICMP协议实现一个ping命令

php 基于ICMP协议实现一个ping命令 网络协议是什么ICMP 协议什么是ICMP?ICMP 的主要功能ICMP 在 IPv4 和 IPv6 的封装Wireshark抓包ICMP 请求包分析PHP构建 ICMP 数据包php中的 pack & unpack 函数字节和字符packunpackICMP计算校验和步骤总结网络协议是什么 网络协议&…...

Java基本数据类型

1.概述 佛说&#xff0c;大千世界&#xff0c;无奇不有。在这个世界里&#xff0c;物种的多样性&#xff0c;遍地开花&#xff0c;同样&#xff0c;在Java的世界里&#xff0c;也有着异曲同工之妙&#xff0c;Java秉承面向对象的特性&#xff0c;必然少不了区分对象的类型&…...

English Learning - L2 语音作业打卡 Day2 2023.2.22 周三

English Learning - L2 语音作业打卡 Day2 2023.2.22 周三&#x1f48c; 发音小贴士&#xff1a;&#x1f48c; 当日目标音发音规则/技巧&#xff1a;&#x1f36d; Part 1【热身练习】&#x1f36d; Part2【练习内容】&#x1f36d;【练习感受】&#x1f353;元音[ ɑː ]&…...

45. 跳跃游戏 II

题目&#xff1a; 45. 跳跃游戏 II难度中等1974收藏分享切换为英文接收动态反馈给定一个长度为 n 的 0 索引整数数组 nums。初始位置为 nums[0]。每个元素 nums[i] 表示从索引 i 向前跳转的最大长度。换句话说&#xff0c;如果你在 nums[i] 处&#xff0c;你可以跳转到任意 num…...

应届生Java面试50题线程篇(含解析)

什么是线程&#xff1f; 答&#xff1a;线程是操作系统能够进行运算调度的最小单位&#xff0c;是程序执行流的最小单元。在Java中&#xff0c;可以通过实现Runnable接口或继承Thread类来创建线程。 创建线程的方式有哪些?各自的优缺点是什么&#xff1f; 继承 Thread 类&…...

【数据库】第七章 数据库设计

第七章数据库设计 数据库设计概述 数据库设计的基本步骤 需求分析概念结构设计逻辑结构设计物理结构设计数据库实施数据库运行和维护 需求分析 收集需求&#xff0c;理解需求 收集各个角色的需求 概念数据库设计 建立概念模型 &#xff0c;E-R图/IDEF1x图 消除冲突&…...

Burp Suite 常用模块简介

Burp Suite 常用模块分为 目标站点(target)模块 代理(proxy)模块 攻击(Intruder)模块 重放(Repeater) 模块 Target模块是对站点资源的收集&#xff0c;与站点各资源包发出和相应包的记录 Proxy模块是核心模块&#xff0c;可以拦截数据包发送往浏览器&#xff0c;进行修改后再…...

QML Item和Rectangle详解

1.Item和Rectangle Item类型是Qt Quick中所有可视项的基本类型。 Qt Quick中的所有可视项都继承Item。尽管Item对象没有视觉外观&#xff0c;但它定义了视觉项中常见的所有属性&#xff0c;例如x和y位置、宽度和高度、锚定和键处理支持。 Rectangle继承自Item&#xff0c;多…...

常见前端基础面试题(HTML,CSS,JS)(六)

GET 和 POST 的区别 从 http 协议的角度来说&#xff0c;GET 和 POST 它们都只是请求行中的第一个单词&#xff0c;除了语义不同&#xff0c;其实没有本质的区别。 之所以在实际开发中会产生各种区别&#xff0c;主要是因为浏览器的默认行为造成的。 受浏览器的影响&#xf…...

深度学习 李沐报错

3.6. softmax回归的从零开始实现 — 动手学深度学习 2.0.0 documentation softmax从0开始实现 函数执行需要加main指定 改成这样 if __name__"__main__":print(evaluate_accuracy(net, test_iter)) 不然会这样出错 RuntimeError: An attempt has been m…...

【JAVA程序设计】(C00104)基于Springboot的家庭理财管理系统——有文档

基于Springboot的家庭理财管理系统项目简介项目获取开发环境项目技术运行截图运行视频项目简介 基于Springboot开发的家庭理财管理系统设计与实现共分为三个角色&#xff1a;系统管理员、家庭管理员、家庭用户 管理员角色包含以下功能&#xff1a; 用户管理、修改密码、角色管…...

【第五章 AOP概述,底层原理,AOP术语,切入点表达式,AOP操作(基于注解方式,基于xml配置文件)】

第五章 AOP概述&#xff0c;底层原理&#xff0c;AOP术语&#xff0c;切入点表达式&#xff0c;AOP操作&#xff08;基于注解方式&#xff0c;基于xml配置文件&#xff09; 1.AOP概述&#xff1a; &#xff08;1&#xff09;什么是AOP&#xff1a; ①面向切面编程&#xff08;…...

面试官: 你知道 JWT、JWE、JWS 、JWK嘛?

想起了 之前做过的 很多 登录授权 的项目 它相比原先的session、cookie来说&#xff0c;更快更安全&#xff0c;跨域也不再是问题&#xff0c;更关键的是更加优雅 &#xff0c;所以今天总结了一篇文章来介绍他 JWT 指JSON Web Token&#xff0c;如果在项目中通过 jjwt 来支持 J…...

基于企业微信应用消息的每日早安推送

基于企业微信应用消息的每日早安推送 第一步&#xff1a;注册企业微信 企业微信注册地址&#xff1a;https://work.weixin.qq.com/wework_admin/register_wx 按照正常流程填写信息即可&#xff0c;个人也可以注册企业微信&#xff0c;不需要公司 注册完成后&#xff0c;登录…...

【数字IC基础】黑盒验证、白盒验证、 灰盒验证

文章目录 一、黑盒验证二、白盒验证三、灰盒验证一、黑盒验证 1、黑盒验证:大多数基于仿真的验证环境都是黑盒验证;2、不需要知道设计的内部结构和特性,只需要在输入端口打激励,观察输出即可;3、验证工程师学习设计的规格,然后编写验证环境中的 drivers, monitors, check…...

管理的本质是达成目标

“没有目标&#xff0c;其实就没有管理学存在的意义。要有效地使用管理学的智慧&#xff0c;首先要建立清晰的目标。” - 《宁向东的管理学课》 起源 最近开始刷很久之前就在得到上买了的已经起灰了的课程&#xff0c;看到这句话觉得很有道理。 思考 这里面有一个很重要的词…...

【数字IC基础】IC(Integrated Circuit,集成电路)常用缩写

文章目录 1、集成电路:2、数字IC设计相关步骤:3、数字设计相关概念:4、验证相关:5、语言类:6、IC设计相关工具:7、存储器相关:8、总线协议类:9、文件格式类:10、标准和规范:11、其它:1、集成电路: 缩写全称中文翻译LSILarge-scale intergrated circuit大规模集成电…...

JavaScript 高级1 :面向对象

JavaScript 高级1 &#xff1a;面向对象 Date: January 16, 2023 Text: 面向对象、ES6中类和对象、类的继承、面向对象案例 目标&#xff1a; 能够说出什么是面向对象 能够说出类和对象的关系 能够使用 class 创建自定义类型 能够说出什么是继承 面向对象编程介绍 面向过…...

C语言结构体对齐

1. 结构体对齐 要点 变量只能存储在他的长度的整数倍地址上结构体整体对齐跟他的最长的字段整数倍对齐 栗子1 struct Example1 {char a; //1个字节int c; //4个字节short b; //2个字节 };std::cout << sizeof(Example1 ) << std::endl; // 12 std::cout &…...

Bootstrap系列之导航

Bootstrap导航 可以在 ul 元素上添加 .nav类&#xff0c;在每个 li 选项上添加 .nav-item 类&#xff0c;在每个链接上添加 .nav-link 类: 基本的导航 <div class"container mt-3"><h2>导航</h2><p>简单的水平导航:</p><ul class&…...

Java EE|TCP/IP协议栈之应用层协议DNS详解

文章目录一、对DNS的感性认识简介特点一些常见疑问二、DNSDNS域名结构域名的分级三、域名服务器四、域名解析过程参考一、对DNS的感性认识 简介 DNS&#xff0c;即Domain Name System,是域名系统的简称。它是Internet上解决网上机器命名的一种系统。 TCP/IP中的IP地址是由四…...

【MyBatis】作用域生命周期(四)

&#x1f697;MyBatis学习第四站~ &#x1f6a9;起始站&#xff1a;MyBatis概述&环境搭建(一) &#x1f6a9;本文已收录至专栏&#xff1a;数据库学习之旅 &#x1f44d;希望您能有所收获 一.引入 为了使用方便&#xff0c;我们经常能看到各种教程都将MyBatis抽离为工具类…...

腾讯一面—Android 系统启动流程详解

正文AMS 是 Android 中最核心的服务之一&#xff0c;主要负责系统中四大组件的启动、切换、调度及应用进程的管理和调度等工作&#xff0c;其职责与操作系统中的进程管理和调度模块相类似&#xff0c;它本身也是一个 Binder 的实现类&#xff0c;应用进程能通过 Binder 机制调用…...

【Python知识点桂电版】02组合数据类型

一、序列序列简介序列是指一种包含多项数据的数据结构&#xff0c;分为不可变序列和可变序列。可变序列可修改序列内的元素如列表&#xff0c;二不可变序列一旦建立就不能修改其中的元素&#xff0c;字符串和元组属于不可变序列。列表和元组的创建列表&#xff1a;列表名 [元素…...

LeetCode100_100. 相同的树

LeetCode100_100. 相同的树 一、描述 给你两棵二叉树的根节点 p 和 q &#xff0c;编写一个函数来检验这两棵树是否相同。 如果两个树在结构上相同&#xff0c;并且节点具有相同的值&#xff0c;则认为它们是相同的。 示例 1&#xff1a; 输入&#xff1a;p [1,2,3], q […...

javaEE 初阶 — 网络层中 IP 协议 的报文结构

文章目录IP 协议报文4位版本号4位首部长度8位服务类型16位总长度&#xff08;字节数&#xff09;8位生存时间&#xff08;TTL&#xff09;与 8位协议16位首部校验和32位源 IP 地址与32位目标 IP 地址动态分配的 IP 地址NAT 网络地址转换IPv6IP 协议报文 4位版本号 这里的 IP 协…...

iOS swift UICollectionView

文章目录1.纯代码自定义UICollectionViewCell2.禁止滑动&#xff08;弹簧效果&#xff09;3.UICollectionView的长按拖动2.在一个控制器中放两个UICollectionView或者UITableView,代理方法要怎么写1.纯代码自定义UICollectionViewCell import UIKitclass NewDeviceBottomColle…...

计算机三级数据库 填空题汇总

计算机三级 数据库 IDEF0需求建模方法由箭头和&#xff08;活动/方框/矩形&#xff09;两种元素构成。、从安全性角度考虑&#xff0c;防火墙技术是用来保证数据库应用系统的&#xff08;网络&#xff09;环境安全的。在UML的状态机图中&#xff0c;状态之间的转移是由&#x…...

可以玩h5的网站/品牌营销策划怎么写

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2021年低压电工考试及低压电工免费试题&#xff0c;包含低压电工考试答案和解析及低压电工免费试题练习。由安全生产模拟考试一点通公众号结合国家低压电工考试最新大纲及低压电工考试真题汇总&#xff0c;有助于低压…...

怎样做模具钢网站/seo范畴有哪些

注意 presto中都是用单引号: select dt from tb1 where dt date 2021-02-01;...

南京网站优化网站建设公司/如何引流客源最快的方法

第一种&#xff1a;使用正则 复制代码 代码如下:<?php echo preg_replace(# #, , ab ab); //输出 "abab" ?>第二种&#xff1a;使用str_replace()函数 复制代码 代码如下:<?php echo str_replace( , , ab ab); //输出 "abab ?>第三种&…...

建设路84号 网站备案/网站死链检测工具

在javascript中数据类型的转换分为两种&#xff0c;一种是隐式类型转换&#xff0c;还有一种是强制转换&#xff0c;下面来简单介绍下这两种数据类型的转换。 - 隐式类型转换 在没有特意指定类型来进行数据转换的都是隐式类型转换&#xff0c;以下的三种情况都是属于隐式类…...

建设投资基金管理有限公司网站/广州最新疫情最新消息

找到laravel安装目录执行php artisan nova:user 转载于:https://www.cnblogs.com/F4natasy/p/10700370.html...

家装设计师怎么学/文章优化关键词排名

Matlab强大的函数库涉及各个领域 matlab各个领域的函数库&#xff1a;https://ww2.mathworks.cn/matlabcentral/fileexchange/ 发现向量arrayA中最小的Num个数 function [ minFlags ] findMinFlag( arrayA,num ) %FINDMINFLAG Summary of this function goes here % Deta…...