做视频网站视频/seo搜索引擎优化5
没有用过rocketmq,但是一直对RocketMQ的实现很感兴趣,本次阅读源码基于5.0.0
一、 nameserver
通过源码阅读发现,它的作用主要是当作一个注册中心,注册broker、topic等信息,维护topic以及broker队列的路由信息,通过netty监听端口来处理网络通信请求
1.1 服务配置加载以及netty服务启动
在namesrv模块下,可以看到有一个NamesrvStartup.java的类,有一个main方法来执行namesrv的启动:
public static void main(String[] args) {main0(args);}public static NamesrvController main0(String[] args) {try {NamesrvController controller = createNamesrvController(args);start(controller);String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();log.info(tip);System.out.printf("%s%n", tip);return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;}
首先是createNamesrvController 方法,主要作用是生成NamesrvController 启动配置:
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));//PackageConflictDetect.detectFastjson();Options options = ServerUtil.buildCommandlineOptions(new Options());// 解析启动命令commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());if (null == commandLine) {System.exit(-1);return null;}final NamesrvConfig namesrvConfig = new NamesrvConfig();final NettyServerConfig nettyServerConfig = new NettyServerConfig();final NettyClientConfig nettyClientConfig = new NettyClientConfig();nettyServerConfig.setListenPort(9876);if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();// 加载配置文件properties.load(in);// 通过配置内容,反射调用set方法转换为对应的配置类MixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);MixAll.properties2Object(properties, nettyClientConfig);namesrvConfig.setConfigStorePath(file);System.out.printf("load config properties file OK, %s%n", file);in.close();}}if (commandLine.hasOption('p')) {// 通过配置内容,反射调用set方法转换为对应的配置类MixAll.printObjectProperties(null, namesrvConfig);MixAll.printObjectProperties(null, nettyServerConfig);MixAll.printObjectProperties(null, nettyClientConfig);System.exit(0);}MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);if (null == namesrvConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}// 配置日志打印LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();JoranConfigurator configurator = new JoranConfigurator();configurator.setContext(lc);lc.reset();configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);MixAll.printObjectProperties(log, namesrvConfig);MixAll.printObjectProperties(log, nettyServerConfig);final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig, nettyClientConfig);// remember all configs to prevent discardcontroller.getConfiguration().registerConfig(properties);return controller;}
生成配置对象之后会执行start方法来启动,start方法内有一个initialize方法,主要作用是定义一些线程池以及定时任务,使用一个线程来定时每 5 * 1000 毫秒扫描路由信息上的broker,超过超过2分钟则认为broker不可用了,会进行销毁:
public boolean initialize() {this.kvConfigManager.load();// 生成Netty服务所需要的配置信息,并通过linux系统环境等因素是否使用epoll模式this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);this.defaultThreadPoolQueue = new LinkedBlockingQueue<>(this.namesrvConfig.getDefaultThreadPoolQueueCapacity());// 定义线程池对象this.defaultExecutor = new ThreadPoolExecutor(this.namesrvConfig.getDefaultThreadPoolNums(),this.namesrvConfig.getDefaultThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.defaultThreadPoolQueue,new ThreadFactoryImpl("RemotingExecutorThread_")) {@Overrideprotected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {return new FutureTaskExt<T>(runnable, value);}};this.clientRequestThreadPoolQueue = new LinkedBlockingQueue<>(this.namesrvConfig.getClientRequestThreadPoolQueueCapacity());// 定义线程池对象this.clientRequestExecutor = new ThreadPoolExecutor(this.namesrvConfig.getClientRequestThreadPoolNums(),this.namesrvConfig.getClientRequestThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.clientRequestThreadPoolQueue,new ThreadFactoryImpl("ClientRequestExecutorThread_")) {@Overrideprotected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {return new FutureTaskExt<T>(runnable, value);}};this.remotingClient = new NettyRemotingClient(this.nettyClientConfig);this.remotingClient.updateNameServerAddressList(Arrays.asList(RemotingUtil.getLocalAddress() + ":" + this.nettyServerConfig.getListenPort()));// 注册请求处理器,通过code判断使用哪个processorthis.registerProcessor();// 使用一个线程来定时每 5 * 1000 毫秒扫描路由信息上不存活的broker,超过2分钟则认为broker不可用了,会进行销毁this.scanExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,5, this.namesrvConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically,1, 10, TimeUnit.MINUTES);this.scheduledExecutorService.scheduleAtFixedRate(() -> {try {NamesrvController.this.printWaterMark();} catch (Throwable e) {LOGGER.error("printWaterMark error.", e);}}, 10, 1, TimeUnit.SECONDS);if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {// Register a listener to reload SslContexttry {fileWatchService = new FileWatchService(new String[] {TlsSystemConfig.tlsServerCertPath,TlsSystemConfig.tlsServerKeyPath,TlsSystemConfig.tlsServerTrustCertPath},new FileWatchService.Listener() {boolean certChanged, keyChanged = false;@Overridepublic void onChanged(String path) {if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {LOGGER.info("The trust certificate changed, reload the ssl context");reloadServerSslContext();}if (path.equals(TlsSystemConfig.tlsServerCertPath)) {certChanged = true;}if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {keyChanged = true;}if (certChanged && keyChanged) {LOGGER.info("The certificate and private key changed, reload the ssl context");certChanged = keyChanged = false;reloadServerSslContext();}}private void reloadServerSslContext() {((NettyRemotingServer) remotingServer).loadSslContext();}});} catch (Exception e) {LOGGER.warn("FileWatchService created error, can't load the certificate dynamically");}}return true;}
扫描不存活的broker方法:
public void scanNotActiveBroker() {try {log.info("start scanNotActiveBroker");for (Entry<BrokerAddrInfo, BrokerLiveInfo> next : this.brokerLiveTable.entrySet()) {// 上一次的更新时间long last = next.getValue().getLastUpdateTimestamp();// 默认值是1000 * 60 * 2 毫秒,就是两分钟long timeoutMillis = next.getValue().getHeartbeatTimeoutMillis();if ((last + timeoutMillis) < System.currentTimeMillis()) {RemotingUtil.closeChannel(next.getValue().getChannel());log.warn("The broker channel expired, {} {}ms", next.getKey(), timeoutMillis);// 超时销毁对用的brokerthis.onChannelDestroy(next.getKey());}}} catch (Exception e) {log.error("scanNotActiveBroker exception", e);}}
配置信息以及定时等任务都生成好之后进行netty服务的start:
public void start() {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});// 准备定义好的handler,主要用来定义连接handler以及io请求handlerprepareSharableHandlers();ServerBootstrap childHandler =this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)// 判断是否使用epoll模式io.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_KEEPALIVE, false).childOption(ChannelOption.TCP_NODELAY, true).localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});if (nettyServerConfig.getServerSocketSndBufSize() > 0) {log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize());childHandler.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize());}if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {log.info("server set SO_RCVBUF to {}", nettyServerConfig.getServerSocketRcvBufSize());childHandler.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize());}if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && nettyServerConfig.getWriteBufferHighWaterMark() > 0) {log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark());childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()));}if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {// 通过netty监听端口,进行网络服务的启动ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}if (this.channelEventListener != null) {this.nettyEventExecutor.start();}// 用来定时扫描返回表,对超时返回的信息进行删除以及超时回调处理this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);}
nameServer会保存注册上来的topic信息表以及broker信息表,并进行定时维护,通过接收netty网络消息的code值来决定需要做什么处理,比如通过topic信息去查broker路由信息以及队列信息:
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);// 从请求头中解析topic名称final GetRouteInfoRequestHeader requestHeader =(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);// 不包含字符1if (requestHeader.getTopic().indexOf(GetRouteInfoRequestHeader.split) < 0) {// 通过routeInfoManager获取topic所在的broker节点信息以及队列信息TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());if (topicRouteData != null) {if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {String orderTopicConf =this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,requestHeader.getTopic());topicRouteData.setOrderTopicConf(orderTopicConf);}// 把topic以及broker队列信息封装返回byte[] content = topicRouteData.encode();response.setBody(content);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}// 如果为空,则返回topic不存在response.setCode(ResponseCode.TOPIC_NOT_EXIST);response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));return response;}String[] topics = requestHeader.getTopic().split(String.valueOf(GetRouteInfoRequestHeader.split));TopicRouteDatas topicRouteDatas = new TopicRouteDatas();// 多个topic信息进行封装返回for (String topic : topics) {TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(topic);if (topicRouteData == null) {continue;}if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {String orderTopicConf =this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, topic);topicRouteData.setOrderTopicConf(orderTopicConf);}topicRouteDatas.getTopics().put(topic, topicRouteData);}response.setBody(topicRouteDatas.encode());response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}// 比如如下的一些请求命令switch (request.getCode()) {case RequestCode.PUT_KV_CONFIG:return this.putKVConfig(ctx, request);case RequestCode.GET_KV_CONFIG:return this.getKVConfig(ctx, request);case RequestCode.DELETE_KV_CONFIG:return this.deleteKVConfig(ctx, request);case RequestCode.QUERY_DATA_VERSION:return this.queryBrokerTopicConfig(ctx, request);case RequestCode.REGISTER_BROKER:return this.registerBroker(ctx, request);case RequestCode.UNREGISTER_BROKER:return this.unregisterBroker(ctx, request);case RequestCode.BROKER_HEARTBEAT:return this.brokerHeartbeat(ctx, request);case RequestCode.GET_BROKER_MEMBER_GROUP:return this.getBrokerMemberGroup(ctx, request);case RequestCode.GET_BROKER_CLUSTER_INFO:return this.getBrokerClusterInfo(ctx, request);case RequestCode.WIPE_WRITE_PERM_OF_BROKER:return this.wipeWritePermOfBroker(ctx, request);case RequestCode.ADD_WRITE_PERM_OF_BROKER:return this.addWritePermOfBroker(ctx, request);case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:return this.getAllTopicListFromNameserver(ctx, request);case RequestCode.DELETE_TOPIC_IN_NAMESRV:return this.deleteTopicInNamesrv(ctx, request);case RequestCode.REGISTER_TOPIC_IN_NAMESRV:return this.registerTopicToNamesrv(ctx, request);case RequestCode.GET_KVLIST_BY_NAMESPACE:return this.getKVListByNamespace(ctx, request);case RequestCode.GET_TOPICS_BY_CLUSTER:return this.getTopicsByCluster(ctx, request);case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:return this.getSystemTopicListFromNs(ctx, request);case RequestCode.GET_UNIT_TOPIC_LIST:return this.getUnitTopicList(ctx, request);case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:return this.getHasUnitSubTopicList(ctx, request);case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:return this.getHasUnitSubUnUnitTopicList(ctx, request);case RequestCode.UPDATE_NAMESRV_CONFIG:return this.updateConfig(ctx, request);case RequestCode.GET_NAMESRV_CONFIG:return this.getConfig(ctx, request);case RequestCode.GET_CLIENT_CONFIG:return this.getClientConfigs(ctx, request);default:String error = " request type " + request.getCode() + " not supported";return RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
二、broker
首先是createBrokerController方法去记载一些配置文件、解析启动命令参数等:
public static BrokerController createBrokerController(String[] args) {System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));try {//PackageConflictDetect.detectFastjson();Options options = ServerUtil.buildCommandlineOptions(new Options());// 解析启动参数commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),new DefaultParser());if (null == commandLine) {System.exit(-1);}// broker的配置信息final BrokerConfig brokerConfig = new BrokerConfig();final NettyServerConfig nettyServerConfig = new NettyServerConfig();final NettyClientConfig nettyClientConfig = new NettyClientConfig();// netty是否使用tls传输nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));// netty的监听端口nettyServerConfig.setListenPort(10911);final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);}// 设置配置文件if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {configFileHelper.setFile(file);configFile = file;BrokerPathConfigHelper.setBrokerConfigPath(file);}}// 加载配置内容properties = configFileHelper.loadConfig();if (properties != null) {properties2SystemEnv(properties);// 通过反射获取对象的set方法设置对象属性MixAll.properties2Object(properties, brokerConfig);MixAll.properties2Object(properties, nettyServerConfig);MixAll.properties2Object(properties, nettyClientConfig);MixAll.properties2Object(properties, messageStoreConfig);}MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);if (null == brokerConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}// 获取nameserver注册地址String namesrvAddr = brokerConfig.getNamesrvAddr();if (null != namesrvAddr) {try {String[] addrArray = namesrvAddr.split(";");for (String addr : addrArray) {RemotingUtil.string2SocketAddress(addr);}} catch (Exception e) {System.out.printf("The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",namesrvAddr);System.exit(-3);}}switch (messageStoreConfig.getBrokerRole()) {case ASYNC_MASTER:case SYNC_MASTER:// 如果brokerId为零则以master启动brokerConfig.setBrokerId(MixAll.MASTER_ID);break;case SLAVE:// brokerId小于0则停止if (brokerConfig.getBrokerId() <= 0) {System.out.printf("Slave's brokerId must be > 0");System.exit(-3);}break;default:break;}if (messageStoreConfig.isEnableDLegerCommitLog()) {brokerConfig.setBrokerId(-1);}messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();JoranConfigurator configurator = new JoranConfigurator();configurator.setContext(lc);lc.reset();// 设置日志文件夹System.setProperty("brokerLogDir", "");if (brokerConfig.isIsolateLogEnable()) {System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + brokerConfig.getBrokerId());}if (brokerConfig.isIsolateLogEnable() && messageStoreConfig.isEnableDLegerCommitLog()) {System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + messageStoreConfig.getdLegerSelfId());}configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");if (commandLine.hasOption('p')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig);MixAll.printObjectProperties(console, nettyServerConfig);MixAll.printObjectProperties(console, nettyClientConfig);MixAll.printObjectProperties(console, messageStoreConfig);System.exit(0);} else if (commandLine.hasOption('m')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig, true);MixAll.printObjectProperties(console, nettyServerConfig, true);MixAll.printObjectProperties(console, nettyClientConfig, true);MixAll.printObjectProperties(console, messageStoreConfig, true);System.exit(0);}log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);MixAll.printObjectProperties(log, brokerConfig);MixAll.printObjectProperties(log, nettyServerConfig);MixAll.printObjectProperties(log, nettyClientConfig);MixAll.printObjectProperties(log, messageStoreConfig);brokerConfig.setInBrokerContainer(false);// 生成BrokerControllerfinal BrokerController controller = new BrokerController(brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig);// remember all configs to prevent discardcontroller.getConfiguration().registerConfig(properties);// 初始化配置,设置一些监视器模式函数boolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {private volatile boolean hasShutdown = false;private AtomicInteger shutdownTimes = new AtomicInteger(0);@Overridepublic void run() {synchronized (this) {log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());if (!this.hasShutdown) {this.hasShutdown = true;long beginTime = System.currentTimeMillis();controller.shutdown();long consumingTimeTotal = System.currentTimeMillis() - beginTime;log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);}}}}, "ShutdownHook"));return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;}
之后又是调用BrokerController的start函数,主要是调用startBasicService方法启动一些基础服务,主要是netty网络服务、消息处理服务:
protected void startBasicService() throws Exception {if (this.messageStore != null) {// 启动消息存储的服务this.messageStore.start();}if (remotingServerStartLatch != null) {// 阻塞等待需要的服务启动remotingServerStartLatch.await();}// 启动netty服务监听端口if (this.remotingServer != null) {this.remotingServer.start();}// 启动netty服务监听端口if (this.fastRemotingServer != null) {this.fastRemotingServer.start();}for (BrokerAttachedPlugin brokerAttachedPlugin : brokerAttachedPlugins) {if (brokerAttachedPlugin != null) {brokerAttachedPlugin.start();}}// 取消息处理器if (this.popMessageProcessor != null) {this.popMessageProcessor.getPopLongPollingService().start();this.popMessageProcessor.getPopBufferMergeService().start();this.popMessageProcessor.getQueueLockManager().start();}// 确认消息处理服务if (this.ackMessageProcessor != null) {this.ackMessageProcessor.startPopReviveService();}if (this.assignmentManager != null) {this.assignmentManager.start();}if (this.topicQueueMappingCleanService != null) {this.topicQueueMappingCleanService.start();}// 文件监视器服务if (this.fileWatchService != null) {this.fileWatchService.start();}if (this.pullRequestHoldService != null) {this.pullRequestHoldService.start();}// 定时扫描不存活的channel信息并进行清理if (this.clientHousekeepingService != null) {this.clientHousekeepingService.start();}// 执行shell脚本文件if (this.filterServerManager != null) {this.filterServerManager.start();}if (this.brokerStatsManager != null) {this.brokerStatsManager.start();}if (this.brokerFastFailure != null) {this.brokerFastFailure.start();}if (this.escapeBridge != null) {this.escapeBridge.start();}if (this.brokerPreOnlineService != null) {this.brokerPreOnlineService.start();}//Init state version after messageStore initialized.this.topicConfigManager.initStateVersion();}
注册各种消息处理器,通过code来判断使用哪个消息处理器进行消息的处理以及返回:
public void registerProcessor() {/** SendMessageProcessor*/sendMessageProcessor.registerSendMessageHook(sendMessageHookList);sendMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendMessageProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendMessageProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendMessageProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendMessageProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);/*** PullMessageProcessor*/this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);this.remotingServer.registerProcessor(RequestCode.LITE_PULL_MESSAGE, this.pullMessageProcessor, this.litePullMessageExecutor);this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);/*** PeekMessageProcessor*/this.remotingServer.registerProcessor(RequestCode.PEEK_MESSAGE, this.peekMessageProcessor, this.pullMessageExecutor);/*** PopMessageProcessor*/this.remotingServer.registerProcessor(RequestCode.POP_MESSAGE, this.popMessageProcessor, this.pullMessageExecutor);/*** AckMessageProcessor*/this.remotingServer.registerProcessor(RequestCode.ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor);/*** ChangeInvisibleTimeProcessor*/this.remotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, this.changeInvisibleTimeProcessor, this.ackMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, this.changeInvisibleTimeProcessor, this.ackMessageExecutor);/*** notificationProcessor*/this.remotingServer.registerProcessor(RequestCode.NOTIFICATION, this.notificationProcessor, this.pullMessageExecutor);/*** pollingInfoProcessor*/this.remotingServer.registerProcessor(RequestCode.POLLING_INFO, this.pollingInfoProcessor, this.pullMessageExecutor);/*** ReplyMessageProcessor*/replyMessageProcessor.registerSendMessageHook(sendMessageHookList);this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);/*** QueryMessageProcessor*/NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);/*** ClientManageProcessor*/this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor);this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManageProcessor, this.clientManageExecutor);this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManageProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManageProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManageProcessor, this.clientManageExecutor);/*** ConsumerManageProcessor*/ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);/*** QueryAssignmentProcessor*/this.remotingServer.registerProcessor(RequestCode.QUERY_ASSIGNMENT, queryAssignmentProcessor, loadBalanceExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_ASSIGNMENT, queryAssignmentProcessor, loadBalanceExecutor);this.remotingServer.registerProcessor(RequestCode.SET_MESSAGE_REQUEST_MODE, queryAssignmentProcessor, loadBalanceExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SET_MESSAGE_REQUEST_MODE, queryAssignmentProcessor, loadBalanceExecutor);/*** EndTransactionProcessor*/this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);/** Default*/AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);}
基础服务启动之后向nameSerever注册broker信息以及同步broker组:
scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {@Overridepublic void run2() {try {if (System.currentTimeMillis() < shouldStartTime) {BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);return;}if (isIsolated) {BrokerController.LOG.info("Skip register for broker is isolated");return;}// 向nameServer注册broker的topic等基础信息BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {BrokerController.LOG.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));if (this.brokerConfig.isEnableSlaveActingMaster()) {scheduledFutures.add(this.brokerHeartbeatExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {@Overridepublic void run2() {if (isIsolated) {// 如果是单机模式直接返回return;}try {// 定时更新配置信息BrokerController.this.sendHeartbeat();} catch (Exception e) {BrokerController.LOG.error("sendHeartbeat Exception", e);}}}, 1000, brokerConfig.getBrokerHeartbeatInterval(), TimeUnit.MILLISECONDS));scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {@Override public void run2() {try {// 定时同步broker组内容、HA高可用内容BrokerController.this.syncBrokerMemberGroup();} catch (Throwable e) {BrokerController.LOG.error("sync BrokerMemberGroup error. ", e);}}}, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), TimeUnit.MILLISECONDS));}
三、producer
producer通过指定一个producerGroup来启动,默认启动流程主要执行了DefaultMQProducerImpl的start方法:
public void start(final boolean startFactory) throws MQClientException {switch (this.serviceState) {case CREATE_JUST:// 初始状态默认失败this.serviceState = ServiceState.START_FAILED;// 检查producerGroup是否存在、group大小是否超限等this.checkConfig();// 如果producer不为CLIENT_INNER_PRODUCER,变更实例名字为pid值if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();}// 初始化参数,实例化netty配置信息以及消息处理器等,创建mq工厂this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);// 在缓存中注册这个producerboolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);if (!registerOK) {// 状态为新建状态this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());if (startFactory) {// 进行启动流程mQClientFactory.start();}log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),this.defaultMQProducer.isSendMessageWithVIPChannel());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The producer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();// 每 1000 毫秒,定时扫描失效的请求并进行清理RequestFutureHolder.getInstance().startScheduledTask(this);}
主要也是加载配置 、更新缓存、添加定时清理失效请求的任务等,主要的代码在mQClientFactory.start()
方法
public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// 查找nameSrv地址信息if (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();}// 定时扫描与nameSrv的channel连接状态this.mQClientAPIImpl.start();// 定时更新nameSrv地址信息、定时更新topic路由信息、定时向所有的broker发送心跳检测、定时向broker获取消费者的消费情况this.startScheduledTask();// 拉取消息服务this.pullMessageService.start();this.rebalanceService.start();// 消息推送服务this.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case START_FAILED:throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);default:break;}}}
初始化服务之后进行消息发送,使用producer.send(msg)默认消息发送,broker收到消息之后取到typeCode为SEND_MESSAGE
= 10,执行SendMessageProcessor 的processRequest处理消息,执行各种钩子函数以及返回response,通过定时再将消息同步到slave节点中去
private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {this.makeSureStateOK();// 检查topic、消息体空值等情况Validators.checkMessage(msg, this.defaultMQProducer);// 随机数final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;// 获取topic的broker路由、队列信息TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];for (; times < timesTotal; times++) {String lastBrokerName = null == mq ? null : mq.getBrokerName();// 通过随机值对队列的长度进行取模,选择一个可用的消息队列进行发送MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();if (times > 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;break;}// 通过同步或者异步的方式发送小给brokersendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}} catch (RemotingException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQClientException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQBrokerException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {continue;} else {if (sendResult != null) {return sendResult;}throw e;}} catch (InterruptedException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());throw e;}} else {break;}}if (sendResult != null) {return sendResult;}String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",times,System.currentTimeMillis() - beginTimestampFirst,msg.getTopic(),Arrays.toString(brokersSent));info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);MQClientException mqClientException = new MQClientException(info, exception);if (callTimeout) {throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");}if (exception instanceof MQBrokerException) {mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());} else if (exception instanceof RemotingConnectException) {mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);} else if (exception instanceof RemotingTimeoutException) {mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);} else if (exception instanceof MQClientException) {mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);}throw mqClientException;}validateNameServerSetting();throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);}
四、consumer
与producer类似,初始状态设置group名,执行start方法
public synchronized void start() throws MQClientException {switch (this.serviceState) {case CREATE_JUST:log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());// 初始状态默认值失败this.serviceState = ServiceState.START_FAILED;// 检查配置信息this.checkConfig();this.copySubscription();// 如果是集群方式消费,更改实例名为pid值if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {this.defaultMQPushConsumer.changeInstanceNameToPID();}// 根据配置信息等,创建mq客户端工厂this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);// 设置消费者属性this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);if (this.pullAPIWrapper == null) {this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());}// 注册消息过滤的钩子函数this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);if (this.defaultMQPushConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();} else {switch (this.defaultMQPushConsumer.getMessageModel()) {// 广播的形式,offset存储在本地客户端文件当中case BROADCASTING:this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;// 集群的形式消费,需要去broker拿取最新的消费offset信息case CLUSTERING:this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);}// 加载消费offset信息this.offsetStore.load();if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());//POPTODO reuse Executor ?this.consumeMessagePopService = new ConsumeMessagePopOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());//POPTODO reuse Executor ?this.consumeMessagePopService =new ConsumeMessagePopConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}// 启动对应的服务,主要有定时清除失效消息this.consumeMessageService.start();// POPTODOthis.consumeMessagePopService.start();// 向缓存注册消费者信息内容boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}// 启动客户端工厂内容mQClientFactory.start();log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The PushConsumer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}this.updateTopicSubscribeInfoWhenSubscriptionChanged();this.mQClientFactory.checkClientInBroker();// 向所有消费者发送心跳检测this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();this.mQClientFactory.rebalanceImmediately();}
主要的启动函数为mQClientFactory.start(),与所有broker进行连接,定时动态向nameServer获取配置内容与路由信息
public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// 查找nameSrv地址信息if (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();}// 定时扫描与nameSrv的channel连接状态this.mQClientAPIImpl.start();// 定时更新nameSrv地址信息、定时更新topic路由信息、定时向所有的broker发送心跳检测、定时向broker获取消费者的消费情况this.startScheduledTask();// 拉取消息服务this.pullMessageService.start();this.rebalanceService.start();// 消息推送服务this.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case START_FAILED:throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);default:break;}}}
相关文章:

RocketMQ源码阅读
没有用过rocketmq,但是一直对RocketMQ的实现很感兴趣,本次阅读源码基于5.0.0 一、 nameserver 通过源码阅读发现,它的作用主要是当作一个注册中心,注册broker、topic等信息,维护topic以及broker队列的路由信息&#…...

重磅 | 小O软件新品【鲸鱼地图】发布
千呼万唤始出来.......,小O系列软件又添新品【鲸鱼地图】!!! 2023年新年伊始,小O就投入到新品研发工作中,秉承“发现地理价值”理念,为用户提供更加好用、易用的地图软件产品,经过春…...

软考高级信息系统项目管理师系列之二十五:项目合同管理
软考高级信息系统项目管理师系列之二十五:项目合同管理 一、项目合同管理内容整理一、合同管理基本概念1.项目合同管理定义2.合同的分类3.合同类型选择4.合同内容二、合同管理过程1.合同管理过程的内容2.合同签订和履行管理3.合同变更和档案管理4.合同违约索赔管理项目合同管理…...

测试开发之Django实战示例 第十三章 上线
在上一章,为其他程序与我们的Web应用交互创建了RESTful API。本章将学习如何创建生产环境让我们的网站正式上线,主要内容有:配置生产环境创建自定义中间件实现自定义管理命令1创建生产环境现在该将Django项目正式部署到生产环境中了。我们将按…...

python实战应用讲解-【语法基础篇】Python中的数值类型(附示例代码)
目录 前言 数值类型 十六进制、八进制和二进制 Python 数值类型转换 数值和表达式 前言...

Git常用命令以及如何在IDEA中使用Git
前言Git是一个分布式版本控制工具,主要用于管理开发过程中的源代码文件(Java类、xml文件、html页面等)。Git在管理文件过程中会记录日志,方便回退到历史版本;Git存在分支的概念,一个项目可以有多个分支&…...

音乐播放器-- 以及数据库数据存储
运行环境 : java1.8 数据库以及代码编写工具 : sqlserver -- mysql 也可以 工具 eclipse 编码gbk窗体 : Swing使用了jaudiotagger 进行了音乐处理 图片展示 ----- 空闲时间 做出来玩的项目 部分功能还没有完善 完善了的功能 音乐 /// 主页 &a…...

[JAVA安全]Spring Messaging之CVE-2018-1270
漏洞简介 Spring 框架中通过spring-messaging 模块来实现 STOMP (Simple Text-Orientated Messaging Protocol),STOMP是一种封装 WebSocket的简单消息协议。攻击者可以通过建立WebSocket连接并发送一条消息造成远程代码执行, spring-messagin…...

CAN通信笔记-位时间、Tq及采样点同步
本文框架1.前言2. 位时间2.1 位时间定义2.2 位时间计算3. Tq3.1 Tq的计算3.1.1 举个例子3.2 位时间与Tq的换算4. 采样点同步4.1 硬同步4.2 重同步4.2.1 延长PBS1的重同步4.2.2 缩短PBS2的重同步1.前言 本篇记录些关于CAN的一些学习笔记,说实话CAN协议发展的已经非常…...

玩转 Kubernetes 配置管理:ConfigMap 和 Secret 实战演示
目录一、简介二、ConfigMap2.1 基于目录创建 ConfigMap2.2 基于文件创建 ConfigMap2.3 从环境文件创建 ConfigMap2.4 定义从文件创建 ConfigMap 时要使用的键2.5 根据字符串创建 ConfigMap三、Secret3.1 基于文件创建Secret3.2 基于字符串创建Secret3.3 yaml文件方式创建secret…...

Kubernetes
一、 kubernetes介绍 1.1 应用部署方式演变 在部署应用程序的方式上,主要经历了三个时代 传统部署:互联网早期,会直接将应用程序部署在物理机上 优点:简单,不需要其它技术的参与 缺点:不能为应用程序定义…...

从零开始 verilog 以太网交换机(三)MAC发送控制器的设计与实现
从零开始 verilog 以太网交换机(三)MAC发送控制器的设计与实现 🔈声明: 😃博主主页:王_嘻嘻的CSDN主页 🧨 从零开始 verilog 以太网交换机系列专栏:点击这里 🔑未经作者允…...

使用vector<char>作为输入缓冲区
一、引言 当我们编写代码:实现网络接收、读取文件内容等功能时,我们往往要在内存中开辟一个输入缓冲区(又名:input buffer/读缓冲区)来存贮接收到的数据。在C里面我们可以用如下方法开辟输入缓冲区。 ①使用C语言中的数组&#x…...

自己在网站搭建用到的一些网站
背景 以后可能很少做网站类的项目了,所以做个简单总结,把自己的一些经历和一些小工具做个记录 域名和主机 https://www.godaddy.com/zh-sg, 我之前的基本都是国际会议型的网站,所以就在gadaddy上买了主机和域名。目标群体在国内可以考虑腾…...

XLSReadWriteII5 Color 颜色l的调用和使用
XLSReadWriteII5 Color 颜色l的调用和使用 一、色彩三原色 自然界,颜色是由红、绿、蓝三色组成,人眼的可见的颜色,可以通过红、绿、蓝三色按照不同的比例合成产生。 任意一种颜色由这三种原色按照一定的比例混合出来。 二、Windows系…...

RT-Thread SP使用教程
RT-Thread SPI 使用教程 实验环境使用的是正点原子的潘多拉开发板。 SPI从机设备使用的是BMP280温湿度大气压传感器。 使用RT-Thread Studio搭建基础功能。 1. 创建工程 使用RT-Thread Studio IDE创建芯片级的工程。创建完成后,可以直接编译下载进行测试。 2.…...

LeetCode 2363. 合并相似的物品
给你两个二维整数数组 items1 和 items2 ,表示两个物品集合。每个数组 items 有以下特质: items[i] [valuei, weighti] 其中 valuei 表示第 i 件物品的 价值 ,weighti 表示第 i 件物品的 重量 。 items 中每件物品的价值都是 唯一的 。 请你…...

numpy 中常用的数据保存、fmt多个参数
在经常性读取大量的数值文件时(比如深度学习训练数据),可以考虑现将数据存储为Numpy格式,然后直接使用Numpy去读取,速度相比为转化前快很多 一、保存为二进制文件(.npy/.npz) (1)numpy.save(file, arr, allow_pickleTrue, fix_importsTrue) file:文件名…...

从0到1一步一步玩转openEuler--19 openEuler 管理服务-特性说明
文章目录19 管理服务-特性说明19.1 更快的启动速度19.2 提供按需启动能力19.3 采用cgroup特性跟踪和管理进程的生命周期19.4 启动挂载点和自动挂载的管理19.5 实现事务性依赖关系管理19.6 与SysV初始化脚本兼容19.7 能够对系统进行快照和恢复19 管理服务-特性说明 19.1 更快的…...

23美赛E题:光污染(ICM)完整思路Python代码
问题E(综合评价与仿真题):光污染(ICM) 背景 光污染用于描述过度或不良使用人造光。我们称之为光污染的一些现象包括光侵入、过度照明和光杂波。在大城市,太阳落山后,这些现象最容易在天空中看到;然而,它们也可能发生在更偏远的地区。 光污染会改变我们对夜空的看法,…...

快速排序的描述以及两种实现方案
一、快速排序描述 每一轮排序选择一个基准点(pivot)进行分区 1.1. 让小于基准点的元素的进入一个分区,大于基准点的元素的进入另一个分区 1.2. 当分区完成时,基准点元素的位置就是其最终位置在子分区内重复以上过程,直…...

算力引领 数“聚”韶关——第二届中国韶关大数据创新创业大赛圆满收官
为进一步促进数字经济领域创新创业发展,推动国家数据中心集群建设,构建大数据领域资源专业平台,促进大湾区大数据科技成果和创新创业人才转化落地,为韶关大数据领域创新型产业集群的打造、大数据科技成果和创新创业人才的转化落地…...

MySQL 记录锁+间隙锁可以防止删除操作而导致的幻读吗?
文章目录什么是幻读?实验验证加锁分析总结什么是幻读? 首先来看看 MySQL 文档是怎么定义幻读(Phantom Read)的: The so-called phantom problem occurs within a transaction when the same query produces different sets of r…...

【分库分表】企业级分库分表实战方案与详解(MySQL专栏启动)
📫作者简介:小明java问道之路,2022年度博客之星全国TOP3,专注于后端、中间件、计算机底层、架构设计演进与稳定性建设优化,文章内容兼具广度、深度、大厂技术方案,对待技术喜欢推理加验证,就职于…...

(考研湖科大教书匠计算机网络)第五章传输层-第五节:TCP拥塞控制
获取pdf:密码7281专栏目录首页:【专栏必读】考研湖科大教书匠计算机网络笔记导航 文章目录一:拥塞控制概述二:拥塞控制四大算法(1)慢开始和拥塞避免A:慢启动(slow start)…...

13.使用自动创建线程池的风险,要自己创建为好
自动创建线程池就是直接调用 Executors去new默认的那几个线程池,但是会出现一定的风险,线程池里面会用到队列,也会跟线程池自身有关,所以要从队列和线程池两个方面去解析。 1.了解线程池的队列 线程池的内部结构主要由四部分组成…...

【项目设计】—— 负载均衡式在线OJ平台
目录 一、项目的相关背景 二、所用技术栈和开发环境 三、项目的宏观结构 四、compile_server模块设计 1. 编译服务(compiler模块) 2. 运行服务(runner模块) 3. 编译并运行服务(compile_run模块) 4…...

Docker学习笔记
1:docker安装步骤Linux 2:docker安装步骤Windows 3:docker官方文档 4:docker官方远程仓库 docker常用命令 1: docker images----查看docker中安装的镜像 2: docker pull nginx------在docker中安装Nginx镜…...

【爬虫理论实战】详解常见头部反爬技巧与验证方式 | 有 Python 代码实现
以下是常见头部反爬技巧与验证方式的大纲: User-Agent 字段的伪装方式,Referer 字段的伪装方式,Cookie 字段的伪装方式。 文章目录1. ⛳️ 头部反爬技巧1.1. User-Agent 字段&User-Agent 的作用1.2. 常见 User-Agent 的特征1.3. User-Age…...

基于SpringBoot+Vue的鲜花商场管理系统
【辰兮要努力】:hello你好我是辰兮,很高兴你能来阅读,昵称是希望自己能不断精进,向着优秀程序员前行! 博客来源于项目以及编程中遇到的问题总结,偶尔会有读书分享,我会陆续更新Java前端、后台、…...