Spring Boot 整合 RocketMQ 之消息消费手动提交 ACK 实战【案例分享】
前言:
上一篇我们分享了 RocketMQ 消息重试的一些基本原理,本篇我们基于 Spring Boot 整合 RocketMQ 来分享一下 RocketMQ 消息基于手动提交的案例,在分享手动进行消息 ACK 中也会分享消息重试的使用。
RocketMQ 系列文章传送门
RocketMQ 的介绍及核心概念讲解
Spring Boot 整合 RocketMQ 之普通消息
Spring Boot 整合 RocketMQ 之定时/延时消息
Spring Boot 整合 RocketMQ 之顺序消息
Spring Boot 整合 RocketMQ 之事务消息
RocketMQ 之消息重试机制
同步消息手动提交 ACK 案例分享
同步消息 Producer 消息发送代码案例
同步消息发送的代码前面分享过,感兴趣的朋友也可以通过上面的系列文章链接去查看,这里还是简单的分享一下同步消息 Producer 消息发送代码,具体如下:
package com.order.service.rocketmq.producer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import java.util.ArrayList;
import java.util.List;/*** @ClassName: OneWayMessageProducer* @Author: zhangyong* @Date: 2024/9/27 17:27* @Description: 同步消息发送者*/
@Slf4j
@Component
public class SyncMessageProducer {@Autowiredprivate RocketMQTemplate rocketMqTemplate;/*** @param message:* @date 2024/10/10 17:47* @description 同步消息发送*/public void sendSyncMessage(String message) {rocketMqTemplate.syncSend("sync-topic", MessageBuilder.withPayload(message).build());}}
同步消息手动 ACK Consumer 端代码案例分享
RocketMQ 消息手动 ACK 就不能再使用 @RocketMQMessageListener 注解 + 实现 RocketMQListener 接口的方式来实现了,RocketMQListener 的源码如下:
package org.apache.rocketmq.spring.core;public interface RocketMQListener<T> {void onMessage(T var1);
}
我们可以看到 RocketMQListener 中只提供了一个 onMessage 方法,且返回值为 void,不接受返回值,因此没办法进行手动 ACK。
这里我们使用 DefaultMQPushConsumer 来实现消息的手动 ACK,DefaultMQPushConsumer 实现了 MQPushConsumer 接口,具体实现代码如下:
package com.order.service.rocketmq.consumer;import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** @ClassName: ManualCommitSyncCounsumer* @Author: Author* @Date: 2024/10/16 16:23* @Description:*/
@Slf4j
@Component
public class ManualCommitSyncCounsumer {/*** @date 2024/10/16 17:19* @description 同步消息消费成功 手动提交*/@PostConstructpublic void onSyncMessage() throws MQClientException {//消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sync-group");//设置最大消息重试次数//consumer.setMaxReconsumeTimes(2);//RocketMQ 地址可以 可以用配置文件consumer.setNamesrvAddr("xxx-xxx-rocketmq.xxx.com:19876");//订阅一个或多个topic,并指定tag过滤条件,这里指定 * 表示接收所有 tag 的消息consumer.subscribe("sync-topic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {//存储消息id 和消费次数的关系final Map<String, Integer> map = new HashMap<>();@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(new Date());//消息处理MessageExt messageExt = list.get(0);String message = new String(messageExt.getBody());String msgId = messageExt.getMsgId();//获取消息消费次数Integer count = map.get(msgId);if (count == null) {count = 0;}//次数+1count = count + 1;//覆盖mapmap.put(msgId, count);if (count > 2) {log.info("当前时间:{},当前消息id:{},是第【{}】次消费,直接返回消费消费成功,消息内容:{}", dateStr, msgId, count, message);//消息消费成功后移除map.remove(msgId);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}log.info("当前时间:{},当前消息id:{},是第【{}】次消费,消息内容:{}", dateStr, msgId, count, message);//模拟除 0 异常//int a = 1 / 0;log.info("当前时间:{},当前消息id:{},是第【{}】次消费,消息消费成功,消息内容:{}", dateStr, msgId, count, message);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//return null;}});consumer.start();}}
RocketMQ 同步消费端手动 ACK 结果验证:
正常消费返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS
2024-10-19 10:23:07.575 INFO 10820 --- [MessageThread_1] c.o.s.r.c.ManualCommitSyncCounsumer : 当前时间:2024-10-19 10:23:07,当前消息id:7F0000012A4418B4AAC25EECF4F50000,是第【1】次消费,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:23:07.575 INFO 10820 --- [MessageThread_1] c.o.s.r.c.ManualCommitSyncCounsumer : 当前时间:2024-10-19 10:23:07,当前消息id:7F0000012A4418B4AAC25EECF4F50000,是第【1】次消费,消息消费成功,消息内容:小明同学你妈喊你回家吃饭了
没有模拟除 0 异常,正常消费返回,一次就消费成功,结果符合预期。
模拟除 0 异常
2024-10-19 10:19:59.026 INFO 34052 --- [MessageThread_2] c.o.s.r.c.ManualCommitSyncCounsumer : 当前时间:2024-10-19 10:19:59,当前消息id:7F000001850418B4AAC25EEA14AF0003,是第【1】次消费,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:20:09.029 INFO 34052 --- [MessageThread_3] c.o.s.r.c.ManualCommitSyncCounsumer : 当前时间:2024-10-19 10:20:09,当前消息id:7F000001850418B4AAC25EEA14AF0003,是第【2】次消费,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:20:39.032 INFO 34052 --- [MessageThread_4] c.o.s.r.c.ManualCommitSyncCounsumer : 当前时间:2024-10-19 10:20:39,当前消息id:7F000001850418B4AAC25EEA14AF0003,是第【3】次消费,直接返回消费消费成功,消息内容:小明同学你妈喊你回家吃饭了
可以看到消息在不断重试消息,分别是 2024-10-19 10:19:59、2024-10-19 10:20:09、2024-10-19 10:20:39 触发了消费,时间间隔也是 10秒、30秒,结果符合预期。
正常消费但是返回 NULL
2024-10-19 10:25:47.338 INFO 27256 --- [MessageThread_1] c.o.s.r.c.ManualCommitSyncCounsumer : 当前时间:2024-10-19 10:25:47,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【1】次消费,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:25:47.338 INFO 27256 --- [MessageThread_1] c.o.s.r.c.ManualCommitSyncCounsumer : 当前时间:2024-10-19 10:25:47,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【1】次消费,消息消费成功,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:25:57.344 INFO 27256 --- [MessageThread_2] c.o.s.r.c.ManualCommitSyncCounsumer : 当前时间:2024-10-19 10:25:57,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【2】次消费,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:25:57.344 INFO 27256 --- [MessageThread_2] c.o.s.r.c.ManualCommitSyncCounsumer : 当前时间:2024-10-19 10:25:57,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【2】次消费,消息消费成功,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:26:27.347 INFO 27256 --- [MessageThread_3] c.o.s.r.c.ManualCommitSyncCounsumer : 当前时间:2024-10-19 10:26:27,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【3】次消费,直接返回消费消费成功,消息内容:小明同学你妈喊你回家吃饭了
可以看到返回 NULL 和模拟除 0 异常是一样的效果,消息在不断重试消息,分别是 2024-10-19 10:19:59、2024-10-19 10:20:09、2024-10-19 10:20:39 触发了消费,时间间隔也是 10秒、30秒,结果符合预期。
上面的案例我们没有控制消息消费重试次数,我们可以设置一个消息消费重试次数,代码如下:
//设置最大消息重试次数
consumer.setMaxReconsumeTimes(2);
使用 @RocketMQMessageListener 注解 + 实现 RocketMQListener 接口的方式消费消息的时候消费失败也会自动进行重试,因此我们一定要控制好重试次数,可以 @RocketMQMessageListener 注解的 maxReconsumeTimes 来控制重试次数(高版本的 starter 才有该属性)。
不管使用哪种方式进行消费,对于达到重试次数还是消费失败的消息一般有两种处理方式,分別是:
- 直接返回消息消费成功,使用本地库记录消息进行重新推送或者人工介入处理。
- 不进行处理,让消息进入死信队列,然后去监听死信队列进行处理。
顺序消息 Producer 消息发送代码案例
顺序消息发送的代码前面分享过,感兴趣的朋友也可以通过上面的系列文章链接去查看,这里还是简单的分享一下顺序消息 Producer 消息发送代码,具体如下:
package com.order.service.rocketmq.producer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;/*** @ClassName: OneWayMessageProducer* @Author: Author* @Date: 2024/9/27 17:27* @Description: 顺序消息发送者*/
@Slf4j
@Component
public class OrderlyMessageProducer {@Autowiredprivate RocketMQTemplate rocketMqTemplate;/*** @date 2024/10/11 15:45* @description 同步顺序消息*/public void syncSendOrderly() {//hashKey 用来计算决定消息发送到哪个队列 一般是订单 ID 等信息 这里我们模拟订单 ID 发送rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:666666 创建").build(), "666666");rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:666666 支付").build(), "666666");rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:666666 确认收货").build(), "666666");rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:888888 创建").build(), "888888");rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:888888 支付").build(), "888888");rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:888888 确认收货").build(), "888888");}}
顺序消息 Consumer 消息消费代码案例
同步消息手动 ACK 我们是注册了一个 MessageListenerConcurrently 消息监听器,顺序消息的手动 ACK 我们需要注册一个 MessageListenerOrderly 的消息监听器,具体代码如下:
package com.order.service.rocketmq.consumer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** @ClassName: ManualCommitOrderlyMessageConsumer* @Author: Author* @Date: 2024/10/10 17:35* @Description: 顺序消息消费*/
@Slf4j
@Component
public class ManualCommitOrderlyMessageConsumer {/*** @date 2024/10/16 17:19* @description 顺序消息消费成功 手动提交*/@PostConstructpublic void onOrderlyMessage() throws MQClientException {//消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly-group");//设置最大消息重试次数//consumer.setMaxReconsumeTimes(2);//RocketMQ 地址可以 可以用配置文件consumer.setNamesrvAddr("dev-ztn-rocketmq.eminxing.com:19876");//订阅一个或多个topic,并指定tag过滤条件,这里指定 * 表示接收所有 tag 的消息consumer.subscribe("orderly-topic", "*");consumer.registerMessageListener(new MessageListenerOrderly() {//存储消息id 和消费次数的关系final Map<String, Integer> map = new HashMap<>();@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(new Date());//消息处理MessageExt messageExt = list.get(0);String message = new String(messageExt.getBody());String msgId = messageExt.getMsgId();//获取消息消费次数Integer count = map.get(msgId);if (count == null) {count = 0;}//次数+1count = count + 1;//覆盖mapmap.put(msgId, count);if (count > 2) {log.info("当前时间:{},当前消息id:{},是第【{}】次消费,直接返回顺序消费消费成功,消息内容:{}", dateStr, msgId, count, message);//消息消费成功后移除map.remove(msgId);return ConsumeOrderlyStatus.SUCCESS;}log.info("当前时间:{},当前消息id:{},是第【{}】次消费,顺序消息内容:{}", dateStr, msgId, count, message);//模拟除 0 异常//int a = 1 / 0;log.info("当前时间:{},当前消息id:{},是第【{}】次消费,顺序消息消费成功,消息内容:{}", dateStr, msgId, count, message);return ConsumeOrderlyStatus.SUCCESS;//return null;}});consumer.start();}}
RocketMQ 顺序消费端手动 ACK 结果验证:
正常消费返回 ConsumeOrderlyStatus.SUCCESS
2024-10-19 13:46:44.293 INFO 29348 --- [orderly-group_1] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EBC0000,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 13:46:44.294 INFO 29348 --- [orderly-group_1] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EBC0000,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 13:46:44.295 INFO 29348 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC00001,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 13:46:44.295 INFO 29348 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC00001,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 13:46:44.295 INFO 29348 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC30002,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 13:46:44.295 INFO 29348 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC30002,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 13:46:44.298 INFO 29348 --- [orderly-group_3] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC50003,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 13:46:44.298 INFO 29348 --- [orderly-group_3] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC50003,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 13:46:44.300 INFO 29348 --- [orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC70004,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 13:46:44.300 INFO 29348 --- [orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC70004,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 13:46:44.302 INFO 29348 --- [orderly-group_5] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC90005,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
2024-10-19 13:46:44.303 INFO 29348 --- [orderly-group_5] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC90005,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:888888 确认收货
可以看到是按发送顺序消费的,结果符合预期。
模拟除 0 异常
2024-10-19 13:50:45.587 INFO 27604 --- [orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:45,当前消息id:7F0000016BD418B4AAC25FAB0D4F0000,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 13:50:46.588 INFO 27604 --- [orderly-group_5] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:46,当前消息id:7F0000016BD418B4AAC25FAB0D4F0000,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 13:50:47.592 INFO 27604 --- [orderly-group_6] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:47,当前消息id:7F0000016BD418B4AAC25FAB0D4F0000,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 13:50:47.592 INFO 27604 --- [orderly-group_6] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:47,当前消息id:7F0000016BD418B4AAC25FAB0D540001,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 13:50:48.593 INFO 27604 --- [orderly-group_7] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:48,当前消息id:7F0000016BD418B4AAC25FAB0D540001,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 13:50:49.595 INFO 27604 --- [orderly-group_8] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:49,当前消息id:7F0000016BD418B4AAC25FAB0D540001,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 13:50:49.595 INFO 27604 --- [orderly-group_8] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:49,当前消息id:7F0000016BD418B4AAC25FAB0D550002,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 13:50:50.610 INFO 27604 --- [orderly-group_9] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:50,当前消息id:7F0000016BD418B4AAC25FAB0D550002,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 13:50:51.611 INFO 27604 --- [rderly-group_10] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:51,当前消息id:7F0000016BD418B4AAC25FAB0D550002,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 13:50:51.611 INFO 27604 --- [rderly-group_10] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:51,当前消息id:7F0000016BD418B4AAC25FAB0D580003,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 13:50:52.617 INFO 27604 --- [rderly-group_11] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:52,当前消息id:7F0000016BD418B4AAC25FAB0D580003,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 13:50:53.618 INFO 27604 --- [rderly-group_12] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:53,当前消息id:7F0000016BD418B4AAC25FAB0D580003,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 13:50:53.618 INFO 27604 --- [rderly-group_12] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:53,当前消息id:7F0000016BD418B4AAC25FAB0D590004,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 13:50:54.620 INFO 27604 --- [rderly-group_13] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:54,当前消息id:7F0000016BD418B4AAC25FAB0D590004,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 13:50:55.627 INFO 27604 --- [rderly-group_14] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:55,当前消息id:7F0000016BD418B4AAC25FAB0D590004,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 13:50:55.627 INFO 27604 --- [rderly-group_14] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:55,当前消息id:7F0000016BD418B4AAC25FAB0D5B0005,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
2024-10-19 13:50:56.629 INFO 27604 --- [rderly-group_15] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:56,当前消息id:7F0000016BD418B4AAC25FAB0D5B0005,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
2024-10-19 13:50:57.631 INFO 27604 --- [rderly-group_16] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:57,当前消息id:7F0000016BD418B4AAC25FAB0D5B0005,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 确认收货
可以看到是按发送顺序消费的,且每条消息都是经过第三次消费后才返回成功的,每次消费具体上一次消费的时间间隔是 1 秒,结果符合预期。
这里需要注意一点的时候上一条消息没有消费成功时,后面一条消息永远不会被消费,这个从我们的日志中也能够体现出来,因此我们在使用顺序消息的时候一定要注意消息的重试次数,消息重试次数我们可以通过自己的业务来判断消费几次,也可以使用 RocketMQ 来实现,代码如下:
//设置最大消息重试次数
consumer.setMaxReconsumeTimes(2);
模拟返回 NULL
2024-10-19 14:00:51.484 INFO 22728 --- [rderly-group_14] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:51,当前消息id:7F00000158C818B4AAC25FB44C170000,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 14:00:52.485 INFO 22728 --- [rderly-group_15] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:52,当前消息id:7F00000158C818B4AAC25FB44C170000,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 14:00:53.492 INFO 22728 --- [rderly-group_16] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:53,当前消息id:7F00000158C818B4AAC25FB44C170000,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 14:00:53.492 INFO 22728 --- [rderly-group_16] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:53,当前消息id:7F00000158C818B4AAC25FB44C1B0001,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 14:00:54.492 INFO 22728 --- [rderly-group_17] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:54,当前消息id:7F00000158C818B4AAC25FB44C1B0001,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 14:00:55.494 INFO 22728 --- [rderly-group_18] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:55,当前消息id:7F00000158C818B4AAC25FB44C1B0001,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 14:00:55.494 INFO 22728 --- [rderly-group_18] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:55,当前消息id:7F00000158C818B4AAC25FB44C1D0002,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 14:00:56.495 INFO 22728 --- [rderly-group_19] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:56,当前消息id:7F00000158C818B4AAC25FB44C1D0002,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 14:00:57.497 INFO 22728 --- [rderly-group_20] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:57,当前消息id:7F00000158C818B4AAC25FB44C1D0002,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 14:00:57.497 INFO 22728 --- [rderly-group_20] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:57,当前消息id:7F00000158C818B4AAC25FB44C1F0003,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 14:00:58.510 INFO 22728 --- [orderly-group_1] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:58,当前消息id:7F00000158C818B4AAC25FB44C1F0003,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 14:00:59.520 INFO 22728 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:59,当前消息id:7F00000158C818B4AAC25FB44C1F0003,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 14:00:59.520 INFO 22728 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:59,当前消息id:7F00000158C818B4AAC25FB44C200004,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 14:01:00.521 INFO 22728 --- [orderly-group_3] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:00,当前消息id:7F00000158C818B4AAC25FB44C200004,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 14:01:01.523 INFO 22728 --- [orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:01,当前消息id:7F00000158C818B4AAC25FB44C200004,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 14:01:01.523 INFO 22728 --- [orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:01,当前消息id:7F00000158C818B4AAC25FB44C220005,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
2024-10-19 14:01:02.525 INFO 22728 --- [orderly-group_5] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:02,当前消息id:7F00000158C818B4AAC25FB44C220005,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
2024-10-19 14:01:03.527 INFO 22728 --- [orderly-group_6] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:03,当前消息id:7F00000158C818B4AAC25FB44C220005,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 确认收货
可以看到是按发送顺序消费的,且每条消息都是经过第三次消费后才返回成功的,每次消费具体上一次消费的时间间隔是 1 秒,和模拟除 0异常 一样的结果,结果符合预期。
总结:RocketMQ 消息重试次数可以通过 @RocketMQMessageListener 注解的 maxReconsumeTimes 属性来设置,也可以通过编码来设置,不管采用何种方式来设置,我们都要在业务编码中做好处理,过多重视造成的性能问题,以及没有合理处理消费失败的消息造成的消息丢失问题,对于顺序消息我们更要慎重对待,顺序消息回到导致消息阻塞。
如有不正确的地方欢迎各位指出纠正。
相关文章:
Spring Boot 整合 RocketMQ 之消息消费手动提交 ACK 实战【案例分享】
前言: 上一篇我们分享了 RocketMQ 消息重试的一些基本原理,本篇我们基于 Spring Boot 整合 RocketMQ 来分享一下 RocketMQ 消息基于手动提交的案例,在分享手动进行消息 ACK 中也会分享消息重试的使用。 RocketMQ 系列文章传送门 RocketMQ …...
胃癌数据集(不定期更新)
恶性肿瘤其实就是癌症的一种,它是由一些不正常的细胞组成的,这些细胞会不停地生长和繁殖,而且它们还会侵犯周围的正常组织,甚至通过血液和淋巴系统跑到身体其他部位去。 恶性肿瘤有以下几个特点: 1、无限制生长&…...
zh/FAQ/CentOSStream-CentOS Stream 常见问题
CentOS Stream 是什么? CentOS Stream 是一个为开发者而设的发行版本,目的是要协助社群中的成员、Red Hat 伙伴及其它人在一个较稳定及可预测的 Linux 生态环境下充分利用创新的开源程序。 它的内容正是 Red Hat 有意收录于下个稳定版 RHEL 的软件。任何…...
多台西门子PLC与多台三菱PLC之间实时通讯的方案(PLC内不用编程)
PLC通讯智能网关IGT-DSER模块支持西门子、三菱、欧姆龙、罗克韦尔AB、GE等各种品牌的PLC之间通讯,同时也支持PLC与Modbus协议的变频器、智能仪表等设备通讯。网关有多个网口、串口,也可选择WIFI无线通讯。PLC内无需编程开发,在智能网关上配置…...
C++ [项目] 恶魔轮盘赌
现在才发现C游戏的支持率这么高,那就发几篇吧 零、前情提要 此篇是我与 同学的共创,他负责写人,我负责写机,简称人机, 不过有一点小插曲…… 一、基本介绍 支持Dev-C5.11版本(务必调为英文输入法),基本操作看游戏里的介绍,怎么做的……懒得说,能看懂就看注释,没有…...
微信小程序版本更新管理——实现自动更新
✅作者简介:2022年博客新星 第八。热爱国学的Java后端开发者,修心和技术同步精进。 🍎个人主页:Java Fans的博客 🍊个人信条:不迁怒,不贰过。小知识,大智慧。 💞当前专栏…...
Python使用Selenium库实现CSDN自动化发帖
虽然CSDN上有很多优秀的作品,但也不乏很多很水的文章,我也不知道这种有什么意义。不过发这么水的文章多没意思,让浏览器自动化发帖就行了。以下程序能够实现CSDN自动化发帖,同时附自动给关注的人的文章点赞的程序。因为也有很多作…...
StringBulider和StringBuffer的底层源码剖析
要深入了解 StringBuffer 和 StringBuilder 的区别,从底层源码的角度来解析,包括它们的创建、扩容机制等,可以参考 JDK 1.8 的源码。 1. AbstractStringBuilder 类 StringBuffer 和 StringBuilder 都继承自 AbstractStringBuilder。…...
手机空号过滤接口-在线手机空号检测-手机空号过滤API
接口简介:在线检测手机号状态,与运营商平台联动大数据分析判断手机号状态。可划分出实号、空号、停机、流量卡、沉默号。 更新周期:两周 不支持号段:14、16、17、19号段 存在5%的误差,如需实时接口,可购买手…...
ubuntu 用ss-TPROXY实现透明代理,基于TPROXY的透明TCP/UDP代理,在 Linux 2.6.28 后进入官方内核。
TPROXY 是一个 Linux 内核模块,在 Linux 2.6.28 后进入官方内核。 1 安装 tproxy 相关依赖 参考ss-tproxy 的安装依赖。 bash ss-tproxy 使用了 bash 的一些语法特性,比如 shell 数组,因此必须用 bash 解释器执行。大多数发行版已经自带了…...
报错解决:opene3d draw_geometries(): incompatible function arguments.
1. 报错信息 o3d.visualization.draw_geometries(target_pcd) TypeError: draw_geometries(): incompatible function arguments. The following argument types are supported:1. (geometry_list: List[open3d.cpu.pybind.geometry.Geometry], window_name: str Open3D, wid…...
Clickhouse笔记(二) 集群搭建
0.集群规划 操作系统使用ubuntu2204server,8C8G100G。 节点分片部署192.168.50.5分片1副本1clickhouse-server/clickhouse-client/keeper192.168.50.6分片1副本2clickhouse-server/clickhouse-client/keeper192.168.60.7分片2副本1clickhouse-server/clickhouse-c…...
华为云购买弹性云服务器(教程)
配置弹性云服务器 基础配置 实例 操作系统...
Python异常检测- 单类支持向量机(One-Class SVM)
系列文章目录 Python异常检测- Isolation Forest(孤立森林) python异常检测 - 随机离群选择Stochastic Outlier Selection (SOS) python异常检测-局部异常因子(LOF)算法 Python异常检测- DBSCAN 文章目录 系列文章目录前言一、On…...
基于SpringBoot+Vue+uniapp微信小程序的婚庆摄影小程序的详细设计和实现(源码+lw+部署文档+讲解等)
项目运行截图 技术框架 后端采用SpringBoot框架 Spring Boot 是一个用于快速开发基于 Spring 框架的应用程序的开源框架。它采用约定大于配置的理念,提供了一套默认的配置,让开发者可以更专注于业务逻辑而不是配置文件。Spring Boot 通过自动化配置和约…...
NGAC访问控制系列三:低代码产品NGAC运用与算法运用
目录 一、基于NGAC的低代码模型权限管理方案 二、多策略访问控制系统限制内部访问的线性时间算法 1、概述 2、权限控制图遍历算法 一、基于NGAC的低代码模型权限管理方案 NGAC在低代码平台的权限决策模型:功能权限数据权限 案例:对于七巧低代码开发平台&…...
Unity3D 面试题收录
Unity3D 客户端面试题收录(持续更新~) 面试题收录 本文收录一些对于 Unity3D 客户端可能遇到的面试题(持续更新~),答案仅出于个人理解,如有偏差,希望指正。 Unity3D MonoBehaviour 生命周期…...
Jrebel热部署不生效解决
打开setting找到build底下的complier勾选 build project automatically 设置自动构建项目 打开setting找到Advanced Setting勾选Complier中的第一项 Jrebel panel中勾选需要热部署的项目模块 项目配置信息:Edit Configuration中进行编辑,配置如下图…...
软件测试学习笔记丨Selenium键盘鼠标事件ActionChains
本文转自测试人社区,原文链接:https://ceshiren.com/t/topic/22515 本文为霍格沃兹测试开发学社的学习经历分享,写出来分享给大家,希望有志同道合的小伙伴可以一起交流技术,一起进步~ 说明:本篇博客基于sel…...
CI/CD(持续集成与持续交付)流水线
集成 Jenkins、GitLab Webhook、Nexus 和 RabbitMQ 可以形成一个全面的 CI/CD(持续集成与持续交付)流水线,结合消息队列可以创建事件驱动的工作流。以下是配置这四个工具以实现一个基本的 CI/CD 流程的详细步骤。 前置条件 Jenkins、GitLab、…...
分布式光伏发电有什么优劣势
分布式光伏发电是指在离电力消费地点较近的地方,通过安装光伏发电系统来发电。这种系统通常用于屋顶、地面或其他建筑物上,相比于集中式光伏发电系统,它有一些独特的优势和劣势: 优势: 地理适应性: 分布式…...
Spring Boot框架中的IO
1. 文件资源的访问与管理 在 Spring Boot 中,资源文件的访问与管理是常见的操作需求,比如加载配置文件、读取静态文件或从外部文件系统读取文件。Spring 提供了多种方式来处理资源文件访问,包括通过 ResourceLoader、Value 注解以及 Applica…...
DBeaver连接Hive教程
hive shell:通过hive shell来操作hive,但是至多只能存在一个hive shell,启动第二个会被阻塞,也就是说hive shell不支持并发操作。 基于JDBC等协议:启动hiveserver2,通过jdbc协议可以访问hive,hi…...
Vue-Router源码实现详解
1.Hash模式 hash就是url中#后面的部分hash改变时,页面不会从新加载,会触发hashchange事件,去监听hash改变,而且也会被记录到浏览器历史记录中vue-router的hash模式,主要是通过hashchange事件,根据hash值找…...
程序员节日的日期是10月24日程序员日
程序员节日的日期是10月24日。 这一天被称为中国程序员日或1024程序员节,由博客园、CSDN等自发组织设立,旨在纪念程序员对科技世界的贡献。 程序员节日的由来和意义 1024程序员节的由来可以追溯到2010年,最初由网友提出设立一个…...
联邦学习中的数据异构性
在联邦学习(Federated Learning, FL)领域中, 异构数据(Heterogeneous Data) 是指不同客户端所持有的本地数据在特征分布、类别分布、数量等方面存在差异的数据。这种数据的异质性是联邦学习面临的一大挑战,…...
Python小程序 - 替换文件内容
1. 写入文件c:\a.txt 1)共写入10行 2)每行内容 0123456789 # 1 ls 0123456789 ln 10 with open("c:/a.txt", w,encodingUTF-8) as f:for i in range(ln):f.write(ls\n)######################################### 2 ln 10…...
k8s备份恢复(velero)
velero简介 velero官网: https://velero.io/ velero-github: https://github.com/vmware-tanzu/velero velero的特性 备份可以按集群资源的子集,按命名空间、资源类型标签选择器进行过滤,从而为备份和恢复的内容提供高度的灵活…...
LED户外屏:面对复杂环境的七大挑战
户外LED显示屏作为现代城市广告和信息传播的重要媒介,其应用范围越来越广泛。然而,与室内环境相比,户外环境的复杂多变对LED显示屏提出了更高的要求。本文将探讨户外LED显示屏在设计和应用过程中必须考虑的七个关键问题。 1. 高分辨率 户外LE…...
LabVIEW自动化流动返混实验系统
随着工业自动化的不断发展,连续流动反应器在化工、医药等领域中的应用日益广泛。传统的流动返混实验操作复杂,数据记录和处理不便,基于LabVIEW的全自动流动返混实验系统能自动测定多釜反应器、单釜反应器和管式反应器的停留时间分布ÿ…...
网站建设方案书原件/项链seo关键词
《班扎古鲁白玛的沉默》 作者:扎西拉姆多多 你见,或者不见我 我就在那里 不悲不喜 你念,或者不念我 情就在那里 不来不去 你爱,或者不爱我 爱就在那里 不增不减 你跟&…...
大连城乡建设委员会网站/网页设计素材网站
构建代码Jeff Mott , Dan Prince和Sebastian Seitz对本文进行了同行评审。 感谢所有SitePoint的同行评审员使SitePoint内容达到最佳状态! 以功能性方式考虑JavaScript的优势之一是能够使用小型,易于理解的单个功能来构建复杂的功能。 但是有…...
优秀网站设计参考/竞价推广账户托管费用
在上一篇进一步了解String 中,发现了string的不便之处,而string的替代解决方案就是StringBuilder的使用..它的使用也很简单System.Text.StringBuilder sb new System.Text.StringBuilder();这样就初始化了一个StringBuilder ..之后我们可以通过Append()来追加字符串填充到sb中…...
重庆网站设计好的公司/seo技术分享
利用微数据、微格式进行SEO优化——提高搜索引擎收录展示效果 最近,Google、Bing和雅虎宣布正式推出 schema.org,这是由Google、Bing和雅虎共同发起的一个新项目,将为网页上的结构化数据标记建立并提供一套通用模式。 Schema.org旨在成为网站…...
青岛小型网站建设/知乎营销推广
2019独角兽企业重金招聘Python工程师标准>>> 风险提示:刷BIOS有风险,请谨慎 想给我的e40换无线网卡,网卡买了,但是不会拆机、就算安装了也会报1802错误,不能开机。周六在中关村转了一圈,商家都不…...
网站建设外包还是自己做/域名注册
大数据快速发展,对数据实时性要求不断提高。Flink 作为集流式批量于一体的开源大数据计算引擎,凭着在实时 ETL、实时报表、监控预警、在线系统等方面优势备受企业青睐。阿里、腾讯、字节等较早就已经进行了布局,尤其是字节将它作为流处理唯一…...