RocketMQ实现延迟队列精确到秒级实现
前言篇:
为了节约成本,决定通过自研来改造rocketmq,添加任意时间延迟的延时队列,开源版本的rocketmq只有支持18个等级的延迟时间,
其实对于大部分的功能是够用了的,但是以前的项目,全部都是使用了阿里云的rocketmq,原因是不同的供应商的订单的延时时间是不同的
(部分供应商的订单未支付30分钟取消,有些1个半小时取消,各种时间都有),
所以使用了大量的延时队列,但是开源版本不支持任意时间延时(希望官方支持这个功能)
为了实现这个功能,网上查询了不少资料,查询到不少相关的文章,主要实现都是基于时间轮来实现的,
但是比较少开源的代码实现(也许大家都没有这个需求吧)
debug实践篇:
1. 撸起袖子加油干,首先,下载源代码 https://github.com/apache/rocketmq.git,导入ide
运行mvn package 生成jar包,如果成功的话,会生成到distribution目录下面

2. 查看文档,发现要运行namesvr 和 broker
找到 src\main\java\org\apache\rocketmq\namesrv\NamesrvStartup.java ,开心的执行main方法,
哦哦哦哦哦,果然报错了,提示 rocketmq.home.dir 目录不存在,查看源码, 原来是从system.propeties读取的,
为了调试,我毫不犹豫的加上了配置文件,

再次运行,不报错了,控制台显示,成功啦( 生活是多么美好,空气是多么清晰! )

3.运行 broker ,打开 src\main\java\org\apache\rocketmq\broker\BrokerStartup.java,执行main方法,
添加 配置文件 ( D:\\mq\\rocketmq-rocketmq-all-4.9.2是我本地的路径,你要修改成自己的 )
1 System.setProperty("rocketmq.home.dir", "D:\\mq\\rocketmq-rocketmq-all-4.9.2\\rb");2 System.setProperty("user.home", "D:\\mq\\rocketmq-rocketmq-all-4.9.2\\rb\\home\\");

运行一下,成功了,开心的发一条消息,试试,哦哦哦哦哦。 发不出去哦( 人生最痛苦的事情是,快要成功了,却没有成功 ) 。
原来还要配置namesvr地址,在启动命令,添加 -n localhost:9876 ( 上面的namesvr 启动的ip和端口)

4.漫长的改造之路 (我们是勇敢的斯巴达勇士,一直勇往直前)
用了阿里云的延时队列,发现它的message 可以传一个时间过来(任意的延时时间)
来来来,我们复制一下( 不要告诉别人,我们一直是复制,粘贴的,没有原创, 嘘 ...... )
1/** 2 * 该类预定义一些系统键. 3 */4 static public class SystemPropKey {5 public static final String TAG = "__TAG";6 public static final String KEY = "__KEY";7 public static final String MSGID = "__MSGID";8 public static final String SHARDINGKEY = "__SHARDINGKEY";9 public static final String RECONSUMETIMES = "__RECONSUMETIMES";10 public static final String BORNTIMESTAMP = "__BORNTIMESTAMP";11 public static final String BORNHOST = "__BORNHOST";12/**13 * 设置消息的定时投递时间(绝对时间). <p>例1: 延迟投递, 延迟3s投递, 设置为: System.currentTimeMillis() + 3000; <p>例2: 定时投递,14 * 2016-02-01 11:30:00投递, 设置为: new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-02-0115 * 11:30:00").getTime()16 */17 public static final String STARTDELIVERTIME = "__STARTDELIVERTIME";18 }
/** * <p> 设置消息的定时投递时间(绝对时间),最大延迟时间为7天. </p> <ol> <li>延迟投递: 延迟3s投递, 设置为: System.currentTimeMillis() + 3000;</li> * <li>定时投递: 2016-02-01 11:30:00投递, 设置为: new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-02-01 * 11:30:00").getTime()</li> </ol> */ public void setStartDeliverTime(final long value) { putSystemProperties(SystemPropKey.STARTDELIVERTIME, String.valueOf(value)); }
5.既然要改造rocketmq,在哪里改呢,debug,debug,debug(一直到天荒地老),功夫不负有心人,找到啦,
找到 \src\main\java\org\apache\rocketmq\broker\processor\SendMessageProcessor.java, 发现
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { SendMessageContext mqtraceContext;switch (request.getCode()) {case RequestCode.CONSUMER_SEND_MSG_BACK:return this.consumerSendMsgBack(ctx, request);default: SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return null; } mqtraceContext = buildMsgContext(ctx, requestHeader); this.executeSendMessageHookBefore(ctx, request, mqtraceContext); RemotingCommand response;if (requestHeader.isBatch()) { response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader); } else { response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); } this.executeSendMessageHookAfter(response, mqtraceContext);return response; } }
继续debug,发现 sendMessage 就是处理发送消息的,
如果我们在这里判断是否延时消息就写入文件,然后返回成功到客户端,等到了时间就发送延迟消息,不就搞定了吗?
oh,yes,就是这么干的
//处理延迟消息 delay message String startTime = msgInner.getProperty(Message.SystemPropKey.STARTDELIVERTIME); boolean isDelayMsg = false; long nextStartTime = 0;if (startTime != null && msgInner.getDelayTimeLevel() <= 0) { nextStartTime = Long.parseLong(startTime);if (nextStartTime >= System.currentTimeMillis()) { isDelayMsg = true; } }if (isDelayMsg) {return delayProcessor.handlePutMessageResultFuture(response, request, msgInner, ctx, queueIdInt, nextStartTime); } else {if (traFlag != null && Boolean.parseBoolean(traFlag)) {if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");return response; } putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner); } else { putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); }return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt); } }
其中 delayProcessor.handlePutMessageResultFuture 是我们用来处理延迟消息的地方
我们按照每个时间一个文件夹来保存延时消息,等延时消息到达后,定时的写入延时队列里面。
详细原理,请查考 rocketmq 原理实现篇 https://www.cnblogs.com/tomj2ee/p/15815186.html
<em> </em>
package org.apache.rocketmq.broker.delay;import io.netty.channel.ChannelHandlerContext;import org.apache.commons.lang3.time.DateFormatUtils;import org.apache.rocketmq.broker.BrokerController;import org.apache.rocketmq.common.protocol.ResponseCode;import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;import org.apache.rocketmq.logging.InternalLogger;import org.apache.rocketmq.logging.InternalLoggerFactory;import org.apache.rocketmq.remoting.protocol.RemotingCommand;import org.apache.rocketmq.store.MessageExtBrokerInner;import java.io.*;import java.net.InetSocketAddress;import java.net.SocketAddress;import java.util.Date;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.ThreadLocalRandom;public class DelayProcessor implements Runnable { protected static final InternalLogger log = InternalLoggerFactory.getLogger(DelayProcessor.class.getCanonicalName()); protected final BrokerController brokerController; protected final SocketAddress storeHost; private ExecutorService jobTaskExecute = Executors.newFixedThreadPool(16); public DelayProcessor(final BrokerController brokerController) { this.brokerController = brokerController; this.storeHost = new InetSocketAddress(brokerController.getBrokerConfig().getBrokerIP1(), brokerController .getNettyServerConfig().getListenPort()); Thread thread = new Thread(this); thread.setName("delayProcessor-run---thread"); thread.setDaemon(true);new File(getDelayPath()).mkdirs(); thread.start(); Thread missCallThread = new Thread(() -> { try {for(;;) { Thread.sleep(10 * 1000); sendMissCallMsg(); } } catch (InterruptedException e) { e.printStackTrace(); } }); missCallThread.setName("delayProcessor-callback-thread"); missCallThread.start(); System.out.println("init delay success " +getDelayPath()); } public RemotingCommand handlePutMessageResultFuture(RemotingCommand response, RemotingCommand request, MessageExtBrokerInner msgInner, ChannelHandlerContext ctx,int queueIdInt, long nextStartTime) {return handlePutMessageResult(response, request, msgInner, ctx, queueIdInt, nextStartTime); } private RemotingCommand handlePutMessageResult(RemotingCommand response, RemotingCommand request, MessageExtBrokerInner msg, ChannelHandlerContext ctx,int queueIdInt, long nextStartTime) { boolean svOk = saveMsgFile(nextStartTime, msg); SendMessageResponseHeader sendMessageResponseHeader = new SendMessageResponseHeader(); sendMessageResponseHeader.setQueueId(1); sendMessageResponseHeader.setMsgId("0"); sendMessageResponseHeader.setQueueOffset(0l); sendMessageResponseHeader.setTransactionId(""); RemotingCommand newCommand = RemotingCommand.createRequestCommand(ResponseCode.SUCCESS, sendMessageResponseHeader);if (svOk) { newCommand.setCode(ResponseCode.SUCCESS); } else { newCommand.setCode(ResponseCode.SYSTEM_ERROR); newCommand.setRemark("发送消息延迟失败!"); } newCommand.setExtFields(request.getExtFields()); newCommand.setVersion(response.getVersion()); newCommand.setOpaque(response.getOpaque()); newCommand.setLanguage(response.getLanguage()); newCommand.setBody(request.getBody());if (!request.isOnewayRPC()) { try { ctx.writeAndFlush(newCommand); } catch (Throwable e) { log.error("DelayProcessor process request over, but response failed", e); log.error(request.toString()); log.error(response.toString()); } }return newCommand; } public void putMessage(MessageExtBrokerInner msgInner) { this.brokerController.getMessageStore().putMessage(msgInner); } @Override public void run() {for (; ; ) { long curTime = System.currentTimeMillis() / 1000; jobTaskExecute.submit(() -> sendMsg(curTime)); try { Thread.sleep(1000); } catch (InterruptedException e) { } } } private String getDelayPath() { String delayPath = "./delay-store"+ File.separator + "delay";return delayPath; } private boolean saveMsgFile(long startTime, MessageExtBrokerInner msgInner) { ObjectOutputStream objectOutputStream = null; try { String msgId =(startTime/1000 )+"-"+ System.currentTimeMillis() + "-" + ThreadLocalRandom.current().nextInt(99999999); System.out.println( getCurrentTime()+"写入延迟消息 >>" + msgId); String parentDir = getDelayPath() + File.separator + startTime / 1000; File parentFile = new File(parentDir);if (!parentFile.exists()) { parentFile.mkdirs(); } String fileName = parentDir + File.separator + msgId; FileOutputStream fos = new FileOutputStream(fileName); BufferedOutputStream bos = new BufferedOutputStream(fos); objectOutputStream = new ObjectOutputStream(bos); objectOutputStream.writeObject(msgInner);returntrue; } catch (Exception ex) { log.error("saveMsgFile ex:", ex);returnfalse; } finally { try {if (objectOutputStream != null) { objectOutputStream.close(); } } catch (Exception ex) { log.error("saveMsgFile ex:", ex); } } } private MessageExtBrokerInner readFile(File f) { ObjectInputStream ois = null; try { ois = new ObjectInputStream(new FileInputStream(f));return (MessageExtBrokerInner) ois.readObject(); } catch (Exception ex) {return null; } finally {if (ois != null) { try { ois.close(); } catch (IOException e) { e.printStackTrace(); } } } } private void sendMissCallMsg() { File lst = new File(getDelayPath()); File[] files = lst.listFiles(); long startTime = System.currentTimeMillis() / 1000 - 10 * 1000;for (File f : files) { String name = f.getName();if (f.isDirectory() && !name.equals(".") && !name.equals("..")) { try { Long fileTime = Long.parseLong(name);if (fileTime <= startTime) { sendMsg(fileTime); } } catch (Exception ex) { } } } } private String getCurrentTime(){return Thread.currentThread().getName()+ ">>["+DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")+"] "; } private void sendMsg(long startTime) { File lst = new File(getDelayPath() + File.separator + startTime); File[] files = lst.listFiles();if (files != null) {for (File f : files) { System.out.println( getCurrentTime()+"时间到发送>> "+ startTime+" to commitLog " + f.getName()); MessageExtBrokerInner msgInner = readFile(f);if (msgInner != null) { putMessage(msgInner); System.out.println( getCurrentTime()+"写入log >> "+ startTime+" to commitLog " + f.getName()+" success"); f.delete(); } } lst.delete(); } }}
总结:rocketmq延迟队列实现主要是通过时间轮和文件来保存延时消息,等到了时间后,再写入延时队列,来达到延时的目的。
总共有4种方式来实现延时队列,可以参考延时队列的实现原理篇
https://www.cnblogs.com/tomj2ee/p/15815157.html
开源rocketmq延迟队列实现:
https://gitee.com/venus-suite/rocketmq-with-delivery-time.git
相关文章:

RocketMQ实现延迟队列精确到秒级实现
前言篇:为了节约成本,决定通过自研来改造rocketmq,添加任意时间延迟的延时队列,开源版本的rocketmq只有支持18个等级的延迟时间,其实对于大部分的功能是够用了的,但是以前的项目,全部都是使用了…...

线性数据结构:数组 Array
一、前言数组是数据结构还是数据类型?数组只是个名称,它可以描述一组操作,也可以命名这组操作。数组的数据操作,是通过 idx->val 的方式来处理。它不是具体要求内存上要存储着连续的数据才叫数组,而是说,…...

大数据开发-Hive
1、hive简介 hive是基于Hadoop的一个数据仓库工具,用于分析数据的。可以将结构化数据文件映射为一张数据库表,并提供类SQL查询功能 注:hive-SQL or HQL or类SQL 和标准SQL还是有一点点区别的 本质是SQL转换为MapReduce程序 用途࿱…...

《程序员新声》-Tech Lead 如何带领团队
收听本期播客 谢谢收听程序员新声,这是一款来自思特沃克(Thoughtworks)的播客节目。在这里,我们不仅讨论软件和技术领域的现状和未来,更关注程序员的成长世界。如何学习,如何晋升,如何带领团队…...

每日算法面试题
🧝♂️算法题 实现 pow(x, n) ,即计算 x 的 n 次幂函数(即,xn)。 示例 1:输入:x = 2.00000, n = 10 输出:1024.00000示例 2:输入:x = 2.10000, n = 3 输出:9.26100示例 3:输入:x...

高质量前端之自动化测试
前端自动化测试:Testing Library 篇 引言 前端测试 静态测试 eslint、TypeScript 单元测试 jest、mocha 集成测试 enzyme、react-testing-library、mock 爬虫 前后端解耦 为什么要引入自动化测试 测试可以让开发者站在用户的角度考虑问题,通过测试的手…...

2023不伤人脉的全新商城分销,一劳永逸的消费分红
2023不伤人脉的全新商城分销,一劳永逸的消费分红 2023-02-24 11:52梦龙 2023不伤人脉的全新商城分销,一劳永逸的消费分红 如今是流量为王的时代,但是如何将流量转化为忠实客户是个难题。不再是单向的买卖关系,而是从对产品的关注…...

【代码随想录训练营】【Day21】第六章|二叉树|530.二叉搜索树的最小绝对差|501.二叉搜索树中的众数|236. 二叉树的最近公共祖先
二叉搜索树的最小绝对差 题目详细:LeetCode.530 这道题使我第一次了解到二叉树的双指针遍历法,详细可以先查看卡哥的讲解视频:《代码随想录 — 二叉搜索树中的众数》 利用二叉搜索树的特点: 中序遍历二叉搜索树得到一个有序序…...

leaflet 导出图片,打印图片(A4横版或竖版)
第093个 点击查看专栏目录 本示例的目的是介绍如何在vue+leaflet中打印图片导出图片。一个简单的leaflet插件示例,添加了一个图标来打印或导出地图。 直接复制下面的 vue+openlayers源代码,操作2分钟即可运行实现效果. 文章目录 示例效果配置方式示例源代码(共85行)安装插…...

Java面向对象:继承特性的学习
本文介绍了面向对象的继承特性: 什么是继承 继承的概念 Java中继承的语法 在继承下父类成员的访问 super和this关键字 父类和子类构造方法 在继承下类中出现初始化代码的执行顺序 父类成员的访问权限对子类的可见性 Java的继承关系 final关键字 认识继承和组合关系 继承特性的学…...

问答系统(QA)调研
引言 智能问答系统广泛用于回答人们以自然语言形式提出的问题,经典应用场景包括:智能语音交互、在线客服、知识获取、情感类聊天等。根据QA任务,可以将QA大致分为5大类,分别为: 文本问答(text-based QA&am…...

商务租车的三大优势吸引企业以租代购
随着社会机经济的高速发展,租车模式的日益盛行,租车不仅仅是受个体户的青睐,而作为环保经济的出行方式也让越来越多的企业开始选择以租代买,据调查统计,最早开始商务租车的群体是外企。而近几年,国内的很多…...

蓝桥杯的比赛流程和必考点
蓝桥杯的比赛流程和必考点 距省赛仅1个多月!蓝桥杯的比赛流程和必考点,你还不清楚? “巷子里的猫很自由,却没有归宿;围墙里的狗有归宿,终身都得低头。人生这道选择题,怎么选都会有遗憾。” 但不…...

【数据结构】红黑树
红黑树一、红黑树的概念二、红黑树的接口2.1 插入三、验证四、源码一、红黑树的概念 红黑树也是一个二叉搜索树,他是通过对任何一条从根到叶子的路径上各个结点着色方式的限制,最长路径长度不超过最短路径长度的 2 倍保持近似平衡。他在每个节点添加了一…...

从C++的角度理解C#的Event
由于技术背景是C起家的,所以对于C的概念很清楚,遇到C#的EVENT时候,总感觉这个概念比较抽象,不容易理解,但是当使用函数指针和回调函数来理解EVENT的时候,这个概念就清晰了。 首先对于EVENT来讲,…...

商城进货记录交易-课后程序(JAVA基础案例教程-黑马程序员编著-第七章-课后作业)
【实验7-2】商城进货记录交易 【任务介绍】 1.任务描述 每个商城都需要进货,而这些进货记录整理起来很不方便,本案例要求编写一个商城进货记录交易的程序,使用字节流将商场的进货信息记录在本地的csv文件中。程序具体要求如下: …...

【正点原子FPGA连载】第十七章双核AMP实验 摘自【正点原子】DFZU2EG_4EV MPSoC之嵌入式Vitis开发指南
1)实验平台:正点原子MPSoC开发板 2)平台购买地址:https://detail.tmall.com/item.htm?id692450874670 3)全套实验源码手册视频下载地址: http://www.openedv.com/thread-340252-1-1.html 第十七章双核AMP…...

内存管理框架---页(一)
文章目录物理内存的模型非一致内存访问--NUMA一致内存访问模型--UMA内存管理架构页页框管理页描述符页描述符字段flags字段详解gfp_mask 标志获得页alloc_pages__get_free_pages获得填充为0的页释放页kmallocvmalloc参考资料你用心写的每一篇文章,可能会带别人和自己…...

华为OD机试真题Python实现【流水线】真题+解题思路+代码(20222023)
流水线 题目 一个工厂有m条流水线 来并行完成n个独立的作业 该工厂设置了一个调度系统 在安排作业时,总是优先执行处理时间最短的作业 现给定流水线个数m 需要完成的作业数n 每个作业的处理时间分别为 t1,t2...tn 请你编程计算处理完所有作业的耗时为多少 当n > m时 首先…...

「JVM 编译优化」Graal 编译器
文章目录1. 历史背景2. 构建编译调试环境3. JVMCI 编译器接口4. 代码中间表示5. 代码优化与生成1. 历史背景 Graal 编译器在 JDK 9 以 Jaotc 提前编译工具的形式首次加入到官方的 JDK 中,JDK 10 开始提供替换(得益于 HotSpot 编译器接口,Jav…...

蓝牙标签操作指南
一、APP安装指南 1.APP权限问题 电子标签APP安装之后,会提示一些权限的申请,点击允许。否则某些会影响APP的正常运行。安装后,搜索不到蓝牙标签,可以关闭App,重新打开。 2.手机功能 运行APP时候,需要打开…...

嵌入式 Linux Shell编程
目录 1、shell脚本 2、执行shell脚本 3、shell脚本编写 3.1 shell变量 3.2 标准变量或环境变量 3.4 变量赋值有五种格式 3.5 运算符和表达式 关系运算符 布尔运算符 3.6 Test命令用法 1、判断表达式 2、判断字符串 3.判断整数 4、判断文件 3.7 数组 1、数组定义…...

Web前端学习:一
编辑器的基础使用 编辑器推荐使用: HBuilderx(免费中文)(建议使用) Sublime(免费英文) Sublime中文设置方法,下载语言插件: 1、进入Sublime后,ShiftCtrlP…...

SpringBoot集成Redis实现分布式会话
在单体应用的时代,Session 会话直接保存在服务器中,实现非常简单,但是随着微服务的流行,现代应用架构基本都是分布式架构,请求随机的分配到后端的多个应用中,此时session就需要共享,而存储在red…...

2023年关于身份安全的4 个预测
如果您身处技术领域,就会知道现在是时候盘点过去的一年,展望未来 365 天将影响业务、创新以及我们工作方式的因素的季节。这不是一门精确的科学,我们也不总是对的。但是推测很有趣,当我们看到其中一些趋势成为现实时会更有趣。本文…...

Linux期末考试应急
Linux期末考试应急 虚拟机添加硬盘、分区、格式化、挂载、卸载 fdisk -l#查看系统现有分区fdisk <指定磁盘>#指定磁盘分区sudo mkfs.ext3 <指定分区>#格式化磁盘###挂载磁盘1.新建一个目录sudo mkdir /mnt/test2.将指定分区挂载到对应目录sudo mount /dev/sdb10 /…...

mars3d对geojson图层分属性设置样式
开发中可能会遇到如下需求,在全省的数据中按某个属性⾼亮展示某市区。此时就需要使⽤分属性样式的api了。⽂档如下。GeoJsonLayer - Mars3D API文档属性是根据⽮量数据的属性进⾏匹配。可以通过 layer.graphics[0]?.attr ⽅式获取。 指导有哪些属性之后先设置…...

三、锁相关知识
文章目录锁的分类可重入锁、不可重入锁乐观锁、悲观锁公平锁、非公平锁互斥锁、共享锁深入synchronized类锁、对象锁synchronized的优化synchronized实现原理synchronized的锁升级重量锁底层ObjectMonitor深入ReentrantLockReentrantLock和synchronized的区别AQS概述加锁流程源…...

C语言数据类型
C 数据类型 在 C 语言中,数据类型指的是用于声明不同类型的变量或函数的一个广泛的系统。变量的类型决定了变量存储占用的空间,以及如何解释存储的位模式。 C 中的类型可分为以下几种: 1 基本类型: 它们是算术类型,…...

华为OD机试真题Python实现【水仙花数】真题+解题思路+代码(20222023)
水仙花数 题目 所谓的水仙花数是指一个n位的正整数其各位数字的n次方的和等于该数本身, 例如153 = 1^3 + 5^3 + 3^3,153是一个三位数 🔥🔥🔥🔥🔥👉👉👉👉👉👉 华为OD机试(Python)真题目录汇总 输入 第一行输入一个整数N, 表示 N 位的正整数 N 在3…...