RocketMQ5.0.0的Broker主从同步机制
目录
一、主从同步工作原理
1. 主从配置
2. 启动HA
二、主从同步实现机制
1. 从Broker发送连接事件
2. 主Broker接收连接事件
3. 从Broker反馈复制进度
4. ReadSocketService线程读取从Broker复制进度
5. WriteSocketService传输同步消息
6. GroupTransferService线程通知HA结果
1):待需要HA的消息集合
2):通知消息发送者线程
三、读写分离机制
四、参考资料
一、主从同步工作原理
为了提高消息消费的高可用性,避免Broker发生单点故障引起存储在Broker上的消息无法及时消费, RocketMQ引入Broker主备机制,即:消息消费到达主服务器后需要将消息同步到消息从服务器,如果主服务器Broker宕机后,消息消费者可以从从服务器拉取消息。
下图所示是Broker的HA交互机制流程图及类图。主从同步模式分为:同步、异步。
- step1:主服务器启动,并在特定端口上监听从服务器的连接;
- step2:从服务器主动连接主服务器,主服务器接收客户端的连接,并建立相关TCP连接;
- step3:从服务器主动向主服务器发送待拉取消息偏移量,主服务器解析请求并返回消息给从服务器;
- step4:从服务器保存消息并继续发送新的消息同步请求。
1. 主从配置
参考rocketmq-distribution项目的conf目录下有:2主2从异步HA配置(2m-2s-async)、2主2从同步HA配置(2m-2s-sync)。以下1主1从异步HA配置实例如下。
主Broker配置:
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 启动SQL过滤
enablePropertyFilter = truenamesrvAddr=192.168.1.55:9876;172.17.0.3:9876
brokerIP1=192.168.1.55
从Broker配置:
注意:brokerName与主机相同;brokerId > 0时,则为从,0时则为主;brokerRole角色为SLAVE(从),刷盘类型为ASYNC_FLUSH(异步刷盘)。
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 1
deleteWhen = 04
fileReservedTime = 48
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
# 启动SQL过滤
enablePropertyFilter = truenamesrvAddr=192.168.1.55:9876;172.17.0.3:9876
2. 启动HA
org.apache.rocketmq.store.DefaultMessageStore#start是Broker启动方法,如下图所示是其调用链及相关HA部分代码。
/*** broker启动时,消息存储线程* BrokerController#startBasicService()* @throws Exception*/
@Override
public void start() throws Exception {// 是否HA主从复制if (!messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {this.haService.init(this);}......if (this.haService != null) {this.haService.start();}......
}
org.apache.rocketmq.store.ha.DefaultHAService#init是HAService初始化方法,如下代码所示。注意,从Broker的broker.conf配置的brokerRole为SLAVE,才能创建HAClient(从Broker注册到主Broker)。
@Override
public void init(final DefaultMessageStore defaultMessageStore) throws IOException {this.defaultMessageStore = defaultMessageStore;this.acceptSocketService = new DefaultAcceptSocketService(defaultMessageStore.getMessageStoreConfig());this.groupTransferService = new GroupTransferService(this, defaultMessageStore);if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {this.haClient = new DefaultHAClient(this.defaultMessageStore);}this.haConnectionStateNotificationService = new HAConnectionStateNotificationService(this, defaultMessageStore);
}
org.apache.rocketmq.store.ha.DefaultHAService#start是HAService启动方法。注意:
- org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService:主Broker接收从Broker的连接事件;
- org.apache.rocketmq.store.ha.GroupTransferService:负责主Broker向从Broker发送同步数据;
- org.apache.rocketmq.store.ha.HAClient:从Broker向主Broker发送连接事件;
Broker启动时,根据配置brokerRole配置(ASYNC_MASTER、SYNC_MASTER、SLAVE)判定Broker是主还是从。若是Slave角色,在broker配置文件中获取haMasterAddress,并更新至masterAddress;但是haMasterAddress配置为空,则启动成功,但是不会执行HA。
/*** 启动HAService* step1:{@link AcceptSocketService}接收从Broker的注册事件,方法是{@link AcceptSocketService#beginAccept()}* step2:启动{@link AcceptSocketService}线程,监听从Broker发送心跳* step3:同步数据{@link GroupTransferService}线程启动,主Broker向从Broker发送数据* step4:启动从Broker{@link HAClient}发送心跳到主Broker*/
@Override
public void start() throws Exception {// 主接收从Broker的连接事件,SelectionKey.OP_ACCEPT(连接事件)this.acceptSocketService.beginAccept();// 启动主Broker线程this.acceptSocketService.start();// 主Broker同步数据线程启动this.groupTransferService.start();this.haConnectionStateNotificationService.start();// 启动从Broker{@link HAClient}发送心跳到主Brokerif (haClient != null) {this.haClient.start();}
}
二、主从同步实现机制
1. 从Broker发送连接事件
org.apache.rocketmq.store.ha.DefaultHAClient是从Broker向主Broker的发送连接事件的核心类,是个线程。其主要属性如下代码所示。
// Socket读缓存区大小,4M
private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
// 主Broker地址
private final AtomicReference<String> masterHaAddress = new AtomicReference<>();
private final AtomicReference<String> masterAddress = new AtomicReference<>();
// 从Broker向主Broker发起HA的偏移量
private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
// 网络传输通道
private SocketChannel socketChannel;
// 事件选择器
private Selector selector;
/*** 上次读取主Broker的时间戳* last time that slave reads date from master.*/
private long lastReadTimestamp = System.currentTimeMillis();
/*** 上次写入主Broker的时间戳* last time that slave reports offset to master.*/
private long lastWriteTimestamp = System.currentTimeMillis();// 反馈HA的复制进度(从Broker的Commitlog文件的最大偏移量)
private long currentReportedOffset = 0;
// 本次处理读缓存区的指针
private int dispatchPosition = 0;
// 读缓存区
private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
// 读缓存区备份,与byteBufferRead交换
private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
private DefaultMessageStore defaultMessageStore;
// HA连接状态
private volatile HAConnectionState currentState = HAConnectionState.READY;
// 流监控
private FlowMonitor flowMonitor;
org.apache.rocketmq.store.ha.DefaultHAClient#run是HAClient启动执行任务,其调用链和代码如下。
- DefaultHAClient#connectMaster():从Broker连接到主Broker。
- DefaultHAClient#transferFromMaster():向主发送HA进度;处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中。
/*** 启动HAClient* {@link DefaultHAClient#connectMaster()}:从Broker连接到主Broker* {@link DefaultHAClient#transferFromMaster()}:向主发送HA进度;处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中*/
@Override
public void run() {log.info(this.getServiceName() + " service started");this.flowMonitor.start();while (!this.isStopped()) {try {switch (this.currentState) {case SHUTDOWN:return;case READY:if (!this.connectMaster()) {log.warn("HAClient connect to master {} failed", this.masterHaAddress.get());this.waitForRunning(1000 * 5);}continue;case TRANSFER:// 向主发送HA进度;处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中if (!transferFromMaster()) {// 没有可拉取消息时,设置READY状态closeMasterAndWait();continue;}break;default:this.waitForRunning(1000 * 2);continue;}long interval = this.defaultMessageStore.now() - this.lastReadTimestamp;if (interval > this.defaultMessageStore.getMessageStoreConfig().getHaHousekeepingInterval()) {log.warn("AutoRecoverHAClient, housekeeping, found this connection[" + this.masterHaAddress+ "] expired, " + interval);this.closeMaster();log.warn("AutoRecoverHAClient, master not response some time, so close connection");}} catch (Exception e) {log.warn(this.getServiceName() + " service has exception. ", e);this.closeMasterAndWait();}}log.info(this.getServiceName() + " service end");
}
注意,一旦HAClient线程启动后,在状态READY、TRANSFER来回变化,READY状态下:发送从Broker连接事件到主Broker,开启Socket连接;TRANSFER状态下:主从发送相关数据信息,如:从向主发送HA复制进度(currentReportedOffset,即:从Broker的Commitlog文件的最大偏移量);主向从发送同步消息。
org.apache.rocketmq.store.ha.DefaultHAClient#connectMaster是从Broker连接到主Broker的核心方法,其代码如下。
/*** 从Broker连接到主Broker* 注意:* a. Broker启动时,若是Slave角色,从broker配置文件中获取haMasterAddress,并更新至masterAddress;* b. 若是Slave角色,但是haMasterAddress配置为空,则启动成功,但是不会执行HA* @return true连接成功;false连接失败*/
public boolean connectMaster() throws ClosedChannelException {if (null == socketChannel) {// 获取主Broker地址String addr = this.masterHaAddress.get();if (addr != null) {// 根据地址创建SocketAddress对象SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);// 获取SocketChannelthis.socketChannel = RemotingUtil.connect(socketAddress);if (this.socketChannel != null) {// SocketChannel注册OP_READ(网络读事件)this.socketChannel.register(this.selector, SelectionKey.OP_READ);log.info("HAClient connect to master {}", addr);this.changeCurrentState(HAConnectionState.TRANSFER);}}// 获取Commitlog最大偏移量(HA同步进度)this.currentReportedOffset = this.defaultMessageStore.getMaxPhyOffset();this.lastReadTimestamp = System.currentTimeMillis();}return this.socketChannel != null;
}
2. 主Broker接收连接事件
org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService是主Broker接收从Broker连接事件的实现类,是一个线程。其主要属性如下代码所示。
// 主Broker监听本地的Socket(本地IP + 端口号)
private final SocketAddress socketAddressListen;
// Socket通道,基于NIO
private ServerSocketChannel serverSocketChannel;
// 事件选择器,基于NIO
private Selector selector;
org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService#beginAccept方法定义了主Broker监听从Broker的连接事件。
/*** 启动监听从broker的连接* Starts listening to slave connections.** @throws Exception If fails.*/
public void beginAccept() throws Exception {this.serverSocketChannel = ServerSocketChannel.open();this.selector = RemotingUtil.openSelector();this.serverSocketChannel.socket().setReuseAddress(true); // TCP可重复使用this.serverSocketChannel.socket().bind(this.socketAddressListen); // 绑定监听端口if (0 == messageStoreConfig.getHaListenPort()) {messageStoreConfig.setHaListenPort(this.serverSocketChannel.socket().getLocalPort());log.info("OS picked up {} to listen for HA", messageStoreConfig.getHaListenPort());}this.serverSocketChannel.configureBlocking(false); // 非阻塞模式this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT); // 注册OP_ACCEPT(连接事件)
}
org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService#run监听到从Broker连接事件的任务处理,为每个连接事件创建org.apache.rocketmq.store.ha.HAConnection对象并启动(负责M-S的数据同步逻辑)。
/*** 标准的NIO连接处理方式* step1:选择器每1s处理一次连接就绪事件* step2:是否连接事件,若是,创建{@link SocketChannel}* step3:每一个连接创建{@link HAConnection}(负责M-S的数据同步逻辑)*/
@Override
public void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {// 选择器每1s处理一次连接就绪事件this.selector.select(1000);Set<SelectionKey> selected = this.selector.selectedKeys();if (selected != null) {for (SelectionKey k : selected) {// 是否连接事件if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {// 若是连接事件时,创建SocketChannelSocketChannel sc = ((ServerSocketChannel) k.channel()).accept();if (sc != null) {DefaultHAService.log.info("HAService receive new connection, "+ sc.socket().getRemoteSocketAddress());try {// 每一个连接创建HAConnection并启动(负责M-S的数据同步逻辑)HAConnection conn = createConnection(sc);conn.start();DefaultHAService.this.addConnection(conn);} catch (Exception e) {log.error("new HAConnection exception", e);sc.close();}}} else {log.warn("Unexpected ops in select " + k.readyOps());}}selected.clear();}} catch (Exception e) {log.error(this.getServiceName() + " service has exception.", e);}}log.info(this.getServiceName() + " service end");
}
org.apache.rocketmq.store.ha.DefaultHAConnection创建并启动时,启动读、写线程服务。其关键属性如下代码所示。
- private WriteSocketService writeSocketService:主Broker向从Broker写数据服务类
- private ReadSocketService readSocketService:主Broker读取从Broker数据服务类
private final DefaultHAService haService;
private final SocketChannel socketChannel;
// HA客户端连接地址
private final String clientAddress;
// 主Broker向从Broker写数据服务类
private WriteSocketService writeSocketService;
// 主Broker读取从Broker数据服务类
private ReadSocketService readSocketService;
private volatile HAConnectionState currentState = HAConnectionState.TRANSFER;
// 从Broker请求拉取的偏移量
private volatile long slaveRequestOffset = -1;
// 从Broker反馈已完成的偏移量
private volatile long slaveAckOffset = -1;
private FlowMonitor flowMonitor;
3. 从Broker反馈复制进度
org.apache.rocketmq.store.ha.DefaultHAClient#transferFromMaster是从Broker与主Broker传输数据的核心方法,代码如下所示,该方法有两大功能:
- 从Broker向主Broker:反馈HA复制进度,即:currentReportedOffset(从Broker的Commitlog文件的最大偏移量),方法org.apache.rocketmq.store.ha.DefaultHAClient#reportSlaveMaxOffset执行。
- 从Broker接收主Broker:HA同步消息内容,方法org.apache.rocketmq.store.ha.DefaultHAClient#processReadEvent执行。
/*** 向主反馈HA复制进度;处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中* {@link DefaultHAClient#reportSlaveMaxOffset(long)}:向主反馈HA复制进度,即:currentReportedOffset(从Broker的Commitlog文件的最大偏移量)* {@link DefaultHAClient#processReadEvent()}:处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中*/
private boolean transferFromMaster() throws IOException {boolean result;// 判断是否需要向主Broker反馈当前待拉取偏移量if (this.isTimeToReportOffset()) {log.info("Slave report current offset {}", this.currentReportedOffset);// 向主Broker反馈拉取偏移量result = this.reportSlaveMaxOffset(this.currentReportedOffset);if (!result) {return false;}}this.selector.select(1000);// 处理主Broker发送过来的消息数据result = this.processReadEvent();if (!result) {return false;}return reportSlaveMaxOffsetPlus();
}
org.apache.rocketmq.store.ha.DefaultHAClient#reportSlaveMaxOffset向主Broker反馈HA复制进度,代码如下。
/*** 向主Broker反馈拉取偏移量* 注意:* a. 向主Broker反馈拉取偏移量maxOffset: 对于Slave端:发送下次待拉取消息偏移量* 对于Master端:本次请求拉取的偏移量,也可以理解为同步ACK* b. 手动切换ByteBuffer的写模式/读模式;* c. 通过{@link Buffer#hasRemaining()}判断缓存内容是否完全写入SocketChannel(基于NIO模式的写范例)* @param maxOffset HA待拉取偏移量* @return ByteBuffer缓存的内容是否写完*/
private boolean reportSlaveMaxOffset(final long maxOffset) {// 偏移量写入ByteBufferthis.reportOffset.position(0); // 写缓存位置this.reportOffset.limit(8); // 写缓存字节长度this.reportOffset.putLong(maxOffset); // 偏移量写入ByteBuffer// 将ByteBuffer的写模式 转为 读模式this.reportOffset.position(0);this.reportOffset.limit(8);// 循环,并判定ByteBuffer是否完全写入SocketChannelfor (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {try {this.socketChannel.write(this.reportOffset);} catch (IOException e) {log.error(this.getServiceName()+ "reportSlaveMaxOffset this.socketChannel.write exception", e);return false;}}lastWriteTimestamp = this.defaultMessageStore.getSystemClock().now();return !this.reportOffset.hasRemaining();
}
4. ReadSocketService线程读取从Broker复制进度
org.apache.rocketmq.store.ha.DefaultHAConnection.ReadSocketService#processReadEvent是主Broker读取从Broker拉取消息的请求,获取内容是HA复制进度。其代码如下,看出主Broker获取从Broker的HA复制进度后,赋值给DefaultHAConnection#slaveRequestOffset属性,后立即唤醒GroupTransferService线程,执行消息同步。
/*** 主Broker读取从Broker拉取消息的请求* step1:判定byteBufferRead是否有剩余空间,没有则{@link Buffer#flip()}* step2:用剩余空间,从SocketChannel读数据到缓存中;读取到的内容是从Broker拉取消息的偏移量* step3:通知等待同步HA复制结果的发送消息线程* @return*/
private boolean processReadEvent() {int readSizeZeroTimes = 0;/*byteBufferRead没有剩余空间时,则:position == limit == capacity调用flip()方法后,则:position == 0, limit == capacity,加上processPosition = 0,说明从头开始处理*/if (!this.byteBufferRead.hasRemaining()) {this.byteBufferRead.flip(); // ByteBuffer重置处理this.processPosition = 0;}// ByteBuffer有剩余空间,循环至byteBufferRead没有剩余空间while (this.byteBufferRead.hasRemaining()) {try {// 从SocketChannel读数据到缓存中int readSize = this.socketChannel.read(this.byteBufferRead);if (readSize > 0) {readSizeZeroTimes = 0; // 重置this.lastReadTimestamp = DefaultHAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();// 读取内容长度 >= 8,说明收到从Broker的拉取请求(内容是offset)if ((this.byteBufferRead.position() - this.processPosition) >= 8) {int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);long readOffset = this.byteBufferRead.getLong(pos - 8);this.processPosition = pos;// 从Broker反馈已完成的偏移量DefaultHAConnection.this.slaveAckOffset = readOffset;// 更新从Broker请求拉取的偏移量if (DefaultHAConnection.this.slaveRequestOffset < 0) {DefaultHAConnection.this.slaveRequestOffset = readOffset;log.info("slave[" + DefaultHAConnection.this.clientAddress + "] request offset " + readOffset);}// 通知等待同步HA复制结果的发送消息线程DefaultHAConnection.this.haService.notifyTransferSome(DefaultHAConnection.this.slaveAckOffset);}} else if (readSize == 0) {// 连续读取字节数0,则终止本次读取处理if (++readSizeZeroTimes >= 3) {break;}} else {log.error("read socket[" + DefaultHAConnection.this.clientAddress + "] < 0");return false;}} catch (IOException e) {log.error("processReadEvent exception", e);return false;}}return true;
}
5. WriteSocketService传输同步消息
ReadSocketService线程读取从Broker发送的HA复制进度,由org.apache.rocketmq.store.ha.DefaultHAConnection.WriteSocketService根据DefaultHAConnection#slaveRequestOffset获取主Broker还没有同步的所有消息进行HA同步。其如下代码所示WriteSocketService#run方法,是同步消息核心逻辑。
/*** 传输消息内容到HA客户端* step1:slaveRequestOffset为-1时,说明主Broker没有收到从Broker的拉取请求,忽略本次写事件* step2:nextTransferFromWhere为-1时,说明初次传输,计算nextTransferFromWhere(待传输offset)* step3:判断上次是否传输完* 上次传输完:当前时间 与 上次传输时间的间隔 > 发送心跳包时间间隔,* 发送心跳包长度: nextTransferFromWhere + size(消息长度0,避免长连接由空闲而关闭)* 上次传输没有完成:继续传输,忽略本次写事件* step4:根据从Broker待拉取消息offset查找之后的所有可读消息* step5:待同步消息总大小 > 一次传输的大小,默认32KB,则截取,此时一次传输不是完整的消息* step6:传输消息内容*/
@Override
public void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {this.selector.select(1000);// slaveRequestOffset为-1时,说明主Broker没有收到从Broker的拉取请求,忽略本次写事件if (-1 == DefaultHAConnection.this.slaveRequestOffset) {Thread.sleep(10);continue;}/*nextTransferFromWhere为-1时,说明初次传输初次传输时,计算nextTransferFromWhere(待传输offset)*/// 初次传输if (-1 == this.nextTransferFromWhere) {// =0 时,从Commitlog文件的最大偏移量传输if (0 == DefaultHAConnection.this.slaveRequestOffset) {long masterOffset = DefaultHAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();masterOffset =masterOffset- (masterOffset % DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getMappedFileSizeCommitLog());if (masterOffset < 0) {masterOffset = 0;}this.nextTransferFromWhere = masterOffset;}// !=0 时,从Broker请求的偏移量else {this.nextTransferFromWhere = DefaultHAConnection.this.slaveRequestOffset;}log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + DefaultHAConnection.this.clientAddress+ "], and slave request " + DefaultHAConnection.this.slaveRequestOffset);}/*判断上次是否传输完上次传输完:当前时间 与 上次传输时间的间隔 > 发送心跳包时间间隔,发送心跳包长度: nextTransferFromWhere + size(消息长度0,避免长连接由空闲而关闭)上次传输没有完成:继续传输,忽略本次写事件*/// lastWriteOver为true,则上次传输完if (this.lastWriteOver) {// 当前时间 与 上次传输时间的间隔long interval =DefaultHAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;// 时间间隔 > 发送心跳包时间间隔if (interval > DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaSendHeartbeatInterval()) {// Build Header 发送心跳包长度: nextTransferFromWhere + size(消息长度0,避免长连接由空闲而关闭)this.byteBufferHeader.position(0);this.byteBufferHeader.limit(headerSize);this.byteBufferHeader.putLong(this.nextTransferFromWhere);this.byteBufferHeader.putInt(0);this.byteBufferHeader.flip();// 传输数据this.lastWriteOver = this.transferData();if (!this.lastWriteOver)continue;}}// lastWriteOver为false,则上次传输没有完成,则继续传输else {// 继续传输上次拉取请求,还未完成,则忽略本次写事件this.lastWriteOver = this.transferData();if (!this.lastWriteOver)continue;}// 根据从Broker待拉取消息offset查找之后的所有可读消息SelectMappedBufferResult selectResult =DefaultHAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);if (selectResult != null) {int size = selectResult.getSize();// 待同步消息总大小 > 一次传输的大小,默认32KBif (size > DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {size = DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();}int canTransferMaxBytes = flowMonitor.canTransferMaxByteNum();if (size > canTransferMaxBytes) {if (System.currentTimeMillis() - lastPrintTimestamp > 1000) {log.warn("Trigger HA flow control, max transfer speed {}KB/s, current speed: {}KB/s",String.format("%.2f", flowMonitor.maxTransferByteInSecond() / 1024.0),String.format("%.2f", flowMonitor.getTransferredByteInSecond() / 1024.0));lastPrintTimestamp = System.currentTimeMillis();}size = canTransferMaxBytes;}long thisOffset = this.nextTransferFromWhere;this.nextTransferFromWhere += size; // 下一次写入的offsetselectResult.getByteBuffer().limit(size);this.selectMappedBufferResult = selectResult;// Build Header 传输size大小的消息内容(不一定是完整的消息)this.byteBufferHeader.position(0);this.byteBufferHeader.limit(headerSize);this.byteBufferHeader.putLong(thisOffset);this.byteBufferHeader.putInt(size);this.byteBufferHeader.flip();// 传输数据this.lastWriteOver = this.transferData();} else {DefaultHAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);}} catch (Exception e) {DefaultHAConnection.log.error(this.getServiceName() + " service has exception.", e);break;}}DefaultHAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();if (this.selectMappedBufferResult != null) {this.selectMappedBufferResult.release();}changeCurrentState(HAConnectionState.SHUTDOWN);this.makeStop();readSocketService.makeStop();haService.removeConnection(DefaultHAConnection.this);SelectionKey sk = this.socketChannel.keyFor(this.selector);if (sk != null) {sk.cancel();}try {this.selector.close();this.socketChannel.close();} catch (IOException e) {DefaultHAConnection.log.error("", e);}DefaultHAConnection.log.info(this.getServiceName() + " service end");
}
6. GroupTransferService线程通知HA结果
org.apache.rocketmq.store.ha.GroupTransferService该类负责将主从同步复制结束后,通知阻塞的消息发送者线程。同步主从Broker模式,即:消息刷磁盘后,继续等待新消息被传输到从Broker,等待传输结果,并通知消息发送线程。
1):待需要HA的消息集合
org.apache.rocketmq.store.CommitLog#asyncPutMessage是消息生产者发送消息到Broker时执行存储消息,参考《RocketMQ5.0.0消息存储<二>_消息存储流程》,该方法会根据同步或异步模式(默认)来执行org.apache.rocketmq.store.CommitLog#handleDiskFlushAndHA方法(完成刷盘和HA复制),方法调用链如下。
生产者把消息发送到Broker,完成commit操作(消息提交到文件内存映射中) ,随后根据同步/异步模式完成刷盘和HA。HA操作时,把消息提交请求添加到org.apache.rocketmq.store.ha.GroupTransferService.requestsWrite是主Broker待需要HA的的集合。以下是org.apache.rocketmq.store.CommitLog#handleHA的代码。
private CompletableFuture<PutMessageStatus> handleHA(AppendMessageResult result, PutMessageResult putMessageResult,int needAckNums) {if (needAckNums >= 0 && needAckNums <= 1) {return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}HAService haService = this.defaultMessageStore.getHaService();long nextOffset = result.getWroteOffset() + result.getWroteBytes();// Wait enough acks from different slavesGroupCommitRequest request = new GroupCommitRequest(nextOffset, this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout(), needAckNums);haService.putRequest(request);haService.getWaitNotifyObject().wakeupAll();return request.future();
}
2):通知消息发送者线程
HAService启动时,会启动GroupTransferService线程。GroupTransferService#run执行任务,如下代码所示。
@Override
public void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {// 间隔10sthis.waitForRunning(10);// 主从同步复制结束后,通知阻塞的消息发送者线程this.doWaitTransfer();} catch (Exception e) {log.warn(this.getServiceName() + " service has exception. ", e);}}log.info(this.getServiceName() + " service end");
}
其中执行waitForRunning()方法时,会去执行org.apache.rocketmq.store.ha.GroupTransferService#swapRequests方法,使得requestsWrite与requestsRead两个集合对调:
- private volatile List<CommitLog.GroupCommitRequest> requestsWrite:主Broker待需要HA的消息集合
- private volatile List<CommitLog.GroupCommitRequest> requestsRead:主Broker正在执行的HA集合
private void swapRequests() {List<CommitLog.GroupCommitRequest> tmp = this.requestsWrite;this.requestsWrite = this.requestsRead;this.requestsRead = tmp;
}
org.apache.rocketmq.store.ha.GroupTransferService#doWaitTransfer方法是主从同步复制结束后,通知阻塞的消息发送者线程,如下代码所示。
/*** 主从同步复制结束后,通知阻塞的消息发送者线程* step1:遍历消息提交请求(内存提交到Commitlog文件的内存映射)* step2:判断主从同步成功:从已经成功复制的最大偏移量 >= 消息生产者发送消息后返回下一条消息的偏移量*/
private void doWaitTransfer() {// 加锁synchronized (this.requestsRead) {if (!this.requestsRead.isEmpty()) {// commit请求,即:内存提交到Commitlog文件的内存映射for (CommitLog.GroupCommitRequest req : this.requestsRead) {boolean transferOK = false;long deadLine = req.getDeadLine(); // 是否超时final boolean allAckInSyncStateSet = req.getAckNums() == MixAll.ALL_ACK_IN_SYNC_STATE_SET;for (int i = 0; !transferOK && deadLine - System.nanoTime() > 0; i++) { // 是否超时if (i > 0) {// 等待1sthis.notifyTransferObject.waitForRunning(1000);}if (!allAckInSyncStateSet && req.getAckNums() <= 1) {// 主从同步成功判断:从已经成功复制的最大偏移量 >= 消息生产者发送消息后返回下一条消息的偏移量transferOK = haService.getPush2SlaveMaxOffset().get() >= req.getNextOffset();continue;}if (allAckInSyncStateSet && this.haService instanceof AutoSwitchHAService) {// In this mode, we must wait for all replicas that in InSyncStateSet.final AutoSwitchHAService autoSwitchHAService = (AutoSwitchHAService) this.haService;final Set<String> syncStateSet = autoSwitchHAService.getSyncStateSet();if (syncStateSet.size() <= 1) {// Only mastertransferOK = true;break;}// Include masterint ackNums = 1;for (HAConnection conn : haService.getConnectionList()) {final AutoSwitchHAConnection autoSwitchHAConnection = (AutoSwitchHAConnection) conn;if (syncStateSet.contains(autoSwitchHAConnection.getSlaveAddress()) && autoSwitchHAConnection.getSlaveAckOffset() >= req.getNextOffset()) {ackNums++;}if (ackNums >= syncStateSet.size()) {transferOK = true;break;}}} else {// Include masterint ackNums = 1;for (HAConnection conn : haService.getConnectionList()) {// TODO: We must ensure every HAConnection represents a different slave// Solution: Consider assign a unique and fixed IP:ADDR for each different slaveif (conn.getSlaveAckOffset() >= req.getNextOffset()) {ackNums++;}if (ackNums >= req.getAckNums()) {transferOK = true;break;}}}}if (!transferOK) {log.warn("transfer message to slave timeout, offset : {}, request acks: {}",req.getNextOffset(), req.getAckNums());}// 从完成复制后,唤醒消息发送者线程req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);}this.requestsRead.clear();}}
}
三、读写分离机制
RocketMQ读写分离与其他中间件的实现方式完全不同,RocketMQ是消费者首先向主服务器发起拉取消息请求,然后主服务器返回一批消息,然后会根据主服务器负载压力与主从同步情况,向从服务器建议下次消息拉取是从主服务器还是从从服务器拉取。
RocketMQ根据MessageQueu查找Broker地址的唯一依据是brokerName。Broker组织中根据brokerName获取一组Broker服务器(M-S),它们的brokerName相同但brokerId不同,主服务器的brokerId为0,从服务器的brokerId大于0。 其方法是org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInSubscribe
详细消费拉取消息时,实现读写分离机制见后续章节,参考《RocketMQ5.0.0消息消费<一> _ PUSH模式的消息拉取》。
四、参考资料
【RocketMQ】学习RocketMQ必须要知道的主从同步原理_午睡的猫…的博客-CSDN博客_rocketmq主从同步原理
【RocketMQ】主从同步实现原理 - shanml - 博客园
RocketMQ5.0.0消息存储<二>_消息存储流程_爱我所爱0505的博客-CSDN博客_rocketmq 消息写入流程RocketMQ5.0.0消息存储<三>_消息转发与恢复机制_爱我所爱0505的博客-CSDN博客
RocketMQ5.0.0消息存储<四>_刷盘机制_爱我所爱0505的博客-CSDN博客
相关文章:

RocketMQ5.0.0的Broker主从同步机制
目录 一、主从同步工作原理 1. 主从配置 2. 启动HA 二、主从同步实现机制 1. 从Broker发送连接事件 2. 主Broker接收连接事件 3. 从Broker反馈复制进度 4. ReadSocketService线程读取从Broker复制进度 5. WriteSocketService传输同步消息 6. GroupTransferService线程…...

深度学习论文: EdgeYOLO: An Edge-Real-Time Object Detector及其PyTorch实现
深度学习论文: EdgeYOLO: An Edge-Real-Time Object Detector及其PyTorch实现 EdgeYOLO: An Edge-Real-Time Object Detector PDF: https://arxiv.org/pdf/2302.07483.pdf PyTorch代码: https://github.com/shanglianlm0525/CvPytorch PyTorch代码: https://github.com/shangli…...

如何做好APP性能测试?
随着智能化生活的推进,我们生活中不可避免的要用到很多程序app。有的APP性能使用感很好,用户都愿意下载使用,而有的APP总是出现卡顿或网络延迟的情况,那必然就降低了用户的好感。所以APP性能测试对于软件开发方来说至关重要&#…...

Hive窗口函数
概述 窗口函数(window functions)也叫开窗函数、OLAP函数。 如果函数具有over子句,则它是窗口函数 窗口函数可以简单地解释为类似于聚合函数的计算函数,但是通过group by 子句组合的 常规聚合会隐藏正在聚合的各个…...

C++学习笔记(1):在默认构造函数内部使用带参数的构造函数
题目以下代码的输出是不是0:#include <unordered_map> #include <iostream>using namespace std;struct CLS{int i;CLS(int i_) :i(i_){}CLS(){CLS(0);} };int main(){CLS obj;std::cout << obj.i << endl;return 0; }结果-858993460为什么…...

Android面试题_安卓面经(23/30)设计模式源码案例
系列专栏: 《150道安卓常见面试题全解析》 安卓专栏目录见帖子 : 安卓面经_anroid面经_150道安卓基础面试题全解析 安卓系统Framework面经专栏:《Android系统Framework面试题解析大全》 安卓系统Framework面经目录详情:Android系统面经_Framework开发面经_150道面试题答案解…...

Dubbo性能调优参数以及原理
Dubbo作为一个服务治理框架,功能相对来说比较完善,性能也挺不错。但很多同学在使用dubbo的时候,只是简单的参考官方说明进行配置和应用,并没有过多的去思考一些关键参数的意义,最终做出来的效果总是差强人意,接下来我们…...

vue3全家桶之vuex和pinia持久化存储基础(二)
一.vuex数据持久化存储 这里使用的是vuex4.1.0版本,和之前的vuex3一样,数据持久化存储方案也使用 vuex-persistedstate,版本是最新的安装版本,当前可下载依赖包版本4.1.0,接下来在vue3项中安装和使用: 安装vuex-persistedstate npm i vuex-persisteds…...

LAMP架构与搭建论坛
目录 1、LAMP架构简述 2、各组件作用 3、构建LAMP平台 1.编译安装Apache httpd服务 2.编译安装mysql 3.编译安装php 4.搭建一个论坛 1、LAMP架构简述 LAMP架构是目前成熟的企业网站应用模式之一,指的是协同工作的一整台系统和相关软件,能够提供动…...

代码随想录 || 回溯算法93 78 90
Day2493.复原IP地址力扣题目链接给定一个只包含数字的字符串,复原它并返回所有可能的 IP 地址格式。有效的 IP 地址 正好由四个整数(每个整数位于 0 到 255 之间组成,且不能含有前导 0),整数之间用 . 分隔。例如&#…...

界面组件Kendo UI for Angular——让网格数据信息显示更全面
Kendo UI致力于新的开发,来满足不断变化的需求,通过React框架的Kendo UI JavaScript封装来支持React Javascript框架。Kendo UI for Angular是专用于Angular开发的专业级Angular组件,telerik致力于提供纯粹的高性能Angular UI组件,…...

【Linux】进程状态|优先级|进程切换|环境变量
文章目录1. 运行队列和运行状态2. 进程状态3. 两种特殊的进程僵尸进程孤儿进程4. 进程优先级5. 进程切换进程特性进程切换6. 环境变量的基本概念7. PATH环境变量8. 设置和获取环境变量9. 命令行参数1. 运行队列和运行状态 💕 运行队列: 进程是如何在CP…...

合宙Air780E|FTP|内网穿透|命令测试|LuatOS-SOC接口|官方demo|学习(18):FTP命令及应用
1、FTP服务器准备 本机为win11系统,利用IIS搭建FTP服务器。 搭建方式可参考博文:windows系统搭建FTP服务器教程 windows系统搭建FTP服务器教程_程序员路遥的博客-CSDN博客_windows服务器安装ftp 设置完成后,测试FTP(已正常访问…...

大规模 IoT 边缘容器集群管理的几种架构-4-Kubeedge
前文回顾 大规模 IoT 边缘容器集群管理的几种架构-0-边缘容器及架构简介大规模 IoT 边缘容器集群管理的几种架构-1-RancherK3s大规模 IoT 边缘容器集群管理的几种架构-2-HashiCorp 解决方案 Nomad大规模 IoT 边缘容器集群管理的几种架构-3-Portainer 📚️Reference…...

Spring底层核心原理解析
Spring简介 ClassPathXmlApplicationContext context new classPathXmlApplicationContext("spring.xml"); UserService userService (UserService) context.getBean("userService"); userService.test();上面一段代码是我们开始学习spring时看到的&…...

OpenStack手动分布式部署Glance【Queens版】
目录 Glance简介 1、登录数据库配置(在controller执行) 1.1登录数据库 1.2数据库里创建glance 1.3授权对glance数据库的正确访问 1.4退出数据库 1.5创建glance用户密码为000000 1.6增加admin角色 1.7创建glance服务 1.8创建镜像服务API端点 2、安装gla…...

谈一谈你对View的认识和View的工作流程
都2023年了,不会还有人不知道什么是View吧,不会吧,不会吧。按我以往的面试经验来看,View被问到的概率不比Activity低多少哦,个人感觉View在Android中的重要性也和Activity不相上下,所以这篇文章将介绍下Vie…...

Redis集群的脑裂问题
集群脑裂导致数据丢失怎么办? 什么是脑裂? 先来理解集群的脑裂现象,这就好比一个人有两个大脑,那么到底受谁控制呢? 那么在 Redis 中,集群脑裂产生数据丢失的现象是怎样的呢? 在 Redis 主从架…...

互斥信号+任务临界创建+任务锁
普通信号量 1、信号量概念 2、创建信号量函数 3、互斥信号量 创建互斥信号量函数 等待信号量函数 释放互斥信号量 4、创建任务临界区 5、任务锁 任务上锁函数 编辑 任务结束函数 效果 普通信号量 1、信号量概念 信号量像是一种上锁机制,代码必须获…...

Elasticsearch7.8.0版本进阶——文档搜索
目录一、文档搜索的概述二、倒排索引不可变的优点三、倒排索引不可变的优点一、文档搜索的概述 早期的全文检索会为整个文档集合建立一个很大的倒排索引并将其写入到磁盘。 一旦新的索引就绪,旧的就会被其替换,这样最近的变化便可以被检索到。倒排索引被…...

spring security权限问题
org.springframework.boot spring-boot-starter-security 引入jar extends WebSecurityConfigurerAdapter 用来配置登陆和权限 configure(HttpSecurity http) 覆盖这个方法 //配置授权相关的 .authorizeRequests () //任何请求 .anyRequest() //要求授权后可以访问 .authen…...

mysql 8.0.22安装
mysql8.0.22安装1. 配置my.ini2. 添加环境变量3. 安装mysql3.1 mysql初始化3.2 安装mysql服务3.3 启动mysql服务4. 连接数据库修改连接数据库的密码前提:已经从官网下载mysql8.0.22安装包并解压(下载地址:https://dev.mysql.com/downloads/in…...

Mysql系列:Mysql5.7编译安装
系统环境:Centos7 1:下载mysql源码包 https://dev.mysql.com/downloads/mysql/5.7.html downloads 选择MySQL Community Server>source_code>Generic Linux (Architecture Independent), Compressed TAR Archive -> 选择需要的mysql版本&…...

设备树(配合LED驱动说明)
目录 一、起源 二、基本组成 三、基本语法 四、特殊节点 4.1 根节点 4.2 /memory 4.3 /chosen 4.4 /cpus 多核CPU支持 五、常用属性 5.1 phandle 5.2 地址 --------------- 重要 5.3 compatible --------------- 重要 5.4 中断 --------------- 重要 5.5 …...

(二十六)大白话如何从底层原理解决生产的Too many connections故障?
今天我们继续讲解昨天的那个案例背景,其实就是经典的Too many connections故障,他的核心就是linux的文件句柄限制,导致了MySQL的最大连接数被限制,那么今天来讲讲怎么解决这个问题。 其实核心就是一行命令: ulimit -H…...

ASEMI高压MOS管60R380参数,60R380特征,60R380应用
编辑-Z ASEMI高压MOS管60R380参数: 型号:60R380 漏极-源极电压(VDS):600V 栅源电压(VGS):20V 漏极电流(ID):11A 功耗(PD&#x…...

Python期末试卷
《Python程序设计基础》期末试题 班级 学号 姓名 一.选择题(须知:答案写到下方的表格中,其它一律无效.每题2分,共40分) 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18. 19. 20. 1.在Python交互…...

Linux | 网络通信 | http协议介绍 | cookie策略讲解
文章目录url统一资源定位符http协议介绍GET vs POSThttp状态码http常见headercookie session上篇博客定制了一个协议,该协议用来进行简单的计算,其包含了数据的序列化和反序列化,编码和解码的定制,并且该协议基于TCP通信…...

招投标系统简介 招投标系统源码 java招投标系统 招投标系统功能设计
项目说明 随着公司的快速发展,企业人员和经营规模不断壮大,公司对内部招采管理的提升提出了更高的要求。在企业里建立一个公平、公开、公正的采购环境,最大限度控制采购成本至关重要。符合国家电子招投标法律法规及相关规范,以及…...

winapi获取和修改camera raw界面元素数据
camera raw 界面如下: 需求就是根据 windows api 来操作界面右边的色温、色调、曝光等属性,进而对图片进行调色。根据 spy 捕获的窗口信息,理论上是可以拿到并修改值的。 根据 class 可以先拿到窗口句柄: #define CAMERA_RAW_CLA…...