通过这三个文件彻底搞懂rocketmq的存储原理
前言
RocketMQ是阿里开发的一个高性能的消息队列,支持各种消息类型,而且支持事务消息,可以说是现在的很多系统中的香饽饽了,所以呢,怎么使用大家肯定是要学习的
我们作为一个有梦想的程序员,在学习一门技术的时候,肯定是不能光知其然,这是远远不够的,我们必须要知其所以然,这样才能在面试的时候侃侃而谈,啊呸,不对,这样我们才能在工作中遇到问题的时候,理性的去思考如何解决问题
我们知道RocketMQ的架构是producer、NameServer、broker、Consumer,producer是生产消息的,NameServer是路由中心,负责服务的注册发现以及路由管理这些。
Consumer是属于消费消息的,broker则属于真正的存储消息,以及进行消息的持久化,也就是存储消息的文件和索引消息的文件都在broker上
消息队列的主要作用是解耦异步削峰,也就意味着消息队列中的存储功能是必不可少的,而随着时代的发展,业务量的增加也对消息队列的存储功能的强度的要求越来越高了
也就是说你不能光性能好,你得存储的消息也得足够支撑我的业务量,你只能存储100MB的消息,我这系统每分钟的消息业务量可能500MB了,那肯定不够使啊,那还削个啥的峰啊,峰来了你自己都顶不住
RocketMQ凭借其强大的存储能力和强大的消息索引能力,以及各种类型消息和消息的特性脱颖而出,于是乎,我们这些有梦想的程序员学习RocketMQ的存储原理也变得尤为重要
而要说起这个存储原理,则不得不说的就是RocketMQ的消息存储文件commitLog文件,消费方则是凭借着巧妙的设计Consumerqueue文件来进行高性能并且不混乱的消费,还有RocketMQ的强大的支持消息索引的特性,靠的就是indexfile索引文件
我们这篇文章就从这commitLog、Consumerqueue、indexfile这三个神秘的文件说起,搞懂这三个文件,RocketMQ的核心就被你掏空了
先上个图,写入commitLog文件时commitLog和Consumerqueue、indexfile文件三者的关系
Commitlog文件
大小和命名规则
RocketMQ中的消息存储文件放在${ROCKET_HOME}/store 目录下,当生产者发送消息时,broker会将消息存储到Commit文件夹下,文件夹下面会有一个commitLog文件,但是并不是意味着这个文件叫这个,文件命名是根据消息的偏移量来决定的
文件有自己的生成规则,每个commitLog文件的大小是1G,一般情况下第一个 CommitLog 的起始偏移量为 0,第二个 CommitLog 的起始偏移量为 1073741824 (1G = 1073741824byte)。
也正是因为该文件的文件名字规则,所以也可以更好的知道消息处于哪个文件中,假设物理偏移量是1073741830,则相对的偏移量是6(6 = 1073741830 - 1073741824),于是判断出该消息位于第二个commitLog文件上,下面要说的Consumerqueue文件和indexfile文件都是通过偏移量来计算出消息位于哪个文件,进行更为精准的定位,减少了IO次数
文件存储规则和特点
commitLog文件的最大的一个特点就是消息的顺序写入,随机读写,关于commitLog的文件的落盘有两种,一种是同步刷盘,一种是异步刷盘,可通过 flushDiskType 进行配置
在写入commitLog的时候内部会有一个mappedFile内存映射文件,消息是先写入到这个内存映射文件中,然后根据刷盘策略写到硬盘中,对于producer的角度来说就是,同步就是当消息真正的写到硬盘的时候才会给producer返回成功,而异步就是当消息到达内存的时候就返回成功了,然后异步的去刷盘
跑题了,最大的特点顺序写入,所有的topic的消息都存储到commitLog文件中,顺序写入可以充分的利用磁盘顺序减少了IO争用数据存储的性能,kafka也是通过硬盘顺序存盘的
大家都常说硬盘的速度比内存慢,其实这句话也是有歧义的,当硬盘顺序写入和读取的时候,速度不比内存慢,甚至比内存速度快,这种存储方式就好比数组,我们如果知道数组的下标,则可以直接通过下标计算出位置,找到内存地址,众所周知,数组的读取是很快的,但是数组的缺点在于插入数据比较慢,因为如果在中间插入数据需要将后面的数据往后移动
而对于数组来说,如果我们只会顺序的往后添加,数组的速度也是很快的,因为数组没有后续的数据的移动,这一操作很耗时
回到RocketMQ中的commitLog文件,也是同样的道理,顺序的写入文件也就不需要太多的去考虑写入的位置,直接找到文件往后放就可以了,而取数据的时候,也是和数组一样,我们可以通过文件的大小去精准的定位到哪一个文件,然后再精准的定位到文件的位置
当然,至于这个索引位置就是靠下面的Consumerqueue文件和indexfile文件来找到消息的位置的,也就是索引地址
哦对了,数组的元素大小是一样的,并不意味这commitLog文件的各个消息存储空间一样
简单看下源码
这部分源码在DefaultMessageStore.putMessage
@Overridepublic PutMessageResult putMessage(MessageExtBrokerInner msg) {if (this.shutdown) {log.warn("message store has shutdown, so putMessage is forbidden");return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);}// 从节点不允许写入if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {long value = this.printTimes.getAndIncrement();if ((value % 50000) == 0) {log.warn("message store is slave mode, so putMessage is forbidden ");}return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);}// store是否允许写入if (!this.runningFlags.isWriteable()) {long value = this.printTimes.getAndIncrement();if ((value % 50000) == 0) {log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());}return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);} else {this.printTimes.set(0);}// topic过长if (msg.getTopic().length() > Byte.MAX_VALUE) {log.warn("putMessage message topic length too long " + msg.getTopic().length());return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);}// 消息附加属性过长if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);}if (this.isOSPageCacheBusy()) {return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);}long beginTime = this.getSystemClock().now();// 添加消息到commitLogPutMessageResult result = this.commitLog.putMessage(msg);long eclipseTime = this.getSystemClock().now() - beginTime;if (eclipseTime > 500) {log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);}this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);if (null == result || !result.isOk()) {this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();}return result;}
中间的commitLog.putMessage就是负责实现消息写入commitLog文件,这个太长了,我就不给大家截了
大致流程就是组装消息,放入属性,然后通过MappedFile对象写入文件,紧接着根据刷盘策略刷盘,最后进行主从同步
consumerQueue文件
RocketMQ是分为多个topic,消息所属主题,属于消息类型,每一个topic有多个queue,每个queue放着不同的消息,在同一个消费者组下的消费者,可以同时消费同一个topic下的不同queue队列的消息。
不同消费者下的消费者,可以同时消费同一个topic下的相同的队列的消息。而同一个消费者组下的消费者,不可以同时消费不同topic下的消息
而每个topic下的queue队列都会对应一个Consumerqueue文件,例如Topic中有三个队列,每个队列中的消息索引都会有一个编号,编号从0开始,往上递增。
并由此一个位点offset的概念,有了这个概念,就可以对Consumer端的消费情况进行队列定义。
消息消费完成后,需要将消费进度存储起来,即前面提到的offset。
广播模式下,同消费组的消费者相互独立,消费进度要单独存储;
集群模式下,同一条消息只会被同一个消费组消费一次,消费进度会参与到负载均衡中,故消费进度是需要共享的。
消费进度,也就是由Broker管理每一个消费者消费Topic的进度,包含正常提交消费进度和重置消费进度,消费进度管理的目的是保证消费者在正常运行状态、重启、异常关闭等状态下都能准确续接“上一次”未处理的消息。
在RocketMQ中,实现的消费语义叫“至少投递一次”,也就是所有的消息至少有一次机会消费不用担心会丢消息。用户需要实现消费幂等来避免重复投递对业务实际数据的影响。
幂等是啥应该不用我多说了吧,亲爱的你们肯定知道了
如上图所示,消费者一般在两种情况下“上报”消费进度,消费成功后(包含正常消费成功、重试消费成功)和重置消费进度。
而消费进度的标准就是Consumerqueue文件,这个文件中存储的是投递到某一个messagequeue中的位置信息
比如我们知道消息存储到commitLog文件中,一个消费者A对应着消费messagequeueA这个队列,但是无法确定在commitLog文件中该队列中的消息的位置,于是就有了ConsumerqueueA这个文件,这个文件对应一个messagequeueA,消费者A便可以通过ConsumerqueueA来确定自己的消费进度,获取消息在commitLog文件中的具体的offset和大小
存放位置和结构
consumequeue存放在store文件里面,里面的consumequeue文件里面按照topic排放,然后每个topic默认4个队列,里面存放的consumequeue文件
ConsumeQueue中并不需要存储消息的内容,而存储的是消息在CommitLog中的offset。也就是说ConsumeQueue其实是CommitLog的一个索引文件。
consumequeue是定长结构,每个记录固定大小20个字节,单个consumequeue文件默认包含30w个条目,所以单个文件大小大概6M左右
很显然,Consumer消费消息的时候,要读2次:先读ConsumeQueue得到offset,再通过offset找到CommitLog对应的消息内容。
ConsumeQueue的作用
消费者通过broker保存的offset(offsetTable.offset json文件中保存的ConsumerQueue的下标)可以在ConsumeQueue中获取消息,从而快速的定位到commitLog的消息位置,由于每个消息的大小是不一样的,也可以通过size获取到消息的大小,从而读取完整的消息
过滤tag是也是通过遍历ConsumeQueue来实现的(先比较hash(tag)符合条件的再到具体消息比较tag)
offsetTable.offset
和commitLog的offset不是一回事,这个offset是ConsumeQueue文件的(已经消费的)下标/行数,可以直接定位到ConsumeQueue并找到commitlogOffset从而找到消息体原文。这个offset是消息消费进度的核心,不同的消费模式,保存地址不同
广播模式:DefaultMQPushConsumer的BROADCASTING模式,各个Consumer没有互相干扰,使用LoclaFileOffsetStore,把Offset存储在Consumer本地
集群模式:DefaultMQPushConsumer的CLUSTERING模式,由Broker端存储和控制Offset的值,使用RemoteBrokerOffsetStore
简单看下构建过程
在Broker中,构建ComsummerQueue不是存储完CommitLog就马上同步构建的,而是通过一个线程任务异步的去做这个事情。在DefaultMessageStore中有一个ReputMessageService成员,它就是负责构建ComsumerQueue的任务线程。
ReputMessageService继承自ServiceThread,表明其是一个服务线程,它的run方法很简单,如下所示:
public void run() {while (!this.isStopped()) {try {Thread.sleep(1);this.doReput(); // 构建ComsumerQueue} catch (Exception e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}}}
在run方法里,每休息1毫秒就进行一次构建ComsumerQueue的动作。因为必须先写入CommitLog,然后才能进行ComsumerQueue的构建。那么不排除构建ComsumerQueue的速度太快了,而CommitLog还没写入新的消息。这时就需要sleep下,让出cpu时间片,避免浪费CPU资源。
我们点进去这个doReput()看核心处理逻辑
private void doReput() {for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);// 拿到所有的最新写入CommitLog的数据if (result != null) {try {this.reputFromOffset = result.getStartOffset();for (int readSize = 0; readSize < result.getSize() && doNext; ) {DispatchRequest dispatchRequest =DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); // 一条一条的读消息int size = dispatchRequest.getMsgSize();if (dispatchRequest.isSuccess()) {if (size > 0) {DefaultMessageStore.this.doDispatch(dispatchRequest); // 派发消息,进行处理,其中就包括构建ComsumerQueuethis.reputFromOffset += size;readSize += size;} else if (size == 0) { // this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);readSize = result.getSize();}} else if (!dispatchRequest.isSuccess()) { // 获取消息异常if (size > 0) {log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);this.reputFromOffset += size;} else {doNext = false;if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {this.reputFromOffset += result.getSize() - readSize;}}}}} finally {result.release();}} else {doNext = false;}}}
我在这里省略了一些和构建ComsumerQueue不相干的代码。
其实在doReput里面就做了三件事:
1、获取最新写入到CommitLog中的数据byteBuffer。
2、从byteBuffer中一条条的读取消息,并派发出去处理。
3、更新reputFromOffset位移。
感兴趣的可以打断点走一遍
indexFile文件
RocketMQ还支持通过MessageID或者MessageKey来查询消息,使用ID查询时,因为ID就是用broker+offset生成的(这里msgId指的是服务端的),所以很容易就找到对应的commitLog文件来读取消息。
对于用MessageKey来查询消息,MessageStore通过构建一个index来提高读取速度
文件结构
indexfile文件存储在store目录下的index文件里面,里面存放的是消息的hashcode和index内容,文件由一个文件头组成:长40字节。500w个hashslot,每个4字节。2000w个index条目,每个20字节。
所以这里我们可以估算每个indexfile的大小为:40+500w4+2000w20个字节,大约400M左右
文件详细信息
IndexHeader:索引文件头信息由40个字节组成
//8位 该索引文件的第一个消息(Message)的存储时间(落盘时间)
this.byteBuffer.putLong(beginTimestampIndex, this.beginTimestamp.get());
//8位 该索引文件的最后一个消息(Message)的存储时间(落盘时间)
this.byteBuffer.putLong(endTimestampIndex, this.endTimestamp.get());
//8位 该索引文件第一个消息(Message)的在CommitLog(消息存储文件)的物理位置偏移量(可以通过该物理偏移直接获取到该消息)
this.byteBuffer.putLong(beginPhyoffsetIndex, this.beginPhyOffset.get());
//8位 该索引文件最后一个消息(Message)的在CommitLog(消息存储文件)的物理位置偏移量
this.byteBuffer.putLong(endPhyoffsetIndex, this.endPhyOffset.get());
//4位 该索引文件目前的hash slot的个数
this.byteBuffer.putInt(hashSlotcountIndex, this.hashSlotCount.get());
//4位 索引文件目前的索引个数
this.byteBuffer.putInt(indexCountIndex, this.indexCount.get());
Slot槽位,默认每个文件配置的slot是500万个,每个slot是4位的整型数据,Slot每个节点保存当前已经拥有多少个index数据了
//slot的数据存放位置 40 + keyHash %(500W)* 4
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;//Slot Table
//4字节
//记录该slot当前index,如果hash冲突(即absSlotPos一致)作为下一次该slot新增的前置index
this.mappedByteBuffer.putInt(absSlotPos,this.indexHeader.getIndexCount());
索引消息内容,消息长度固定为20位
//Index Linked list
//topic+message key的hash值
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
//消息在CommitLog的物理文件地址, 可以直接查询到该消息(索引的核心机制)
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
//消息的落盘时间与header里的beginTimestamp的差值(为了节省存储空间,如果直接存message的落盘时间就得8bytes)
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
//9、记录该slot上一个index
//hash冲突处理的关键之处, 相同hash值上一个消息索引的index(如果当前消息索引是该hash值的第一个索引,则prevIndex=0, 也是消息索引查找时的停止条件),每个slot位置的第一个消息的prevIndex就是0的
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
再论结构
文件结构slot和indexLinkedList可以理解成java中的HashMap
哎,你说HashMap我可不困了啊,你可别蒙我,这个我熟,什么负载因子、默认大小、扩容机制、红黑树,还有多线程下不安全这些
乖,我知道你熟悉,你跟着我一起学习,这些当然了如指掌,只需要你了解HashMap的结构和冲突即可
每放入一个新消息的index进来,首先会取MessageKey的HashCode,然后用Hashcode对slot的总数进行取模,决定该消息key的位置,slot的总数默认是500W个
只要取hash就必然面临着hash冲突的问题,indexfile也是采用链表结构来解决hash冲突,这一点和HashMap一样的,不过这个不存在红黑树转换这一说,个人猜测这个的冲突数量也达不到很高的级别,所以进行这方面的设计也没啥必要,甚至变成了强行增加indexfile的文件结构难度
还有,在indexfile中的slot中放的是最新的index的指针,因为一般查询的时候大概率是优先查询最近的消息
每个slot中放的指针值是索引在indexfile中的偏移量,也就是后面index的位置,而index中存放的就是该消息在commitlog文件中的offset,每个index的大小是20字节,所以根据当前索引是这个文件中的第几个偏移量,也就很容易定位到索引的位置,根据前面的固定大小可以很快把真实坐标算出来,以此类推,形成一个链表的结构
查询流程
由于indexHeader,slot,index都是固定大小,所以:
公式1:第n个slot在indexFile中的起始位置是这样:40+(n-1)*4
公式2:第s个index在indexFile中的起始位置是这样:40+5000000*4+(s-1)*20
查询的传入值除了key外,还包含一个时间起始值以及截止值
为啥还要传时间范围呢?
一个indexFile写完一个会继续写下一个,仅仅一个key无法定位到具体的indexFile,时间范围就为了更精确的定位到具体的indexFile,缩小查找的范围,indexFile文件名是一个时间戳,根据这个日期就可以定位到传入的日期范围对应在哪个或者哪些indexFile中,是不是很棒。
好了,我们接着说查询流程
key–>计算hash值–>hash值对500万取余算出对应的slot序号–>根据40+(n-1)4(公式1)算出该slot在文件中的位置–>读取slot值,也就是index序号–>根据40+50000004+(s-1)*20(公式2)算出该index在文件中的位置–>读取该index–>将key的hash值以及传入的时间范围与index的keyHash值以及timeDiff值进行比对
不满足则根据index中的preIndexNo找到上一个index,继续上一步;满足则根据index中的phyOffset拿到commitLog中的消息
为啥比对时还要带上时间范围呢?
只比key不行吗?答案是不行,因为key可能会重复,producer在消息生产时可以指定消息的key,这个key显然无法保证唯一性,那自动生成的msgId呢?也不能保证唯一,你可以去看看msgId的生成规则
包括当前机器IP+进程号+MessageClientIDSetter.class.getClassLoader()的hashCode值+消息生产时间与broker启动时间的差值+broker启动后从0开始单调自增的int值,前面三项很明显可能重复,后面两项一个是时间差,一个是重启归零,也可能重复
简单看下源码,感兴趣的下载源码去研究
indexfile的添加消息索引的过程
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {//1. 判断该索引文件的索引数小于最大的索引数,如果>=最大索引数,IndexService就会尝试新建一个索引文件if (this.indexHeader.getIndexCount() < this.indexNum) {//2. 计算该message key的hash值int keyHash = indexKeyHashMethod(key);//3. 根据message key的hash值散列到某个hash slot里int slotPos = keyHash % this.hashSlotNum;//4. 计算得到该hash slot的实际文件位置Positionint absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;try {//5. 根据该hash slot的实际文件位置absSlotPos得到slot里的值//这里有两种情况://1). slot=0, 当前message的key是该hash值第一个消息索引//2). slot>0, 该key hash值上一个消息索引的位置int slotValue = this.mappedByteBuffer.getInt(absSlotPos);//6. 数据校验及修正if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {slotValue = invalidIndex;}long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();timeDiff = timeDiff / 1000;if (this.indexHeader.getBeginTimestamp() <= 0) {timeDiff = 0;} else if (timeDiff > Integer.MAX_VALUE) {timeDiff = Integer.MAX_VALUE;} else if (timeDiff < 0) {timeDiff = 0;}//7. 计算当前消息索引具体的存储位置(Append模式)int absIndexPos =IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize+ this.indexHeader.getIndexCount() * indexSize;//8. 存入该消息索引this.mappedByteBuffer.putInt(absIndexPos, keyHash);this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);//9. 关键之处:在该key hash slot处存入当前消息索引的位置,下次通过该key进行搜索时//会找到该key hash slot -> slot value -> curIndex -> //if(curIndex.prevIndex>0) pre index (一直循环 直至该curIndex.prevIndex==0就停止)this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());if (this.indexHeader.getIndexCount() <= 1) {this.indexHeader.setBeginPhyOffset(phyOffset);this.indexHeader.setBeginTimestamp(storeTimestamp);}this.indexHeader.incHashSlotCount();this.indexHeader.incIndexCount();this.indexHeader.setEndPhyOffset(phyOffset);this.indexHeader.setEndTimestamp(storeTimestamp);return true;} catch (Exception e) {log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);} } else {log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()+ "; index max num = " + this.indexNum);}return false;}
indexfile的索引搜索源码
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,final long begin, final long end, boolean lock) {if (this.mappedFile.hold()) {//1. 计算该key的hashint keyHash = indexKeyHashMethod(key);//2. 计算该hash value 对应的hash slot位置int slotPos = keyHash % this.hashSlotNum;//3. 计算该hash value 对应的hash slot物理文件位置int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;FileLock fileLock = null;try {//4. 取出该hash slot 的值int slotValue = this.mappedByteBuffer.getInt(absSlotPos);//5. 该slot value <= 0 就代表没有该key对应的消息索引,直接结束搜索// 该slot value > maxIndexCount 就代表该key对应的消息索引超过最大限制,数据有误,直接结束搜索if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()|| this.indexHeader.getIndexCount() <= 1) {} else {//6. 从当前slot value 开始搜索for (int nextIndexToRead = slotValue; ; ) {if (phyOffsets.size() >= maxNum) {break;}//7. 找到当前slot value(也就是index count)物理文件位置int absIndexPos =IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize+ nextIndexToRead * indexSize;//8. 读取消息索引数据int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);//9. 获取该消息索引的上一个消息索引index(可以看成链表的prev 指向上一个链节点的引用)int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);//10. 数据校验if (timeDiff < 0) {break;}timeDiff *= 1000L;long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;boolean timeMatched = (timeRead >= begin) && (timeRead <= end);//10. 数据校验比对 hash值和落盘时间if (keyHash == keyHashRead && timeMatched) {phyOffsets.add(phyOffsetRead);}//当prevIndex <= 0 或prevIndex > maxIndexCount 或prevIndexRead == nextIndexToRead 或 timeRead < begin 停止搜索if (prevIndexRead <= invalidIndex|| prevIndexRead > this.indexHeader.getIndexCount()|| prevIndexRead == nextIndexToRead || timeRead < begin) {break;}nextIndexToRead = prevIndexRead;}}} catch (Exception e) {log.error("selectPhyOffset exception ", e);} finally {this.mappedFile.release();}}}
结束语
感谢大家能够做我最初的读者和传播者,请大家相信,只要你给我一
份爱,我终究会还你们一页情的。
欢迎大家关注我的公众号【左耳君】,探索技术,分享生活
哦对了,后续所有的文章都会更新到这里
https://github.com/DayuMM2021/Java
相关文章:
通过这三个文件彻底搞懂rocketmq的存储原理
前言 RocketMQ是阿里开发的一个高性能的消息队列,支持各种消息类型,而且支持事务消息,可以说是现在的很多系统中的香饽饽了,所以呢,怎么使用大家肯定是要学习的 我们作为一个有梦想的程序员,在学习一门技…...
Linux安装Nvidia显卡驱动
使用的Linux系统为 Ubuntu 18.04,显卡为GeForce RTX 3060 。 查看ubuntu版本号命令:sudo lsb_release -a 查看显卡型号命令:lspci | grep -i vga (详细查看方法: 查看显卡型号)。 下面是安装显卡驱动步…...
GPT-4 介绍
1 简介 本文根据openAI的2023年3月的《GPT-4 Technical Report 》翻译总结的。 原文地址:https://arxiv.org/pdf/2303.08774.pdf 原文确实没有GPT-4 具体的模型结构,openAI向盈利组织、非公开方向发展了。也没透露硬件、训练成本、训练数据、训练方法等…...
Ubuntu下单机安装Hadoop详细教程(附所需安装包下载)
目录 前言 一、创建Hadoop用户 二、更新apt和安装Vim编辑器 三、安装SSH和配置SSH无密码登录 四、安装Java环境 1. 安装JDK 2. 配置JDK环境 3. 检验安装 五、安装单机Hadoop 1. 下载安装Hadoop 2. 运行示例 总结 前言 本文安装的 Hadoop 及 Java 环境基于林子雨老…...
【嵌入式烧录/刷写文件】-2.1-详解Intel Hex格式文件
目录 1 什么是Intel Hex 2 Intel Hex的格式 2.1 Intel Hex的Record结构 2.1.1 “Record type记录类型”的说明 2.1.2 “Record length记录长度”的说明 2.1.3 如何计算“Checksum校验和” 2.2 Record order记录顺序 2.3 Text line terminators文本行终止符 3 Hex文件的…...
【云原生】初识 Kubernetes — pod 的前世今生
目录标题前言🐳 Kubernetes到底是什么?🐬 K8s 的由来🐬K8s 的工作方式🐬 K8s 主要组件🐋Master 组件🐋Node 组件🐳 pod 是什么?🐬pod 的概念🐬控制…...
【基础篇】Java类加载器详解
类加载过程详解 类的生命周期 类从被加载到虚拟机内存到开始卸载出内存为止,生命周期可以简单概括为7个阶段:加载(Loading)、验证(Verification)、准备(Preparation)、解析ÿ…...
Pytorch动手实现Transformer机器翻译
Pytorch动手实现Transformer机器翻译前言一、环境配置1. torchtextMethod1:Method2:2. Spacy以en包下载为例:手动安装语言包到spacy3. NLTKMethod1:Method2:二、运行结果1. 模型训练(train)2. 翻…...
宝塔面板部署node+vue项目注意事项
宝塔面板部署nodevue项目注意事项 宝塔连接云服务器 如果服务器上没有安装宝塔面板,需要先安装,安装流程如下: 从宝塔官网主页进去,点击下载安装,然后点击在线安装 输入服务器IP和密码在服务器上安装宝塔面板 等待一…...
【LeetCode】剑指 Offer 39. 数组中出现次数超过一半的数字 p205 -- Java Version
题目链接:https://leetcode.cn/problems/shu-zu-zhong-chu-xian-ci-shu-chao-guo-yi-ban-de-shu-zi-lcof/ 1. 题目介绍(39. 数组中出现次数超过一半的数字) 数组中有一个数字出现的次数超过数组长度的一半,请找出这个数字。 你可…...
fisco bcos用caliper0.2.0进行压力测试的安装配置
一、前期环境 1. 硬件 需要外网权限 2. 操作系统 版本要求:Ubuntu > 16.04, CentOS > 7, MacOS > 10.14 3. 基础软件 python 2.7,make,g,gcc,git sudo apt install python2.7 make g gcc git curl git confi…...
正在进行 | 用友企业数智化财务峰会落地广州 高能不断
3月28日,以「智能会计 价值财务」为主题的“2023企业数智化财务创新峰会”登陆广州。 此次用友企业数智化财务创新峰会,邀请了知名院校的专家学者、央国企等大型企业财务数智化领路人以及羊城权威媒体,近千人相约广州越秀国际会议中心,深度聚焦大型企业财务数智化创新应用…...
uniapp - APP云打包、蒲公英平台发布APP的步骤
一、uniapp 云打包 1、注册 dcloud 开发者 首先需要注册一个 dcloud 开发者的账号 dcloud开发者中心:登录 (dcloud.net.cn) 根据流程注册即可。 2、云打包(已安卓为例) 项目创建完成后,查看 dcloud 开发者中心,看是否…...
reposync命令详解--reposync同步aliyunyum库到本地
参考: reposync - 命令 - -桃枝夭夭- - 博客园 0. 简介 reposync 命令简单来说就是可以把指定外网源(repo id)的包同步到本地文件中 1. 安装 reposync 命令 [rootV10SP1-1 ~]# yum install -y dnf-plugins-core2. 常用选项以及参数 选项含义-c [fil…...
OCR之论文笔记TrOCR
文章目录TrOCR: Transformer-based Optical Character Recognition with Pre-trained Models一. 简介二. TrOCR2.1. Encoder2.2 Decoder2.3 Model Initialiaztion2.4 Task Pipeline2.5 Pre-training2.6 Fine-tuning2.7 Data Augmentation三. 实验3.1 Data3.2 Settings3.2 Resul…...
雷电4模拟器安装xposed框架(2022年)
别问我都2202年了为什么还在用雷电4安卓7。我特么哪知道Xposed的相关资料这么难找啊,只能搜到一些老旧的资料,尝试在老旧的平台上实现了。 最初的Xposed框架现在已经停止更新了,只支持到安卓8。如果要在更高版本的安卓系统上使用Xposed得看看…...
微信小程序支付完整流程(前端)
微信小程序中,常见付款给商家的场景,下面列出企业小程序中,从0起步完整微信支付流程。 一,注册微信支付商户号(由上级或法人注册) 接入微信支付 - 微信商户平台 此商户号,需要由主管及更上级领导…...
设置鼠标右键打开方式,添加IDEA的打开方式
一、问题描述 已下载IDEA,但是右键打开之前保存的项目文件,无法显示以IDEA方式打开。 二、解决步骤 1. 打开注册表 winR键输入regedit 2、查找路径为计算机\HKEY_LOCAL_MACHINE\SOFTWARE\Classes\Directory\shell (我找了半天没看到Class…...
LAMP架构之zabbix监控(2):zabbix基础操作
目录 一、zabbix监控节点添加和删除 (1)手动添加 (2)自动添加 (3)按照条件批量添加 (4)使用api工具进行管理 二、针对应用的zabbix监控 一、zabbix监控节点添加和删除 实验说明&a…...
ShareSDK常见问题
QQ-分享报错901111,9001010等 由于QQ现在需要审核后才可以分享(之前分享不需要审核),所以此错误解决方法只需通过腾讯开放平台的审核即可,另外要检查注册好的应用的基本信息,包名、md5签名和Bundle id是不…...
[Spring]一文明白IOC容器和思想
✅作者简介:大家好,我是Philosophy7?让我们一起共同进步吧!🏆 📃个人主页:Philosophy7的csdn博客 🔥系列专栏: 数据结构与算法 👑哲学语录: 承认自己的无知,乃…...
程序人生 | 与足球共舞的火柴人(致敬格拉利什,赋予足球更深的意义)
个人简介 👀个人主页: 前端杂货铺 🙋♂️学习方向: 主攻前端方向,也会涉及到服务端 📃个人状态: 在校大学生一枚,已拿多个前端 offer(秋招) 🚀未…...
MATLAB | R2023a更新了哪些好玩的东西
R2023a来啦!!废话不多说看看新版本有啥有趣的玩意和好玩的特性叭!!把绘图放最前面叭,有图的内容看的人多。。 1 区域填充 可以使用xregion及yregion进行区域填充啦!! x -10:0.25:10; y x.^…...
Python Module — OpenAI ChatGPT API
目录 文章目录目录OpenAI Python SDKopenai.ChatCompletion 模块openai.ChatCompletion.create 函数OpenAI Python SDK 官方文档:https://platform.openai.com/docs/api-reference/introduction OpenAI Python SDK 用于开发与 OpenAI RESTful API 进行交互的客户端…...
Docker学习记录
阅读前请看一下:我是一个热衷于记录的人,每次写博客会反复研读,尽量不断提升博客质量。文章设置为仅粉丝可见,是因为写博客确实花了不少精力。希望互相进步谢谢!! 文章目录阅读前请看一下:我是一…...
Linux-VIM使用
文章目录前言VIM使用1、切换模式2、跳转(1) 跳转到指定行(2) 跳转到首行(3) 跳转到末行3、自动格式化程序4. 大括号对应5. 删除(1)删除一个单词(2)删除光标位置至行尾(3)删除光标位置至行首(4&a…...
Windows安全中心内存完整性无法打开问题的处理方法
Windows11安全中心内存完整性无法打开 今天电脑使用过程中突然看到系统桌面右下角任务栏中 windows安全中心图标出现了警告信息,如下图红框所示: 点击该图标进入windows安全中心的 安全性概览 界面,如下图: 在该界面可以看到出现安…...
在芯片设计行业,从项目的初期到交付,不同的岗位的工程师主要负责什么?
大家都知道在芯片设计行业,项目是至关重要的一环。从项目的初期到交付,不同的岗位的工程师在项目的各环节主要负责什么?他们是怎样配合的?下面看看资深工程师怎么说。 一个项目,从初期到交付的过程是比较漫长的。我们知道最早的时候&#…...
Spring Cloud Alibaba全家桶(七)——Sentinel控制台规则配置
前言 本文小新为大家带来 Sentinel控制台规则配置 相关知识,具体内容包括流控规则(包括:QPS流控规则,并发线程数流控规则),BlockException统一异常处理,流控模式(包括:直…...
mysql-installer安装教程(详细图文)
目录 1.安装 2.配置系统环境变量 3.配置初始化my.ini文件 4.MySQL彻底删除 5.Navicat 安装 1.安装 先去官网下载需要的msi,在这放出官网下载地址下载地址 这里我具体以8.0.28 为安装例子,除了最新版安装界面有些变动以往的都是差不多的。 过去的版本…...
wordpress怎么多用户/北京百度推广官网首页
问题:flume指定HDFS类型的Sink时,采集数据至HDFS指定目录,会产生大量小文件。问题重现: 1、创建flume配置文件flume-env.sh,:flume配置文件如下(根据自身需要修改):因为f…...
中国建设人才服务信息网是正规网站/seo专业培训中心
蜻蜓特派员Windows XP SP3 纯净安装版 终结版,系统纯净无广告、无插件,网卡等驱动和运行库齐全,安全更新补丁全网最新!微软停止了 Windows XP 的支持之后还是偶尔为 WinXP 提供了更加安全的使用环境。看到网上上很多“最终版”&am…...
免费个人网站建设公司/百度推广官方网站
解决的问题 springboot(当然别的也可以)多环境切换需要修改配置文件硬编码,打包时不够方便。 解决: 配置文件能读取pom文件中的配置,根据命令选择不同配置注入springboot的配置文件中 pom配置文件: <…...
网站开发培训/有哪些免费网站可以发布广告
阿里云全云民计算优惠活动上售卖的突发性能实例293元一年,很便宜很实惠,那么性能如何?什么是突发性能实例?阿里云突发性能实例t5怎么样?值得买吗?阿里云百科来说说阿里云新推出的突发性能实例t5吧ÿ…...
wordpress调用自定义文章类型文章/代运营套餐价格表
1。 添加Column Start\Basic Settings\Data Schema\Manually Define a Schema\添加Column,命名用数据库中的字段名。2。 添加Column中文名称 Band and Column Settings\Band [0](手动添加Column后)\Columns\选择列\Header\修改Caption属性3。…...
做百度移动网站优化排/网站竞价推广
.版本2.支持库eAPI.程序集窗口程序集1.程序集变量窗口1.子程序__启动窗口_创建完毕时钟1.时钟周期=500.子程序_时钟1_周期事件.局部变量系统进程列表,进程信息,,"0".局部变量所有窗口....版本 2.支持库 eAPI.程序集 窗口程序集1.程序集变量 窗口1.子程序 …...