当前位置: 首页 > news >正文

RocketMQ5.0.0的Broker主从同步机制

目录

一、主从同步工作原理

1. 主从配置

2. 启动HA

二、主从同步实现机制

1. 从Broker发送连接事件

2. 主Broker接收连接事件

3. 从Broker反馈复制进度

4. ReadSocketService线程读取从Broker复制进度

5. WriteSocketService传输同步消息

6. GroupTransferService线程通知HA结果

        1):待需要HA的消息集合

        2):通知消息发送者线程

三、读写分离机制

四、参考资料


一、主从同步工作原理

        为了提高消息消费的高可用性,避免Broker发生单点故障引起存储在Broker上的消息无法及时消费, RocketMQ引入Broker主备机制,即:消息消费到达主服务器后需要将消息同步到消息从服务器,如果主服务器Broker宕机后,消息消费者可以从从服务器拉取消息。

        下图所示是Broker的HA交互机制流程图及类图。主从同步模式分为:同步、异步。

  • step1:主服务器启动,并在特定端口上监听从服务器的连接;
  • step2:从服务器主动连接主服务器,主服务器接收客户端的连接,并建立相关TCP连接;
  • step3:从服务器主动向主服务器发送待拉取消息偏移量,主服务器解析请求并返回消息给从服务器;
  • step4:从服务器保存消息并继续发送新的消息同步请求。

1. 主从配置

        参考rocketmq-distribution项目的conf目录下有:2主2从异步HA配置(2m-2s-async)、2主2从同步HA配置(2m-2s-sync)。以下1主1从异步HA配置实例如下。

        主Broker配置:

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 启动SQL过滤
enablePropertyFilter = truenamesrvAddr=192.168.1.55:9876;172.17.0.3:9876
brokerIP1=192.168.1.55

        从Broker配置:

        注意:brokerName与主机相同;brokerId > 0时,则为从,0时则为主;brokerRole角色为SLAVE(从),刷盘类型为ASYNC_FLUSH(异步刷盘)。

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 1
deleteWhen = 04
fileReservedTime = 48
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
# 启动SQL过滤
enablePropertyFilter = truenamesrvAddr=192.168.1.55:9876;172.17.0.3:9876

2. 启动HA

        org.apache.rocketmq.store.DefaultMessageStore#start是Broker启动方法,如下图所示是其调用链及相关HA部分代码。 

/*** broker启动时,消息存储线程* BrokerController#startBasicService()* @throws Exception*/
@Override
public void start() throws Exception {// 是否HA主从复制if (!messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {this.haService.init(this);}......if (this.haService != null) {this.haService.start();}......
}

        org.apache.rocketmq.store.ha.DefaultHAService#init是HAService初始化方法,如下代码所示。注意,从Broker的broker.conf配置的brokerRole为SLAVE,才能创建HAClient(从Broker注册到主Broker)

@Override
public void init(final DefaultMessageStore defaultMessageStore) throws IOException {this.defaultMessageStore = defaultMessageStore;this.acceptSocketService = new DefaultAcceptSocketService(defaultMessageStore.getMessageStoreConfig());this.groupTransferService = new GroupTransferService(this, defaultMessageStore);if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {this.haClient = new DefaultHAClient(this.defaultMessageStore);}this.haConnectionStateNotificationService = new HAConnectionStateNotificationService(this, defaultMessageStore);
}

        org.apache.rocketmq.store.ha.DefaultHAService#start是HAService启动方法。注意:

  • org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService:主Broker接收从Broker的连接事件;
  • org.apache.rocketmq.store.ha.GroupTransferService:负责主Broker向从Broker发送同步数据;
  • org.apache.rocketmq.store.ha.HAClient:从Broker向主Broker发送连接事件;

        Broker启动时,根据配置brokerRole配置(ASYNC_MASTER、SYNC_MASTER、SLAVE)判定Broker是主还是从。若是Slave角色,在broker配置文件中获取haMasterAddress,并更新至masterAddress;但是haMasterAddress配置为空,则启动成功,但是不会执行HA

/*** 启动HAService* step1:{@link AcceptSocketService}接收从Broker的注册事件,方法是{@link AcceptSocketService#beginAccept()}* step2:启动{@link AcceptSocketService}线程,监听从Broker发送心跳* step3:同步数据{@link GroupTransferService}线程启动,主Broker向从Broker发送数据* step4:启动从Broker{@link HAClient}发送心跳到主Broker*/
@Override
public void start() throws Exception {// 主接收从Broker的连接事件,SelectionKey.OP_ACCEPT(连接事件)this.acceptSocketService.beginAccept();// 启动主Broker线程this.acceptSocketService.start();// 主Broker同步数据线程启动this.groupTransferService.start();this.haConnectionStateNotificationService.start();// 启动从Broker{@link HAClient}发送心跳到主Brokerif (haClient != null) {this.haClient.start();}
}

二、主从同步实现机制

1. 从Broker发送连接事件

        org.apache.rocketmq.store.ha.DefaultHAClient是从Broker向主Broker的发送连接事件的核心类,是个线程。其主要属性如下代码所示。

// Socket读缓存区大小,4M
private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
// 主Broker地址
private final AtomicReference<String> masterHaAddress = new AtomicReference<>();
private final AtomicReference<String> masterAddress = new AtomicReference<>();
// 从Broker向主Broker发起HA的偏移量
private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
// 网络传输通道
private SocketChannel socketChannel;
// 事件选择器
private Selector selector;
/*** 上次读取主Broker的时间戳* last time that slave reads date from master.*/
private long lastReadTimestamp = System.currentTimeMillis();
/*** 上次写入主Broker的时间戳* last time that slave reports offset to master.*/
private long lastWriteTimestamp = System.currentTimeMillis();// 反馈HA的复制进度(从Broker的Commitlog文件的最大偏移量)
private long currentReportedOffset = 0;
// 本次处理读缓存区的指针
private int dispatchPosition = 0;
// 读缓存区
private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
// 读缓存区备份,与byteBufferRead交换
private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
private DefaultMessageStore defaultMessageStore;
// HA连接状态
private volatile HAConnectionState currentState = HAConnectionState.READY;
// 流监控
private FlowMonitor flowMonitor;

        org.apache.rocketmq.store.ha.DefaultHAClient#run是HAClient启动执行任务,其调用链和代码如下。

  • DefaultHAClient#connectMaster():从Broker连接到主Broker
  • DefaultHAClient#transferFromMaster():向主发送HA进度;处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中。  

/*** 启动HAClient* {@link DefaultHAClient#connectMaster()}:从Broker连接到主Broker* {@link DefaultHAClient#transferFromMaster()}:向主发送HA进度;处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中*/
@Override
public void run() {log.info(this.getServiceName() + " service started");this.flowMonitor.start();while (!this.isStopped()) {try {switch (this.currentState) {case SHUTDOWN:return;case READY:if (!this.connectMaster()) {log.warn("HAClient connect to master {} failed", this.masterHaAddress.get());this.waitForRunning(1000 * 5);}continue;case TRANSFER:// 向主发送HA进度;处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中if (!transferFromMaster()) {// 没有可拉取消息时,设置READY状态closeMasterAndWait();continue;}break;default:this.waitForRunning(1000 * 2);continue;}long interval = this.defaultMessageStore.now() - this.lastReadTimestamp;if (interval > this.defaultMessageStore.getMessageStoreConfig().getHaHousekeepingInterval()) {log.warn("AutoRecoverHAClient, housekeeping, found this connection[" + this.masterHaAddress+ "] expired, " + interval);this.closeMaster();log.warn("AutoRecoverHAClient, master not response some time, so close connection");}} catch (Exception e) {log.warn(this.getServiceName() + " service has exception. ", e);this.closeMasterAndWait();}}log.info(this.getServiceName() + " service end");
}

        注意,一旦HAClient线程启动后,在状态READY、TRANSFER来回变化,READY状态下:发送从Broker连接事件到主Broker,开启Socket连接;TRANSFER状态下:主从发送相关数据信息,如:从向主发送HA复制进度(currentReportedOffset,即:从Broker的Commitlog文件的最大偏移量);主向从发送同步消息

        org.apache.rocketmq.store.ha.DefaultHAClient#connectMaster是从Broker连接到主Broker的核心方法,其代码如下。

/*** 从Broker连接到主Broker* 注意:*  a. Broker启动时,若是Slave角色,从broker配置文件中获取haMasterAddress,并更新至masterAddress;*  b. 若是Slave角色,但是haMasterAddress配置为空,则启动成功,但是不会执行HA* @return true连接成功;false连接失败*/
public boolean connectMaster() throws ClosedChannelException {if (null == socketChannel) {// 获取主Broker地址String addr = this.masterHaAddress.get();if (addr != null) {// 根据地址创建SocketAddress对象SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);// 获取SocketChannelthis.socketChannel = RemotingUtil.connect(socketAddress);if (this.socketChannel != null) {// SocketChannel注册OP_READ(网络读事件)this.socketChannel.register(this.selector, SelectionKey.OP_READ);log.info("HAClient connect to master {}", addr);this.changeCurrentState(HAConnectionState.TRANSFER);}}// 获取Commitlog最大偏移量(HA同步进度)this.currentReportedOffset = this.defaultMessageStore.getMaxPhyOffset();this.lastReadTimestamp = System.currentTimeMillis();}return this.socketChannel != null;
}

2. 主Broker接收连接事件

        org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService是主Broker接收从Broker连接事件的实现类,是一个线程。其主要属性如下代码所示。

// 主Broker监听本地的Socket(本地IP + 端口号)
private final SocketAddress socketAddressListen;
// Socket通道,基于NIO
private ServerSocketChannel serverSocketChannel;
// 事件选择器,基于NIO
private Selector selector;

        org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService#beginAccept方法定义了主Broker监听从Broker的连接事件

/*** 启动监听从broker的连接* Starts listening to slave connections.** @throws Exception If fails.*/
public void beginAccept() throws Exception {this.serverSocketChannel = ServerSocketChannel.open();this.selector = RemotingUtil.openSelector();this.serverSocketChannel.socket().setReuseAddress(true); // TCP可重复使用this.serverSocketChannel.socket().bind(this.socketAddressListen); // 绑定监听端口if (0 == messageStoreConfig.getHaListenPort()) {messageStoreConfig.setHaListenPort(this.serverSocketChannel.socket().getLocalPort());log.info("OS picked up {} to listen for HA", messageStoreConfig.getHaListenPort());}this.serverSocketChannel.configureBlocking(false); // 非阻塞模式this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT); // 注册OP_ACCEPT(连接事件)
}

        org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService#run监听到从Broker连接事件的任务处理,为每个连接事件创建org.apache.rocketmq.store.ha.HAConnection对象并启动(负责M-S的数据同步逻辑)

/*** 标准的NIO连接处理方式* step1:选择器每1s处理一次连接就绪事件* step2:是否连接事件,若是,创建{@link SocketChannel}* step3:每一个连接创建{@link HAConnection}(负责M-S的数据同步逻辑)*/
@Override
public void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {// 选择器每1s处理一次连接就绪事件this.selector.select(1000);Set<SelectionKey> selected = this.selector.selectedKeys();if (selected != null) {for (SelectionKey k : selected) {// 是否连接事件if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {// 若是连接事件时,创建SocketChannelSocketChannel sc = ((ServerSocketChannel) k.channel()).accept();if (sc != null) {DefaultHAService.log.info("HAService receive new connection, "+ sc.socket().getRemoteSocketAddress());try {// 每一个连接创建HAConnection并启动(负责M-S的数据同步逻辑)HAConnection conn = createConnection(sc);conn.start();DefaultHAService.this.addConnection(conn);} catch (Exception e) {log.error("new HAConnection exception", e);sc.close();}}} else {log.warn("Unexpected ops in select " + k.readyOps());}}selected.clear();}} catch (Exception e) {log.error(this.getServiceName() + " service has exception.", e);}}log.info(this.getServiceName() + " service end");
}

        org.apache.rocketmq.store.ha.DefaultHAConnection创建并启动时,启动读、写线程服务。其关键属性如下代码所示。

  • private WriteSocketService writeSocketService:主Broker向从Broker写数据服务类
  • private ReadSocketService readSocketService:主Broker读取从Broker数据服务类
private final DefaultHAService haService;
private final SocketChannel socketChannel;
// HA客户端连接地址
private final String clientAddress;
// 主Broker向从Broker写数据服务类
private WriteSocketService writeSocketService;
// 主Broker读取从Broker数据服务类
private ReadSocketService readSocketService;
private volatile HAConnectionState currentState = HAConnectionState.TRANSFER;
// 从Broker请求拉取的偏移量
private volatile long slaveRequestOffset = -1;
// 从Broker反馈已完成的偏移量
private volatile long slaveAckOffset = -1;
private FlowMonitor flowMonitor;

3. 从Broker反馈复制进度

        org.apache.rocketmq.store.ha.DefaultHAClient#transferFromMaster是从Broker与主Broker传输数据的核心方法,代码如下所示,该方法有两大功能:

  • 从Broker向主Broker:反馈HA复制进度,即:currentReportedOffset(从Broker的Commitlog文件的最大偏移量),方法org.apache.rocketmq.store.ha.DefaultHAClient#reportSlaveMaxOffset执行。
  • 从Broker接收主Broker:HA同步消息内容,方法org.apache.rocketmq.store.ha.DefaultHAClient#processReadEvent执行。
/*** 向主反馈HA复制进度;处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中* {@link DefaultHAClient#reportSlaveMaxOffset(long)}:向主反馈HA复制进度,即:currentReportedOffset(从Broker的Commitlog文件的最大偏移量)* {@link DefaultHAClient#processReadEvent()}:处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中*/
private boolean transferFromMaster() throws IOException {boolean result;// 判断是否需要向主Broker反馈当前待拉取偏移量if (this.isTimeToReportOffset()) {log.info("Slave report current offset {}", this.currentReportedOffset);// 向主Broker反馈拉取偏移量result = this.reportSlaveMaxOffset(this.currentReportedOffset);if (!result) {return false;}}this.selector.select(1000);// 处理主Broker发送过来的消息数据result = this.processReadEvent();if (!result) {return false;}return reportSlaveMaxOffsetPlus();
}

        org.apache.rocketmq.store.ha.DefaultHAClient#reportSlaveMaxOffset向主Broker反馈HA复制进度,代码如下。

/*** 向主Broker反馈拉取偏移量* 注意:*      a. 向主Broker反馈拉取偏移量maxOffset: 对于Slave端:发送下次待拉取消息偏移量*                                        对于Master端:本次请求拉取的偏移量,也可以理解为同步ACK*      b. 手动切换ByteBuffer的写模式/读模式;*      c. 通过{@link Buffer#hasRemaining()}判断缓存内容是否完全写入SocketChannel(基于NIO模式的写范例)* @param maxOffset HA待拉取偏移量* @return ByteBuffer缓存的内容是否写完*/
private boolean reportSlaveMaxOffset(final long maxOffset) {// 偏移量写入ByteBufferthis.reportOffset.position(0); // 写缓存位置this.reportOffset.limit(8); // 写缓存字节长度this.reportOffset.putLong(maxOffset); // 偏移量写入ByteBuffer// 将ByteBuffer的写模式 转为 读模式this.reportOffset.position(0);this.reportOffset.limit(8);// 循环,并判定ByteBuffer是否完全写入SocketChannelfor (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {try {this.socketChannel.write(this.reportOffset);} catch (IOException e) {log.error(this.getServiceName()+ "reportSlaveMaxOffset this.socketChannel.write exception", e);return false;}}lastWriteTimestamp = this.defaultMessageStore.getSystemClock().now();return !this.reportOffset.hasRemaining();
}

4. ReadSocketService线程读取从Broker复制进度

        org.apache.rocketmq.store.ha.DefaultHAConnection.ReadSocketService#processReadEvent是主Broker读取从Broker拉取消息的请求,获取内容是HA复制进度。其代码如下,看出主Broker获取从Broker的HA复制进度后,赋值给DefaultHAConnection#slaveRequestOffset属性,后立即唤醒GroupTransferService线程,执行消息同步

/*** 主Broker读取从Broker拉取消息的请求* step1:判定byteBufferRead是否有剩余空间,没有则{@link Buffer#flip()}* step2:用剩余空间,从SocketChannel读数据到缓存中;读取到的内容是从Broker拉取消息的偏移量* step3:通知等待同步HA复制结果的发送消息线程* @return*/
private boolean processReadEvent() {int readSizeZeroTimes = 0;/*byteBufferRead没有剩余空间时,则:position == limit == capacity调用flip()方法后,则:position == 0, limit == capacity,加上processPosition = 0,说明从头开始处理*/if (!this.byteBufferRead.hasRemaining()) {this.byteBufferRead.flip(); // ByteBuffer重置处理this.processPosition = 0;}// ByteBuffer有剩余空间,循环至byteBufferRead没有剩余空间while (this.byteBufferRead.hasRemaining()) {try {// 从SocketChannel读数据到缓存中int readSize = this.socketChannel.read(this.byteBufferRead);if (readSize > 0) {readSizeZeroTimes = 0; // 重置this.lastReadTimestamp = DefaultHAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();// 读取内容长度 >= 8,说明收到从Broker的拉取请求(内容是offset)if ((this.byteBufferRead.position() - this.processPosition) >= 8) {int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);long readOffset = this.byteBufferRead.getLong(pos - 8);this.processPosition = pos;// 从Broker反馈已完成的偏移量DefaultHAConnection.this.slaveAckOffset = readOffset;// 更新从Broker请求拉取的偏移量if (DefaultHAConnection.this.slaveRequestOffset < 0) {DefaultHAConnection.this.slaveRequestOffset = readOffset;log.info("slave[" + DefaultHAConnection.this.clientAddress + "] request offset " + readOffset);}// 通知等待同步HA复制结果的发送消息线程DefaultHAConnection.this.haService.notifyTransferSome(DefaultHAConnection.this.slaveAckOffset);}} else if (readSize == 0) {// 连续读取字节数0,则终止本次读取处理if (++readSizeZeroTimes >= 3) {break;}} else {log.error("read socket[" + DefaultHAConnection.this.clientAddress + "] < 0");return false;}} catch (IOException e) {log.error("processReadEvent exception", e);return false;}}return true;
}

5. WriteSocketService传输同步消息

        ReadSocketService线程读取从Broker发送的HA复制进度,由org.apache.rocketmq.store.ha.DefaultHAConnection.WriteSocketService根据DefaultHAConnection#slaveRequestOffset获取主Broker还没有同步的所有消息进行HA同步。其如下代码所示WriteSocketService#run方法,是同步消息核心逻辑。

/*** 传输消息内容到HA客户端* step1:slaveRequestOffset为-1时,说明主Broker没有收到从Broker的拉取请求,忽略本次写事件* step2:nextTransferFromWhere为-1时,说明初次传输,计算nextTransferFromWhere(待传输offset)* step3:判断上次是否传输完*        上次传输完:当前时间 与 上次传输时间的间隔 > 发送心跳包时间间隔,*                  发送心跳包长度: nextTransferFromWhere + size(消息长度0,避免长连接由空闲而关闭)*        上次传输没有完成:继续传输,忽略本次写事件* step4:根据从Broker待拉取消息offset查找之后的所有可读消息* step5:待同步消息总大小 > 一次传输的大小,默认32KB,则截取,此时一次传输不是完整的消息* step6:传输消息内容*/
@Override
public void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {this.selector.select(1000);// slaveRequestOffset为-1时,说明主Broker没有收到从Broker的拉取请求,忽略本次写事件if (-1 == DefaultHAConnection.this.slaveRequestOffset) {Thread.sleep(10);continue;}/*nextTransferFromWhere为-1时,说明初次传输初次传输时,计算nextTransferFromWhere(待传输offset)*/// 初次传输if (-1 == this.nextTransferFromWhere) {// =0 时,从Commitlog文件的最大偏移量传输if (0 == DefaultHAConnection.this.slaveRequestOffset) {long masterOffset = DefaultHAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();masterOffset =masterOffset- (masterOffset % DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getMappedFileSizeCommitLog());if (masterOffset < 0) {masterOffset = 0;}this.nextTransferFromWhere = masterOffset;}// !=0 时,从Broker请求的偏移量else {this.nextTransferFromWhere = DefaultHAConnection.this.slaveRequestOffset;}log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + DefaultHAConnection.this.clientAddress+ "], and slave request " + DefaultHAConnection.this.slaveRequestOffset);}/*判断上次是否传输完上次传输完:当前时间 与 上次传输时间的间隔 > 发送心跳包时间间隔,发送心跳包长度: nextTransferFromWhere + size(消息长度0,避免长连接由空闲而关闭)上次传输没有完成:继续传输,忽略本次写事件*/// lastWriteOver为true,则上次传输完if (this.lastWriteOver) {// 当前时间 与 上次传输时间的间隔long interval =DefaultHAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;// 时间间隔 > 发送心跳包时间间隔if (interval > DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaSendHeartbeatInterval()) {// Build Header 发送心跳包长度: nextTransferFromWhere + size(消息长度0,避免长连接由空闲而关闭)this.byteBufferHeader.position(0);this.byteBufferHeader.limit(headerSize);this.byteBufferHeader.putLong(this.nextTransferFromWhere);this.byteBufferHeader.putInt(0);this.byteBufferHeader.flip();// 传输数据this.lastWriteOver = this.transferData();if (!this.lastWriteOver)continue;}}// lastWriteOver为false,则上次传输没有完成,则继续传输else {// 继续传输上次拉取请求,还未完成,则忽略本次写事件this.lastWriteOver = this.transferData();if (!this.lastWriteOver)continue;}// 根据从Broker待拉取消息offset查找之后的所有可读消息SelectMappedBufferResult selectResult =DefaultHAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);if (selectResult != null) {int size = selectResult.getSize();// 待同步消息总大小 > 一次传输的大小,默认32KBif (size > DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {size = DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();}int canTransferMaxBytes = flowMonitor.canTransferMaxByteNum();if (size > canTransferMaxBytes) {if (System.currentTimeMillis() - lastPrintTimestamp > 1000) {log.warn("Trigger HA flow control, max transfer speed {}KB/s, current speed: {}KB/s",String.format("%.2f", flowMonitor.maxTransferByteInSecond() / 1024.0),String.format("%.2f", flowMonitor.getTransferredByteInSecond() / 1024.0));lastPrintTimestamp = System.currentTimeMillis();}size = canTransferMaxBytes;}long thisOffset = this.nextTransferFromWhere;this.nextTransferFromWhere += size; // 下一次写入的offsetselectResult.getByteBuffer().limit(size);this.selectMappedBufferResult = selectResult;// Build Header 传输size大小的消息内容(不一定是完整的消息)this.byteBufferHeader.position(0);this.byteBufferHeader.limit(headerSize);this.byteBufferHeader.putLong(thisOffset);this.byteBufferHeader.putInt(size);this.byteBufferHeader.flip();// 传输数据this.lastWriteOver = this.transferData();} else {DefaultHAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);}} catch (Exception e) {DefaultHAConnection.log.error(this.getServiceName() + " service has exception.", e);break;}}DefaultHAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();if (this.selectMappedBufferResult != null) {this.selectMappedBufferResult.release();}changeCurrentState(HAConnectionState.SHUTDOWN);this.makeStop();readSocketService.makeStop();haService.removeConnection(DefaultHAConnection.this);SelectionKey sk = this.socketChannel.keyFor(this.selector);if (sk != null) {sk.cancel();}try {this.selector.close();this.socketChannel.close();} catch (IOException e) {DefaultHAConnection.log.error("", e);}DefaultHAConnection.log.info(this.getServiceName() + " service end");
}

6. GroupTransferService线程通知HA结果

        org.apache.rocketmq.store.ha.GroupTransferService该类负责将主从同步复制结束后,通知阻塞的消息发送者线程。同步主从Broker模式,即:消息刷磁盘后,继续等待新消息被传输到从Broker,等待传输结果,并通知消息发送线程

        1):待需要HA的消息集合

        org.apache.rocketmq.store.CommitLog#asyncPutMessage是消息生产者发送消息到Broker时执行存储消息,参考《RocketMQ5.0.0消息存储<二>_消息存储流程》,该方法会根据同步或异步模式(默认)来执行org.apache.rocketmq.store.CommitLog#handleDiskFlushAndHA方法(完成刷盘和HA复制),方法调用链如下。

        生产者把消息发送到Broker,完成commit操作(消息提交到文件内存映射中) ,随后根据同步/异步模式完成刷盘和HA。HA操作时,把消息提交请求添加到org.apache.rocketmq.store.ha.GroupTransferService.requestsWrite是主Broker待需要HA的的集合。以下是org.apache.rocketmq.store.CommitLog#handleHA的代码。  

private CompletableFuture<PutMessageStatus> handleHA(AppendMessageResult result, PutMessageResult putMessageResult,int needAckNums) {if (needAckNums >= 0 && needAckNums <= 1) {return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}HAService haService = this.defaultMessageStore.getHaService();long nextOffset = result.getWroteOffset() + result.getWroteBytes();// Wait enough acks from different slavesGroupCommitRequest request = new GroupCommitRequest(nextOffset, this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout(), needAckNums);haService.putRequest(request);haService.getWaitNotifyObject().wakeupAll();return request.future();
}

        2):通知消息发送者线程

        HAService启动时,会启动GroupTransferService线程。GroupTransferService#run执行任务,如下代码所示。

@Override
public void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {// 间隔10sthis.waitForRunning(10);// 主从同步复制结束后,通知阻塞的消息发送者线程this.doWaitTransfer();} catch (Exception e) {log.warn(this.getServiceName() + " service has exception. ", e);}}log.info(this.getServiceName() + " service end");
}

        其中执行waitForRunning()方法时,会去执行org.apache.rocketmq.store.ha.GroupTransferService#swapRequests方法,使得requestsWrite与requestsRead两个集合对调

  • private volatile List<CommitLog.GroupCommitRequest> requestsWrite:主Broker待需要HA的消息集合
  • private volatile List<CommitLog.GroupCommitRequest> requestsRead:主Broker正在执行的HA集合
private void swapRequests() {List<CommitLog.GroupCommitRequest> tmp = this.requestsWrite;this.requestsWrite = this.requestsRead;this.requestsRead = tmp;
}

        org.apache.rocketmq.store.ha.GroupTransferService#doWaitTransfer方法是主从同步复制结束后,通知阻塞的消息发送者线程,如下代码所示。

/*** 主从同步复制结束后,通知阻塞的消息发送者线程* step1:遍历消息提交请求(内存提交到Commitlog文件的内存映射)* step2:判断主从同步成功:从已经成功复制的最大偏移量 >= 消息生产者发送消息后返回下一条消息的偏移量*/
private void doWaitTransfer() {// 加锁synchronized (this.requestsRead) {if (!this.requestsRead.isEmpty()) {// commit请求,即:内存提交到Commitlog文件的内存映射for (CommitLog.GroupCommitRequest req : this.requestsRead) {boolean transferOK = false;long deadLine = req.getDeadLine(); // 是否超时final boolean allAckInSyncStateSet = req.getAckNums() == MixAll.ALL_ACK_IN_SYNC_STATE_SET;for (int i = 0; !transferOK && deadLine - System.nanoTime() > 0; i++) { // 是否超时if (i > 0) {// 等待1sthis.notifyTransferObject.waitForRunning(1000);}if (!allAckInSyncStateSet && req.getAckNums() <= 1) {// 主从同步成功判断:从已经成功复制的最大偏移量 >= 消息生产者发送消息后返回下一条消息的偏移量transferOK = haService.getPush2SlaveMaxOffset().get() >= req.getNextOffset();continue;}if (allAckInSyncStateSet && this.haService instanceof AutoSwitchHAService) {// In this mode, we must wait for all replicas that in InSyncStateSet.final AutoSwitchHAService autoSwitchHAService = (AutoSwitchHAService) this.haService;final Set<String> syncStateSet = autoSwitchHAService.getSyncStateSet();if (syncStateSet.size() <= 1) {// Only mastertransferOK = true;break;}// Include masterint ackNums = 1;for (HAConnection conn : haService.getConnectionList()) {final AutoSwitchHAConnection autoSwitchHAConnection = (AutoSwitchHAConnection) conn;if (syncStateSet.contains(autoSwitchHAConnection.getSlaveAddress()) && autoSwitchHAConnection.getSlaveAckOffset() >= req.getNextOffset()) {ackNums++;}if (ackNums >= syncStateSet.size()) {transferOK = true;break;}}} else {// Include masterint ackNums = 1;for (HAConnection conn : haService.getConnectionList()) {// TODO: We must ensure every HAConnection represents a different slave// Solution: Consider assign a unique and fixed IP:ADDR for each different slaveif (conn.getSlaveAckOffset() >= req.getNextOffset()) {ackNums++;}if (ackNums >= req.getAckNums()) {transferOK = true;break;}}}}if (!transferOK) {log.warn("transfer message to slave timeout, offset : {}, request acks: {}",req.getNextOffset(), req.getAckNums());}// 从完成复制后,唤醒消息发送者线程req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);}this.requestsRead.clear();}}
}

三、读写分离机制

        RocketMQ读写分离与其他中间件的实现方式完全不同,RocketMQ是消费者首先向主服务器发起拉取消息请求,然后主服务器返回一批消息,然后会根据主服务器负载压力与主从同步情况,向从服务器建议下次消息拉取是从主服务器还是从从服务器拉取

        RocketMQ根据MessageQueu查找Broker地址的唯一依据是brokerName。Broker组织中根据brokerName获取一组Broker服务器(M-S),它们的brokerName相同但brokerId不同,主服务器的brokerId为0,从服务器的brokerId大于0。 其方法是org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInSubscribe

        详细消费拉取消息时,实现读写分离机制见后续章节,参考《RocketMQ5.0.0消息消费<一> _ PUSH模式的消息拉取》。

四、参考资料

【RocketMQ】学习RocketMQ必须要知道的主从同步原理_午睡的猫…的博客-CSDN博客_rocketmq主从同步原理

【RocketMQ】主从同步实现原理 - shanml - 博客园

RocketMQ5.0.0消息存储<二>_消息存储流程_爱我所爱0505的博客-CSDN博客_rocketmq 消息写入流程RocketMQ5.0.0消息存储<三>_消息转发与恢复机制_爱我所爱0505的博客-CSDN博客 

RocketMQ5.0.0消息存储<四>_刷盘机制_爱我所爱0505的博客-CSDN博客

相关文章:

RocketMQ5.0.0的Broker主从同步机制

目录 一、主从同步工作原理 1. 主从配置 2. 启动HA 二、主从同步实现机制 1. 从Broker发送连接事件 2. 主Broker接收连接事件 3. 从Broker反馈复制进度 4. ReadSocketService线程读取从Broker复制进度 5. WriteSocketService传输同步消息 6. GroupTransferService线程…...

深度学习论文: EdgeYOLO: An Edge-Real-Time Object Detector及其PyTorch实现

深度学习论文: EdgeYOLO: An Edge-Real-Time Object Detector及其PyTorch实现 EdgeYOLO: An Edge-Real-Time Object Detector PDF: https://arxiv.org/pdf/2302.07483.pdf PyTorch代码: https://github.com/shanglianlm0525/CvPytorch PyTorch代码: https://github.com/shangli…...

如何做好APP性能测试?

随着智能化生活的推进&#xff0c;我们生活中不可避免的要用到很多程序app。有的APP性能使用感很好&#xff0c;用户都愿意下载使用&#xff0c;而有的APP总是出现卡顿或网络延迟的情况&#xff0c;那必然就降低了用户的好感。所以APP性能测试对于软件开发方来说至关重要&#…...

Hive窗口函数

概述 窗口函数&#xff08;window functions&#xff09;也叫开窗函数、OLAP函数。 如果函数具有over子句&#xff0c;则它是窗口函数 窗口函数可以简单地解释为类似于聚合函数的计算函数&#xff0c;但是通过group by 子句组合的 常规聚合会隐藏正在聚合的各个…...

C++学习笔记(1):在默认构造函数内部使用带参数的构造函数

题目以下代码的输出是不是0&#xff1a;#include <unordered_map> #include <iostream>using namespace std;struct CLS{int i;CLS(int i_) :i(i_){}CLS(){CLS(0);} };int main(){CLS obj;std::cout << obj.i << endl;return 0; }结果-858993460为什么…...

Android面试题_安卓面经(23/30)设计模式源码案例

系列专栏: 《150道安卓常见面试题全解析》 安卓专栏目录见帖子 : 安卓面经_anroid面经_150道安卓基础面试题全解析 安卓系统Framework面经专栏:《Android系统Framework面试题解析大全》 安卓系统Framework面经目录详情:Android系统面经_Framework开发面经_150道面试题答案解…...

Dubbo性能调优参数以及原理

Dubbo作为一个服务治理框架&#xff0c;功能相对来说比较完善&#xff0c;性能也挺不错。但很多同学在使用dubbo的时候&#xff0c;只是简单的参考官方说明进行配置和应用&#xff0c;并没有过多的去思考一些关键参数的意义&#xff0c;最终做出来的效果总是差强人意,接下来我们…...

vue3全家桶之vuex和pinia持久化存储基础(二)

一.vuex数据持久化存储 这里使用的是vuex4.1.0版本,和之前的vuex3一样,数据持久化存储方案也使用 vuex-persistedstate,版本是最新的安装版本,当前可下载依赖包版本4.1.0&#xff0c;接下来在vue3项中安装和使用&#xff1a; 安装vuex-persistedstate npm i vuex-persisteds…...

LAMP架构与搭建论坛

目录 1、LAMP架构简述 2、各组件作用 3、构建LAMP平台 1.编译安装Apache httpd服务 2.编译安装mysql 3.编译安装php 4.搭建一个论坛 1、LAMP架构简述 LAMP架构是目前成熟的企业网站应用模式之一&#xff0c;指的是协同工作的一整台系统和相关软件&#xff0c;能够提供动…...

代码随想录 || 回溯算法93 78 90

Day2493.复原IP地址力扣题目链接给定一个只包含数字的字符串&#xff0c;复原它并返回所有可能的 IP 地址格式。有效的 IP 地址 正好由四个整数&#xff08;每个整数位于 0 到 255 之间组成&#xff0c;且不能含有前导 0&#xff09;&#xff0c;整数之间用 . 分隔。例如&#…...

界面组件Kendo UI for Angular——让网格数据信息显示更全面

Kendo UI致力于新的开发&#xff0c;来满足不断变化的需求&#xff0c;通过React框架的Kendo UI JavaScript封装来支持React Javascript框架。Kendo UI for Angular是专用于Angular开发的专业级Angular组件&#xff0c;telerik致力于提供纯粹的高性能Angular UI组件&#xff0c…...

【Linux】进程状态|优先级|进程切换|环境变量

文章目录1. 运行队列和运行状态2. 进程状态3. 两种特殊的进程僵尸进程孤儿进程4. 进程优先级5. 进程切换进程特性进程切换6. 环境变量的基本概念7. PATH环境变量8. 设置和获取环境变量9. 命令行参数1. 运行队列和运行状态 &#x1f495; 运行队列&#xff1a; 进程是如何在CP…...

合宙Air780E|FTP|内网穿透|命令测试|LuatOS-SOC接口|官方demo|学习(18):FTP命令及应用

1、FTP服务器准备 本机为win11系统&#xff0c;利用IIS搭建FTP服务器。 搭建方式可参考博文&#xff1a;windows系统搭建FTP服务器教程 windows系统搭建FTP服务器教程_程序员路遥的博客-CSDN博客_windows服务器安装ftp 设置完成后&#xff0c;测试FTP&#xff08;已正常访问…...

大规模 IoT 边缘容器集群管理的几种架构-4-Kubeedge

前文回顾 大规模 IoT 边缘容器集群管理的几种架构-0-边缘容器及架构简介大规模 IoT 边缘容器集群管理的几种架构-1-RancherK3s大规模 IoT 边缘容器集群管理的几种架构-2-HashiCorp 解决方案 Nomad大规模 IoT 边缘容器集群管理的几种架构-3-Portainer &#x1f4da;️Reference…...

Spring底层核心原理解析

Spring简介 ClassPathXmlApplicationContext context new classPathXmlApplicationContext("spring.xml"); UserService userService (UserService) context.getBean("userService"); userService.test();上面一段代码是我们开始学习spring时看到的&…...

OpenStack手动分布式部署Glance【Queens版】

目录 Glance简介 1、登录数据库配置&#xff08;在controller执行&#xff09; 1.1登录数据库 1.2数据库里创建glance 1.3授权对glance数据库的正确访问 1.4退出数据库 1.5创建glance用户密码为000000 1.6增加admin角色 1.7创建glance服务 1.8创建镜像服务API端点 2、安装gla…...

谈一谈你对View的认识和View的工作流程

都2023年了&#xff0c;不会还有人不知道什么是View吧&#xff0c;不会吧&#xff0c;不会吧。按我以往的面试经验来看&#xff0c;View被问到的概率不比Activity低多少哦&#xff0c;个人感觉View在Android中的重要性也和Activity不相上下&#xff0c;所以这篇文章将介绍下Vie…...

Redis集群的脑裂问题

集群脑裂导致数据丢失怎么办&#xff1f; 什么是脑裂&#xff1f; 先来理解集群的脑裂现象&#xff0c;这就好比一个人有两个大脑&#xff0c;那么到底受谁控制呢&#xff1f; 那么在 Redis 中&#xff0c;集群脑裂产生数据丢失的现象是怎样的呢&#xff1f; 在 Redis 主从架…...

互斥信号+任务临界创建+任务锁

普通信号量 1、信号量概念 2、创建信号量函数 3、互斥信号量 创建互斥信号量函数 等待信号量函数 释放互斥信号量 4、创建任务临界区 5、任务锁 任务上锁函数 ​编辑 任务结束函数 效果 普通信号量 1、信号量概念 信号量像是一种上锁机制&#xff0c;代码必须获…...

Elasticsearch7.8.0版本进阶——文档搜索

目录一、文档搜索的概述二、倒排索引不可变的优点三、倒排索引不可变的优点一、文档搜索的概述 早期的全文检索会为整个文档集合建立一个很大的倒排索引并将其写入到磁盘。 一旦新的索引就绪&#xff0c;旧的就会被其替换&#xff0c;这样最近的变化便可以被检索到。倒排索引被…...

MPNet:旋转机械轻量化故障诊断模型详解python代码复现

目录 一、问题背景与挑战 二、MPNet核心架构 2.1 多分支特征融合模块(MBFM) 2.2 残差注意力金字塔模块(RAPM) 2.2.1 空间金字塔注意力(SPA) 2.2.2 金字塔残差块(PRBlock) 2.3 分类器设计 三、关键技术突破 3.1 多尺度特征融合 3.2 轻量化设计策略 3.3 抗噪声…...

TDengine 快速体验(Docker 镜像方式)

简介 TDengine 可以通过安装包、Docker 镜像 及云服务快速体验 TDengine 的功能&#xff0c;本节首先介绍如何通过 Docker 快速体验 TDengine&#xff0c;然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker&#xff0c;请使用 安装包的方式快…...

css实现圆环展示百分比,根据值动态展示所占比例

代码如下 <view class""><view class"circle-chart"><view v-if"!!num" class"pie-item" :style"{background: conic-gradient(var(--one-color) 0%,#E9E6F1 ${num}%),}"></view><view v-else …...

C++:std::is_convertible

C++标志库中提供is_convertible,可以测试一种类型是否可以转换为另一只类型: template <class From, class To> struct is_convertible; 使用举例: #include <iostream> #include <string>using namespace std;struct A { }; struct B : A { };int main…...

Java如何权衡是使用无序的数组还是有序的数组

在 Java 中,选择有序数组还是无序数组取决于具体场景的性能需求与操作特点。以下是关键权衡因素及决策指南: ⚖️ 核心权衡维度 维度有序数组无序数组查询性能二分查找 O(log n) ✅线性扫描 O(n) ❌插入/删除需移位维护顺序 O(n) ❌直接操作尾部 O(1) ✅内存开销与无序数组相…...

关于nvm与node.js

1 安装nvm 安装过程中手动修改 nvm的安装路径&#xff0c; 以及修改 通过nvm安装node后正在使用的node的存放目录【这句话可能难以理解&#xff0c;但接着往下看你就了然了】 2 修改nvm中settings.txt文件配置 nvm安装成功后&#xff0c;通常在该文件中会出现以下配置&…...

OpenLayers 分屏对比(地图联动)

注&#xff1a;当前使用的是 ol 5.3.0 版本&#xff0c;天地图使用的key请到天地图官网申请&#xff0c;并替换为自己的key 地图分屏对比在WebGIS开发中是很常见的功能&#xff0c;和卷帘图层不一样的是&#xff0c;分屏对比是在各个地图中添加相同或者不同的图层进行对比查看。…...

学习STC51单片机32(芯片为STC89C52RCRC)OLED显示屏2

每日一言 今天的每一份坚持&#xff0c;都是在为未来积攒底气。 案例&#xff1a;OLED显示一个A 这边观察到一个点&#xff0c;怎么雪花了就是都是乱七八糟的占满了屏幕。。 解释 &#xff1a; 如果代码里信号切换太快&#xff08;比如 SDA 刚变&#xff0c;SCL 立刻变&#…...

IP如何挑?2025年海外专线IP如何购买?

你花了时间和预算买了IP&#xff0c;结果IP质量不佳&#xff0c;项目效率低下不说&#xff0c;还可能带来莫名的网络问题&#xff0c;是不是太闹心了&#xff1f;尤其是在面对海外专线IP时&#xff0c;到底怎么才能买到适合自己的呢&#xff1f;所以&#xff0c;挑IP绝对是个技…...

Java编程之桥接模式

定义 桥接模式&#xff08;Bridge Pattern&#xff09;属于结构型设计模式&#xff0c;它的核心意图是将抽象部分与实现部分分离&#xff0c;使它们可以独立地变化。这种模式通过组合关系来替代继承关系&#xff0c;从而降低了抽象和实现这两个可变维度之间的耦合度。 用例子…...