当前位置: 首页 > news >正文

RocketMQ 延迟队列

什么是延迟队列

指消息发送到某个队列后,在指定多长时间之后才能被消费。

应用场景

RocketMQ 延迟队列

定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。

broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m

8m 9m 10m 20m 30m 1h 2h”,18个level。

可以配置自定义messageDelayLevel。需要注意的是 messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:

msg.setDelayLevel(level)

level有以下三种情况:

level == 0,消息为非延迟消息1<=level<=maxLevel,消息延迟特定时间,例如level1,延迟1s

level > maxLevel,则level maxLevel,例如level==20,延迟2h

在 RocketMQ中定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。

RocketMQ 延迟队列和RabbitMQ延迟队列相比

RocketMQ直接一步到位,功能类似于RabbitMQ的延迟交换器插件、而RabbitMQ提供了延迟队列,但如果是基于消息设置每个消息设置不同的延迟时间,会产生前面的消息早已过期,但后面的消息还存在消息队列中,故此RabbitMQ提供了延迟交换器插件,而RocketMQ 延迟队列设计就比较好了。

具体示例

生产者

// 实例化生产者,并指定生产组名称DefaultMQProducer producer =newDefaultMQProducer("myproducer_group_topic_name_delay_01");//设置实例名称,一个jvm中有多个生产者可以根据实例名区分//默认defaultproducer.setInstanceName("topic_delay");// 指定nameserver的地址producer.setNamesrvAddr("localhost:9876");//设置同步重试次数producer.setRetryTimesWhenSendFailed(2);//设置异步发送次数//producer.setRetryTimesWhenSendAsyncFailed(2);// 初始化生产者producer.start();for(int i =0; i <20; i++){Message message =newMessage("topic_name_delay",("key="+ i).getBytes("utf-8"));//设置延迟消费时间 设置延迟时间级别0,18,0表示不延迟,18表示延迟2h,大于18的都是2hmessage.setDelayTimeLevel(i);// 1 同步发送  如果发送失败会根据重试次数重试SendResult send = producer.send(message);SendStatus sendStatus = send.getSendStatus();System.out.println(sendStatus.toString());}

消费者

/*** 推消息消费*/DefaultMQPushConsumer defaultMQPushConsumer =newDefaultMQPushConsumer("consumer_group_delay_01");// 指定nameserver的地址defaultMQPushConsumer.setNamesrvAddr("localhost:9876");defaultMQPushConsumer.subscribe("topic_name_delay","*");// 1 提高消费并行度defaultMQPushConsumer.setConsumeThreadMax(10);defaultMQPushConsumer.setConsumeThreadMin(1);// 2 以批量方式进行 消费// 设置消息批处理的一个批次中消息的最大个数defaultMQPushConsumer.setConsumeMessageBatchMaxSize(10);//设置重试次数 默认16次defaultMQPushConsumer.setMaxReconsumeTimes(1);// 添加消息监听器,一旦有消息推送过来,就进行消费defaultMQPushConsumer.setMessageListener(newMessageListenerConcurrently(){@OverridepublicConsumeConcurrentlyStatusconsumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context){//final MessageQueue messageQueue = context.getMessageQueue();for(MessageExt msg : msgs){System.out.println(msg);try{System.out.println(newString(msg.getBody(),"utf-8"));}catch(UnsupportedEncodingException e){e.printStackTrace();}}// 消息消费成功returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 消息消费失败//                return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});

消费消息,按照0到18级别来,0 表示不延迟,1表示延迟1s,大于等于18表示延迟2h

按照级别一次类推

默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m8m 9m 10m 20m 30m 1h 2h”,18个level。

这里只拷贝0-8 的打印日志,可以自己等待确认。

MessageExt [queueId=0, storeSize=195, queueOffset=19, sysFlag=0, bornTimestamp=1628949643548, bornHost=/192.168.0.103:55518, storeTimestamp=1628949643554, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F0000000000029A8A, commitLogOffset=170634, bodyCRC=858365373, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=20, CONSUME_START_TIME=1628949650433, UNIQ_KEY=C0A8006748D07C53A9EB47ABD51C0000, CLUSTER=DefaultCluster, WAIT=true, DELAY=0}, body=[107, 101, 121, 61, 48], transactionId=‘null’}]
key=0
MessageExt [queueId=1, storeSize=234, queueOffset=22, sysFlag=0, bornTimestamp=1628949643558, bornHost=/192.168.0.103:55518, storeTimestamp=1628949644566, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002ACF8, commitLogOffset=175352, bodyCRC=1143909675, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=23, CONSUME_START_TIME=1628949650435, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5260001, CLUSTER=DefaultCluster, WAIT=true, DELAY=1, REAL_QID=1}, body=[107, 101, 121, 61, 49], transactionId=‘null’}]
key=1
MessageExt [queueId=2, storeSize=234, queueOffset=18, sysFlag=0, bornTimestamp=1628949643561, bornHost=/192.168.0.103:55518, storeTimestamp=1628949648566, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002ADE2, commitLogOffset=175586, bodyCRC=1562901649, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=19, CONSUME_START_TIME=1628949650436, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5290002, CLUSTER=DefaultCluster, WAIT=true, DELAY=2, REAL_QID=2}, body=[107, 101, 121, 61, 50], transactionId=‘null’}]
key=2
MessageExt [queueId=3, storeSize=234, queueOffset=17, sysFlag=0, bornTimestamp=1628949643566, bornHost=/192.168.0.103:55518, storeTimestamp=1628949653569, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002AECC, commitLogOffset=175820, bodyCRC=706792455, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=18, CONSUME_START_TIME=1628949653572, UNIQ_KEY=C0A8006748D07C53A9EB47ABD52E0003, CLUSTER=DefaultCluster, WAIT=true, DELAY=3, REAL_QID=3}, body=[107, 101, 121, 61, 51], transactionId=‘null’}]
key=3
MessageExt [queueId=0, storeSize=234, queueOffset=20, sysFlag=0, bornTimestamp=1628949643568, bornHost=/192.168.0.103:55518, storeTimestamp=1628949673574, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B0A0, commitLogOffset=176288, bodyCRC=876894628, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=21, CONSUME_START_TIME=1628949673577, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5300004, CLUSTER=DefaultCluster, WAIT=true, DELAY=4, REAL_QID=0}, body=[107, 101, 121, 61, 52], transactionId=‘null’}]
key=4
MessageExt [queueId=1, storeSize=234, queueOffset=23, sysFlag=0, bornTimestamp=1628949643570, bornHost=/192.168.0.103:55518, storeTimestamp=1628949703574, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B18A, commitLogOffset=176522, bodyCRC=1128491314, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=24, CONSUME_START_TIME=1628949703577, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5320005, CLUSTER=DefaultCluster, WAIT=true, DELAY=5, REAL_QID=1}, body=[107, 101, 121, 61, 53], transactionId=‘null’}]
key=5
MessageExt [queueId=2, storeSize=234, queueOffset=20, sysFlag=0, bornTimestamp=1628949643572, bornHost=/192.168.0.103:55518, storeTimestamp=1628949763575, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B35E, commitLogOffset=176990, bodyCRC=1514813576, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=21, CONSUME_START_TIME=1628949763580, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5340006, CLUSTER=DefaultCluster, WAIT=true, DELAY=6, REAL_QID=2}, body=[107, 101, 121, 61, 54], transactionId=‘null’}]
key=6
MessageExt [queueId=3, storeSize=234, queueOffset=19, sysFlag=0, bornTimestamp=1628949643574, bornHost=/192.168.0.103:55518, storeTimestamp=1628949823580, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B532, commitLogOffset=177458, bodyCRC=760023070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=20, CONSUME_START_TIME=1628949823583, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5360007, CLUSTER=DefaultCluster, WAIT=true, DELAY=7, REAL_QID=3}, body=[107, 101, 121, 61, 55], transactionId=‘null’}]
key=7
MessageExt [queueId=0, storeSize=234, queueOffset=22, sysFlag=0, bornTimestamp=1628949643576, bornHost=/192.168.0.103:55518, storeTimestamp=1628949883582, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B706, commitLogOffset=177926, bodyCRC=1039275407, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=23, CONSUME_START_TIME=1628949883586, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5380008, CLUSTER=DefaultCluster, WAIT=true, DELAY=8, REAL_QID=0}, body=[107, 101, 121, 61, 56], transactionId=‘null’}]
key=8

同样我们在控制台可以看到,存放的消息

相关文章:

RocketMQ 延迟队列

什么是延迟队列指消息发送到某个队列后&#xff0c;在指定多长时间之后才能被消费。应用场景RocketMQ 延迟队列定时消息&#xff08;延迟队列&#xff09;是指消息发送到broker后&#xff0c;不会立即被消费&#xff0c;等待特定时间投递给真正的topic。broker有配置项messageD…...

【精准计时】北斗GPS卫星时钟同步改变精准计时年代

【精准计时】北斗GPS卫星时钟同步改变精准计时年代 【精准计时】北斗GPS卫星时钟同步改变精准计时年代 北斗GPS成精确计时先锋   北斗GPS精确时间自动校准技术&#xff0c;是一种简便的获取北斗GPS精确时间信息的专利技术&#xff0c;具有灵敏度高、不受时间及地域限制等特点…...

【C#基础】C# 面向对象编程

序号系列文章5【C#基础】C# 运算符总结6【C#基础】C# 常用语句讲解7【C#基础】C# 常用数据结构文章目录前言面向对象的 C#1&#xff0c;类的概念2&#xff0c;类的定义3&#xff0c;类成员4&#xff0c;对象5&#xff0c;继承6&#xff0c;多态性结语前言 &#x1f60a;大家好&…...

数据结构与算法入门

目录数据结构概述逻辑结构存储结构算法概述如何理解“大O记法”时间复杂度空间复杂度数据结构概述 数据结构可以简单的理解为数据与数据之间所存在的一些关系&#xff0c;数据的结构分为数据的存储结构和数据的逻辑结构。 逻辑结构 集合结构&#xff1a;数据元素同属于一个集…...

【OpenAI】基于 Gym-CarRacing 的自动驾驶练习项目 | 路径训练功能的实现 | GYM-Box2D CarRacing

限时开放&#xff0c;猛戳订阅&#xff01; &#x1f449; 《一起玩蛇》&#x1f40d; &#x1f4ad; 写在前面&#xff1a; 本篇是关于多伦多大学自动驾驶专业项目的博客。GYM-Box2D CarRacing 是一种在 OpenAI Gym 平台上开发和比较强化学习算法的模拟环境。它是流行的 Box2…...

亚马逊、沃尔玛测评自养号测评、退款、撸卡撸货怎么做?

大家好&#xff0c;有很多的测评工作室做亚马逊测评、沃尔玛测评自养号大额退款&#xff0c;撸卡撸货的找到我&#xff0c;问我有什么方式可以解决成本&#xff0c;效率&#xff0c;纯净度&#xff0c;便捷性等问题&#xff0c;测评养号系统从最早的模拟器&#xff0c;虚拟机到…...

Apollo 2.1.0最新版docker 部署多环境 与java spring boot 接入demo (附带一键部署脚本)

最新Apollo 版本发布2.1.0 https://www.apolloconfig.com/#/zh/design/apollo-design 环境说明 ecs 主机一台数据库mysql 8.0docker 环境 apollo 是内网可信应用&#xff0c;最好是部署在内网里面&#xff0c;外网不可使用&#xff0c;避免配置信息泄漏&#xff0c;这里为了方…...

分布式算法 - 一致性Hash算法

一致性Hash算法是个经典算法&#xff0c;Hash环的引入是为解决单调性(Monotonicity) 的问题&#xff1b;虚拟节点的引入是为了解决 平衡性(Balance) 问题。一致性Hash算法引入在分布式集群中&#xff0c;对机器的添加删除&#xff0c;或者机器故障后自动脱离集群这些操作是分布…...

OAuth2.0入门

什么是OAuth2.0 OAuth&#xff08;Open Authorization&#xff09;是一个关于授权&#xff08;authorization&#xff09;的开放网络标准&#xff0c;允许用户授权第三方应用访问他们存储在另外的服务提供者上的信息&#xff0c;而不需要将用户名和密码提供给第三方移动应用或…...

【HTTP——了解HTTP协议及状态码】

一&#xff0c; 什么是通信通信&#xff0c;就是信息的传递和交换。通信三要素&#xff1a;通信的主体&#xff0c;通信的内容&#xff0c;通信的方式现实生活中的通信&#xff1a;我打电话叫小明来我家吃饭【其中通信的主体是&#xff0c;我&#xff0c;小明。通信内容是&…...

骨传导耳机靠谱吗,骨传导耳机的原理是什么

很多人刚开始接触骨传导耳机时都会具有一个疑问&#xff0c;骨传导耳机是不是真的靠谱&#xff0c;是不是真的不伤害听力&#xff1f;骨传导耳机传输声音的原理是什么&#xff1f; 下面就给大家讲解一下骨传导耳机传输声音的原理以及骨传导耳机对听力到底有没有伤害。 骨传导…...

对个人博客系统进行web自动化测试(包含测试代码和测试的详细过程)

目录 一、总述 二、登录页面测试 一些准备工作 验证页面显示是否正确 验证正常登录的情况 该过程中出现的问题 验证登录失败的情况 关于登录界面的总代码 测试视频 三、注册界面的自动化测试 测试代码 过程中出现的bug 测试视频 四、博客列表页测试&#xff08;…...

[ 2204听力 ] 五

[ 第五次课 对话1 ] Narrator Listen to a conversation between a student and her Ecology professor (woman) Hi, professor, did you want to talk about my paper? I didn’t get a grade. (man) Ah, yes, I think you might have done the wrong assignment. assign…...

嵌入式常问问题和知识

12、并发和并行的区别&#xff1f; 最本质的区别就是&#xff1a;并发是轮流处理多个任务&#xff0c;并行是同时处理多个任务。 你吃饭吃到一半&#xff0c;电话来了&#xff0c;你一直到吃完了以后才去接&#xff0c;这就说明你不支持并发也不支持并行。 你吃饭吃到一半&…...

【数据结构】空间复杂度

&#x1f680;write in front&#x1f680; &#x1f4dc;所属专栏&#xff1a;初阶数据结构 &#x1f6f0;️博客主页&#xff1a;睿睿的博客主页 &#x1f6f0;️代码仓库&#xff1a;&#x1f389;VS2022_C语言仓库 &#x1f3a1;您的点赞、关注、收藏、评论&#xff0c;是对…...

湖南中创教育提醒校外培训留意这几点,避免维权

校外教育培训机构是市场经济发展的必然产物&#xff0c;有需求就有市场&#xff0c;这个无可厚非。而校外教育培训机构的火热&#xff0c;正是反映出人民群众对教育发展的需求在不断增强。 培训机构分类中有面对大学生参加公务员招考、教师考编等考证考试的培训机构&#xff1…...

docker 配置私有/本地镜像仓库

docker 配置私有/本地镜像仓库docker pull registry mkdir -p /usr/local/docker/registry-data docker tag registry 192.168.28.132:5000/registry docker run -di -p 5000:5000 --namelocal_registry --restartalways --privilegedtrue --log-drivernone -v /usr/local/d…...

每日学术速递2.23

Subjects: Robotics 1.On discrete symmetries of robotics systems: A group-theoretic and data-driven analysis ​ 标题&#xff1a;关于机器人系统的离散对称性&#xff1a;群论和数据驱动分析 作者&#xff1a;Daniel Ordonez-Apraez, Mario Martin, Antonio Agudo, F…...

LeetCode 232. 用栈实现队列

LeetCode 232. 用栈实现队列 难度&#xff1a;easy\color{Green}{easy}easy 题目描述 请你仅使用两个栈实现先入先出队列。队列应当支持一般队列支持的所有操作&#xff08;pushpushpush、poppoppop、peekpeekpeek、emptyemptyempty&#xff09;&#xff1a; 实现 MyQueueM…...

AI算法创新赛-人车目标检测竞赛总结04

队伍&#xff1a;AI000038 小组成员&#xff1a;杨志强&#xff0c;林松 1. 算法介绍 1.1 相关工作 当前流行的目标检测算法主要分为三种&#xff0c;一阶段算法&#xff1a;SSD&#xff0c;FCOS&#xff0c;Scaled&#xff0c;YOLO系列等&#xff1b;二阶段算法&#xff1a…...

【C语言进阶】动态内存管理详解与常见动态内存错误以及柔性数组使用与介绍

​ ​&#x1f4dd;个人主页&#xff1a;Sherry的成长之路 &#x1f3e0;学习社区&#xff1a;Sherry的成长之路&#xff08;个人社区&#xff09; &#x1f4d6;专栏链接&#xff1a;C语言进阶 &#x1f3af;长路漫漫浩浩&#xff0c;万事皆有期待 文章目录1.动态内存1.1 概述…...

【C++】string的模拟实现

文章目录1. string的模拟实现1.构造函数使用new开辟空间优化成全缺省的构造函数2. C_str3. operator[]4.拷贝构造浅拷贝深拷贝5. 赋值三种情况6. 迭代器7.比较(ASCII值)大小8. reserve(扩容)9. push_back(尾插字符)10. append(尾插字符串)11. (字符/字符串)12. insert在pos位置…...

前端借助Canvas实现压缩base64图片两种方法

一、具体代码 1、利用canvas压缩图片方法一 // 第一种压缩图片方法&#xff08;图片base64,图片类型,压缩比例,回调函数&#xff09;// 图片类型是指 image/png、image/jpeg、image/webp(仅Chrome支持)// 该方法对以上三种图片类型都适用 压缩结果的图片base64与原类型相同// …...

用ChatGPT生成Excel公式,太方便了

ChatGPT 自去年 11 月 30 日 OpenAI 重磅推出以来&#xff0c;这款 AI 聊天机器人迅速成为 AI 界的「当红炸子鸡」。一经发布&#xff0c;不少网友更是痴迷到通宵熬夜和它对话聊天&#xff0c;就为了探究 ChatGPT 的应用天花板在哪里&#xff0c;经过试探不少人发现&#xff0c…...

【Kubernetes 企业项目实战】09、Rancher 2.6 管理 k8s-v1.23 及以上版本高可用集群

目录 一、Rancher 介绍 1.1Rancher简介 1.2 Rancher 和 k8s 的区别 1.3 Rancher 企业使用案例 二、安装 Rancher 2.1 初始化环境 2.2 安装 Rancher 2.3 登录 Rancher 平台 三、通过 Rancher 管理已存在的 k8s 集群 3.1 配置 rancher 3.2 导入 k8s ​四、通过 Ranc…...

在Excel中按条件筛选数据并存入新的表

案例 老板想要看去年每月领料数量大于1000的数据。手动筛选并复制粘贴出来,需要重复操作12次,实在太麻烦了,还是让Python来做吧。磨刀不误砍柴工,先整理一下思路: 1读取原表,将数量大于1000的数据所对应的行整行提取(如同在excel表中按数字筛选大于1000的) 2将提取的数…...

【面试题】MySQL索引相关知识点

1.什么是索引 索引是存储引擎快速查找记录的一种数据结构&#xff0c;就类似书的目录&#xff0c;通过目录可以快速的查找到想要查找的内容 2.索引的特点 特点&#xff1a;索引是基于数据引擎的&#xff0c;不同的数据引擎实现索引的方式不一定相同 好处&#xff1a;通过索引…...

MySQL索引类型及原理?一文读懂

一、什么是MySQL索引&#xff1f; MySQL索引是一种数据结构&#xff0c;用于提高数据库查询的性能。它类似于一本书的目录&#xff0c;通过在表中存储指向数据行的引用&#xff0c;使得查询数据的速度更快。 在MySQL中&#xff0c;索引通常是在表上定义的&#xff0c;它们可以…...

【C语言】字符分类函数+内存函数

目录 1.字符函数 1.1字符分类函数 1.2.字符转换函数 //统一字符串中的大小写 2.内存处理函数 2.1内存拷贝函数memcpy //模拟实现memcpy 2.2内存移动函数memmove //模拟实现memmove 2.3内存比较函数memcmp 2.4内存设置函数memset 1.字符函数 1.1字符分类函数 头文…...

高通平台开发系列讲解(SIM卡篇)SIM卡基础概念

文章目录 一、SIM卡基本定义二、卡的类型三、SIM卡的作用三、SIM卡基本硬件结构四、SIM卡的内部物理单元五、卡文件系统沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇文章将介绍SIM的相关组件。 一、SIM卡基本定义 SIM卡是一种智能卡(ICC Card/UICC Card) SIM…...