当前位置: 首页 > news >正文

【RocketMQ】消息的刷盘机制

刷盘策略

CommitLogasyncPutMessage方法中可以看到在写入消息之后,调用了submitFlushRequest方法执行刷盘策略:

public class CommitLog {public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {// ...try {// 获取上一次写入的文件MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();// ...// 写入消息result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);// ...} finally {beginTimeInLock = 0;putMessageLock.unlock();}// ...// 执行刷盘CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);// ...}
}

刷盘有两种策略:

  • 同步刷盘,表示消息写入到内存之后需要立刻刷到磁盘文件中。

    同步刷盘会构建GroupCommitRequest组提交请求并设置本次刷盘后的位置偏移量的值(写入位置偏移量+写入数据字节数),然后将请求添加到flushDiskWatcherGroupCommitService中进行刷盘。

  • 异步刷盘,表示消息写入内存成功之后就返回,由MQ定时将数据刷入到磁盘中,会有一定的数据丢失风险。

public class CommitLog {// 监控刷盘private final FlushDiskWatcher flushDiskWatcher;public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {// 是否是同步刷盘if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {// 获取GroupCommitServicefinal GroupCommitService service = (GroupCommitService) this.flushCommitLogService;// 是否等待if (messageExt.isWaitStoreMsgOK()) {// 构建组提交请求,传入本次刷盘后位置的偏移量:写入位置偏移量+写入数据字节数GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());// 添加到wather中flushDiskWatcher.add(request);// 添加到serviceservice.putRequest(request);// 返回return request.future();} else {service.wakeup();return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}}// 如果是异步刷盘else {if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {flushCommitLogService.wakeup();} else  {commitLogService.wakeup();}return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}}
}

同步刷盘

如果使用的是同步刷盘,首先获取了GroupCommitService,然后构建GroupCommitRequest组提交请求,将请求添加到flushDiskWatcherGroupCommitService中,其中flushDiskWatcher用于监控刷盘是否超时,GroupCommitService用于提交刷盘数据。

构建GroupCommitRequest提交请求

GroupCommitRequestCommitLog的内部类:

  • nextOffset:写入位置偏移量+写入数据字节数,也就是本次刷盘成功后应该对应的flush偏移量
  • flushOKFuture:刷盘结果
  • deadLine:刷盘的限定时间,值为当前时间 + 传入的超时时间,超过限定时间还未刷盘完毕会被认为超时
public class CommitLog {public static class GroupCommitRequest {private final long nextOffset;// 刷盘状态private CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>();private final long deadLine;// 刷盘的限定时间,超过限定时间还未刷盘完毕会被认为超时public GroupCommitRequest(long nextOffset, long timeoutMillis) {this.nextOffset = nextOffset;// 设置限定时间:当前时间 + 超时时间this.deadLine = System.nanoTime() + (timeoutMillis * 1_000_000);}public void wakeupCustomer(final PutMessageStatus putMessageStatus) {// 结束刷盘,设置刷盘状态this.flushOKFuture.complete(putMessageStatus);}public CompletableFuture<PutMessageStatus> future() {// 返回刷盘状态return flushOKFuture;}}
}

GroupCommitService处理刷盘

GroupCommitServiceCommitLog的内部类,从继承关系中可知它实现了Runnable接口,在run方法调用waitForRunning等待刷盘请求的提交,然后处理刷盘,不过这个线程是在什么时候启动的呢?

public class CommitLog {/*** GroupCommit Service*/class GroupCommitService extends FlushCommitLogService {// ...// run方法public void run() {CommitLog.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {// 等待刷盘请求的到来this.waitForRunning(10);// 处理刷盘this.doCommit();} catch (Exception e) {CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);}}// ...}}
}

刷盘线程的启动

在BrokerController的启动方法中,可以看到调用了messageStore的start方法,前面可知使用的是DefaultMessageStore,进入到DefaultMessageStore的start方法,它又调用了commitLog的start方法,在CommitLogstart方法中,启动了刷盘的线程和监控刷盘的线程:

public class BrokerController {public void start() throws Exception {if (this.messageStore != null) {// 启动this.messageStore.start();}// ...}
}public class DefaultMessageStore implements MessageStore {/*** @throws Exception*/public void start() throws Exception {// ...this.flushConsumeQueueService.start();// 调用CommitLog的启动方法this.commitLog.start();this.storeStatsService.start();// ...}
}public class CommitLog {private final FlushCommitLogService flushCommitLogService; // 刷盘private final FlushDiskWatcher flushDiskWatcher; // 监控刷盘private final FlushCommitLogService commitLogService; // commitLogServicepublic void start() {// 启动刷盘的线程this.flushCommitLogService.start();flushDiskWatcher.setDaemon(true);// 启动监控刷盘的线程flushDiskWatcher.start();if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {this.commitLogService.start();}}
}

刷盘请求的处理

既然知道了线程在何时启动的,接下来详细看一下GroupCommitService是如何处理刷盘提交请求的。

前面知道在GroupCommitService的run方法中,调用了waitForRunning方法等待刷盘请求,waitForRunningGroupCommitService父类ServiceThread中实现。ServiceThread是一个抽象类,实现了Runnable接口,里面使用了CountDownLatch进行线程间的通信,大小设为1。

waitForRunning方法在进入的时候先判断hasNotified是否为true(已通知),并尝试将其更新为false(未通知),由于hasNotified的初始化值为false,所以首次进入的时候条件不成立,不会进入到这个处理逻辑,会继续执行后面的代码。

接着调用 waitPoint的reset方法将其重置为1,并调用waitPoint的await方法进行等待:

// ServiceThread
public abstract class ServiceThread implements Runnable {// 是否通知,初始化为falseprotected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);// CountDownLatch用于线程间的通信protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);// 等待运行protected void waitForRunning(long interval) {// 判断hasNotified是否为true,并尝试将其更新为falseif (hasNotified.compareAndSet(true, false)) {// 调用onWaitEndthis.onWaitEnd();return;}// 重置waitPoint的值,也就是值为1waitPoint.reset();try {// 会一直等待waitPoint值降为0waitPoint.await(interval, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {log.error("Interrupted", e);} finally {// 是否被通知设置为falsehasNotified.set(false);this.onWaitEnd();}}
}

一、添加刷盘请求,唤醒刷盘线程

上面可知需要刷盘的时候调用了GroupCommitServiceputRequest方法添加刷盘请求,在putRequest方法中,将刷盘请求GroupCommitRequest添加到了requestsWrite组提交写请求链表中,然后调用wakeup方法唤醒刷盘线程,wakeup方法在它的父类ServiceThread中实现。

wakeup方法中可以看到,首先将hasNotified更改为了true表示处于已通知状态,然后调用了countDown方法,此时waitPoint值变成0,就会唤醒之前waitForRunning方法中一直在等待的线程。

public class CommitLog {/*** 组提交Service*/class GroupCommitService extends FlushCommitLogService {// 组提交写请求链表private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();// ...// 添加提交请求public synchronized void putRequest(final GroupCommitRequest request) {// 加锁lock.lock();try {// 加入到写请求链表this.requestsWrite.add(request);} finally {lock.unlock();}// 唤醒线程执行提交任务this.wakeup();}   // ...}}// ServiceThread
public abstract class ServiceThread implements Runnable {// CountDownLatch用于线程间的通信protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);// 唤醒刷盘线程public void wakeup() {// 更改状态为已通知状态if (hasNotified.compareAndSet(false, true)) {// waitPoint的值减1,由于大小设置为1,减1之后变为0,会唤醒等待的线程waitPoint.countDown(); }}// ...
}

二、线程被唤醒,执行刷盘前的操作

waitForRunning方法中的await方法一直在等待countdown的值变为0,当上一步调用了wakeup后,就会唤醒该线程,然后开始往下执行,在finally中可以看到将是否被通知hasNotified又设置为了false,然后调用了onWaitEnd方法,GroupCommitService方法中重写了该方法,里面又调用了swapRequests方法将读写请求列表的数据进行了交换,putRequest方法中将提交的刷盘请求放在了写链表中,经过交换,数据会被放在读链表中,后续进行刷盘时会从读链表中获取请求进行处理

// ServiceThread
public abstract class ServiceThread implements Runnable {// CountDownLatchprotected final CountDownLatch2 waitPoint = new CountDownLatch2(1);// 等待运行protected void waitForRunning(long interval) {if (hasNotified.compareAndSet(true, false)) {// 交换this.onWaitEnd();return;}// 重置waitPoint.reset();try {// 会一直等待countdown为0waitPoint.await(interval, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {log.error("Interrupted", e);} finally {// 是否被通知设置为falsehasNotified.set(false);this.onWaitEnd();}}
}public class CommitLog {/*** 组提交Service*/class GroupCommitService extends FlushCommitLogService {// 组提交写请求链表private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();// 组提交读请求链表private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>();@Overrideprotected void onWaitEnd() {// 交换读写请求列表的数据请求this.swapRequests();}private void swapRequests() {// 加锁lock.lock();try {// 将读写请求链表的数据进行交换LinkedList<GroupCommitRequest> tmp = this.requestsWrite;this.requestsWrite = this.requestsRead;this.requestsRead = tmp;} finally {lock.unlock();}}// ...}
}

这里使用读写链表进行交换应该是为了提升性能,如果只使用一个链表,在提交请求的时候需要往链表中添加请求,此时需要加锁,而刷盘线程在处理完请求之后是需要从链表中移除请求的,假设添加请求时加的锁还未释放,刷盘线程就要一直等待,而添加和处理完全可以同时进行,所以使用了两个链表,在添加请求的时候使用写链表,处理请求的时候对读写链表的数据进行交换使用读链表,这样只需在交换数据的时候加锁,以此来提升性能。

三、执行刷盘

waitForRunning执行完毕后,会回到GroupCommitService中的run方法开始继续往后执行代码,从代码中可以看到接下来会调用doCommit方法执行刷盘。

doCommit方法中对读链表中的数据进行了判空,如果不为空,进行遍历处理每一个提交请求,处理逻辑如下:

  1. 获取CommitLog映射文件记录的刷盘位置偏移量flushedWhere,判断是否大于请求设定的刷盘位置偏移量nextOffset,正常情况下flush的位置应该小于本次刷入数据后的偏移量,所以如果flush位置大于等于本次请求设置的flush偏移量,本次将不能进行刷盘

  1. 开启一个循环,调用mappedFileQueueflush方法执行刷盘(具体的实现在异步刷盘的时候再看),由于CommitLog大小为1G,所以本次刷完之后,如果当前已经刷入的偏移量小于请求设定的位置,表示数据未刷完,需要继续刷,反之表示数据已经刷完,flushOK为true,for循环条件不满足结束执行。

  2. 请求处理之后会清空读链表。

public class CommitLog {/*** 组提交Service*/class GroupCommitService extends FlushCommitLogService {  // 运行public void run() {CommitLog.log.info(this.getServiceName() + " service started");// 如果没有停止while (!this.isStopped()) {try {// 等待唤醒刷盘线程this.waitForRunning(10);// 进行提交this.doCommit();} catch (Exception e) {CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);}}// 睡眠10毫秒try {Thread.sleep(10);} catch (InterruptedException e) {CommitLog.log.warn(this.getServiceName() + " Exception, ", e);}synchronized (this) {this.swapRequests();}// 停止之前提交一次this.doCommit();CommitLog.log.info(this.getServiceName() + " service end");}// 提交刷盘private void doCommit() {// 如果不为空if (!this.requestsRead.isEmpty()) {// 遍历刷盘请求for (GroupCommitRequest req : this.requestsRead) {// 获取映射文件的flush位置,判断是否大于请求设定的刷盘位置boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();for (int i = 0; i < 2 && !flushOK; i++) {// 进行刷盘CommitLog.this.mappedFileQueue.flush(0);// 由于CommitLog大小为1G,所以本次刷完之后,如果当前已经刷入的偏移量小于请求设定的位置,表示数据未刷完,需要继续刷,反之表示数据已经刷完,flushOK为true,for循环条件不满足结束执行flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();}// 设置刷盘结果req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);}long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();if (storeTimestamp > 0) {CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);}// 请求处理完之后清空链表this.requestsRead = new LinkedList<>();} else {// Because of individual messages is set to not sync flush, it// will come to this processCommitLog.this.mappedFileQueue.flush(0);}}}}

刷盘超时监控

FlushDiskWatcher用于监控刷盘请求的耗时,它也继承了ServiceThread,在Broker启动时开启了该线程,在run方法中,使用while循环,只要服务未停止,会一直从阻塞队列中获取提交的刷盘请求,开启while循环隔一段时间判断一下刷盘是否完成,如果未完成,会做如下判断:

  1. 使用当前时间减去请求设置的刷盘截止时间,如果已经超过截止时间,说明刷盘时间已经超时,调用wakeupCustomer方法设置刷盘结果为已超时
  2. 如果未超时,为了避免当前线程频繁的进行判断,将当前线程睡眠一会儿,睡眠的计算方式是使用刷盘请求设置的截止时间 - 当前时间,表示剩余的时间,然后除以1000000化为毫秒,得到距离刷盘截止时间的毫秒数sleepTime:
    • sleepTime如果为0,只能是当前时间等于截止时间,也就是到了截止时间,此时同样调用wakeupCustomer方法设置刷盘结果为已超时
    • sleepTime不为0,在10毫秒和sleepTime的值之间取较小的那个作为睡眠的毫秒数将当前线程睡眠,等待刷盘任务执行
public class FlushDiskWatcher extends ServiceThread {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);// 阻塞队列,存放提交请求private final LinkedBlockingQueue<GroupCommitRequest> commitRequests = new LinkedBlockingQueue<>();@Overridepublic String getServiceName() {return FlushDiskWatcher.class.getSimpleName();}@Overridepublic void run() {// 如果未停止while (!isStopped()) {GroupCommitRequest request = null;try {// 从阻塞队列中获取提交请求request = commitRequests.take();} catch (InterruptedException e) {log.warn("take flush disk commit request, but interrupted, this may caused by shutdown");continue;}// 如果还未完成while (!request.future().isDone()) {long now = System.nanoTime();// 如果已经超时if (now - request.getDeadLine() >= 0) {// 设置刷盘结果为超时request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT);break;}// 避免频繁的判断,使用(截止时间 - 当前时间)/1000000 计算一个毫秒数long sleepTime = (request.getDeadLine() - now) / 1_000_000;// 在计算的毫秒数与10之间取最小的sleepTime = Math.min(10, sleepTime);// 如果sleepTime为0表示已经到了截止时间if (sleepTime == 0) {// 设置刷盘结果为超时request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT);break;}try {// 睡眠等待刷盘任务的执行Thread.sleep(sleepTime);} catch (InterruptedException e) {log.warn("An exception occurred while waiting for flushing disk to complete. this may caused by shutdown");break;}}}}
}

异步刷盘

上面讲解了同步刷盘,接下来去看下异步刷盘,首先会判断是否使用了暂存池,如果未开启调用flushCommitLogServicewakeup唤醒刷盘线程,否则使用commitLogService先将数据写入到FileChannel,然后统一进行刷盘:

 public class CommitLog {private final FlushDiskWatcher flushDiskWatcher;public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {// 是否是同步刷盘if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {// ...}// 如果是异步刷盘else {// 如果未使用暂存池if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {// 唤醒刷盘线程进行刷盘flushCommitLogService.wakeup();} else  {// 如果使用暂存池,使用commitLogService,先将数据写入到FILECHANNEL,然后统一进行刷盘commitLogService.wakeup();}// 返回结果return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}}
}

CommitLog的构造函数中可以看到,commitLogService使用的是CommitRealTimeService进行实例化的,flushCommitLogService需要根据设置决定使用哪种类型进行实例化:

  • 如果是同步刷盘,使用GroupCommitService,由前面的同步刷盘可知,使用的就是GroupCommitService进行刷盘的。
  • 如果是异步刷盘,使用FlushRealTimeService

所以接下来需要关注CommitRealTimeServiceFlushRealTimeService

public class CommitLog {    private final FlushCommitLogService flushCommitLogService;// 刷盘Serviceprivate final FlushCommitLogService commitLogService;public CommitLog(final DefaultMessageStore defaultMessageStore) {// 如果设置的同步刷盘if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {// 使用GroupCommitServicethis.flushCommitLogService = new GroupCommitService();} else {// 使用FlushRealTimeServicethis.flushCommitLogService = new FlushRealTimeService();}// commitLogServicethis.commitLogService = new CommitRealTimeService();}
}

CommitRealTimeService

在开启暂存池时,会使用CommitRealTimeService,它继承了FlushCommitLogService,所以会实现run方法,处理逻辑如下:

  1. 从配置信息中获取提交间隔每次提交的最少页数两次提交的最大间隔时间
  2. 如果当前时间大于上次提交时间+两次提交的最大间隔时间,意味着已经有比较长的一段时间没有进行提交了,需要尽快刷盘,此时将每次提交的最少页数设置为0不限制提交页数
  3. 调用mappedFileQueuecommit方法进行提交,并返回提交的结果:
    • 如果结果为true表示未提交任何数据
    • 如果结果为false表示进行了数据提交,需要等待刷盘
  4. 判断提交返回结果是否返回false,如果是调用flushCommitLogService的wakeup方法唤醒刷盘线程,进行刷盘
  5. 调用waitForRunning等待下一次提交处理
class CommitRealTimeService extends FlushCommitLogService {// 上次提交时间戳private long lastCommitTimestamp = 0;@Overridepublic void run() {CommitLog.log.info(this.getServiceName() + " service started");// 如果未停止while (!this.isStopped()) {// 获取提交间隔int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();// 一次提交的最少页数int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();// 两次提交的最大间隔时间int commitDataThoroughInterval =CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();// 开始时间long begin = System.currentTimeMillis();// 如果当前时间大于上次提交时间+提交的最大间隔时间if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {this.lastCommitTimestamp = begin; // 提交时间commitDataLeastPages = 0;// 最少提交页数设为0,表示不限制提交页数}try {// 提交boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);// 提交结束时间long end = System.currentTimeMillis();// 如果返回false表示提交了一部分数据但是还未进行刷盘if (!result) {// 再次更新提交时间戳this.lastCommitTimestamp = end;// 唤醒flush线程进行刷盘flushCommitLogService.wakeup();}if (end - begin > 500) {log.info("Commit data to file costs {} ms", end - begin);}// 等待下一次提交this.waitForRunning(interval);} catch (Throwable e) {CommitLog.log.error(this.getServiceName() + " service has exception. ", e);}}boolean result = false;for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {result = CommitLog.this.mappedFileQueue.commit(0);CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));}CommitLog.log.info(this.getServiceName() + " service end");}}

提交

提交的方法在MappedFileQueuecommit方法中实现,处理逻辑如下:

  1. 根据记录的CommitLog文件提交位置的偏移量获取映射文件,如果获取不为空,调用MappedFile的commit方法进行提交,然后返回本次提交数据的偏移量
  2. 记录本次提交的偏移量:文件的偏移量 + 提交数据的偏移量
  3. 判断本次提交的偏移量是否等于上一次的提交偏移量,如果等于表示本次未提交任何数据,返回结果置为true,否则表示提交了数据,等待刷盘,返回结果为false
  4. 更新上一次提交偏移量committedWhere的值为本次的提交偏移量的值
public class MappedFileQueue {protected long flushedWhere = 0; // flush的位置偏移量private long committedWhere = 0; // 提交的位置偏移量public boolean commit(final int commitLeastPages) {boolean result = true;// 根据提交位置的偏移量获取映射文件MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);if (mappedFile != null) {// 调用mappedFile的commit方法进行提交,返回提交数据的偏移量int offset = mappedFile.commit(commitLeastPages);// 记录本次提交的偏移量:文件的偏移量 + 提交数据的偏移量long where = mappedFile.getFileFromOffset() + offset;// 设置返回结果,如果本次提交偏移量等于上一次的提交偏移量为true,表示什么也没干,否则表示提交了数据,等待刷盘result = where == this.committedWhere;// 更新上一次提交偏移量的值为本次的this.committedWhere = where;}return result;}
}

MappedFile

MappedFile中记录CommitLog的写入位置wrotePosition、提交位置committedPosition以及flush位置flushedPosition,在commit方法中,调用了isAbleToCommit判断是否可以提交数据,判断的流程如下:

  1. 获取提交数据的位置偏移量和写入数据的位置偏移量

  2. 如果最少提交页数大于0,计算本次写入的页数是否大于或等于最少提交页数

    本次写入数据的页数计算方法:写入位置/页大小 - flush位置/页大小

  3. 如果以上条件都满足,判断写入位置是否大于flush位置,如果大于表示有一部数据未flush可以进行提交

满足提交条件后,就会调用commit0方法提交数据,将数据写入到fileChannel中:

public class MappedFile extends ReferenceResource {// 数据写入位置protected final AtomicInteger wrotePosition = new AtomicInteger(0);// 数据提交位置protected final AtomicInteger committedPosition = new AtomicInteger(0);// 数据flush位置private final AtomicInteger flushedPosition = new AtomicInteger(0);// 提交数据public int commit(final int commitLeastPages) {// 如果writeBuffer为空if (writeBuffer == null) {// 不需要提交任何数据到,返回之前记录的写入位置return this.wrotePosition.get();}// 如果可以提交数据if (this.isAbleToCommit(commitLeastPages)) {if (this.hold()) {// 提交数据commit0();this.release();} else {log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());}}// All dirty data has been committed to FileChannel.if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {this.transientStorePool.returnBuffer(writeBuffer);this.writeBuffer = null;}// 返回提交位置return this.committedPosition.get();}// 是否可以提交数据protected boolean isAbleToCommit(final int commitLeastPages) {// 获取提交数据的位置偏移量int flush = this.committedPosition.get();// 获取写入数据的位置偏移量int write = this.wrotePosition.get();if (this.isFull()) {return true;}// 如果最少提交页数大于0if (commitLeastPages > 0) {// 写入位置/页大小 - flush位置/页大小 是否大于至少提交的页数return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;}// 判断是否需要flush数据return write > flush;}protected void commit0() {// 获取写入位置int writePos = this.wrotePosition.get();// 获取上次提交的位置int lastCommittedPosition = this.committedPosition.get();if (writePos - lastCommittedPosition > 0) {try {// 创建共享缓冲区ByteBuffer byteBuffer = writeBuffer.slice();// 设置上一次提交位置byteBuffer.position(lastCommittedPosition);byteBuffer.limit(writePos);this.fileChannel.position(lastCommittedPosition);// 数据写入fileChannelthis.fileChannel.write(byteBuffer);// 更新写入的位置this.committedPosition.set(writePos);} catch (Throwable e) {log.error("Error occurred when commit data to FileChannel.", e);}}}
}

FlushRealTimeService

如果未开启暂存池,会直接使用FlushRealTimeService进行刷盘,当然如果开启暂存池,写入一批数据后,同样会使用FlushRealTimeService进行刷盘FlushRealTimeService同样继承了FlushCommitLogService,是用于执行刷盘的线程,处理逻辑与提交刷盘数据逻辑相似,只不过不是提交数据,而是调用flush方法将提交的数据刷入磁盘:

  1. 从配置信息中获取flush间隔每次flush的最少页数两次flush的最大间隔时间
  2. 如果当前时间大于上次flush时间+两次flush的最大间隔时间,意味着已经有比较长的一段时间没有进行flush,此时将每次flush的最少页数设置为0不限制flush页数
  3. 调用waitForRunning等待被唤醒
  4. 如果被唤醒,调用mappedFileQueueflush方法进行刷盘
class FlushRealTimeService extends FlushCommitLogService {private long lastFlushTimestamp = 0; // 上一次flush的时间private long printTimes = 0;public void run() {CommitLog.log.info(this.getServiceName() + " service started");// 如果未停止while (!this.isStopped()) {// boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();// 获取flush间隔int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();// flush至少包含的页数int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();// 两次flush的时间间隔int flushPhysicQueueThoroughInterval =CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();boolean printFlushProgress = false;long currentTimeMillis = System.currentTimeMillis();// 如果当前毫秒数 大于上次flush时间 + 两次flush之间的间隔if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {this.lastFlushTimestamp = currentTimeMillis; // 更新flush时间flushPhysicQueueLeastPages = 0; // flush至少包含的页数置为0printFlushProgress = (printTimes++ % 10) == 0;}try {// if (flushCommitLogTimed) {// 睡眠Thread.sleep(interval);} else {// 等待flush被唤醒this.waitForRunning(interval);}if (printFlushProgress) {// 打印刷盘进程this.printFlushProgress();}long begin = System.currentTimeMillis();// 进行刷盘CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();if (storeTimestamp > 0) {CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);}long past = System.currentTimeMillis() - begin;if (past > 500) {log.info("Flush data to disk costs {} ms", past);}} catch (Throwable e) {CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);this.printFlushProgress();}}// 如果服务停止,确保数据被刷盘boolean result = false;for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {// 进行刷盘result = CommitLog.this.mappedFileQueue.flush(0);CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));}this.printFlushProgress();CommitLog.log.info(this.getServiceName() + " service end");}

刷盘

刷盘的方法在MappedFileQueueflush方法中实现,处理逻辑如下:

  1. 根据 flush的位置偏移量获取映射文件
  2. 调用mappedFile的flush方法进行刷盘,并返回刷盘后的位置偏移量
  3. 计算最新的flush偏移量
  4. 更新flushedWhere的值为最新的flush偏移量
public class MappedFileQueue {protected long flushedWhere = 0; // flush的位置偏移量private long committedWhere = 0; // 提交的位置偏移量// flush刷盘public boolean flush(final int flushLeastPages) {boolean result = true;// 获取flush的位置偏移量映射文件MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);if (mappedFile != null) {// 获取时间戳long tmpTimeStamp = mappedFile.getStoreTimestamp();// 调用MappedFile的flush方法进行刷盘,返回刷盘后的偏移量int offset = mappedFile.flush(flushLeastPages);// 计算最新的flush偏移量long where = mappedFile.getFileFromOffset() + offset;result = where == this.flushedWhere;// 更新flush偏移量this.flushedWhere = where;if (0 == flushLeastPages) {this.storeTimestamp = tmpTimeStamp;}}// 返回flush的偏移量return result;}
}

flush的逻辑也与commit方法的逻辑类似:

  1. 调用isAbleToFlush判断是否满足刷盘条件,获取上次flush位置偏移量和当前写入位置偏移量进行如下校验:

    • 文件是否已写满,即文件大小是否与写入数据位置相等,如果相等说明文件已经写满需要执行刷盘,满足刷盘条件

    • 如果最少flush页数大于0,计算本次flush的页数是否大于或等于最少flush页数,如果满足可以进行刷盘

      本次flush数据的页数计算方法:写入位置/页大小 - flush位置/页大小

    • 如果写入位置偏移量是否大于flush位置偏移量,如果大于表示有数据未进行刷盘,满足刷盘条件

  2. 调用fileChannel的force或者mappedByteBuffer的force方法进行刷盘

  3. 记录本次flush的位置,并作为结果返回

public class MappedFile extends ReferenceResource {protected final AtomicInteger wrotePosition = new AtomicInteger(0);protected final AtomicInteger committedPosition = new AtomicInteger(0);private final AtomicInteger flushedPosition = new AtomicInteger(0);/*** 进行刷盘并返回flush后的偏移量*/public int flush(final int flushLeastPages) {// 是否可以刷盘if (this.isAbleToFlush(flushLeastPages)) {if (this.hold()) {int value = getReadPosition();try {// 如果writeBuffer不为空if (writeBuffer != null || this.fileChannel.position() != 0) {// 将数据刷到硬盘this.fileChannel.force(false);} else {this.mappedByteBuffer.force();}} catch (Throwable e) {log.error("Error occurred when force data to disk.", e);}// 记录flush位置this.flushedPosition.set(value);this.release();} else {log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());this.flushedPosition.set(getReadPosition());}}// 返回flush位置return this.getFlushedPosition();}// 是否可以刷盘private boolean isAbleToFlush(final int flushLeastPages) {// 获取上次flush位置int flush = this.flushedPosition.get();// 写入位置偏移量int write = getReadPosition();if (this.isFull()) {return true;}// 如果flush的页数大于0,校验本次flush的页数是否满足条件if (flushLeastPages > 0) {// 本次flush的页数:写入位置偏移量/OS_PAGE_SIZE - 上次flush位置偏移量/OS_PAGE_SIZE,是否大于flushLeastPagesreturn ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;} // 写入位置偏移量是否大于flush位置偏移量return write > flush;}// 文件是否已写满public boolean isFull() {// 文件大小是否与写入数据位置相等return this.fileSize == this.wrotePosition.get();}/*** 返回当前有效数据的位置*/public int getReadPosition() {// 如果writeBuffer为空使用写入位置,否则使用提交位置return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();}
}

总结

参考
丁威、周继锋《RocketMQ技术内幕》

RocketMQ版本:4.9.3

相关文章:

【RocketMQ】消息的刷盘机制

刷盘策略 CommitLog的asyncPutMessage方法中可以看到在写入消息之后&#xff0c;调用了submitFlushRequest方法执行刷盘策略&#xff1a; public class CommitLog {public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {// …...

AMBA-AXI(一)burst 传输-INCR/WRAP/Fixed

&#x1f4a1;Note&#xff1a;本文是根据AXI协议IHI0022F_b_amba_axi_protocol_spec.pdf&#xff08;issue F&#xff09;整理的。主要是分享AXI3.0和4.0部分。如果内容有问题请大家在评论区中指出&#xff0c;有补充或者疑问也可以发在评论区&#xff0c;互相学习&#x1f64…...

Java知识复习(八)Spring基础

1、什么是Spring框架&#xff1f; Spring &#xff1a;是一款开源的轻量级 Java 开发框架&#xff0c;旨在提高开发人员的开发效率以及系统的可维护性 2、Spring、SpringMVC和SpringBoot的区别 Spring主要指Spring Framework&#xff0c;就是指如上图所示的各项功能模块Spr…...

WuThreat身份安全云-TVD每日漏洞情报-2023-02-27

漏洞名称:OTFCC 缓冲区错误漏洞 漏洞级别:中危 漏洞编号:CVE-2022-35060,CNVD-2023-11996,CNNVD-202209-1527 相关涉及:OTFCC OTFCC 漏洞状态:EXP 参考链接:https://tvd.wuthreat.com/#/listDetail?TVD_IDTVD-2022-23648 漏洞名称:MuYucms 存在任意代码执行漏洞 漏洞级别:高危…...

上海交大陈海波教授、夏虞斌教授领衔巨作上市:《操作系统:原理与实现》

❤️作者主页&#xff1a;小虚竹 ❤️作者简介&#xff1a;大家好,我是小虚竹。2022年度博客之星评选TOP 10&#x1f3c6;&#xff0c;Java领域优质创作者&#x1f3c6;&#xff0c;CSDN博客专家&#x1f3c6;&#xff0c;华为云享专家&#x1f3c6;&#xff0c;掘金年度人气作…...

dpi数据接入shell脚

原文&#xff1a;dpi数据接入shell脚本_weixin_34416754的博客-CSDN博客 ##############从ftp服务器拿数据文件 #!/bin/bash #获取感知优良率DPI数据 #DCN服务器信息 uSichuan pS988188# ip137.192.5.53 #获取日期&#xff0c;根据日期抓取文件 Tdate -d "3 days ago&…...

Easyrecovery数据恢复软件工作原理及使用介绍教程

Easyrecovery是一款强大的数据恢复软件&#xff0c;它专门解决磁盘数据恢复问题。在计算机世界里&#xff0c;数据丢失经常是一件令人头疼的事情&#xff0c;但是有了Easyrecovery&#xff0c;您可以放心大胆地享受数据备份和恢复的乐趣。EasyRecovery使用Ontrack公司复杂的模式…...

【面试题】社招中级前端笔试面试题总结

大厂面试题分享 面试题库后端面试题库 &#xff08;面试必备&#xff09; 推荐&#xff1a;★★★★★地址&#xff1a;前端面试题库typeof null 的结果是什么&#xff0c;为什么&#xff1f;typeof null 的结果是Object。在 JavaScript 第一个版本中&#xff0c;所有值都存储在…...

设备运行状况不能远程手机查看。难道就妥协吗?为何不试试这个办法

一、背景 随着国家经济结构逐步调整&#xff0c;纺织行业自动化、智能化水平逐步提高&#xff0c;业内竞争程度也将加大&#xff1b;整个市场变化快&#xff0c;并呈现出智能化、通用化、网络化、复杂化的新发展趋势。客户订单小批量、个性化、快速交货的特点越来越明显&#…...

重新认识 Java 中的内存映射(mmap)

mmap 基础概念 mmap 是一种内存映射文件的方法&#xff0c;即将一个文件映射到进程的地址空间&#xff0c;实现文件磁盘地址和一段进程虚拟地址的映射。实现这样的映射关系后&#xff0c;进程就可以采用指针的方式读写操作这一段内存&#xff0c;而系统会自动回写脏页到对应的文…...

224. 基本计算器

224. 基本计算器给你一个字符串表达式 s &#xff0c;请你实现一个基本计算器来计算并返回它的值。注意:不允许使用任何将字符串作为数学表达式计算的内置函数&#xff0c;比如 eval() 。 示例 1&#xff1a;输入&#xff1a;s "1 1"输出&#xff1a;2示例 2&#…...

微信小程序通过 node 连接 mysql——方法,简要原理,及一些常见问题

前言 博主自己在22年夏天根据课程要求做了一个小程序连接阿里云服务器的案例&#xff0c;在最近又碰到了相应的需求。 原参考文章&#xff1a;微信小程序 Node连接本地MYSQL_微信小程序nodejs连接数据库_JJJenny0607的博客-CSDN博客 ,还请多多支持原作者&#xff01; 第二次…...

uni-app项目搭建和代码托管

文章目录一、项目搭建步骤一、HBuilder X 创建uniapp项目步骤二、开启微信小程序服务端口步骤三、把项目运行到微信小程序步骤四、解决警告二、使用Git管理项目2-1、本地管理2-2、托管到码云一、项目搭建 步骤一、HBuilder X 创建uniapp项目 步骤二、开启微信小程序服务端口 步…...

win10+python3.6+cuda9+pytorch1.1.0安装

为了让torch可以使用显卡GPU加速&#xff0c;需要安装对应版本的cudatoolkit和pytorch。这里我的nvidia显卡驱动是9.1版本&#xff0c;只能安装cudatoolkit9。 一般支持gpu加速的显卡大部分都是英伟达nvidia系列&#xff0c;都自带了nvidia驱动&#xff0c;所以不需要安装nvidi…...

【2023】某python语言程序设计跟学第二周内容

本文说明&#xff1a; 案例内容为北理工python语言程序设计课程&#xff0c;如有不妥请联系&#xff01; 目录蟒蛇绘制案例&#xff1a;执行结果&#xff1a;代码分析&#xff1a;举一反三&#xff1a;绘制一个五角星图案执行结果&#xff1a;turtle库根据案例简单说明&#xf…...

spring源码篇——BeanDefinition的注册

spring-framework 版本&#xff1a;v5.3.19 文章目录注解方式&#xff08;AnnotationConfigApplicationContext&#xff09;AnnotationConfigApplicationContext#registerAnnotatedBeanDefinitionReader#doRegisterBeanBeanDefinitionRegistry#registerBeanDefinitionAnnotatio…...

virtualbox7虚拟机中安装苹果macOS big sur系统详细教程

第1步&#xff0c;在 Windows 10/11 PC 上启用虚拟化。 现在的电脑一般都默认开启虚拟化技术了。 如果你遇到一些报错&#xff0c;比如收到错误消息“无法在虚拟机上打开会话”&#xff0c;可以查看 如果没有遇到问题&#xff0c;可以直接进入到第二步。 第2步&#xff0c;在…...

用spectralayers 简单去一下人声做个伴奏

最近有个同事说有个工作要一个歌的伴奏不会下载问我能不能给下一个。问题是我五音不全&#xff0c;也不咋关注伴奏这方面的事儿&#xff0c;然后巧了&#xff0c;当天晚上就有个网上的大哥在群里聊天的时候说有个去人声比较给力的软件&#xff0c;我马上给要来了。 软件叫啥sp…...

高峰对话|深度探讨「多云与边缘」

2022 年 12 月&#xff0c;分析师 Zeus Kerravala 与 VMware 通信运营商和边缘事业部高级副总裁兼总经理 Sanjay Uppal 进行非常有启发性的谈话&#xff0c;分享了科技行业领导者的见解。 二位主要围绕以下主题进行探讨&#xff1a; &#x1f4cd; 如何定义多云&#xff0c;以…...

开发手册——一、编程规约_2.常量定义

这篇文章主要梳理了在java的实际开发过程中的编程规范问题。本篇文章主要借鉴于《阿里巴巴java开发手册终极版》 下面我们一起来看一下吧。 1. 【强制】不允许任何魔法值&#xff08;即未经定义的常量&#xff09;直接出现在代码中。 反例&#xff1a;String key "Id#…...

Sandstorm 建设者亮点——2023 年 2 月

隆重推出 Sandstorm 建设者亮点——2023 年 2 月版&#xff0c;这是由最厉害的 Sandstorm 社区制作的独一无二的 NFT 系列。 从突破性的兔子机器人到神奇的蒸汽朋克海盗船&#xff0c;Sandstorm 建设者亮点 NFT 系列展示了一系列独一无二的创作。 19 项新资产将添加至 Sandstor…...

MyBatis快速入门

创建表&#xff08;自行完成&#xff09;创建模块&#xff0c;引入坐标&#xff08;1&#xff09;.进入mybatis官网&#xff1a;MyBatis中文网按步骤进行添加坐标先添加mybatis依赖然后手动添加mysql驱动junit单元测试坐标&#xff1a;logback坐标&#xff1a;用的时候直接复制…...

Mysql的一些提权方式(mysql提权、UDF)

目录 bash命令提权 必要条件 实验 UDF提权 什么是UDF 必要条件 实验 手动测试...

【2023】DevOps、SRE、运维开发面试宝典之Docker相关面试题

文章目录 1、docker的工作原理是什么2、docker的组成包含哪几大部分3、讲一下镜像的分层结构以及为什么要使用镜像的分层结构?4、简单描述一下Dockerfile的整个构建镜像过程?5、Docker的四种网络类型?6、Docker跨宿主机通讯的方式1、docker的工作原理是什么 docker是一个Cl…...

圣杯布局的实现方式

1.什么是圣杯布局&#xff1f; 左右盒子固定&#xff0c;中间盒子自适应 2.实现方式 &#xff08;1&#xff09;flex布局 思路&#xff1a;左右盒子给固定的宽高&#xff0c;中间盒子flex:1 <!DOCTYPE html> <html lang"en"> <head> <met…...

RecastDemo用法

这里写自定义目录标题recastnavigation介绍recastnavigation的内容RecastDemo安装RecastDemo介绍可配置参数合理的创建标题&#xff0c;有助于目录的生成如何改变文本的样式生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants创建一个自定义列表如何创建一个…...

IIC总线式驱动开发(mpu6050)(二)

目录 六、I2C总线二级外设驱动开发方法 七、I2C总线二级外设驱动开发之名称匹配 1. i2c_register_board_info 2. i2c_new_device&#xff1a;明确二级外设地址的情况下可用 3. i2c_new_probed_device 八、I2C总线二级外设驱动开发之设备树匹配 六、I2C总线二级外设驱动开…...

盘点一下那些远程办公的神仙公司

其实远程办公已经有50多年的历史了&#xff0c;这几年&#xff0c;这种工作方式越来越受到大家的喜欢&#xff0c;对于员工来说&#xff0c;工作效率可以大幅提高&#xff0c;节省下来的通勤时间和成本&#xff0c;有更多的时间花在工作上。可以更好的平衡工作与生活。对于公司…...

Spring Cloud Alibaba全家桶(四)——微服务调用组件Feign

前言 本文小新为大家带来 微服务调用组件Feign 的相关知识&#xff0c;具体内容包含什么是Feign&#xff0c;Spring Cloud Alibaba快速整合OpenFeign&#xff0c;Spring Cloud Feign的自定义配置及使用&#xff08;包括&#xff1a;日志配置、契约配置、自定义拦截器实现认证逻…...

安装pytorch

一、在anaconda中创建虚拟环境 打开Anaconda Prompt创建一个虚拟环境。比如要创建一个名字为pytorch的虚拟环境&#xff0c;可以如下输入。其中python3.7指定该虚拟环境的python版本号。 conda create -n pytorch python3.7 二、进入新创建的虚拟环境。 创建好虚拟环境后&a…...

visio网站开发流程图/企业培训课程种类

chromedriver版本支持的Chrome版本v2.37v64-66v2.36v63-65v2.35v62-64v2.34v61-63v2.33v60-62https://blog.csdn.net/huilan_same/article/details/51896672 淘宝镜像http://npm.taobao.org/ selenium主要是用来做自动化测试&#xff0c;支持多种浏览器&#xff0c;爬虫中主要用…...

宁波住房和城乡建设局网站首页/淘宝关键词排名优化技巧

最近&#xff0c;一封被称为“80后最牛的辞职信”在网上流传。这封辞职信的作者模仿诸葛亮的《出师表》&#xff0c;用文言文写出了一篇颇见古文功底的《辞职表》。 他在《辞职表》里讲述了自己的辛酸经历——满怀理想去某公司工作近一年&#xff0c;结果却发现自己无处施展…...

天津网站设计/如何对seo进行优化

昨日&#xff0c;第30届全国青少年信息学奥林匹克竞赛在电子科技大学开幕&#xff0c;300多名来自全国的&#xff0c;在信息学上痴迷、且在所在省成绩优异的中学生将在未来5天&#xff0c;角逐今年全国青少年信息学的“编程高手”称号。 如果成绩能进入前50名&#xff08;获得…...

如何做seo和网站/seo的优缺点

聊聊高并发&#xff08;十一&#xff09;实现几种自旋锁&#xff08;五&#xff09; 给出了限时有界队列锁的lock和unlock实现。这篇给出tryLock的实现 tryLock比lock略微复杂一点。要处理超时的情况。超时有几种情况&#xff1a; 1. 第一步在等待队列还没有获得节点的时候超时…...

重庆网站建设培训班/网络营销是学什么的

偶尔需要将jupyter notebook的文件转换为html或pdf&#xff0c;但觉得jupyter自带的转换功能又有点丑&#xff0c;于是自己写了一个脚本。 jupyter notebook的打印预览长这样 自己脚本转换出来html的长下面这样 或者长这样 你可以在脚本中更改配置得到自己需要的效果 # 样式…...

wordpress刷留言板/今日头条新闻最新疫情

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 安全生产模拟考试一点通&#xff1a;安全员-C证报名考试是安全生产模拟考试一点通总题库中生成的一套安全员-C证考试报名&#xff0c;安全生产模拟考试一点通上安全员-C证作业手机同步练习。2021年安全员-C证报名考试…...