【RocketMQ】源码详解:Broker启动流程
Broker启动
入口: org.apache.rocketmq.broker.BrokerStartup#main
broker的启动主要分为两部分:1.创建brokerController 2.启动brokerController。与平时进行业务开发时不同的是,这里的BrokerController相当于Broker的一个中央控制器类,并不是编写http接口的类
创建brokerController:设置一些属性参数,读取外部配置文件,实例化brokerController并调用其初始化方法。
- 实例化brokerController:主要是会实例化很多的配置类和线程池的阻塞队列。
- 初始化方法 会加载如消费者消费进度等的文件,会实例化消息存储的服务类,并调用其加载方法,加载储存消息的相关文件到内存中,并进行数据恢复。还会创建各类线程池并注册netty服务的消息处理器,之后创建各种定时任务,如:持久化消费者消费进度的任务、定时打印各种日志的任务等。
启动brokerController: 内部会启动消息储存服务、netty服务、长轮询拉取消息挂起服务、向nameserver心跳定时任务等
public static void main(String[] args) {// broker启动start(createBrokerController(args));
}
创建brokerController
创建brokerController:org.apache.rocketmq.broker.BrokerStartup#createBrokerController
public static BrokerController createBrokerController(String[] args) {// 设置属性rocketmq.remoting.version,即当前rocketmq版本System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {NettySystemConfig.socketSndbufSize = 131072;}if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {NettySystemConfig.socketRcvbufSize = 131072;}try {//PackageConflictDetect.detectFastjson();Options options = ServerUtil.buildCommandlineOptions(new Options());commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),new PosixParser());if (null == commandLine) {System.exit(-1);}// 创建Broker的配置类,包含Broker的各种配置,比如 ROCKETMQ_HOMEfinal BrokerConfig brokerConfig = new BrokerConfig();// NettyServer的配置类,Broker接收来自客户端的消息的时候作为服务端final NettyServerConfig nettyServerConfig = new NettyServerConfig();// NettyClient的配置类,Broker连接NameServer的时候还会作为客户端final NettyClientConfig nettyClientConfig = new NettyClientConfig();nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));// 设置作为NettyServer时的监听端口为10911nettyServerConfig.setListenPort(10911);// Broker的消息存储配置,例如各种文件大小等final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);}/** 判断命令行启动是否包含 -c 命令,用于指定配置文件* 如果包含,则解析指定的配置文件* 启动Broker的时候使用命令参数 -c /xxx/rocketmq/config/conf/broker.conf*/if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {configFile = file;InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();properties.load(in);properties2SystemEnv(properties);MixAll.properties2Object(properties, brokerConfig);MixAll.properties2Object(properties, nettyServerConfig);MixAll.properties2Object(properties, nettyClientConfig);MixAll.properties2Object(properties, messageStoreConfig);BrokerPathConfigHelper.setBrokerConfigPath(file);in.close();}}MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);if (null == brokerConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}// 获取namesrv地址String namesrvAddr = brokerConfig.getNamesrvAddr();if (null != namesrvAddr) {try {// 可以指定多个地址,以 ; 隔开,这里进行拆分String[] addrArray = namesrvAddr.split(";");for (String addr : addrArray) {RemotingUtil.string2SocketAddress(addr);}} catch (Exception e) {System.out.printf("The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",namesrvAddr);System.exit(-3);}}// 设置、校验brokerId,BrokerId为0表示Master,非0表示Slaveswitch (messageStoreConfig.getBrokerRole()) {case ASYNC_MASTER:case SYNC_MASTER:brokerConfig.setBrokerId(MixAll.MASTER_ID);break;case SLAVE:if (brokerConfig.getBrokerId() <= 0) {System.out.printf("Slave's brokerId must be > 0");System.exit(-3);}break;default:break;}// 是否开启DLeger,即多副本(主从切换集群)if (messageStoreConfig.isEnableDLegerCommitLog()) {brokerConfig.setBrokerId(-1);}// 设置高可用通信监听端口,为监听端口+1,默认就是10912// 该端口主要用于 如 主从同步之类的高可用操作messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();JoranConfigurator configurator = new JoranConfigurator();configurator.setContext(lc);lc.reset();configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");// 判断命令行中是否包含字符'p'(printConfigItem)和'm',如果存在则打印配置信息并结束jvm运行,没有的话就不用管if (commandLine.hasOption('p')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig);MixAll.printObjectProperties(console, nettyServerConfig);MixAll.printObjectProperties(console, nettyClientConfig);MixAll.printObjectProperties(console, messageStoreConfig);System.exit(0);} else if (commandLine.hasOption('m')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig, true);MixAll.printObjectProperties(console, nettyServerConfig, true);MixAll.printObjectProperties(console, nettyClientConfig, true);MixAll.printObjectProperties(console, messageStoreConfig, true);System.exit(0);}// 打印当前broker的配置日志log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);MixAll.printObjectProperties(log, brokerConfig);MixAll.printObjectProperties(log, nettyServerConfig);MixAll.printObjectProperties(log, nettyClientConfig);MixAll.printObjectProperties(log, messageStoreConfig);// 实例化 BrokerController,内部主要初始化了一些配置类、manager类、处理器、线程池等final BrokerController controller = new BrokerController(brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig);// remember all configs to prevent discard// 将所有的-c的外部配置信息保存到BrokerController中的Configuration对象属性的allConfigs属性中controller.getConfiguration().registerConfig(properties);// 初始化BrokerControllerboolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}// 添加关闭钩子方法,在Broker关闭之前执行,进行一些内存清理、对象销毁等操作Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {private volatile boolean hasShutdown = false;private AtomicInteger shutdownTimes = new AtomicInteger(0);@Overridepublic void run() {synchronized (this) {log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());if (!this.hasShutdown) {this.hasShutdown = true;long beginTime = System.currentTimeMillis();controller.shutdown();long consumingTimeTotal = System.currentTimeMillis() - beginTime;log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);}}}}, "ShutdownHook"));return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;
}
实例化brokerController
实例化brokerController: org.apache.rocketmq.broker.BrokerController#BrokerController
public BrokerController(final BrokerConfig brokerConfig,final NettyServerConfig nettyServerConfig,final NettyClientConfig nettyClientConfig,final MessageStoreConfig messageStoreConfig
) {// broker的配置this.brokerConfig = brokerConfig;// 作为netty服务端与客户端交互的配置this.nettyServerConfig = nettyServerConfig;// 作为netty客户端与服务端交互的配置this.nettyClientConfig = nettyClientConfig;// 消息存储的配置this.messageStoreConfig = messageStoreConfig;// 消费者偏移量管理器,维护offset进度信息this.consumerOffsetManager = new ConsumerOffsetManager(this);//topic配置管理器,管理broker中存储的所有topic的配置this.topicConfigManager = new TopicConfigManager(this);// 处理消费者拉取消息请求的处理器this.pullMessageProcessor = new PullMessageProcessor(this);//拉取请求挂起服务,处理无消息时push长轮询消费者的挂起等待机制this.pullRequestHoldService = new PullRequestHoldService(this);//消息送达的监听器,生产者消息到达时通过该监听器触发pullRequestHoldService通知pullRequestHoldServicethis.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);//消费者id变化监听器this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);//消费者管理类,维护消费者组的注册实例信息以及topic的订阅信息,并对消费者id变化进行监听this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);//消费者过滤管理器,配置文件为:xx/config/consumerFilter.jsonthis.consumerFilterManager = new ConsumerFilterManager(this);//生产者管理器,包含生产者的注册信息,通过groupName分组this.producerManager = new ProducerManager();//客户端连接心跳服务,用于定时扫描生产者和消费者客户端,并将不活跃的客户端通道及相关信息移除this.clientHousekeepingService = new ClientHousekeepingService(this);//处理某些broker到客户端的请求,例如检查生产者的事务状态,重置offsetthis.broker2Client = new Broker2Client(this);//订阅分组关系管理器,维护消费者组的一些附加运维信息this.subscriptionGroupManager = new SubscriptionGroupManager(this);//broker对方访问的API,处理broker对外的发起请求,比如向nameServer注册,向master、slave发起的请求this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);//过滤服务管理器,拉取消息过滤this.filterServerManager = new FilterServerManager(this);//用于从节点,定时向主节点发起请求同步数据,例如topic配置、消费位移等this.slaveSynchronize = new SlaveSynchronize(this);/*初始化各种阻塞队列。将会被设置到对应的处理不同客户端请求的线程池执行器中*///处理来自生产者的发送消息的请求的队列this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());//处理reply消息的请求的队列,RocketMQ4.7.0版本中增加了request-reply新特性,该特性允许producer在发送消息后同步或者异步等待consumer消费完消息并返回响应消息,类似rpc调用效果。//即生产者发送了消息之后,可以同步或者异步的收到消费了这条消息的消费者的响应this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());//处理查询请求的队列this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());//客户端管理器的队列this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());//消费者管理器的队列,目前没用到this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());//心跳处理的队列this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());//事务消息相关处理的队列this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());//broker状态管理器,保存Broker运行时状态this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());//目前没用到this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));//broker快速失败服务this.brokerFastFailure = new BrokerFastFailure(this);//配置类this.configuration = new Configuration(log,BrokerPathConfigHelper.getBrokerConfigPath(),this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig);
}
初始化brokerController
初始化brokerController: org.apache.rocketmq.broker.BrokerController#initialize
public boolean initialize() throws CloneNotSupportedException {//topic配置文件加载,路径为 {user.home}/store/config/topics.jsonboolean result = this.topicConfigManager.load();// 消费者消费偏移量配置文件加载,路径为 {user.home}/store/config/consumerOffset.jsonresult = result && this.consumerOffsetManager.load();// 订阅分组配置文件加载,路径为 {user.home}/store/config/subscriptionGroup.jsonresult = result && this.subscriptionGroupManager.load();// 消费者过滤配置文件加载,路径为 {user.home}/store/config/consumerFilter.jsonresult = result && this.consumerFilterManager.load();if (result) {// 实例化和初始化消息存储服务相关类 DefaultMessageStoretry {// 实例化消息存储类DefaultMessageStorethis.messageStore =new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,this.brokerConfig);/*** enableDLegerCommitLog 为 true(默认为false),则创建DLedgerRoleChangeHandler。* 在启用enableDLegerCommitLog情况下,broker通过raft协议选主,可以实现主从角色自动切换*/if (messageStoreConfig.isEnableDLegerCommitLog()) {DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);}this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);//load pluginMessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);this.messageStore = MessageStoreFactory.build(context, this.messageStore);this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));} catch (IOException e) {result = false;log.error("Failed to initialize", e);}}/** 通过消息存储服务 加载 储存消息的相关文件到内存中,并进行数据恢复(此步骤是broker启动的核心步骤)* 文件:commitLog文件(储存消息的)、consumequeue文件(消费者依据此文件消费,储存指向commitLog中消息的偏移量)、indexFile文件* 数据恢复:broker可能会异常关闭,导致消息在文件中储存不完整 或 已储存到commitLog但未存储到consumequeue和indexFile*/result = result && this.messageStore.load();if (result) {// 创建netty远程服务,remotingServer和fastRemotingServerthis.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);/********************* 创建各种执行器线程池*********************/// 处理发送消息的请求的线程池this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getSendMessageThreadPoolNums(),this.brokerConfig.getSendMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.sendThreadPoolQueue,new ThreadFactoryImpl("SendMessageThread_"));//处理拉取消息的请求的线程池this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getPullMessageThreadPoolNums(),this.brokerConfig.getPullMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.pullThreadPoolQueue,new ThreadFactoryImpl("PullMessageThread_"));// 处理reply消息的请求的线程池(Reply模式允许Producer发出消息后,以同步或异步的形式等Consumer消费并返回一个响应消息,达到类似RPC的调用过程)this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getProcessReplyMessageThreadPoolNums(),this.brokerConfig.getProcessReplyMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.replyThreadPoolQueue,new ThreadFactoryImpl("ProcessReplyMessageThread_"));// 处理查询请求的线程池this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getQueryMessageThreadPoolNums(),this.brokerConfig.getQueryMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.queryThreadPoolQueue,new ThreadFactoryImpl("QueryMessageThread_"));// broker 管理线程池,作为默认处理器的线程池this.adminBrokerExecutor =Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl("AdminBrokerThread_"));// 客户端管理器的线程池this.clientManageExecutor = new ThreadPoolExecutor(this.brokerConfig.getClientManageThreadPoolNums(),this.brokerConfig.getClientManageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.clientManagerThreadPoolQueue,new ThreadFactoryImpl("ClientManageThread_"));// 心跳处理的线程池this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getHeartbeatThreadPoolNums(),this.brokerConfig.getHeartbeatThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.heartbeatThreadPoolQueue,new ThreadFactoryImpl("HeartbeatThread_", true));// 事务消息相关处理的线程池this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getEndTransactionThreadPoolNums(),this.brokerConfig.getEndTransactionThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.endTransactionThreadPoolQueue,new ThreadFactoryImpl("EndTransactionThread_"));//消费者管理的线程池this.consumerManageExecutor =Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl("ConsumerManageThread_"));// 注册netty服务的消息处理器this.registerProcessor();/********************** 启动各种定时任务 **********************/final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();final long period = 1000 * 60 * 60 * 24;// 每隔24h打印昨天生产和消费的消息数量this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.getBrokerStats().record();} catch (Throwable e) {log.error("schedule record error.", e);}}}, initialDelay, period, TimeUnit.MILLISECONDS);//每隔5s將消费者offset进行持久化,存入consumerOffset.json文件中this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerOffsetManager.persist();} catch (Throwable e) {log.error("schedule persist consumerOffset error.", e);}}}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);//每隔10s將消费过滤信息进行持久化,存入consumerFilter.json文件中this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerFilterManager.persist();} catch (Throwable e) {log.error("schedule persist consumer filter error.", e);}}}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);/*** 每3分钟检查消费进度,若消费进度落后超过 consumerFallbehindThreshold = 1024L * 1024 * 1024 * 16\* 且disableConsumeIfConsumerReadSlowly=true(默认false)则剔除掉该订阅组,该消费者组停止消费消息来保护broker* 因为存储消息的commitLog一个文件大小才为1024L * 1024 * 1024。* ps:不清楚原因*/this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.protectBroker();} catch (Throwable e) {log.error("protectBroker error.", e);}}}, 3, 3, TimeUnit.MINUTES);//每隔1s將打印发送消息线程池队列、拉取消息线程池队列、查询消息线程池队列、结束事务线程池队列的大小以及队列头部元素存在时间this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.printWaterMark();} catch (Throwable e) {log.error("printWaterMark error.", e);}}}, 10, 1, TimeUnit.SECONDS);//每隔1min將打印已存储在commitlog提交日志中但尚未分派到consumequeue消费队列的字节数。this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());} catch (Throwable e) {log.error("schedule dispatchBehindBytes error.", e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);if (this.brokerConfig.getNamesrvAddr() != null) {// 更新NamesrvAddrthis.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {/*** 如果未指定nameSvr地址,且开启从地址服务器获取* 则每2min从nameSvr地址服务器获取最新的地址并更新* 若希望动态更新nameSvr地址,则需要指定地址服务器url和fetchNamesrvAddrByAddressServer设置为true*/this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.brokerOuterAPI.fetchNameServerAddr();} catch (Throwable e) {log.error("ScheduledTask fetchNameServerAddr exception", e);}}}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);}//如果没有开启DLeger服务,DLeger开启后表示支持高可用的主从自动切换if (!messageStoreConfig.isEnableDLegerCommitLog()) {if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {// 如果是从节点,更新master地址if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());this.updateMasterHAServerAddrPeriodically = false;} else {this.updateMasterHAServerAddrPeriodically = true;}} else {//如果是主节点,每隔60s將打印主从节点的差异this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.printMasterAndSlaveDiff();} catch (Throwable e) {log.error("schedule printMasterAndSlaveDiff error.", e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);}}// ....省略// 初始化事务消息相关服务initialTransaction();// 初始化权限相关服务initialAcl();// 初始化RPC调用的钩子函数initialRpcHooks();}return result;
}
启动brokerController
入口: org.apache.rocketmq.broker.BrokerStartup#start
public static BrokerController start(BrokerController controller) {try {// BrokerController启动controller.start();String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "+ controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();if (null != controller.getBrokerConfig().getNamesrvAddr()) {tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();}log.info(tip);System.out.printf("%s%n", tip);return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;
}
public void start() throws Exception {//启动消息存储服务if (this.messageStore != null) {this.messageStore.start();}//启动netty远程服务if (this.remotingServer != null) {this.remotingServer.start();}//启动netty远程服务VIP通道if (this.fastRemotingServer != null) {this.fastRemotingServer.start();}if (this.fileWatchService != null) {this.fileWatchService.start();}if (this.brokerOuterAPI != null) {this.brokerOuterAPI.start();}//长轮询拉取消息挂起服务启动if (this.pullRequestHoldService != null) {this.pullRequestHoldService.start();}//客户端连接心跳服务启动if (this.clientHousekeepingService != null) {this.clientHousekeepingService.start();}if (this.filterServerManager != null) {this.filterServerManager.start();}// 如果没有开启DLegerif (!messageStoreConfig.isEnableDLegerCommitLog()) {//如果不是SLAVE,那么启动事务消息检查服务startProcessorByHa(messageStoreConfig.getBrokerRole());//如果是SLAVE,则启动主从同步服务, 定时任务每隔10s与master机器同步数据,采用slave主动拉取的方法//同步的内容包括topic配置,消费者消费位移、延迟消息偏移量、订阅组信息等handleSlaveSynchronize(messageStoreConfig.getBrokerRole());this.registerBrokerAll(true, false, true);}// 定时任务,默认每30s向namesvr发起一次注册,即心跳包。时间间隔可配置registerNameServerPeriod,1万到6万毫秒间。this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);if (this.brokerStatsManager != null) {this.brokerStatsManager.start();}if (this.brokerFastFailure != null) {this.brokerFastFailure.start();}}
相关文章:
![](https://www.ngui.cc/images/no-images.jpg)
【RocketMQ】源码详解:Broker启动流程
Broker启动 入口: org.apache.rocketmq.broker.BrokerStartup#main broker的启动主要分为两部分:1.创建brokerController 2.启动brokerController。与平时进行业务开发时不同的是,这里的BrokerController相当于Broker的一个中央控制器类&…...
![](https://www.ngui.cc/images/no-images.jpg)
vue事件
1. 事件传参 <button click"clickEvt($event, 22)">点我</button>2. 事件修饰符 prevent:阻止默认事件stop:阻止事件冒泡(加到子元素)once:事件只触发一次capture:使用事件的捕获模…...
![](https://img-blog.csdnimg.cn/img_convert/d9170e1401299bd00c2459253ffcddf5.png)
研报精选230220
目录 【行业230220国信证券】银行业行业专题:经济复苏中的优质中小银行【行业230220国信证券】汽车行业周报(2023年第7周):吉利将发布新品牌“银河” ,2022年宇通纯电动客车获欧洲销量冠军【行业230220开源证券】商贸零…...
![](https://www.ngui.cc/images/no-images.jpg)
kubernetes sd configs配置详解
1.基于Kubernetes的服务发现 kubernetes_sd_config 这个是以角色(role)来定义收集的,Kubernetes SD配置允许从Kubernetes的RESTAPI中检索scrape目标,并始终与群集状态保持同步。 凡<role>必须是endpoints,service,pod&…...
![](https://www.ngui.cc/images/no-images.jpg)
Linux查看文件的命令
目录 1、tail 2、head 3、cat 4、more 5、sed 6、less Linux查看日志的命令有多种: tail、cat、tac、head、echo等,本文只介绍几种常用的方法。 1、tail 命令格式: tail[必要参数][选择参数][文件] -f 循环读取 -q 不显示处理信息 -v 显示详细的处理信…...
![](https://img-blog.csdnimg.cn/1327af1ef29146a5a3de394c5ac64156.png)
如何单独清除某个网页的缓存(reload)
有时候在自己服务器上调试的时候,刷新一直不更新,样式改了也看不到,就很烦 今天教你一个方法快速清除 F12 控制台情况下右击左上角的刷新 这三个分别代表: ①正常重新加载(Ctrl R): 正常重新加载 此方法,浏览器发送请求时会…...
![](https://img-blog.csdnimg.cn/img_convert/4b22c4403c644bb48c65276e16d3b9bf.png)
魔兽世界经典怀旧服务器架设教程
准备工具:MySQL服务端服务器最重要的你需要会技术、要不然都瞎扯 给你东西你也看不懂。教程开始:安装MySQL并创建数据库安装MySQL社区版,并配置SQL服务器。安装SQLyog。利用其登录,创建realmd、characters、mangos、scriptdev2数据…...
![](https://img-blog.csdnimg.cn/img_convert/1d27946fa2c1dd7e3a0887b1f4df6137.png)
Interview系列 - 05 Java|Iterator迭代器|集合继承体系|Set List Map接口特性|List实现类区别
文章目录01. 迭代器 Iterator 是什么?02. 迭代器 Iterator 有什么特点?03. 迭代器 Iterator 怎么使用?04. 如何边遍历边移除 Collection 中的元素?05. Iterator 和 ListIterator 有什么区别?06. 数组和集合的区别&…...
![](https://www.ngui.cc/images/no-images.jpg)
LeetCode 1769. 移动所有球到每个盒子所需的最小操作数
有 n 个盒子。给你一个长度为 n 的二进制字符串 boxes ,其中 boxes[i] 的值为 ‘0’ 表示第 i 个盒子是 空 的,而 boxes[i] 的值为 ‘1’ 表示盒子里有 一个 小球。 在一步操作中,你可以将 一个 小球从某个盒子移动到一个与之相邻的盒子中。…...
![](https://www.ngui.cc/images/no-images.jpg)
MKS SKIPR V1.0船长版(Voron 2.4 R2)配置简要笔记
第一次用MKS SKIPR V1.0,设置过程中,也不知道怎么回事,跟现有的资料有些出入。首先,基本的配置调试可以参考官方的使用说明。 MKS SKIPR V1.0 使用说明书 这个说明比较简单,很多深一点的东西没有提现,不过…...
![](https://img-blog.csdnimg.cn/3ab4ea0d46624371954c8a1fe805804d.png)
90后,转行软件测试3年,从月入7000+到月入过万,整理出的这一万字经验分享。
周一发工资了,到手12857.65,美滋滋 今年是我毕业参加工作的第3年,工资终于来到5位数了。上一家公司月薪7000,实际拿到手就6450左右,感觉今年真的是元气满满啊,工资翻倍,良好的人生开端。 想起…...
![](https://img-blog.csdnimg.cn/f43a590b591542a78147ea4539f3fe9b.png)
Java之关于String字符串笔试面试重点
目录 一.关于字符串的常量池 1.关于字符串产生的三种方式 2.关于字符串的常量池 3.直接赋值法和new的方式产生对象的区别 二.关于intern方法 1.情况一(已经包含) 2.情况二(已经包含) 3.情况三(未包含) 4.情况四 三.关于字符串的不可变性 1.了解字符串的不可变性 2.Str…...
![](https://img-blog.csdnimg.cn/4a24fdbf2cbe4d11bc39914851d3b71b.png)
mdio协议
1. 简介 MDIO接口中有特定的术语定义总线上的各种设备,驱动MDIO总线的设备被定义为站管理实体(STA),而被MDC管理的目标设备称为可被MDIO管理的设备(MMD)。 STA初始化MDIO所有的通信,同时负责驱动…...
![](https://www.ngui.cc/images/no-images.jpg)
kubectl命令
kubectl命令是操作 Kubernetes 集群的最直接和最高效的途径。 1、kubectl自动补全 $ source <(kubectl completion bash) # setup autocomplete in bash, bash-completion package should be installed first. $ source <(kubectl completion zsh) # setup autocomple…...
![](https://www.ngui.cc/images/no-images.jpg)
题库-JAVASE01
文章目录1.JAVA开发环境2.JAVA变量3.JAVA基本类型4.运算符和表达式5.分支结构6.循环结构7.数组8.方法1.JAVA开发环境 (单选题)在Java中,以下描述错误的是( ) A…class是源文件 B…java是编译前的源文件 C…class是编译后的文件 D.Java程序需…...
![](https://img-blog.csdnimg.cn/1839269acfa144ccab2ed4a8be4b5325.png)
Java序列化机制
Java序列化机制 概述 java中的序列化可能都停留在实现Serializable接口上,对于它里面的一些核心机制没有深入了解过。直到最近在项目中踩了一个坑,就是序列化对象添加一个字段以后,使用方系统报了反序列化失败,原因是我们双方的…...
![](https://img-blog.csdnimg.cn/img_convert/ea14d13b8e91f7af2323cde1a0bc223f.jpeg)
3款强大到离谱电脑软件,都是效率神器,从此远离加班
闲话少说,直接上狠货。 1、ImageGlass ImageGlass是一款值得吹爆的电脑图片浏览工具,使用极其方便,体积50M左右,非常小巧,功能却强大到离谱,ImageGlass打开图片的速度极快,实现快速不同图像间切…...
![](https://img-blog.csdnimg.cn/3d9ac50032e347f1bb36bbf7a190ae96.png)
【项目】Vue3+TS CMS 登录模块搭建
💭💭 ✨:Vue3 TS 💟:东非不开森的主页 💜: keep going💜💜 🌸: 如有错误或不足之处,希望可以指正,非常感谢😉 Vue3TS一、…...
![](https://www.ngui.cc/images/no-images.jpg)
Java 8 的那些常见写法
前言 现在Java已经发展到Java19版本了,由于Java后面一些版本,就开始商用收费了,所以目前绝大多数公司的JDK版本都是采用的之前稳定且免费的1.8版本,也就是Java8,这个版本已经能满足几乎所有业务的需求开发了ÿ…...
![](https://img-blog.csdnimg.cn/e13f500da02a4d4c99324ddc326e5749.png)
PyQt5数据库开发1 4.3 QSqlTableModel 之 相关槽函数的实现(多图长文详解)
目录 一、打开数据库表 1. 写打开数据库的槽函数 2. 运行后发现数据库可以打开了 3. ODBC配通了,数据库还是打不开 4. 写在tableView上显示数据库表的函数 5. 运行后发现表可以显示了 6. 代码分析 7. 添加列名称 8. 根据内容调整列宽 9. 备注:…...
![](https://img-blog.csdnimg.cn/4a93dcf091cc497b83d3cef1289cb03a.png)
QT 设计一个串口调试工具,用一个工程就能轻松解决,外加虚拟串口工具模拟调试,在日常工作中可类比模块间通信,非常详细建议收藏
QT 串口调试工具第一节 虚拟串口工具安装第二节 QT创建一个基于QWidget的项目第三节 UI界面设计第三节 项目头文件widget.h第四节 项目实现文件widget.cpp第五节 main函数第六节 编译结果重点第七节 使用QT打包程序,不安装QT的电脑可使用第一节 虚拟串口工具安装 -…...
![](https://img-blog.csdnimg.cn/ac31b6ae18b5490ca805ab80e8363f19.png)
OpenSumi 是信创开发云的首选
原文作者:行云创新技术总监 邓冰寒 引言 随着云原生应用的日益普及,开发上云也逐步被越来越多的厂商和开发者接受,在这个赛道国内外有不少玩家,国外的 GitHub Codespaces、CodeSandbox,GitPod、亚马逊 Cloud9…...
![](https://img-blog.csdnimg.cn/1b76b3fc6e264e17a8a9b1d4da9e3170.png)
JdbcTemplate常用方法解析
文章目录1.JdbcTemplate简介2.JdbcTemplate主要方法:3.常用方法介绍update()方法增删改query()查询方法1.JdbcTemplate简介 JdbcTemplate是Spring JDBC的核心类,借助该类提供的方法可以很方便的实现数据的增删改查。 Spring对数据库的操作在jdbc上面做…...
![](https://img-blog.csdnimg.cn/img_convert/847c79ab6f84697ee27dadd24fac0d4b.jpeg)
生物素标记试剂1869922-24-6,Alkyne-PEG3-Biotin PC,炔烃PEG3生物素PC
1、试剂基团反应特点(Reagent group reaction characteristics):PC alkyne-PEG3-Biotin含一个炔烃和一个 PEG 链接的可光裂解生物素基团。含 3 个单元 PEG 的 ADC linker,生物素本身是个游离的小分子,在生物实验中常常…...
![](https://img-blog.csdnimg.cn/b8ccec91b881418b80ab6f97e898aacf.png#pic_center)
CS224W课程学习笔记(三):DeepWalk算法原理与说明
引言 什么是图嵌入? 图嵌入(Graph Embedding,也叫Network Embedding) 是一种将图数据(通常为高维稠密的矩阵)映射为低微稠密向量的过程,能够很好地解决图数据难以高效输入机器学习算法的问题。…...
![](https://img-blog.csdnimg.cn/img_convert/c50878fb610105f0d2d365e5240b0171.jpeg)
rk3568 开发板Ubuntu系统说明
Ubuntu MinimalUbuntu Minimal系统基于Ubuntu 64bit系统构建,目前发布有Ubuntu18.04这个版本。与Ubuntu Desktop 相比具有以下特性:没有桌面环境,占用资源少,在简化网络管理之后,只需40M内存;针对嵌入式平台…...
![](https://www.ngui.cc/images/no-images.jpg)
Windows和Linux常用HASH算法使用命令
Windows和Linux常用hash算法使用命令 Windows,以文件xxx.zip为例 Windows 求文件 md5 certutil -hashfile xxx.zip md5Windows 求文件 sha1 certutil -hashfile xxx.zip sha1Windows 求文件 sha256 certutil -hashfile xxx.zip sha256Linux,以文件xxx.z…...
![](https://img-blog.csdnimg.cn/d81f979a56af4738807c847676311adb.png)
货仓选址 AcWing(JAVA)
在一条数轴上有 N家商店,它们的坐标分别为 A1∼AN。 现在需要在数轴上建立一家货仓,每天清晨,从货仓到每家商店都要运送商品。 为了提高效率,求把货仓建在何处,可以使得货仓到每家商店的距离之和最小。 输入格式&#…...
![](https://img-blog.csdnimg.cn/41f99543d5304e609275feb467c9c6c7.png)
SPI+DMA传输性能比较
本文章仅仅简单记录32单片机的SPIDMA驱动显示屏的性能测试,这里不花费时间介绍SPI和DMA。 硬件材料:SPI显示屏一个,32单片机 软件材料: 1.LCD的SPI驱动显示程序(SPI / SPIDMA): (1&a…...
![](https://img-blog.csdnimg.cn/943f5d5bc835432faf627f4ce6ce08fd.png)
Centos7系统编译Hadoop3.3.4
1、背景 最近在学习hadoop,此篇文章简单记录一下通过源码来编译hadoop。为什么要重新编译hadoop源码,是因为为了匹配不同操作系统的本地库环境。 2、编译源码 2.1 下载并解压源码 [roothadoop01 ~]# mkdir /opt/hadoop [roothadoop01 ~]# cd /opt/had…...
![](https://images0.cnblogs.com/blog/464052/201307/18161925-b7c2ab021d594fb88b3dea6e7de64cc0.png)
申请一个网站需要怎么做/怎么用手机制作网站
Fibonacci Time Limit: 1000/1000 MS (Java/Others) Memory Limit: 32768/32768 K (Java/Others) Total Submission(s): 2087 Accepted Submission(s): 999 Problem Description 2007年到来了。经过2006年一年的修炼,数学神童zouyu终于把0到100000000的Fibonacci数列…...
![](https://img2018.cnblogs.com/blog/1239507/201811/1239507-20181126165131342-917378679.png)
天津网站建设哪里好/百度排名查询
框架名称是ui_auto_web,有bin、conf、lib、log、reports和webCase六个目录,lib目录下有core和page目录,page目录下又包含web目录,把每个功能的测试用例存放到webCase目录下,核心功能文件放在core目录下,rep…...
![](https://img-blog.csdnimg.cn/img_convert/21406af854593a765ea158b1ad477229.png)
做水印的网站/南昌seo快速排名
个人使用的人工智能产品个人/家用:Ems——帮你找到最合适的居住地Bridge Kitchen——教你一步步做菜的厨房助理UnifyID ——通过你走路、打字和坐姿进行身份认证的工具工作:Carly——帮你管理来电ETCH——帮你管理人际关系并且形成可搜索的数据库Findo …...
![](/images/no-images.jpg)
建设集团有限公司网站/热搜榜排名今日事件
我有一个问题,我不确定如何在假定DDD并使用C#/ EF Core时解决.简化情况:我们有2个聚合 – 项目和仓库.它们中的每一个都具有ExternalId(Guid)的身份以在外部(FE等)识别它,其也被视为其域身份.它还有数据库Id taht在数据库模型中表示它 – 实体模型和Db模型是同一类,…...
![](/images/no-images.jpg)
哪些网站用django做的/扬州seo
?数据库基础数据库:保存有组织的数据的容器表:某种特定类型数据的结构化清单列:表中的一个字段数据类型:所容许的数据的类型行:表中的一个记录主键:一列,其值能够唯一区分表中每个行SQL是结构化…...
![](https://img-blog.csdnimg.cn/4601f4b2aba24ef38f0bf14317da5759.png)
东营城乡建设信息网/seo查询 工具
题目描述 给你一个字符串 s ,请你返回满足以下条件的最长子字符串的长度:每个元音字母,即 ‘a’,‘e’,‘i’,‘o’,‘u’ ,在子字符串中都恰好出现了偶数次。 思路 ①由于问题…...