【RocketMQ】重试机制及死信消息处理
【RocketMQ】重试机制及死信消息处理
文章目录
- 【RocketMQ】重试机制及死信消息处理
- 1. 重试机制
- 1.1 生产者重试
- 1.2 消费者重试
- 1.2.1 死信队列
参考文档: 官方文档
1. 重试机制
1.1 生产者重试
rocketmq生产者发送消息失败默认重试2次(同步发送为2次,异步发送为0次)。
当然也可以自定义重试次数及机制:
// 失败的情况重发3次
producer.setRetryTimesWhenSendFailed(3);
// 消息在1S内没有发送成功,就会重试
producer.send(msg, 1000);
1.2 消费者重试
若Consumer消费某条消息失败,则RocketMQ会在重试间隔时间后,将消息重新投递给Consumer消费,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列。
- 在集群模式下,消费的业务逻辑代码会返回 Action.ReconsumerLater,NULL,或者抛出异常,如果一条消息消费失败,最多会重试 16 次,之后该消息会被丢弃。
- 在广播消费模式下,广播消费仍然保证消息至少被消费一次,但不提供重发的选项。
消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息
- 最大重试次数:消息消费失败后,可被重复投递的最大次数。
consumer.setMaxReconsumeTimes(5);
在实际生产中,一般重试3-5次,如果还没有消费成功,则可以把消息签收,通知人工介入。
- 重试间隔:消息消费失败后再次被投递给Consumer消费的间隔时间,只在顺序消费中起作用。
consumer.setSuspendCurrentQueueTimeMillis(5000);
示例:
消费者重试示例:
@Test
public void retryConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("retryTopic", "*");//设置最大重试次数consumer.setMaxReconsumeTimes(2);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt messageExt = list.get(0);System.out.println("时间:" + new Date() + "\t消息体:" + new String(messageExt.getBody()) + "\t重试次数:" + messageExt.getReconsumeTimes());//业务报错了 返回null 返回RECONSUME_LATER 都会重试if(true){throw new RuntimeException();}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}
顺序消费和并发消费的重试机制并不相同,顺序消费消费失败后会先在客户端本地重试直到最大重试次数,这样可以避免消费失败的消息被跳过,消费下一条消息而打乱顺序消费的顺序,而并发消费消费失败后会将消费失败的消息重新投递回服务端,再等待服务端重新投递回来,在这期间会正常消费队列后面的消息。
并发消费失败后并不是投递回原Topic,而是投递到一个 特殊Topic
,其命名为%RETRY%ConsumerGroupName,集群模式下并发消费每一个ConsumerGroup会对应一个特殊Topic,并会订阅该Topic。 两者参数差别如下:
消费类型 | 重试间隔 | 最大重试次数 |
---|---|---|
顺序消费 | 间隔时间可通过自定义设置,SuspendCurrentQueueTimeMillis | 最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。该参数取值无最大限制。若未设置参数值,默认最大重试次数为Integer.MAX |
并发消费 | 间隔时间根据重试次数阶梯变化,取值范围:1秒~2小时。不支持自定义配置 | 最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。默认值为16次,该参数取值无最大限制,建议使用默认值。(根据延时等级划分,共16次) |
1.2.1 死信队列
上面提到的 特殊Topic
称为死信Topic,对应的队列就是死信队列,其中存储的消息就是死信消息。如果产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,死信队列的消息将不会再被消费。应该通知人工介入处理。
对于死信的处理方案有多种,这里演示两种:
- 编写消费者监听死信队列
- 在消费者达到最大重试次数时立刻处理。
方案一:
@Test
public void deadConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-dead-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("%DLQ%retry-consumer-group", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt messageExt = list.get(0);System.out.println("时间:" + new Date() + "\t消息体:" + new String(messageExt.getBody()) + "\t重试次数:" + messageExt.getReconsumeTimes());System.out.println("记录到特别的位置如mysql,发送邮件或短信通知人工处理");//业务报错了 返回null 返回RECONSUME_LATER 都会重试return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}
该方案有一个缺点,就是如果很多Topic都产生了死信消息,那么我们想要处理这些死信消息就得编写很多个监听各个死信队列的消费者。
方案二:
针对方案一的缺点,方案二能够比较好的解决。
@Test
public void retryConsumer2() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("retryTopic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt messageExt = list.get(0);System.out.println(new Date());try {handleDb();// 10/0} catch (Exception e) {if (messageExt.getReconsumeTimes() >= 2) {//不要重试了System.out.println("消息体:" + new String(messageExt.getBody()));System.out.println("记录到特别的位置如mysql,发送邮件或短信通知人工处理");} else {//重试System.out.println("时间:" + new Date() + "\t消息体:" + new String(messageExt.getBody()) + "\t重试次数:" + messageExt.getReconsumeTimes());return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}private void handleDb() {int i = 10 / 0;
}
相关文章:

【RocketMQ】重试机制及死信消息处理
【RocketMQ】重试机制及死信消息处理 文章目录 【RocketMQ】重试机制及死信消息处理1. 重试机制1.1 生产者重试1.2 消费者重试1.2.1 死信队列 参考文档: 官方文档 1. 重试机制 1.1 生产者重试 rocketmq生产者发送消息失败默认重试2次(同步发送为2次,异…...

Mysql DDL执行方式-pt-osc介绍 | 京东云技术团队
1 引言 大家好,接着上次和大家一起学习了《MySQL DDL执行方式-Online DDL介绍》,那么今天接着和大家一起学习另一种MySQL DDL执行方式之pt-soc。 在MySQL使用过程中,根据业务的需求对表结构进行变更是个普遍的运维操作,这些称为…...

C++ stack容器介绍
🤔stack容器介绍: 📖 stack是一种数据结构,也可以被称为堆栈。它是一个容器,只允许在最顶层进行插入和删除,并且只能访问最后一个插入的元素。这个元素称为栈顶。所有新插入的元素都被放置在栈顶上面&#…...

在 Git 中撤消更改的 6 种方法!
目录 1. 修改最近的提交 2. 将分支重置为较旧的提交 硬重置 软重置分支 创建备份分支 3. 交互式变基 删除旧提交 改写提交消息 编辑旧提交 压缩 4. 还原提交 5. 签出文件 6. 使用 Git Reflog 当使用 Git 进行项目代码管理时,难免会出现一些错误操作或需…...

LiveGBS国标GB/T28181国标平台功能-电子地图移动位置订阅mobileposition地图定位GPS轨迹坐标位置获取redis获取位置
LiveGBS国标GB/T28181国标平台功能-电子地图移动位置订阅mobileposition地图定位GPS轨迹坐标位置获取redis获取位置 1、位置订阅1.1、国标设备编辑1.2、选择设备开启位置订阅1.3、全局开启位置订阅1.4、通过目录订阅获取位置(少数情况) 2、经纬度信息查询2.1、访问接口获取2.1.…...

编程(38)----------计算机的部分原理
本篇主要总结一些计算机的理论部分. 计算机在发展历程中,无论是最早的巨无霸机器,还是现在小到可以拿在手中的掌机.只要其本质上是计算机,在最基础的结构上,都是以冯诺依曼体系所构建的. 冯诺依曼体系大致将计算机分为几个最重要的部分:输入,输出,中央处理器,存储设备.也就是…...

若依框架快速搭建(二)
目录 数据库设计功能模块设计XXX信息管理xxx查询xxx添加xxx删除xxx修改xxx导出 功能模块实现运行数据库自动代码生成在IDEA中找到RuoYi-generator,修改配置运行前后端项目,在网页中找到代码生成模块导入表后点击确定,序号前打勾,再…...

为建筑物的供暖系统实施MPC控制器的小型项目(Matlab代码实现)
💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…...

【概率论】中心极限定理(二)
文章目录 主观题主观题 每袋味精的净重为随机变量,平均重量为 100 克,标准差为 10 克。一箱内装 200 袋味精,求一箱味精的净重大于 20500 克的概率? 解: ① E ( X i ) = 100 , D ( X i ) = 1 0 2...

Blender UV展开流程
目录 1. UV1.1 blender默认物体1.2 创建物体1.3 UV参考图1.4 标记缝合边1.5 UV拉伸1.6 孤岛模式 1. UV 1.1 blender默认物体 默认物体已经自动生成UV 在UV编辑工作区,编辑模式,全选物体在左边自动展开UV 在物体数据属性-UV贴图-存在默认的UV贴图&#…...

Flutter 笔记 | Flutter 核心原理(二)关键类和启动流程
Widget、Element、BuildContext 和 RenderObject Widget Widget关键类及其子类继承关系如图所示: 其中,Widget是Widget Tree所有节点的基类。Widget的子类主要分为3类: 第1类是RenderObjectWidget的子类,具体来说又分为SingleCh…...

Android:主题切换
一.概述 正在开发的应用做了一版新UI,原打算将新版UI按项目名做成资源包,再在build.gradle里productFlavors{ }多渠道打包实现 但被告知新旧两个项目共用一个分支,那就做成两个主题(Theme1/Theme2)来适配了 如果只是变更UI,做成…...

terminalworks ASP.NET Core PDF 浏览器-Crack
ASP.NET Core 的 PDF 查看器 terminalworks在 ASP.NET Core 网页或应用程序中添加可靠的 PDF 查看器的简单方法。 我们的 Web PDF 查看器基于经过验证和测试的 Mozilla PdfJS 解决方案,该解决方案在 Firefox 中用作默认 PDF 查看器。我们专门设计了我们的查看器&…...

Rust每日一练(Leetday0020) 最后单词的长度、螺旋矩阵II、排列序列
目录 58. 最后一个单词的长度 Length of Last Word 🌟 59. 螺旋矩阵 II Spiral Matrix II 🌟🌟 60. 排列序列 Permutation Sequence 🌟🌟🌟 🌟 每日一练刷题专栏 🌟 Rust每日…...

短视频矩阵源码如何做应用编程?
短视频矩阵源码, 短视频矩阵系统技术文档: 可以采用电子文档或者纸质文档的形式交付,具体取决于需求方的要求。电子文档可以通过电子邮件、远程指导交付云存储等方式进行传输、 短视频矩阵{seo}源码是指将抖音平台上的视频资源进行筛选、排…...

【运维知识进阶篇】Ansible实现一套完整LNMP架构
前面介绍了PlayBook怎么写服务部署,把服务部署上后,我们来用Ansible来部署项目,实现一套完整的LNMP架构。我们部署wordpress、wecenter、phpshe、phpmyadmin这四个项目。将其所有的剧本都写入lnmp.yml中,相关备份数据都放入root/a…...

Spring Boot 自动配置一篇概览
一、什么是自动配置 bean 自动配置类通过添加 AutoConfiguration 注解实现。 因为 AutoConfiguration 注解本身是以 Configuration 注解的,所以自动配置类可以算是一个标准的基于 Configuration 注解的类。 Conditional 注解可以用于声明自动配置启用条件&#x…...

深入理解设计原则之接口隔离原则(ISP)【软件架构设计】
系列文章目录 C高性能优化编程系列 深入理解软件架构设计系列 深入理解设计模式系列 高级C并发线程编程 LSP:接口隔离原则 系列文章目录1、接口隔离原则的定义和解读2、案例解读3、如何判断一个接口是否符合接口隔离原则?小结 1、接口隔离原则的定义和…...

IMX6ULL裸机篇之I2C实验主控代码说明二
一. I2C实验 I2C实验内容: 学习如何使用 I.MX6U 的 I2C 接口来驱动 AP3216C,读取 AP3216C 的传感器数据。 I2C读写数据时序图: I2C写数据时序图如下: I2C读数据时序图如下: 二. I2C主控读写时序 1. 读数据与写数…...

【计算机组成原理与体系结构】数据的表示与运算
目录 一、进位计数制 二、信息编码 三、定点数数据表示 四、校验码 五、定点数补码加减运算 六、标志位的生成 七、定点数的移位运算 八、定点数的乘除运算 九、浮点数的表示 十、浮点数的运算 一、进位计数制 整数部分: 二进制、八进制、十六进制 --…...

如何入门编程
随着信息技术的快速发展,编程已经成为一个越来越重要的技能。那么,我们该如何入门编程呢?欢迎大家积极讨论 一、自学编程需要注意什么? 对于我个人的理解,其实自学编程最重要的就是兴趣。你得培养编程兴趣。 所以在学…...

SQL中CONVERT转化日期函数的使用方法
SQL中CONVERT转化日期函数的使用方法 SQL中CONVERT函数最常用的是使用convert转化长日期为短日期,如果只要取yyyy-mm-dd格式时间, 就可以用convert(nvarchar(10),field,120) 120 是格式代码, nvarchar(10) 是指取出前10位字符. 例如 SELECT CONVERT(nvarchar(10),…...

SpringBoot2-核心技术(一)
SpringBoot2-核心技术(一) 了解SpringBoot配置文件的使用 文章目录 SpringBoot2-核心技术(一)了解SpringBoot配置文件的使用一、文件类型1. properties2. yaml 二、yaml的基本使用1. 基本语法2. 数据类型2.1 字面量 2.2 对象2.3 …...

mac host学习
参考: SSH中known_hosts文件作用和常见问题及解决方法 https://blog.csdn.net/luduoyuan/article/details/130070120在 Mac 上更改 DNS 设置 https://support.apple.com/zh-cn/guide/mac-help/mh14127/mac mac中有时候你输入的域名,但会跳转到与期望ip不…...

Java之~指定String日期时间,5分钟一截取时间
// 截取5分钟时间Testpublic void timeCutForDay() throws ParseException {String startTime "2023-03-28 09:16:03";String endTime "2023-03-31 23:59:59";SimpleDateFormat dateFormat new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");D…...

【chatGPT4结对编程】chatGPT4教我做图像分类
开始接触深度学习 大语言模型火了之后,我也想过是否要加入深度学习的行业当中来,一开始的想法就是AI大模型肯定会被各大厂垄断,我们作为普通应用型软件工程师直接调用api就完事,另外对自己的学历也自卑(刚刚够线的二本࿰…...

Different romantic
001 他暗恋上我们班上的一个女生。 He has a crush on a girl in our class. crush n. 迷恋 have a crush on (someone) 暗恋(某人) crush 也可以指“暗恋的对象”。例如,“他在大学曾经暗恋过两个人”,英语就是He had two crushe…...

learn C++ NO.7——C/C++内存管理
引言 现在是5月30日的正午,图书馆里空空的,也许是大家都在午休,也许是现在37摄氏度的气温。穿着球衣的我已经汗流浃背,今天热火战胜了凯尔特人,闯入决赛。以下克上的勇气也激励着我,在省内垫底的大学中&am…...

SDUT数据库原理——第十章作业(参考答案)
1. 简述使用检查点方法进行数据恢复的一般步骤。 答: (1)使用检查点方法进行数据恢复,首先从重新开始文件(见P302页图10.3)中找到最后一个检查点记录在日志文件中的地址,由该地址在日志文件中找到最后一个检查点记录。 (2)由该检查点记录得到检查点建立时刻所有正在…...

My Note of Diffusion Models
Diffusion Models Links: https://theaisummer.com/diffusion-models/ Markovian Hierachical VAE rvs: data: x 0 x_{0} x0,representation: x T x_{T} xT ( p ( x 0 , x 1 , ⋯ , x T ) , q ( x 1 , ⋯ , x T ∣ x 0 ) ) (p(x_0,x_1,\cdots,x_T),q(x_1,\cdots,x_{T…...