当前位置: 首页 > news >正文

RocketMQ源码(24)—DefaultMQPushConsumer延迟消息源码

基于RocketMQ release-4.9.3,深入的介绍了DefaultMQPushConsumer延迟消息源码。

文章目录

  • 1 load加载延迟消息数据
    • 1.1 parseDelayLevel解析延迟等级
  • 2 start启动调度消息服务
  • 3 DeliverDelayedMessageTimerTask投递延迟消息任务
    • 3.1 executeOnTimeup执行延迟消息投递
    • 3.2 scheduleNextTimerTask下一个调度任务
    • 3.3 correctDeliverTimestamp校验投递时间
    • 3.4 messageTimeup恢复正常消息
    • 3.5 syncDeliver同步投递消息
  • 4 延迟消息的总结

并发消息消费失败引发消费重试时,默认情况下重试16次,从延迟等级level3(10s)开始,每次延迟时间递增,时间到了又会发送到重试topic去消费,这其中就涉及到RocketMQ的延迟消息,可以说RocketMQ并发消息消费失败引发消费重试就是基于topic替换和延迟消息这两个技术实现的。

此前我们学习了RocketMQ的消费重试,我们知道在判断消息为延迟消息的时候,即DelayTimeLevel大于0,那么替换topic为SCHEDULE_TOPIC_XXXX,替换queueId为延迟队列id, id = level - 1,如果延迟级别大于最大级别,则设置为最大级别18,,默认延迟2h。这些参数可以在broker端配置类MessageStoreConfig中配置。最后保存真实topic到消息的REAL_TOPIC属性,保存queueId到消息的REAL_QID属性,方便后面恢复。

实际上普通的延迟消息也会进行topic替换,那么,发送到SCHEDULE_TOPIC_XXXX对应的消息队列里面的延迟消息,到底是做到能够在给定的延迟时间之后取出来重新投递的呢?下面我们来看看RocketMQ延迟消息的源码。

实际上RocketMQ通过ScheduleMessageService调度消息服务实现延迟(定时)消息。ScheduleMessageService继承了ConfigManager,在DefaultMessageStore实例化的时候被实例化。

1 load加载延迟消息数据

在broker启动执行DefaultMessageStore#load方法加载Commit Log、Consume Queue、index file等文件,将数据加载到内存中,并完成数据的恢复的时候,同样会执行ScheduleMessageService#load方法,加载延迟消息数据,初始化delayLevelTable和offsetTable。

首先调用父类的ConfigManager#load方法(在broker启动部分就讲过源码了),将延迟消息文件${user.home}/store/config/delayOffset.json加载到内存的offsetTable集合中,delayOffset.json中保存着延迟topic每个队列的消费进度(消费偏移量)。

/*** ScheduleMessageService的方法* <p>* 加载延迟消息数据,初始化delayLevelTable和offsetTable*/
@Override
public boolean load() {//调用父类ConfigManager#load方法,将延迟消息文件${user.home}/store/config/delayOffset.json加载到内存的offsetTable集合中//delayOffset.json中保存着延迟topic每个队列的消费进度(消费偏移量)boolean result = super.load();//解析延迟级别到delayLevelTable集合中result = result && this.parseDelayLevel();//矫正每个延迟队列的偏移量result = result && this.correctDelayOffset();return result;
}
/*** ScheduleMessageService的方法* <p>* 获取延迟消息文件路径${user.home}/store/config/delayOffset.json*/
@Override
public String configFilePath() {//${user.home}/store/config/delayOffset.jsonreturn StorePathConfigHelper.getDelayOffsetStorePath(this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
}/*** ScheduleMessageService的方法* <p>* json字符串转换为offsetTable对象*/
@Override
public void decode(String jsonString) {if (jsonString != null) {DelayOffsetSerializeWrapper delayOffsetSerializeWrapper =DelayOffsetSerializeWrapper.fromJson(jsonString, DelayOffsetSerializeWrapper.class);if (delayOffsetSerializeWrapper != null) {this.offsetTable.putAll(delayOffsetSerializeWrapper.getOffsetTable());}}
}

延迟消息文件${user.home}/store/config/delayOffset.json的内容,它保存着延迟队列对应的消费偏移量。

{"offsetTable":{3:2,4:1}
}

1.1 parseDelayLevel解析延迟等级

延迟等级字符串存储在MessageStoreConfig的messageDelayLevel属性中,默认值为"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h",即18个等级,因此也是可以配置的,但是单位仅支持s、m、h、d,分别表示秒、分、时、天。

该方法解析延迟等级以及对应的延迟时间到delayLevelTable中,单位统一转换为毫秒,注意延迟等级从1开始。

/*** ScheduleMessageService的方法* 解析延迟等级到delayLevelTable中* @return*/
public boolean parseDelayLevel() {//时间单位表HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();timeUnitTable.put("s", 1000L);timeUnitTable.put("m", 1000L * 60);timeUnitTable.put("h", 1000L * 60 * 60);timeUnitTable.put("d", 1000L * 60 * 60 * 24);//从MessageStoreConfig中获取延迟等级字符串messageDelayLevelString levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();try {//通过空格拆分String[] levelArray = levelString.split(" ");for (int i = 0; i < levelArray.length; i++) {//获取每个等级的延迟时间String value = levelArray[i];//获取延迟单位String ch = value.substring(value.length() - 1);//获取对应的延迟单位的时间毫秒Long tu = timeUnitTable.get(ch);//延迟等级,从1开始int level = i + 1;//如果当前等级已经大于最大等级,则赋值为最大等级if (level > this.maxDelayLevel) {this.maxDelayLevel = level;}//延迟时间long num = Long.parseLong(value.substring(0, value.length() - 1));//计算该等级的延迟时间毫秒long delayTimeMillis = tu * num;//存入delayLevelTable中this.delayLevelTable.put(level, delayTimeMillis);if (this.enableAsyncDeliver) {this.deliverPendingTable.put(level, new LinkedBlockingQueue<>());}}} catch (Exception e) {log.error("parseDelayLevel exception", e);log.info("levelString String = {}", levelString);return false;}return true;
}

2 start启动调度消息服务

ScheduleMessageService依靠内部的定时任务实现延迟消息,ScheduleMessageService通过start方法完成启动。

在broker的启动过程中,会执行DefaultMessageStore的start方法中,该方法内部通过handleScheduleMessageService方法执行ScheduleMessageService的start方法。

该方法的大概逻辑为:

  1. 初始化延迟消息投递线程池deliverExecutorService,该线程池是一个调度任务线程池ScheduledThreadPoolExecutor,核心线程数就是最大的延迟等级,默认18。
  2. 遍历所有的延迟等级,为每一个延迟等级构建一个对应的DeliverDelayedMessageTimerTask调度任务放到deliverExecutorService中,默认延迟1000ms后执行。
  3. 构建一个延迟队列消费偏移量持久化的定时调度任务,首次延迟1000ms之后执行,后续每次执行间隔flushDelayOffsetInterval时间,默认10s。
/*** ScheduleMessageService的方法* <p>* 启动调度消息服务*/
public void start() {//将启动标志CAS的从false改为true,该服务只能启动一次if (started.compareAndSet(false, true)) {//调用父类的load方法,将延迟消息文件${user.home}/store/config/delayOffset.json加载到内存的offsetTable集合中//fix(dledger): reload the delay offset when master changed (#2518)super.load();/** 1 初始化延迟消息投递线程池,核心线程数就是最大的延迟等级,默认18*/this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));//异步投递,默认不支持if (this.enableAsyncDeliver) {this.handleExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));}/** 2 对所有的延迟等级构建一个对应的DeliverDelayedMessageTimerTask调度任务,默认延迟1000ms后执行*/for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {//延迟等级Integer level = entry.getKey();//延迟时间,毫秒Long timeDelay = entry.getValue();//根据延迟等级获取对应的延迟队列的消费偏移量,如果没有则设置为0Long offset = this.offsetTable.get(level);if (null == offset) {offset = 0L;}//延迟时间不为null,那么为该等级的延迟队列构建一个DeliverDelayedMessageTimerTask调度任务,默认延迟1000ms后执行if (timeDelay != null) {if (this.enableAsyncDeliver) {this.handleExecutorService.schedule(new HandlePutResultTask(level), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);}//DeliverDelayedMessageTimerTask构造参数包括对应的延迟等级,以及最新消费偏移量this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);}}/** 3 构建一个延迟队列消费偏移量持久化的定时调度任务,首次延迟1000ms之后执行,后续每次执行间隔flushDelayOffsetInterval时间,默认10s*/this.deliverExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {if (started.get()) {ScheduleMessageService.this.persist();}} catch (Throwable e) {log.error("scheduleAtFixedRate flush exception", e);}}}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);}
}

3 DeliverDelayedMessageTimerTask投递延迟消息任务

在start方法中,ScheduleMessageService会为每一个延迟等级创建一个DeliverDelayedMessageTimerTask投递延迟消息任务,不同延迟等级的消息放到不同的延迟队列里面,被不同的Task处理。

采用不同的队列处理同一个延迟等级的消息的方式,不再需要进行消息排序,避免了消息排序的复杂逻辑,能比较简单的实现有限等级的延迟消息,RocketMQ的开源版本不支持任意时间的延迟消息,这也是它的一个限制吧!

DeliverDelayedMessageTimerTask是一个线程任务,下面来看看它的run方法,主要是调用executeOnTimeup执行消息投递。

@Override
public void run() {try {//如果服务已启动,那么继续执行if (isStarted()) {//执行消息投递this.executeOnTimeup();}} catch (Exception e) {// XXX: warn and notify melog.error("ScheduleMessageService, executeOnTimeup exception", e);//抛出异常,新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,10000ms后执行,本次任务结束this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_PERIOD);}
}

3.1 executeOnTimeup执行延迟消息投递

延迟消息的核心逻辑实现,执行延迟消息消息投递。

  1. 调用findConsumeQueue方法,根据topic和延迟队列id从consumeQueueTable查找需要写入的ConsumeQueue,如果没找到就新建,即ConsumeQueue文件是延迟创建的。该方法的源码我们在ReputMessageService异步构建ConsumeQueue和IndexFile部分已经讲过了。
  2. 如果没找到对应的消息队列,调用scheduleNextTimerTask方法,新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,100ms后执行,本次任务结束。
  3. 调用getIndexBuffer方法,根据逻辑offset定位到物理偏移量,然后截取该偏移量之后的一段Buffer,其包含要拉取的消息的索引数据及对应consumeQueue文件之后的全部索引数据,即这里截取的Buffer可能包含多条索引数据。该方法的源码我们在broker处理拉取消息请求部分已经讲过了。
  4. 遍历缓存buffer中的消息,根据tagsCode投递时间判断消息是否到期,如果到期则回复真实消息并且投递安到真实topic以及对应的queueId中。
    1. 获取该条目对应的消息的tagsCode,对于延迟消息,tagsCode被替换为延迟消息的发送时间(在CommitLog#checkMessageAndReturnSize方法中,源码此前讲过了)。
    2. 如果投递时间小于当前时间,那么可以投递该延迟消息。如果投递时间大于当前时间,那么新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,指定新的消费偏移量,100ms后执行,本次任务结束。
    3. 根据消息物理偏移量从commitLog中找到该条消息,调用messageTimeup方法构建内部消息对象,设置topic为REAL_TOPIC属性值,即原始topic,设置queueId为REAL_QID属性值,即原始queueId。即恢复为正常消息。
    4. 最后调用syncDeliver方法投递该消息,消息将会被投递到原始topic和队列中,这样就可以被消费了。
  5. 遍历结束,更新下一个offset,新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,指定新的消费偏移量,100ms后执行,本次任务结束。保证线程任务的活性。
/*** DeliverDelayedMessageTimerTask的方法* <p>* 执行延迟消息消息投递*/
public void executeOnTimeup() {/** 1 根据topic和延迟队列id从consumeQueueTable查找需要写入的ConsumeQueue,如果没找到就新建,即ConsumeQueue文件是延迟创建的。* 该方法的源码我们在ReputMessageService异步构建ConsumeQueue和IndexFile部分已经讲过了*/ConsumeQueue cq =ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));/** 2 如果没找到对应的消息队列,新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,100ms后执行,本次任务结束*/if (cq == null) {this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);return;}/** 3 根据逻辑offset定位到物理偏移量,然后截取该偏移量之后的一段Buffer,其包含要拉取的消息的索引数据及对应consumeQueue文件之后的全部索引数据。* 这里截取的Buffer可能包含多条索引数据,因为需要批量拉取多条消息,以及进行消息过滤。* 该方法的源码我们在broker处理拉取消息请求部分已经讲过了*/SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);//没获取到缓存bufferif (bufferCQ == null) {long resetOffset;//如果当前消息队列的最小偏移量 大于 当前偏移量,那么当前偏移量无效,设置新的offset为最小偏移量if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) {log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",this.offset, resetOffset, cq.getQueueId());}//如果当前消息队列的最大偏移量 小于 当前偏移量,那么当前偏移量无效,设置新的offset为最大偏移量else if ((resetOffset = cq.getMaxOffsetInQueue()) < this.offset) {log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",this.offset, resetOffset, cq.getQueueId());} else {resetOffset = this.offset;}//新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,指定新的消费偏移量,100ms后执行,本次任务结束this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);return;}/** 3 遍历缓存buffer中的消息,根据tagsCode投递时间判断消息是否到期,如果到期则回复真实消息并且投递安到真实topic以及对应的queueId中*///下一个消费的offsetlong nextOffset = this.offset;try {//i表示consumeQueue消息索引大小int i = 0;ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();//遍历截取的Buffer中的consumeQueue消息索引,固定长度20bfor (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {//获取该条目对应的消息在commitlog文件中的物理偏移量long offsetPy = bufferCQ.getByteBuffer().getLong();//获取该条目对应的消息在commitlog文件中的总长度int sizePy = bufferCQ.getByteBuffer().getInt();//获取该条目对应的消息的tagsCode,对于延迟消息,tagsCode被替换为延迟消息的发送时间(CommitLog#checkMessageAndReturnSize方法中)long tagsCode = bufferCQ.getByteBuffer().getLong();//如果tagsCode是扩展文件地址if (cq.isExtAddr(tagsCode)) {if (cq.getExt(tagsCode, cqExtUnit)) {tagsCode = cqExtUnit.getTagsCode();} else {//can't find ext content.So re compute tags code.log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",tagsCode, offsetPy, sizePy);long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);}}//当前时间戳long now = System.currentTimeMillis();//校验投递时间,必须小于等于当前时间 + 延迟时间long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);//计算下一个offsetnextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);//如果投递时间大于当前时间,那么新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,指定新的消费偏移量,100ms后执行,本次任务结束long countdown = deliverTimestamp - now;if (countdown > 0) {this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);return;}//根据消息物理偏移量从commitLog中找到该条消息。MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);if (msgExt == null) {continue;}//构建内部消息对象,设置topic为REAL_TOPIC属性值,即原始topic,设置queueId为REAL_QID属性值,即原始queueId。即恢复为正常消息MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",msgInner.getTopic(), msgInner);continue;}boolean deliverSuc;/** 消息投递*/if (ScheduleMessageService.this.enableAsyncDeliver) {//异步投递,默认不支持deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);} else {//默认同步投递deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);}//如果投递失败,那么新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,指定新的消费偏移量,100ms后执行,本次任务结束if (!deliverSuc) {this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);return;}}//遍历结束,更新下一个offsetnextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);} catch (Exception e) {log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);} finally {//释放内存bufferCQ.release();}/** 4 新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,指定新的消费偏移量,100ms后执行,本次任务结束** 保证线程任务的活性*/this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}

3.2 scheduleNextTimerTask下一个调度任务

如果没找到对应的消息队列,或者没找到缓存buffer,或者没有过期的消息,获取投递失败等原因,将会新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,100ms后执行,本次任务结束。

/*** DeliverDelayedMessageTimerTask的方法* <p>* 新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,100ms后执行,本次任务结束** @param offset 消费偏移量* @param delay  延迟时间*/
public void scheduleNextTimerTask(long offset, long delay) {ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, offset), delay, TimeUnit.MILLISECONDS);
}

3.3 correctDeliverTimestamp校验投递时间

校验投递时间,要求投递时间最晚不大于保证投递时间小于等于当前时间 + 延迟时间。

/*** DeliverDelayedMessageTimerTask的方法* <p>* 校验投递时间*/
private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {//投递时间戳long result = deliverTimestamp;//当前时间 + 延迟时间long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);//保证投递时间小于等于当前时间 + 延迟时间if (deliverTimestamp > maxTimestamp) {result = now;}return result;
}

3.4 messageTimeup恢复正常消息

构建一个MessageExtBrokerInner,恢复为正常消息。设置topic的值为REAL_TOPIC属性值,这是原始topic,可能是重试topic或者真实topic。设置queueId的值为REAL_QID属性值,这是原始queueId,可能是重试queueId或者真实queueId。

/*** DeliverDelayedMessageTimerTask的方法* <p>* 还原原始消息,设置topic为REAL_TOPIC属性值,即原始topic,设置queueId为REAL_QID属性值,即原始queueId。即恢复为正常消息** @param msgExt 延迟消息* @return 真实消息*/
private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {//构建MessageExtBrokerInner对象MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setBody(msgExt.getBody());msgInner.setFlag(msgExt.getFlag());MessageAccessor.setProperties(msgInner, msgExt.getProperties());TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());//延迟消息的tagsCode为投递时间,现在来计算真正的tagsCodeValuelong tagsCodeValue =MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());msgInner.setTagsCode(tagsCodeValue);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));msgInner.setSysFlag(msgExt.getSysFlag());msgInner.setBornTimestamp(msgExt.getBornTimestamp());msgInner.setBornHost(msgExt.getBornHost());msgInner.setStoreHost(msgExt.getStoreHost());msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());//不需要等待存储完成后才返回msgInner.setWaitStoreMsgOK(false);MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);//设置topic的值为REAL_TOPIC属性值,这是原始topic,可能是重试topic或者真实topicmsgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));//设置queueId的值为REAL_QID属性值,这是原始queueId,可能是重试queueId或者真实queueIdString queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);int queueId = Integer.parseInt(queueIdStr);msgInner.setQueueId(queueId);return msgInner;
}

3.5 syncDeliver同步投递消息

syncDeliver内部调用asyncPutMessage方法同步投递消息,投递完毕之后,更新offsetTable中的对应延迟队列的消费偏移量。

这里有个bug,第三个参数应该使用当前偏移量,而不是最开始的偏移量,在4.9.4版本已经修复。

/*** DeliverDelayedMessageTimerTask的方法* 同步投递* 这里有个bug,第三个参数应该使用当前偏移量,而不是最开始的偏移量,在4.9.4版本已经修复** @param msgInner 内部消息对象* @param msgId    消息id* @param offset   当前消费偏移量* @param offsetPy 消息物理偏移量* @param sizePy   消息大小* @return*/
private boolean syncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,int sizePy) {//投递消息,内部调用asyncPutMessage方法PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, false);//投递结果PutMessageResult result = resultProcess.get();boolean sendStatus = result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK;if (sendStatus) {//如果发送成功,那么更新offsetTable中的消费偏移量ScheduleMessageService.this.updateOffset(this.delayLevel, resultProcess.getNextOffset());}return sendStatus;
}

4 延迟消息的总结

投递的消息在broker处理过成功,会判断如果是延迟消息,即DelayTimeLevel大于0,那么替换topic为SCHEDULE_TOPIC_XXXX,替换queueId为延迟队列id, id = level - 1,如果延迟级别大于最大级别,则设置为最大级别18,默认延迟2h。这些参数可以在broker端配置类MessageStoreConfig中配置。

最后保存真实topic到消息的REAL_TOPIC属性,保存queueId到消息的REAL_QID属性,方便后面恢复。

而在RocketMQ端,通过ScheduleMessageService调度消息服务处理投递到SCHEDULE_TOPIC_XXXX的延迟消息。

从源码中可以看到,每个延迟级别都有一个线程专门处理该级别的延迟消息,这样避免了消息的排序,具体的处理逻辑其被封装为一个DeliverDelayedMessageTimerTask线程任务。

不同延迟级别的处理线程将会从各自对应的延迟队列中获取延迟消息,然后和当前时间比较看消息是否过期,如果消息过期,那么构造一个新的消息,设置topic为REAL_TOPIC属性值,即原始topic,设置queueId为REAL_QID属性值,即原始queueId,即恢复为正常消息然后再次进行投递。

另外,延迟消息的consumeQueue条目中的tagsCode并不是tag的hashCOde,而是该条消息的到期时间。

RocketMQ在内部通过内部topic替换实现延迟消息,非常巧妙,而且并发消息重试也是使用了延迟消息。

相关文章:

RocketMQ源码(24)—DefaultMQPushConsumer延迟消息源码

基于RocketMQ release-4.9.3&#xff0c;深入的介绍了DefaultMQPushConsumer延迟消息源码。 文章目录1 load加载延迟消息数据1.1 parseDelayLevel解析延迟等级2 start启动调度消息服务3 DeliverDelayedMessageTimerTask投递延迟消息任务3.1 executeOnTimeup执行延迟消息投递3.2…...

计算机视觉知识点(一)——交并比(IoU)及其若干改进

交并比&#xff08;IoU&#xff09;前言IoU公式及示意图IoU Loss缺点GIoU Loss公式及示意图缺点DIoU公式及示意图CIoU前言 目标检测是一个常见的计算机视觉任务&#xff0c;在目标检测任务中&#xff0c;交并比作为评判检测框的标准具有很重要的意义&#xff0c;在实际的应用中…...

一篇文章教你从零到一搭建自动化测试框架(附视频教程+源码)

目录 前言 1. 什么是自动化测试框架&#xff1f; 2. 没有万能的测试框架&#xff0c;适合自己项目的&#xff0c;能提高工作效率的就是好框架。 3. 设计框架的思路&#xff1a; 4.如何开展自动化测试 前言 关于测试框架的好处&#xff0c;比如快速回归提高测试效率&#x…...

【备战蓝桥杯】----01背包问题(动态规划)

&#x1f339;作者:云小逸 &#x1f4dd;个人主页:云小逸的主页 &#x1f4dd;Github:云小逸的Github &#x1f91f;motto:要敢于一个人默默的面对自己&#xff0c;强大自己才是核心。不要等到什么都没有了&#xff0c;才下定决心去做。种一颗树&#xff0c;最好的时间是十年前…...

Golang1.18新特性介绍——泛型

社区长期高呼的泛型特性在Golang 1.18中终于正式发布&#xff0c;Go泛型实现与传统的C有较大差异&#xff0c;更像Rust的泛型实现。本文详细介绍Golang泛型及其特性&#xff0c;包括泛型语法、类型参数、类型约束、类型近似以及constraints包提供内置类型等。 最近写Dao代码&am…...

【SpringBoot17】SpringBoot中使用Quartz管理定时任务

定时任务在系统中用到的地方很多&#xff0c;例如每晚凌晨的数据备份&#xff0c;每小时获取第三方平台的 Token 信息等等&#xff0c;之前我们都是在项目中规定这个定时任务什么时候启动&#xff0c;到时间了便会自己启动&#xff0c;那么我们想要停止这个定时任务的时候&…...

杨辉三角形 (蓝桥杯) JAVA

目录题目描述&#xff1a;暴力破解&#xff08;四成&#xff09;&#xff1a;二分法破解&#xff08;满分&#xff09;&#xff1a;题目描述&#xff1a; 下面的图形是著名的杨辉三角形&#xff1a; 如果我们按从上到下、从左到右的顺序把所有数排成一列&#xff0c;可以得到如…...

AI制药 - AlphaFold Multimer 的 MSA Pairing 源码

目前最新版本是v2.3.1&#xff0c;2023.1.12 AlphaFold multimer v1 于 2021 年 7 月发布&#xff0c;同时发表了一篇描述其方法和结果的论文。AlphaFold multimer v1 使用了与 AlphaFold 单体相同的模型结构和训练方法&#xff0c;但增加了一些特征和损失函数来处理多条链。Al…...

TitanIDE:云原生开发到底强在哪里?

原文作者&#xff1a;行云创新技术总监 邓冰寒 引言 是一种新的软件开发方法&#xff0c;旨在构建更可靠、高效、弹性、安全和可扩展的应用程序。与传统的应用程序开发方式不同&#xff0c;云原生是将开发环境完全搬到云端&#xff0c;构建一站式的云原生开发环境。云原生的开…...

单片机常用完整性校验算法

一、前言 单片机在开发过程中经常会遇到大文件传输&#xff0c;或者大量数据传输&#xff0c;在一些工业环境下&#xff0c;数据传输并不是很稳定&#xff0c;如何检验数据的完整性就是个问题&#xff0c;这里简单介绍一下单片机常用的几种数据完整性校验方法。 二、CheckSum校…...

Anaconda 的安装配置及依赖项的内外网配置

在分享anaconda 的安装配置及使用前&#xff0c;我们必须先明白anaconda是什么&#xff1b;Anaconda是一个开源的Python发行版本。两者区别在于前者是一门编程语言&#xff0c;后者相当于编程语言中的工具包。 由于python自身缺少numpy、matplotlib、scipy、scikit-learn等一系…...

p84 CTF夺旗-PHP弱类型异或取反序列化RCE

数据来源 文章参考 本课重点&#xff1a; 案例1&#xff1a;PHP-相关总结知识点-后期复现案例2&#xff1a;PHP-弱类型对比绕过测试-常考点案例3&#xff1a;PHP-正则preg_match绕过-常考点案例4&#xff1a;PHP-命令执行RCE变异绕过-常考点案例5&#xff1a;PHP-反序列化考题…...

2022财报逆转,有赞穿透迷雾实现突破

2022年&#xff0c;商家经营面临困难。但在一些第三方服务商的帮助下&#xff0c;也有商家取得了逆势增长。 2023年3月23日&#xff0c;有赞发布2022年业绩报告&#xff0c;它帮助许多商家稳住了一整年的经营。2022年&#xff0c;有赞门店SaaS业务的GMV达到425亿元&#xff0c…...

蓝桥杯 - 求组合数【C(a,b)】+ 卡特兰数

文章目录&#x1f4ac;前言885. 求组合数 I C(m,n) 【dp】886 求组合数 II 【数据大小10万级别】 【费马小定理快速幂逆元】887. 求组合数 III 【le18级别】 【卢卡斯定理 逆元 快速幂 】888.求组合数 IV 【没有%p -- 高精度算出准确结果】 【分解质因数 高精度乘法 --只用一…...

膳食真菌在癌症免疫治疗中的作用: 从肠道微生物群的角度

谷禾健康 癌症是一种恶性肿瘤&#xff0c;它可以发生在人体的任何部位&#xff0c;包括肺、乳房、结肠、胃、肝、宫颈等。根据世界卫生组织的数据&#xff0c;全球每年有超过1800万人被诊断出患有癌症&#xff0c;其中约有1000万人死于癌症。癌症已成为全球范围内的主要健康问题…...

怎么将模糊的照片变清晰

怎么将模糊的照片变清晰?珍贵的照片每个人都会有&#xff0c;而遇到珍贵的照片变模糊了&#xff0c;相信会让人很苦恼的。那么有没有办法可以解决呢?答案是有的&#xff0c;我们可以用工具让模糊的照片变得清晰。下面就来分享一些让模糊的照片变清晰的方法&#xff0c;有兴趣…...

【软件测试】基础知识第一篇

文章目录一. 什么是软件测试二. 测试和调试的区别三. 什么是测试用例四. 软件的生命周期五. 软件测试的生命周期一. 什么是软件测试 软件测试就是验证软件产品特性是否满足用户的需求。 那需求又是什么呢&#xff1f;在多数软件公司&#xff0c;会有两种需求&#xff0c;一种…...

【百面成神】java web基础7问,你能坚持到第几问

前 言 &#x1f349; 作者简介&#xff1a;半旧518&#xff0c;长跑型选手&#xff0c;立志坚持写10年博客&#xff0c;专注于java后端 ☕专栏简介&#xff1a;纯手打总结面试题&#xff0c;自用备用 &#x1f330; 文章简介&#xff1a;java web最基础、重要的8道面试题 文章目…...

Centos7安装、各种环境配置和常见bug解决方案,保姆级教程(更新中)

文章目录前言一、Centos7安装二、各种环境配置与安装2.1 安装net-tools&#xff08;建议&#xff09;2.2 配置静态网络&#xff08;建议&#xff09;2.1 修改Centos7的时间&#xff08;建议&#xff09;2.2 Centos7系统编码问题2.3 vim安装&#xff08;建议&#xff09;2.4 解决…...

【C++进阶】智能指针

文章目录为什么需要智能指针&#xff1f;内存泄漏什么是内存泄漏&#xff0c;内存泄漏的危害内存泄漏分类&#xff08;了解&#xff09;如何避免内存泄漏智能指针的使用及原理smart_ptrauto_ptrunique_ptrshared_ptr线程安全的解决循环引用weak_ptr删除器为什么需要智能指针&am…...

软件测试面试题 —— 整理与解析(3)

&#x1f60f;作者简介&#xff1a;博主是一位测试管理者&#xff0c;同时也是一名对外企业兼职讲师。 &#x1f4e1;主页地址&#xff1a;&#x1f30e;【Austin_zhai】&#x1f30f; &#x1f646;目的与景愿&#xff1a;旨在于能帮助更多的测试行业人员提升软硬技能&#xf…...

springboot常用的20个注解

Spring Boot方式的项目开发已经逐步成为Java应用开发领域的主流框架&#xff0c;它不仅可以方便地创建生产级的Spring应用程序&#xff0c;还能轻松地通过一些注解配置与目前比较火热的微服务框架SpringCloud集成&#xff0c; 而Spring Boot 之所以能够轻松地实现应的创建及与…...

USB组合设备——带鼠标功能的键盘

文章目录带鼠标功能的键盘一个接口实现报告描述符示例多个接口实现复合设备和组合设备配置描述符集合的实现报告的返回附 STM32 枚举日志复合设备&#xff1a;Compound Device 内嵌 Hub 和多个 Function&#xff0c;每个 Function 都相当于一个独立的 USB 外设&#xff0c;有自…...

数据结构与算法基础-学习-18-哈夫曼编码

一、个人理解在远程通讯中&#xff0c;需要把字符转成二进制的字符串进行传输&#xff0c;例如我们需要传输ABCD&#xff0c;我们可以用定长的字符串进行表示&#xff0c;例如:A:00B:01C:02D:03这样可能就造成空间的浪费&#xff0c;我们多存储了一个0号位。那用变长呢&#xf…...

ZMC408CE | 实现“8通道独立PSO”应用场景

一、ZMC408SCAN产品亮点 1.高性能处理器&#xff0c;提升运算速度、响应时间和扫描周期等&#xff1b; 2.一维/二维/三维、多通道视觉飞拍&#xff0c;高速高精&#xff1b; 3.位置同步输出PSO&#xff0c;连续轨迹加工中对精密点胶胶量控制和激光能量控制等&#xff1b; 4…...

QuickJS中JS_SetClassProto方法把JavaScript对象指定为某个类的原型对象

在 QuickJS 中&#xff0c;JS_SetClassProto 方法用于设置一个类的原型对象。这个方法的作用是将一个 JavaScript 对象指定为该类的原型对象&#xff0c;从而定义该类的属性和方法。 具体来说&#xff0c;JS_SetClassProto 方法的第一个参数是指向 QuickJS 引擎执行上下文的指…...

泰克信号发生器特点

泰克信号发生器是一种用于产生各种类型的电子信号的仪器&#xff0c;可以广泛应用于电子、通信、自动化、医疗等领域。泰克信号发生器具有以下特点&#xff1a;多种信号类型&#xff1a;泰克信号发生器可以产生多种类型的电子信号&#xff0c;包括正弦波、方波、三角波、脉冲等…...

贯穿设计模式第四话--里氏替换原则

&#x1f973;&#x1f973;&#x1f973; 茫茫人海千千万万&#xff0c;感谢这一刻你看到了我的文章&#xff0c;感谢观赏&#xff0c;大家好呀&#xff0c;我是最爱吃鱼罐头&#xff0c;大家可以叫鱼罐头呦~&#x1f973;&#x1f973;&#x1f973; 从今天开始&#xff0c;将…...

6501: 鸡兔同笼

描述 一个笼子里面关了鸡和免子(鸡有两只脚,兔子有4只脚,没有例外)。已经知道了笼子里面脚的总数a,问笼子里面至少有多少只动物&#xff0c;至多有多少只动物。 输入 一个正整数a(a<32768)。 输出 包含两个正整数&#xff0c;第一个是最少的动物数&#xff0c;第二个是最多的…...

Linux项目自动化构建工具-make/makefile 介绍及使用

使用背景 在工程中的源文件不计数&#xff0c;其按类型、功能、模块分别放在若干个目录中&#xff0c;makefile定义一系列 规则来指定什么文件需要先编译&#xff0c;什么文件需要后编译&#xff0c;哪些文件需要重新编译&#xff0c;或者更复杂 的功能操作 makefile带来的好处…...

vuejs 可做网站吗/seo流量工具

在Java中&#xff0c;如果方法重写只是一种名字空间的编写&#xff0c;那么它最多是让人感到有趣&#xff0c;但没有实际价值&#xff0c;但情况并非如此。方法重写构造成了Java最大的一个概念基础&#xff1a;动态方法调度&#xff08;dynamic method dispatch&#xff09;。动…...

国外免费服务器地址/黑帽seo是什么意思

1) SE最重要的工作是通过技术交流实现用户对公司的认同。一个销售员的第一步应该是推 销自己的公司&#xff0c;其次推销公司的产品&#xff0c;最后是推销个人魅力。但对于SE来说&#xff0c;SE首先要推 销个人魅力&#xff0c;认同用户并让用户认同自己提供的技术产品&#x…...

怎么用二维动画做网站首页步骤/爱站网怎么用

在MySQL服务器出现短暂(5~30秒)的性能波动的时候&#xff0c;一般的性能监控工具都很难抓住故障现场&#xff0c;也就很难收集对应较细粒度的诊断信息。另外&#xff0c;如果这种波动出现的频率很低&#xff0c;例如几天才一次&#xff0c;我们也很难人为的抓住现场&#xff0c…...

如何做建筑一体化的网站/电商运营主要工作内容

2019独角兽企业重金招聘Python工程师标准>>> [ 地址映射](图&#xff1a;左中) linux内核使用页式内存管理&#xff0c;应用程序给出的内存地址是虚拟地址&#xff0c;它需要经过若干级页表一级一级的变换&#xff0c;才变成真正的物理地址。 想一下&#xff0c;地址…...

有域名有服务器如何做网站/环球军事新闻最新消息

本文原创首发于公众号&#xff1a;ReactNative开发圈 Realm是一款专为移动​端开发的高性能数据库。支持React-Naitve&#xff0c;支持 iOS 和 Android。官网文档地址&#xff1a;https://realm.io/docs/javascr...。前提 React Native的版本要大于等于0.31.0安装 npm install …...

生活馆网站开发背景/百度答主中心入口

来源于问题&#xff1a;OpenStack ironic组件如何管理物理机&#xff1f;Ironic是OpenStack裸机管理服务&#xff08;baremetal as service&#xff09;&#xff0c;裸机即没有安装任何操作系统的物理服务器。虽然ironic支持standalone部署模式&#xff0c;但通常会协同OpenSta…...