西安易码建站/流量点击推广平台
梳理一些比较完整,比较复杂的业务线
消息持久化设计
RocketMQ的持久化文件结构
消息持久化也就是将内存中的消息写入到本地磁盘的过程。而磁盘IO操作通常是一个很耗性能,很慢的操作,所以,对消息持久化机制的设计,是一个MQ产品提升性能的关键,甚至可以说是最为重要的核心也不为过。接下来梳理RocketMQ是如何在本地磁盘中保存消息的
RocketMQ消息直接采用磁盘文件保存消息,默认路径在${user_home}/store目录。这些存储目录可以在broker.conf中自行指定。
存储文件主要分为三个部分:
-
CommitLog:存储消息的元数据。所有消息都会顺序存入到CommitLog文件当中。CommitLog由多个文件组成,每个文件固定大小1G。以第一条消息的偏移量为文件名。
-
ConsumerQueue:存储消息在CommitLog的索引。一个MessageQueue一个文件,记录当前MessageQueue被哪些消费者组消费到了哪一条CommitLog。
-
IndexFile:为消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程
另外,还有几个辅助的存储文件,主要记录一些描述消息的元数据:
-
checkpoint:数据存盘检查点。里面主要记录commitlog文件、ConsumeQueue文件以及IndexFile文件最后一次刷盘的时间戳。
-
config/*.json:这些文件是将RocketMQ的一些关键配置信息进行存盘保存。例如Topic配置、消费者组配置、消费者组消息偏移量Offset 等等一些信息。
-
abort:这个文件是RocketMQ用来判断程序是否正常关闭的一个标识文件。正常情况下,会在启动时创建,而关闭服务时删除。但是如果遇到一些服务器宕机,或者kill -9这样一些非正常关闭服务的情况,这个abort文件就不会删除,因此RocketMQ就可以判断上一次服务是非正常关闭的,后续就会做一些数据恢复的操作。
整体的消息存储结构,官方做了个图进行描述:
Producer发过来的所有消息,不管是属于哪个Topic,Broker都统一存在CommitLog文件当中,然后分别构建ConsumeQueue文件和IndexFile两个索引文件,用来辅助消费者进行消息检索。这种设计最直接的好处是可以较少查找目标文件的时间,让消息以最快的速度落盘。对比Kafka存文件时,需要寻找消息所属的Partition文件,再完成写入。当Topic比较多时,这样的Partition寻址就会浪费非常多的时间。所以Kafka不太适合多Topic的场景。而RocketMQ的这种快速落盘的方式,在多Topic的场景下,优势就比较明显了。
在文件形式上:CommitLog文件的大小是固定的。文件名就是当前CommitLog文件当中存储的第一条消息的Offset。
ConsumeQueue文件主要是加速消费者进行消息索引。每个文件夹对应RocketMQ中的一个MessageQueue,文件夹下的文件记录了每个MessageQueue中的消息在CommitLog文件当中的偏移量。这样,消费者通过ConsumeQueue文件,就可以快速找到CommitLog文件中感兴趣的消息记录。而消费者在ConsumeQueue文件中的消费进度,会保存在config/consumerOffset.json文件当中。
IndexFile文件主要是辅助消费者进行消息索引。消费者进行消息消费时,通过ConsumeQueue文件就足够完成消息检索了,但是如果消费者指定时间戳进行消费,或者要按照MessageId或者MessageKey来检索文件,比如RocketMQ管理控制台的消息轨迹功能,ConsumeQueue文件就不够用了。IndexFile文件就是用来辅助这类消息检索的。他的文件名比较特殊,不是以消息偏移量命名,而是用的时间命名。但是其实,他也是一个固定大小的文件。
这是对RocketMQ存盘文件最基础的了解,但是只有这样的设计,是不足以支撑RocketMQ的三高性能的。RocketMQ如何保证ConsumeQueue、IndexFile两个索引文件与CommitLog中的消息对齐?如何保证消息断电不丢失?如何保证文件高效的写入磁盘?等等。如果你想要去抓住RocketMQ这些三高问题的核心设计,那么还是需要到源码当中去深究。
commitLog写入
消息存储的入口在: DefaultMessageStore.asyncPutMessage方法
CommitLog的asyncPutMessage方法中会给写入线程加锁,保证一次只会允许一个线程写入。写入消息的过程是串行的,一次只会允许一个线程写入。
最终进入CommitLog中的DefaultAppendMessageCallback#doAppend方法,这里就是Broker写入消息的实际入口。这个方法最终会把消息追加到MappedFile映射的一块内存里,并没有直接写入磁盘。而是在随后调用ComitLog#submitFlushRequest方法,提交刷盘申请。刷盘完成之后,内存中的文件才真正写入到磁盘当中。
在提交刷盘申请之后,就会立即调用CommitLog#submitReplicaRequest方法,发起主从同步申请。
文件同步刷盘与异步刷盘
入口:CommitLog.submitFlushRequest
这里涉及到了对于同步刷盘与异步刷盘的不同处理机制。这里有很多极致提高性能的设计,对于我们理解和设计高并发应用场景有非常大的借鉴意义。
同步刷盘和异步刷盘是通过不同的FlushCommitLogService的子服务实现的。
//org.apache.rocketmq.store.CommitLog的构造方法
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {this.flushCommitLogService = new GroupCommitService();
} else {this.flushCommitLogService = new FlushRealTimeService();
}
this.commitLogService = new CommitRealTimeService();
同步刷盘采用的是GroupCommitService子线程。虽然是叫做同步刷盘,但是从源码中能看到,他实际上并不是来一条消息就刷一次盘。而是这个子线程每10毫秒执行一次doCommit方法,扫描文件的缓存。只要缓存当中有消息,就执行一次Flush操作。
而异步刷盘采用的是FlushRealTimeService子线程。这个子线程最终也是执行Flush操作,只不过他的执行时机会根据配置进行灵活调整。所以可以看到,这里异步刷盘和同步刷盘的最本质区别,实际上是进行Flush操作的频率不同。
我们经常说使用RocketMQ的同步刷盘,可以保证Broker断电时,消息不会丢失。但是可以看到,RocketMQ并不可能真正来一条消息就进行一次刷盘,这样在海量数据下,操作系统是承受不了的。而只要不是来一次消息刷一次盘,那么在Broker直接断电的情况接下,就总是会有内存中的消息没有刷入磁盘的情况,这就会造成消息丢失。所以,对于消息安全性的设计,其实是重在取舍,无法做到绝对。
同步刷盘和异步刷盘最终落地到FileChannel的force方法。这个force方法就会最终调用一次操作系统的fsync系统调用,完成文件写入。
//org.apache.rocketmq.store.MappedFile#flush
public int flush(final int flushLeastPages) {if (this.isAbleToFlush(flushLeastPages)) {if (this.hold()) {int value = getReadPosition();
try {//We only append data to fileChannel or mappedByteBuffer, never both.if (writeBuffer != null || this.fileChannel.position() != 0) {this.fileChannel.force(false);} else {this.mappedByteBuffer.force();}} catch (Throwable e) {log.error("Error occurred when force data to disk.", e);}
this.flushedPosition.set(value);this.release();} else {log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());this.flushedPosition.set(getReadPosition());}}return this.getFlushedPosition();
}
另外一个CommitRealTimeService这个子线程则是用来写入堆外内存的。应用可以通过配置TransientStorePoolEnable参数开启堆外内存,如果开启了堆外内存,会在启动时申请一个跟CommitLog文件大小一致的堆外内存,这部分内存就可以确保不会被交换到虚拟内存中。而CommitRealTimeService处理消息的方式则只是调用mappedFileQueue的commit方法。这个方法只是往操作系统的PagedCache里写入消息,并不主动进行刷盘操作。会由操作系统通过Dirty Page机制,在某一个时刻进行统一刷盘。例如我们在正常关闭操作系统时,经常会等待很长时间。这里面大部分的时间其实就是在做PageCache的刷盘。
//org.apache.rocketmq.store.MappedFileQueue
public boolean commit(final int commitLeastPages) {boolean result = true;MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);if (mappedFile != null) {int offset = mappedFile.commit(commitLeastPages);long where = mappedFile.getFileFromOffset() + offset;result = where == this.committedWhere;this.committedWhere = where;}
return result;
}
在梳理同步刷盘与异步刷盘的具体实现时,可以看到一个小点,RocketMQ是如何让两个刷盘服务间隔执行的?RocketMQ提供了一个自己实现的CountDownLatch2工具类来提供线程阻塞功能,使用CAS驱动CountDownLatch2的countDown操作。每来一个消息就启动一次CAS,成功后,调用一次countDown。而这个CountDonwLatch2在Java.util.concurrent.CountDownLatch的基础上,实现了reset功能,这样可以进行对象重用。
CommigLog主从复制
入口:CommitLog.submitReplicaRequest
主从同步时,也体现到了RocketMQ对于性能的极致追求。最为明显的,RocketMQ整体是基于Netty实现的网络请求,而在主从复制这一块,却放弃了Netty框架,转而使用更轻量级的Java的NIO来构建。
在主要的HAService中,会在启动过程中启动三个守护进程。
//HAService#start
public void start() throws Exception {this.acceptSocketService.beginAccept();this.acceptSocketService.start();this.groupTransferService.start();this.haClient.start();
}
这其中与Master相关的是acceptSocketService和groupTransferService。其中acceptSocketService主要负责维护Master与Slave之间的TCP连接。groupTransferService主要与主从同步复制有关。而slave相关的则是haClient。
至于其中关于主从的同步复制与异步复制的实现流程,还是比较复杂的,有兴趣的同学可以深入去研究一下。
推荐一篇可供参考的博客 RocketMQ源码分析之主从数据复制-CSDN博客
分发ConsumeQueue和IndexFile
当CommitLog写入一条消息后,在DefaultMessageStore的start方法中,会启动一个后台线程reputMessageService。源码就定义在DefaultMessageStore中。这个后台线程每隔1毫秒就会去拉取CommitLog中最新更新的一批消息。如果发现CommitLog中有新的消息写入,就会触发一次doDispatch。
//org.apache.rocketmq.store.DefaultMessageStore中的ReputMessageService线程类
public void doDispatch(DispatchRequest req) {for (CommitLogDispatcher dispatcher : this.dispatcherList) {dispatcher.dispatch(req);}
}
dispatchList中包含两个关键的实现类CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex。源码就定义在DefaultMessageStore中。他们分别用来构建ConsumeQueue索引和IndexFile索引。
并且,如果服务异常宕机,会造成CommitLog和ConsumeQueue、IndexFile文件不一致,有消息写入CommitLog后,没有分发到索引文件,这样消息就丢失了。DefaultMappedStore的load方法提供了恢复索引文件的方法,入口在load方法。
过期文件删除机制
入口: DefaultMessageStore.addScheduleTask -> DefaultMessageStore.this.cleanFilesPeriodically()
在这个方法中会启动两个线程,cleanCommitLogService用来删除过期的CommitLog文件,cleanConsumeQueueService用来删除过期的ConsumeQueue和IndexFile文件。
在删除CommitLog文件时,Broker会启动后台线程,每60秒,检查CommitLog、ConsumeQueue文件。然后对超过72小时的数据进行删除。也就是说,默认情况下, RocketMQ只会保存3天内的数据。这个时间可以通过fileReservedTime来配置。
触发过期文件删除时,有两个检查的纬度,一个是,是否到了触发删除的时间,也就是broker.conf里配置的deleteWhen属性。另外还会检查磁盘利用率,达到阈值也会触发过期文件删除。这个阈值默认是72%,可以在broker.conf文件当中定制。但是最大值为95,最小值为10。
然后在删除ConsumeQueue和IndexFile文件时,会去检查CommitLog当前的最小Offset,然后在删除时进行对齐。
需要注意的是,RocketMQ在删除过期CommitLog文件时,并不检查消息是否被消费过。 所以如果有消息长期没有被消费,是有可能直接被删除掉,造成消息丢失的。
RocketMQ整个文件管理的核心入口在DefaultMessageStore的start方法中,整体流程总结如下:
文件索引结构
了解了大部分的文件写入机制之后,最后我们来理解一下RocketMQ的索引构建方式。
1、CommitLog文件的大小是固定的,但是其中存储的每个消息单元长度是不固定的,具体格式可以参考org.apache.rocketmq.store.CommitLog中计算消息长度的方法
protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;final int msgLen = 4 //TOTALSIZE+ 4 //MAGICCODE+ 4 //BODYCRC+ 4 //QUEUEID+ 4 //FLAG+ 8 //QUEUEOFFSET+ 8 //PHYSICALOFFSET+ 4 //SYSFLAG+ 8 //BORNTIMESTAMP+ bornhostLength //BORNHOST+ 8 //STORETIMESTAMP+ storehostAddressLength //STOREHOSTADDRESS+ 4 //RECONSUMETIMES+ 8 //Prepared Transaction Offset+ 4 + (bodyLength > 0 ? bodyLength : 0) //BODY+ 1 + topicLength //TOPIC+ 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength+ 0;return msgLen;
}
因为消息的记录大小不固定,所以RocketMQ在每次存CommitLog文件时,都会去检查当前CommitLog文件空间是否足够,如果不够的话,就重新创建一个CommitLog文件。文件名为当前消息的偏移量。
2、ConsumeQueue文件主要是加速消费者的消息索引。他的每个文件夹对应RocketMQ中的一个MessageQueue,文件夹下的文件记录了每个MessageQueue中的消息在CommitLog文件当中的偏移量。这样,消费者通过ComsumeQueue文件,就可以快速找到CommitLog文件中感兴趣的消息记录。而消费者在ConsumeQueue文件当中的消费进度,会保存在config/consumerOffset.json文件当中。
文件结构: 每个ConsumeQueue文件固定由30万个固定大小20byte的数据块组成,数据块的内容包括:msgPhyOffset(8byte,消息在文件中的起始位置)+msgSize(4byte,消息在文件中占用的长度)+msgTagCode(8byte,消息的tag的Hash值)。
msgTag是和消息索引放在一起的,所以消费者根据Tag过滤消息的性能是非常高的。
在ConsumeQueue.java当中有一个常量CQ_STORE_UNIT_SIZE=20,这个常量就表示一个数据块的大小。
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,final long cqOffset) {
if (offset + size <= this.maxPhysicOffset) {log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);return true;}
this.byteBufferIndex.flip();//在ConsumeQueue.java当中构建一条ConsumeQueue索引的方法中,记录一个单元块的数据this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);this.byteBufferIndex.putLong(offset);this.byteBufferIndex.putInt(size);this.byteBufferIndex.putLong(tagsCode);
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;... ...
}
3、IndexFile文件主要是辅助消息检索。他的作用主要是用来支持根据key和timestamp检索消息。他的文件名比较特殊,不是以消息偏移量命名,而是用的时间命名。但是其实,他也是一个固定大小的文件。
文件结构: 他的文件结构由 indexHeader(固定40byte)+ slot(固定500W个,每个固定20byte) + index(最多500W*4个,每个固定20byte) 三个部分组成。
indexFile的详细结构有大厂之前面试过,参考博文: RocketMQ之底层IndexFile存储协议_rocketmq index_roykingw的博客-CSDN博客
延迟消息机制
关注重点
延迟消息是RocketMQ非常有特色的一个功能,其他MQ产品中,往往需要开发者使用一些特殊方法来变相实现延迟消息功能。而RocketMQ直接在产品中实现了这个功能,开发者只需要设定一个属性就可以快速实现。
延迟消息的核心使用方法就是在Message中设定一个MessageDelayLevel参数,对应18个延迟级别。然后Broker中会创建一个默认的Schedule_Topic主题,这个主题下有18个队列,对应18个延迟级别。消息发过来之后,会先把消息存入Schedule_Topic主题中对应的队列。然后等延迟时间到了,再转发到目标队列,推送给消费者进行消费。
源码重点
延迟消息的处理入口在scheduleMessageService这个组件中。 会在broker启动时也一起加载。
1、消息写入到系统内置的Topic中
代码见CommitLog.putMessage方法。
在CommitLog写入消息时,会判断消息的延迟级别,然后修改Message的Topic和Queue,将消息转储到系统内部的Topic中,这样消息就对消费者不可见了。而原始的目标信息,会作为消息的属性,保存到消息当中。
//should be consistent with the old version
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Delivery// 延迟消息转到系统Topicif (msg.getDelayTimeLevel() > 0) {if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}
String topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));// 修改消息的Topic和Queue,转储到系统的Topic中msg.setTopic(topic);msg.setQueueId(queueId);}
}
2、消息转储到目标Topic
接下来就是需要过一点时间,再将消息转回到Producer提交的Topic和Queue中,这样就可以正常往消费者推送了。
这个转储的核心服务是scheduleMessageService,他也是Broker启动过程中的一个功能组件。随DefaultMessageStore组件一起构建。这个服务只在master节点上启动,而在slave节点上会主动关闭这个服务。
//org.apache.rocketmq.store.DefaultMessageStore
@Override
public void handleScheduleMessageService(final BrokerRole brokerRole) {if (this.scheduleMessageService != null) {if (brokerRole == BrokerRole.SLAVE) {this.scheduleMessageService.shutdown();} else {this.scheduleMessageService.start();}}
}
由于RocketMQ的主从节点支持切换,所以就需要考虑这个服务的幂等性。在节点切换为slave时就要关闭服务,切换为master时就要启动服务。并且,即便节点多次切换为master,服务也只启动一次。所以在ScheduleMessageService的start方法中,就通过一个CAS操作来保证服务的启动状态。
if (started.compareAndSet(false, true)) {
这个CAS操作还保证了在后面,同一时间只有一个DeliverDelayedMessageTimerTask执行。这种方式,给整个延迟消息服务提供了一个基础保证。
ScheduleMessageService会每隔1秒钟执行一个executeOnTimeup任务,将消息从延迟队列中写入正常Topic中。 代码见ScheduleMessageService中的DeliverDelayedMessageTimerTask.executeOnTimeup方法。
在executeOnTimeup方法中,就会去扫描SCHEDULE_TOPIC_XXXX这个Topic下的所有messageQueue,然后扫描这些MessageQueue对应的ConsumeQueue文件,找到没有处理过的消息,计算他们的延迟时间。如果延迟时间没有到,就等下一秒再重新扫描。如果延迟时间到了,就进行消息转储。将消息转回到原来的目标Topic下。
整个延迟消息的实现方式:
ScheduleMessageService中扫描延迟消息的主要逻辑:
//ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup
public void executeOnTimeup() {//找到延迟队列对应的ConsumeQueue文件ConsumeQueue cq =ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));
if (cq == null) {this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);return;}//通过计算,找到这一次扫描需要处理的的ConsumeQueue文件SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);... ...long nextOffset = this.offset;try {int i = 0;ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();//循环过滤ConsumeQueue文件当中的每一条消息索引for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {//解析每一条ConsumeQueue记录long offsetPy = bufferCQ.getByteBuffer().getLong();int sizePy = bufferCQ.getByteBuffer().getInt();long tagsCode = bufferCQ.getByteBuffer().getLong();
... ...//计算延迟时间long now = System.currentTimeMillis();long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);//延迟时间没到就等下一次扫描long countdown = deliverTimestamp - now;if (countdown > 0) {this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);return;}
... ...//时间到了就进行转储boolean deliverSuc;if (ScheduleMessageService.this.enableAsyncDeliver) {deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);} else {deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);}
if (!deliverSuc) {this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);return;}}//计算下一次扫描时的Offset起点nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);} catch (Exception e) {log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);} finally {bufferCQ.release();}//部署下一次扫描任务this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}
如果清楚了ConsumeQueue文件的结构,就可以很清晰的感受到RocketMQ其实就是在Broker端,像一个普通消费者一样去进行消费,然后扩展出了延迟消息的整个扩展功能。而这,其实也是很多互联网大厂对RocketMQ进行自定义功能扩展的很好的参考。
长轮询机制
功能回顾
RocketMQ对消息消费者提供了Push推模式和Pull拉模式两种消费模式。但是这两种消费模式的本质其实都是Pull拉模式,Push模式可以认为是一种定时的Pull机制。但是这时有一个问题,当使用Push模式时,如果RocketMQ中没有对应的数据,那难道一直进行空轮询吗?如果是这样的话,那显然会极大的浪费网络带宽以及服务器的性能,并且,当有新的消息进来时,RocketMQ也没有办法尽快通知客户端,而只能等客户端下一次来拉取消息了。针对这个问题,RocketMQ实现了一种长轮询机制 long polling。
长轮询机制简单来说,就是当Broker接收到Consumer的Pull请求时,判断如果没有对应的消息,不用直接给Consumer响应(给响应也是个空的,没意义),而是就将这个Pull请求给缓存起来。当Producer发送消息过来时,增加一个步骤去检查是否有对应的已缓存的Pull请求,如果有,就及时将请求从缓存中拉取出来,并将消息通知给Consumer。
源码重点
Consumer请求缓存,代码入口PullMessageProcessor#processRequest方法
PullRequestHoldService服务会随着BrokerController一起启动。
生产者线:从DefaultMessageStore.doReput进入
整个流程以及源码重点:
关于零拷贝与顺序写
刷盘机制保证消息不丢失
在操作系统层面,当应用程序写入一个文件时,文件内容并不会直接写入到硬件当中,而是会先写入到操作系统中的一个缓存PageCache中。PageCache缓存以4K大小为单位,缓存文件的具体内容。这些写入到PageCache中的文件,在应用程序看来,是已经完全落盘保存好了的,可以正常修改、复制等等。但是,本质上,PageCache依然是内存状态,所以一断电就会丢失。因此,需要将内存状态的数据写入到磁盘当中,这样数据才能真正完成持久化,断电也不会丢失。这个过程就称为刷盘。
Java当中使用FileOutputStream类或者BufferedWriter类,进行write操作,就是写入的Pagecache。
RocketMQ中通过fileChannel.commit方法写入消息,也是写入到Pagecache。
PageCache是源源不断产生的,而Linux操作系统显然不可能时时刻刻往硬盘写文件。所以,操作系统只会在某些特定的时刻将PageCache写入到磁盘。例如当我们正常关机时,就会完成PageCache刷盘。另外,在Linux中,对于有数据修改的PageCache,会标记为Dirty(脏页)状态。当Dirty Page的比例达到一定的阈值时,就会触发一次刷盘操作。例如在Linux操作系统中,可以通过/proc/meminfo文件查看到Page Cache的状态。
[root@192-168-65-174 ~]# cat /proc/meminfo
MemTotal: 16266172 kB
.....
Cached: 923724 kB
.....
Dirty: 32 kB
Writeback: 0 kB
.....
Mapped: 133032 kB
.....
但是,只要操作系统的刷盘操作不是时时刻刻执行的,那么对于用户态的应用程序来说,那就避免不了非正常宕机时的数据丢失问题。因此,操作系统也提供了一个系统调用,应用程序可以自行调用这个系统调用,完成PageCache的强制刷盘。在Linux中是fsync,同样我们可以用man 2 fsync 指令查看。
RocketMQ对于何时进行刷盘,也设计了两种刷盘机制,同步刷盘和异步刷盘。只需要在broker.conf中进行配置就行。
零拷贝加速文件读写
零拷贝(zero-copy)是操作系统层面提供的一种加速文件读写的操作机制,非常多的开源软件都在大量使用零拷贝,来提升IO操作的性能。对于Java应用层,对应着mmap和sendFile两种方式。
理解CPU拷贝和DMA拷贝
操作系统对于内存空间,是分为用户态和内核态的。用户态的应用程序无法直接操作硬件,需要通过内核空间进行操作转换,才能真正操作硬件。这其实是为了保护操作系统的安全。正因为如此,应用程序需要与网卡、磁盘等硬件进行数据交互时,就需要在用户态和内核态之间来回的复制数据。而这些操作,原本都是需要由CPU来进行任务的分配、调度等管理步骤的,早先这些IO接口都是由CPU独立负责,所以当发生大规模的数据读写操作时,CPU的占用率会非常高。
之后,操作系统为了避免CPU完全被各种IO调用给占用,引入了DMA(直接存储器存储)。由DMA来负责这些频繁的IO操作。DMA是一套独立的指令集,不会占用CPU的计算资源。这样,CPU就不需要参与具体的数据复制的工作,只需要管理DMA的权限即可。
DMA拷贝极大的释放了CPU的性能,因此他的拷贝速度会比CPU拷贝要快很多。但是,其实DMA拷贝本身,也在不断优化。
引入DMA拷贝之后,在读写请求的过程中,CPU不再需要参与具体的工作,DMA可以独立完成数据在系统内部的复制。但是,数据复制过程中,依然需要借助数据总进线。当系统内的IO操作过多时,还是会占用过多的数据总线,造成总线冲突,最终还是会影响数据读写性能。
为了避免DMA总线冲突对性能的影响,后来又引入了Channel通道的方式。Channel,是一个完全独立的处理器,专门负责IO操作。既然是处理器,Channel就有自己的IO指令,与CPU无关,他也更适合大型的IO操作,性能更高。
这也解释了,为什么Java应用层与零拷贝相关的操作都是通过Channel的子类实现的。这其实是借鉴了操作系统中的概念。
而所谓的零拷贝技术,其实并不是不拷贝,而是要尽量减少CPU拷贝。
再来理解下mmap文件映射机制是怎么回事
mmap机制的具体实现参见配套示例代码。主要是通过java.nio.channels.FileChannel的map方法完成映射。
以一次文件的读写操作为例,应用程序对磁盘文件的读与写,都需要经过内核态与用户态之间的状态切换,每次状态切换的过程中,就需要有大量的数据复制。
在这个过程中,总共需要进行四次数据拷贝。而磁盘与内核态之间的数据拷贝,在操作系统层面已经由CPU拷贝优化成了DMA拷贝。而内核态与用户态之间的拷贝依然是CPU拷贝。所以,在这个场景下,零拷贝技术优化的重点,就是内核态与用户态之间的这两次拷贝。
而mmap文件映射的方式,就是在用户态不再保存文件的内容,而只保存文件的映射,包括文件的内存起始地址,文件大小等。真实的数据,也不需要在用户态留存,可以直接通过操作映射,在内核态完成数据复制。
这个拷贝过程都是在操作系统的系统调用层面完成的,在Java应用层,其实是无法直接观测到的,但是我们可以去JDK源码当中进行间接验证。在JDK的NIO包中,java.nio.HeapByteBuffer映射的就是JVM的一块堆内内存,在HeapByteBuffer中,会由一个byte数组来缓存数据内容,所有的读写操作也是先操作这个byte数组。这其实就是没有使用零拷贝的普通文件读写机制。
HeapByteBuffer(int cap, int lim) { // package-privatesuper(-1, 0, lim, cap, new byte[cap], 0);/*hb = new byte[cap];offset = 0;*/
}
而NIO把包中的另一个实现类java.nio.DirectByteBuffer则映射的是一块堆外内存。在DirectByteBuffer中,并没有一个数据结构来保存数据内容,只保存了一个内存地址。所有对数据的读写操作,都通过unsafe魔法类直接交由内核完成,这其实就是mmap的读写机制。
最后,这种mmap的映射机制由于还是需要用户态保存文件的映射信息,数据复制的过程也需要用户态的参与,这其中的变数还是非常多的。所以,mmap机制适合操作小文件,如果文件太大,映射信息也会过大,容易造成很多问题。通常mmap机制建议的映射文件大小不要超过2G 。而RocketMQ的CommitLog文件保持在1G固定大小,也是为了方便文件映射。
梳理下sendFile机制是怎么运行的
sendFile机制的具体实现参见配套示例代码。主要是通过java.nio.channels.FileChannel的transferTo方法完成
sourceReadChannel.transferTo(0,sourceFile.length(),targetWriteChannel);
早期的sendfile实现机制其实还是依靠CPU进行页缓存与socket缓存区之间的数据拷贝。但是,在后期的不断改进过程中,sendfile优化了实现机制,在拷贝过程中,并不直接拷贝文件的内容,而是只拷贝一个带有文件位置和长度等信息的文件描述符FD,这样就大大减少了需要传递的数据。而真实的数据内容,会交由DMA控制器,从页缓存中打包异步发送到socket中。
最后,sendfile机制在内核态直接完成了数据的复制,不需要用户态的参与,所以这种机制的传输效率是非常稳定的。sendfile机制非常适合大数据的复制转移。
顺序写加速文件写入磁盘
通常应用程序往磁盘写文件时,由于磁盘空间不是连续的,会有很多碎片。所以我们去写一个文件时,也就无法把一个文件写在一块连续的磁盘空间中,而需要在磁盘多个扇区之间进行大量的随机写。这个过程中有大量的寻址操作,会严重影响写数据的性能。而顺序写机制是在磁盘中提前申请一块连续的磁盘空间,每次写数据时,就可以避免这些寻址操作,直接在之前写入的地址后面接着写就行。
Kafka官方详细分析过顺序写的性能提升问题。Kafka官方曾说明,顺序写的性能基本能够达到内存级别。而如果配备固态硬盘,顺序写的性能甚至有可能超过写内存。而RocketMQ很大程度上借鉴了Kafka的这种思想。
例如可以看下org.apache.rocketmq.store.CommitLog#DefaultAppendMessageCallback中的doAppend方法。在这个方法中,会以追加的方式将消息先写入到一个堆外内存byteBuffer中,然后再通过fileChannel写入到磁盘。
相关文章:

RocketMQ-源码架构二
梳理一些比较完整,比较复杂的业务线 消息持久化设计 RocketMQ的持久化文件结构 消息持久化也就是将内存中的消息写入到本地磁盘的过程。而磁盘IO操作通常是一个很耗性能,很慢的操作,所以,对消息持久化机制的设计,是…...

Unity_ET框架项目-斗地主_启动运行流程
unity_ET框架项目-斗地主_启动运行流程 项目源码地址: Viagi/LandlordsCore: ET斗地主Demohttps://github.com/Viagi/LandlordsCore下载项目到本地。 启动运行步骤: 下载目录如下: 1. VS(我用是2022版VisualStudio)…...

自动化测试框架 —— pytest框架入门篇
今天就给大家说一说pytest框架。 今天这篇文章呢,会从以下几个方面来介绍: 01、pytest框架介绍 pytest 是 python 的第三方单元测试框架,比自带 unittest 更简洁和高效,支持非常丰富的插件,同时兼容 unittest 框架。…...

String类详解
String类详解 大家好,我是免费搭建查券返利机器人赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿! 解密String类:探秘Java中的字符串魔法 在Java的世界里,String类犹如一位魔法…...

Linux高级管理--安装MySQL数据库系统
MySQL服务基础 MySQL.是一个真正的多线程、多用户的SQL数据库服务,凭借其高性能、高可靠和易于使 用的特性,成为服务器领域中最受欢迎的开源数据库系统。在2008年以前,MySOL项目由MySQL AB公司进行开发,发布和支持,之后…...

团建策划信息展示服务预约小程序效果如何
团建是中大型企业商家每年举办的员工活动,其形式多样化、具备全部参与的娱乐性。但在实际策划流程及内容时,部分公司便会难以入手,术业有专攻,这个时候团建策划公司便会发挥效果。 如拓展训练、露营、运动会、体育竞技等往往更具…...

一个Redis实例最多能存放多少keys
程序员的公众号:源1024,获取更多资料,无加密无套路! 最近整理了一份大厂面试资料《史上最全大厂面试题》,Springboot、微服务、算法、数据结构、Zookeeper、Mybatis、Dubbo、linux、Kafka、Elasticsearch、数据库等等 …...

K8S(四)—pod详解
目录 pod介绍Pod的概念:Pod的特性:Pod的配置:Pod的控制:示例 YAML 文件: pod启动流程问题 两种方式启动镜像的升级和回滚更新 Deployment:回滚检查 Deployment 历史版本回滚到之前的修订版本缩放 Deploymen…...

shiro Filter加载和执行 源码解析
一、背景 在使用若依框架(前后端不分离包含shiro安全框架)时,发现作者添加了验证码、登录帐号控制等自定义过滤器,于是对自定的过滤器加载和执行流程产生疑问。下面以验证码过滤器为例,对源码解析。注意类之间的继承关…...

IDEA上传jar包到Maven
mvn install:install-file //固定格式,maven的语法 -Dfilealibaba-sdk-1.0.0.jar //这里填写包的路径,因为我们是在当前目录所以只需要输入包名即可 -DgroupIdcom.qiehua.csdn //这里填写包的groupId,之后作为pom.xml中引用的gr…...

JavaScript——基本语法
1.定义变量: 变量类型 变量名 变量值 var关键字声明变量 es6版本以上 var 可写可不写 <script>// 定义变量:变量类型 变量名 变量值 var关键字声明变量 es6版本以上 var 可写可不写var num 2;</script>2.条件控制 <script>var …...

一款最近很火的开源低代码平台
低代码平台近年来获得大量融资的原因是多方面的。首先,低代码平台代表了软件开发领域的一个重要趋势,即通过简化编程过程来降低技术门槛,使非专业开发者也能构建应用程序。这为那些希望加速数字化转型的企业提供了新的可能性,因此…...

vue之代理配置devServer(vue.config.js)片段
关于vue.config.js的部分配置解析:首先看下面一段配置 devServer: { open: process.platform darwin,//true or false (true则启动项目自动打开系统自带浏览器) host: 0.0.0.0, // 配置devServer服务监听的地址 比如:想让局域网…...

CTD测试流程
连接 连接17Plus,用usb转232线,db9公针2、3分别接Data I/O的2、3。DB9的5接Data I/O的1。尼龙塞子打开状态。不用闭合。 软件连接 打开SeaTermAF V2,注意打开前先把串口插上,否则软件读不到串口。如果读不到,就在插…...

面试经典150题(15-19)
leetcode 150道题 计划花两个月时候刷完,今天(第七天)完成了5道(15-19)150: 今天这些都是我之前做过的,还有就是今天的全都是模拟过程。。所以做的还算快。 15(13. 罗马数字转整数) 题目描述&a…...

Linux下的网络服务
一般来说,各种操作系统在网络方面的性能比较是这样的顺序BSD>Linux>Win NT>Win 9X, 由此说来,Linux的网络功能仅次于UNIX,而强于Win NT和其它的视窗系列产品,对于Win2000我还不能评价太多,因为不是很熟。 Lin…...

制造业对于IT软硬件监控和摄像头故障监控的需求
制造业对于生产线的自动化和智能化需求较高,IT监控运维管理软件在制造业的应用也日益普及。监控易为制造业提供了一系列定制化的解决方案,助力企业实现生产线的智能化和高效化。 随着制造业的数字化转型和智能化升级,IT运维管理软件的需求也在…...

idea一些报错
java: 非法字符: \ufeff 使用IDEA修改文件编码 在IDEA右下角,将编码改为GBK,再转为UTF-8,重新启动项目。具体步骤如下: 在IDEA右下角找到UTF-8字样的编码格式设计项,点击选择第一项GBK,然后Convert…...

【Java系列】详解多线程(二)——Thread类及常见方法(上篇)
个人主页:兜里有颗棉花糖 欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 兜里有颗棉花糖 原创 收录于专栏【Java系列专栏】【JaveEE学习专栏】 本专栏旨在分享学习Java的一点学习心得,欢迎大家在评论区交流讨论💌 目录 一…...

Android Dialog 弹出时,隐藏 navigation bar
1、概述 一些场合,要求界面是全屏的,然而在全屏界面下,弹出dialog 又会导致虚拟按键栏重新弹出来,也是比较难受的。(而且查了非常多方法都是不能完美的解决这个问题,要么是压根不能用,要么是会闪一下虚拟栏…...

LeetCode(Hot100)——1:两数之和
方法1:暴力求解 利用两次for循环来处理, 外循环确定一个数字, 利用内循环不断求和来判断是否两数之和为target,来进行求解。 public class LeetCode1 {Test//测试方法public void test() {int [] nums{2,7,11,15};int target9;/…...

【Qt】报错error:undefined reference to `vtable for Consumer‘的解决方法
1. 问题原因 在创建完程序后,点击构建,显示编译错误。 错误问题如下: error: undefined reference to vtable在编译输出中查看显示如下: error:undefined reference to vtable for custom2. 原因分析 这个错误通常是因为 C 的虚函数表&am…...

【linux系统】用户功能与权限详细总结
前言 菜某的笔记总结,有错误还请指正。 linux用户的概念与root用户 这么理解:一台电脑有多个操作者,每个操作者只能无限制操作自己文件夹中的东西,其他地方的操作需要给与相应权限才能操作。 root用户:就是最高级的…...

ELK简单介绍二
学习目标 能够部署kibana并连接elasticsearch集群能够通过kibana查看elasticsearch索引信息知道用filebeat收集日志相对于logstash的优点能够安装filebeat能够使用filebeat收集日志并传输给logstash kibana kibana介绍 Kibana是一个开源的可视化平台,可以为ElasticSearch集群…...

video 标签 各种属性及所有事件监听
网页中的video 属性和事件,用于计算观看视频的时长,其他用法备存。 <!-- video 不支持 IE8及以下版本浏览器,支持三种视频格式:MP4,WebM 和 Ogg --><video src"test.mp4" controls width"400…...

TS中断言、转换的应用
1.TS 类型断言定义 把两种能有重叠关系的数据类型进行相互转换的一种 TS 语法,把其中的一种数据类型转换成另外一种数据类型。类型断言和类型转换产生的效果一样,但语法格式不同。 2.TS 类型断言语法格式 A 数据类型的变量 as B 数据类型 。 A 数据类…...

【代码随想录算法训练营-第四天】【链表】24,19, 面试题 02.07,142
24. 两两交换链表中的节点 第一遍-递归-小看了一下题解 思路: 读了两遍题目才理解…相邻节点的交换,这个操作很容易实现,但需要一个tmpNode因为是链表的题目,没开始思考之前先加了dummyNode,还真管用把dummyNode作为…...

代理设计模式
1. 代理模式 1.1 代理模式的原理分析 代理设计模式(Proxy Design Pattern)是一种结构型设计模式,它为其他对象提供一个代理对象,以控制对这个对象的访问。代理模式可以用于实现懒加载、安全访问控制、日志记录等功能。 代理模式…...

ubuntu安装docker及docker常用命令
docker里有三个部分 daemon 镜像 和 容器 我们需要了解的概念 容器 镜像 数据卷 文章目录 docker命令docker镜像相关命令docker容器相关命令数据卷ubuntu安装docker docker命令 #启动,停止,重启docker systemctl start docker systemctl stop docker s…...

STM32-TIM定时器输出比较
目录 一、输出比较简介 二、PWM简介 三、输出比较通道(通用) 四、输出比较通道(高级) 五、输出比较模式 六、PWM基本结构 七、PWM参数计算 八、外设介绍 8.1 舵机 8.2 直流电机及驱动 九、开发步骤 十、输出比较库函数…...