【RocketMQ】源码详解:生产者启动与消息发送流程
消息发送
生产者启动
入口 : org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)
生产者在调用send()方法发送消息之前,需要调用start进行启动, 生产者启动过程中会启动一些服务和线程
启动过程中会启动MQClientInstance, 这个实例是针对一个项目的全部生产者消费者, 而不是单个的生产者或消费者
MQClientInstance内部会启动一些服务和定时任务,如netty服务、内部生产者服务等
启动方法最后,则会发送心跳包给broker
生产者启动: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)
public void start(final boolean startFactory) throws MQClientException {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// 检查配置,主要是生产者组名this.checkConfig();if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();}// 创建 MQClientInstance 实例, 消费者启动时也有这一步(对于每个客户端来说, 只有一个客户端实例(一个项目有多个生产者、消费者))this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);// 将当前生产者注册到MQClientInstance中的producerTableboolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException();}// 自动创建topic的配置this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());if (startFactory) {/** 启动 MQClientInstance* netty服务、各种定时任务、拉取消息服务、rebalanceService服务*/mQClientFactory.start();}log.info("the producer [{}] start OK);this.serviceState = ServiceState.RUNNING;break;// ...省略default:break;}// 发送心跳信息给所有broker。this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();// 启动扫描 超时请求 的定时任务,this.startScheduledTask();}
客户端实例启动: org.apache.rocketmq.client.impl.factory.MQClientInstance#start
public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();}// netty服务this.mQClientAPIImpl.start();// 启动各种定时任务this.startScheduledTask();// 拉取消息服务,针对消费者this.pullMessageService.start();// 重平衡服务,针对消费者this.rebalanceService.start();this.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case START_FAILED:throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);default:break;}}
}
消息发送流程
入口: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
调用任一发送方法后,会一路调用到sendDefaultImpl方法
首先会检查消费者状态和消息的格式是否正确
之后会进入一个循环来发送消息,同步消息的循环次数为3次,即可以重试两次,其余消息只发送一次
在循环中,首先会按照轮询的方法选择一个queue进行发送,若发送出现异常则退出当前循环进入下一次循环(若开启故障延迟还会更新broker的故障表,设置隔离时间,隔离时间根据 MQFaultStrategy类中的latencyMax和notAvailableDuration数组进行判断,如其中超时在0.55s - 1s内则隔离30s)
在重新获取queue时,若开启故障延迟,在选择时则会选择【不在故障列表中,或者在故障列表但是时间已经过了其下一次可用的时间点的可用broker】,以实现高可用。若未开启故障延迟,则会传入上一次选择的broker,在这次选择时避开,选择方式也是轮询。
private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 检查生产者状态this.makeSureStateOK();/*** 检查消息格式是否合法:* 1. msg是否为null* 2. topic 是否为空、长度是否大于127、字符串是否有非法字符、是否是系统topic(比如延时topic)* 3. 消息体 是否为空、大小是否大于4MB*/Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;// 获取topic路由信息(存在哪些broker上), 首先获取本地缓存的,若没有则获取nameServer的TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;// 计算最大发送次数,同步模式为3,即默认允许重试2次,可更改重试次数// 其他模式为1,即不允许重试,不可更改。int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];for (; times < timesTotal; times++) {/*** 如果mq为空则说明第一次进入,则不存在lastBrokerName* 否则,说明为循环进入,则上一次发送失败,则获取上一次失败的brokerName*/String lastBrokerName = null == mq ? null : mq.getBrokerName();/*** 选择一个queue** selectOneMessageQueue方法内,可选故障转移为开启, 需要sendLatencyFaultEnable设置为true* 开启:* 对于请求响应较慢的broker,可以在一段时间内将其状态置为不可用(下方catch中有调用的updateFaultItem方法)* 消息队列选择时,会过滤掉mq认为不可用的broker,以此来避免不断向宕机的broker发送消息* 选取一个延迟较短的broker,实现消息发送高可用。* 不开启:* 则传入lastBrokerName,即不会再次选择上次发送失败的broker**/MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();if (times > 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;break;}// 发送消息sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();// 这里调用并传入false,是为了在发送时间超过550ms时,把broker置为故障,// 隔离时间根据 MQFaultStrategy类中的latencyMax和notAvailableDuration数组进行判断,如其中超时在0.55s - 1s内则隔离30sthis.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}} catch (RemotingException e) {endTimestamp = System.currentTimeMillis();// 异常传入为true,表示隔离时间采用默认的30sthis.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;}// ... 省略代码
选择queue : org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {// 判断是否启用故障延迟机制,默认不启用if (this.sendLatencyFaultEnable) {try {int index = tpInfo.getSendWhichQueue().incrementAndGet();for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0)pos = 0;// 轮询获取到一个MessageQueue mq = tpInfo.getMessageQueueList().get(pos);// 如果该broker不在故障列表中,或者在故障列表但是时间已经过了其下一次可用的时间点,则为可用,直接返回if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))return mq;}// 到这里说明全部不正常// 没有选出无故障的mq,那么从故障集合中随机选择一个final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();// 如果写队列数大于0,那么选择该brokerint writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);}return mq;} else {// 如果写队列数小于0,那么移除该brokerlatencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}// 上面都没有返回,则采用轮询的方式选择return tpInfo.selectOneMessageQueue();}// 默认不启用return tpInfo.selectOneMessageQueue(lastBrokerName);
}
更新延时表: org.apache.rocketmq.client.latency.MQFaultStrategy#updateFaultItem
判断延时时间: org.apache.rocketmq.client.latency.MQFaultStrategy#computeNotAvailableDuration
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {if (this.sendLatencyFaultEnable) {// 若isolation为true则默认延时30s,否则调用方法根据超时时间来获取延时时间long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);//更新故障记录表this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);}
}private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30 * 1000L, 60 * 1000L, 120 * 1000L/* 2min */, 180000L/* 3min */, 600000L/* 10min */};private long computeNotAvailableDuration(final long currentLatency) {/*** 根据latencyMax和notAvailableDuration的下标一一对应,若超时时间大于等于notAvailableDuration,则延时latencyMax对应下标的时间* 小于0.55s : 0s* [0.55s,1s) : 30s* [1s,2s) : 60s* ....省略*/for (int i = latencyMax.length - 1; i >= 0; i--) {if (currentLatency >= latencyMax[i])return this.notAvailableDuration[i];}return 0;
}
相关文章:
【RocketMQ】源码详解:生产者启动与消息发送流程
消息发送 生产者启动 入口 : org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean) 生产者在调用send()方法发送消息之前,需要调用start进行启动, 生产者启动过程中会启动一些服务和线程 启动过程中会启动MQClientInstance, 这个实例是针对一个项…...
信息安全(一)
思维导图 一、AES加解密 1.概述 1.1 概念 AES: 高级加密标准(Advanced Encryption Standard)是一种对称加密的区块加密标准。 (1)替代DES的新一代分组加密算法 (2)支持三种长度密钥&#x…...
企业多会场视频直播(主会场、分会场直播)实例效果
阿酷TONY 2023-2-16 长沙 活动直播做多会场切换功能(主会场、分会场、会场一、会场二、会场三自由切换) 企业多会场视频直播(主会场、分会场直播)实例效果 特点:支持PC端,也支持移动端观看,会…...
线性代数速览(一)行列式
文章目录行列式🌻 行列式的定义🌼 行列式的性质🌷 一些定理🥀 行列式的计算🌺 克莱姆法则行列式 行列式的本质,就是一个数值。 🌻 行列式的定义 有三种定义:1、按行展开ÿ…...
恭喜山东翰林“智慧园区管理系统”获易知微可视化设计大赛二等奖
数字化经济发展是全球经济发展的重中之重,“数字孪生(Digital Twin)”这一词汇正在成为学术界和产业界的一个热点。数字孪生作为近年来的新兴技术,其与国民经济各产业融合不断深化,推动着各大产业数字化、网络化、智能…...
gulp简单使用
gulp gulp的核心理念是task runner 可以定义自己的一系列任务 等待任务被执行 基于文件stream的构建流 我们可以使用gulp的插件体系来完成某些任务 webpack的核心理念是module bundler webpack是一个模块化的打包工具 可以使用各种各样的loader来加载不同的模块 可以使用各种…...
ce认证机构如何选择?
CE认证想必大家都已经有所了解,它是产品进入欧盟销售的通行证,那么我们在办理CE认证时该怎么进行选择?带大家了解一下CE认证机构,以及该怎么去进行选择? 以下信息由证果果编辑整理,更多认证机构信息请到证果果网站查看。找机构…...
全网招募P图高手!阿里巴巴持续训练鉴假AI
P过的证件如何鉴定为真?三千万网友都晒出了与梅西的合影?图像编辑技术的普及让人人都能P图,但也带来“假图”识别难题,甚至是欺诈问题。 为此,阿里安全联合华中科技大学国家防伪工程中心、国际文档分析识别方向的唯一顶…...
webrtc QOS笔记一 Neteq直方图算法浅读
webrtc QOS笔记一 Neteq直方图算法浅读 文章目录webrtc QOS笔记一 Neteq直方图算法浅读Histogram Algorithm获取目标延迟遗忘因子曲线Histogram Algorithm DelayManager::Update()->Histogram::Add() 会根据计算的iat_packet(inter arrival times, 实际包间间隔 / 打包时长…...
细分和切入点
本文重点介绍做SEO网站细分和切入点的方法:当我们的行业和关键词竞争性比较大的时候,我们可以考虑对行业或者产品做细分,从而找到切入点。可以按照以下三个方面进行细分。1、按城市细分例如:A:餐饮培训,当前…...
iOS创建Universal Link
iOS 9之前,一直使用的是URL Schemes技术来从外部对App进行跳转,但是iOS系统中进行URL Schemes跳转的时候如果没有安装App,会提示无法打开页面的提示。 iOS 9之后起可以使用Universal Links技术进行跳转页面,这是一种体验更加完美的…...
RuoYi-Vue搭建(若依)
项目简介 RuoYi-Vue基于SpringBootVue前后端分离的Java快速开发框架1.前端采用Vue、Element UI2.后端采用Spring Boot、Spring Security、Redis & Jwt3.权限认证使用Jwt,支持多终端认证系统4.支持加载动态权限菜单,多方式轻松权限控制5.高效率开发&a…...
进程组和用处
进程组:一个或多个进程的集合,进程组id是一个正整数。组长进程:进程组id 进程id组长进程可以创建一个进程组,创建该进程组的进程,终止了,只要进程组有一个进程存在,进程组就存在,与…...
Nacos集群+Nginx负载均衡
搭建Nacos集群 注意: 3个或3个以上Nacos节点才能构成集群。要求服务器内存分配最好大于6G以上(如果不够则需修改nacos启动脚本中的默认内存配置)根据nacos自带的mysql建库脚本建立对应数据库(/conf/nacos-mysql.sql)如果是三台服…...
TypeScript 学习之类型兼容
TypeScript 的类型兼容性是基于结构子类型的。 结构类型是一种只使用其成员来描述类型的方式。 interface Named {name: string; }class Person {name: string; }let p: Named; p new Person();// 赋值成功,因为都是结构类型,只要Person 类型的包含 Nam…...
Linux软件管理RPM
目录 前言 RPM软件管理程序:rpm RPM默认安装的路径 PRM讲解前准备工作 RPM安装(install) RPM查询(query) RPM卸载(erase) RPM升级与更新(upgrade/freshen) RPM重…...
01背包问题
背包问题的递归解决过程如下: 第一步明确思路 在解决问题之前,为描述方便,首先定义一些变量:Vi表示第 i 个物品的价值,Wi表示第 i 个物品的体积,定义V(i,j):当前背包容量 j,前 i 个…...
14_FreeRTOS二值信号量
目录 信号量的简介 队列与信号量的对比 二值信号量 二值信号量相关API函数 实验源码 信号量的简介 信号量是一种解决同步问题的机制,可以实现对共享资源的有序访问。 假设有一个人需要在停车场停车 1.首先判断停车场是否还有空车位(判断信号量是否有资源) 2.停车场正好…...
JavaScript随手笔记---轮播图(点击切换)
💌 所属专栏:【JavaScript随手笔记】 😀 作 者:我是夜阑的狗🐶 🚀 个人简介:一个正在努力学技术的CV工程师,专注基础和实战分享 ,欢迎咨询! &#…...
机器人学 markdown数学公式常用语法
参考链接1 本文包含了markdown常用的数学公式,按照目录可查询选用 初始类 行内数学公式均用两个符号包裹行间数学公式均用两个符号包裹 行间数学公式均用两个符号包裹行间数学公式均用两个符号包裹,用于表示重要的、需在行间单独列出的公式 $行内数学…...
如何使用 Python 语言来编码和解码 JSON 对象
JSON(JavaScript Object Notation) 是一种轻量级的数据交换格式,易于人阅读和编写。 JSON 函数 使用 JSON 函数需要导入 json 库:import json。 函数 描述 json.dumps 将 Python 对象编码成 JSON 字符串 json.loads 将已编码的 JSON 字符串解码为 Pyth…...
【蓝桥云课】求正整数的约数个数
一、求正整数n的约数个数 方法一(常用算法):从1到n逐一判断其能否整除n,若能整除n即为n的约数,否则不是n的约数。 方法二:从1到n\sqrt{n}n逐一判断是否为n的约数,当n\sqrt{n}n为n的约数时,个数加1&…...
刷题记录: wannafly25 E 牛客NC19469 01串 [线段树维护动态dp]
传送门:牛客 题目描述: Bieber拥有一个长度为n的01 串,他每次会选出这个串的一个子串作为曲谱唱歌,考虑该子串从左 往右读所组成的二进制数P。 Bieber每一秒歌唱可以让P增加或减少 2 的 k次方(k由Bieber选 定),但必须…...
懂九转大肠的微软New Bing 内测申请教程
最近微软的New Bing开放内测了,网上已经有拿到内测资格的大佬们对比了ChatGPT和New Bing。对比结果是New Bing比ChatGPT更强大。来看看具体对比例子吧 1.时效性更强 ChatGPT的库比较老,跟不上时事,比如你问它九转大肠的梗,ChatG…...
WRAN翻译
基于小波的图像超分辨残差注意力网络 Wavelet-based residual attention network for image super-resolution 代码: https://github.com/xueshengke/WRANSR-keras 摘要: 图像超分辨率技术是图像处理和计算机视觉领域的一项基础技术。近年来,…...
ROS学习笔记——第二章 ROS通信机制
主要跟着[1]学习ros::Rate r(1); //错误,应改为ros::Rate r(10);[2]对Topic通信打的比方很形象,便于理解记忆。[3]有整个过程的图片,对于初学者更加友好[4]对发布者的代码注释非常好,方便进一步学习此外CMake官方文档可以查询相关…...
MacOS Pytorch 机器学习环境搭建
学习 Pytorch ,首先要搭建好环境,这里将采用 Anoconda Pytorch PyCharm 来一起构建 Pytorch 学习环境。 1. Anoconda 安装与环境创建 Anoconda 官方介绍:提供了在一台机器上执行 Python/R 数据科学和机器学习的最简单方法。 为什么最简单…...
项目——博客系统
文章目录项目优点项目创建创建相应的目录,文件,表,导入前端资源实现common工具类实现拦截器验证用户登录实现统一数据返回格式实现加盐加密类实现encrypt方法实现decrypt方法实现SessionUtil类实现注册页面实现前端代码实现后端代码实现登录页…...
PHP(14)会话技术
PHP(14)会话技术一、概念二、分类三、cookie技术1. cookie的基本使用2. cookie的生命周期3. cookie的作用范围4. cookie的跨子域5. cookie的数组数据四、session1. session原理2. session基本使用3. session配置4. 销毁session一、概念 HTTP协议是一种无…...
对JAVA 中“指针“理解
对于Java中的指针,以下典型案例会让你对指针的理解更加深刻。 首先对于: 系统自动分配对应空间储存数字 1,这个空间被变量名称b所指向即: b ——> 1 变量名称 空间 明…...
网站上的办公网站怎么做/全网营销培训
HTML5中提供了跨域加载数据的方法,让我们得以从JSONP或者Flash中介等各种绕行方案中解脱出来,更加顺畅地与服务器交流。另一方面,因为PHP是最好的语言……所以在它与Node.js之间,我选择前者作为后端语言开发内容服务。这篇文章记录…...
动态网站建设试题/青岛网站建设制作
寒假工作坊Python&Stata数据分析课寒假工作坊现在开始招生了,有兴趣的同学和老师可以戳进来了解课程安排 1月9-10日 Python爬虫&文本数据分析(模块Ⅰ) 1月11-16日 Stata 应用能力提升与实证前沿(模块Ⅱ) 地点浙江 杭州(浙江工…...
天猫网站做真丝服装批发/平台开发
中断一、已有的含义1、半中间发生阻隔、停顿或故障而断开。2、是指在计算机执行期间,系统内发生任何非寻常的或非预期的急需处理事件,使得CPU暂时中断当前正在执行的程序而转去执行相应的事件处理程序。待处理完毕后又返回原来被中断处继续执行或调度新的…...
四川省第十五公司/上海seo网站推广
最初了解 NMock 是从 《程序员》2004.12上的 董洵 所写的 《将单元测试进行到底—Mock Object 浅析》,那真是一片不错的文章! 在文章末尾的参考资料中,有一篇 MSDN杂志的文章Mock Objects to the Rescue! Test Your .NET Code with NMock &a…...
wordpress英文主题破解版/互联网营销师含金量
QPushButton 在任何 GUI 设计中,命令按钮都是最重要和最常用的控件。 任何计算机用户都熟悉带有保存、打开、确定、是、否和取消等作为标题的按钮。 在 PyQt API 中,QPushButton 类对象提供了一个按钮,当单击该按钮时,可以对其进行编程以调用某个函数。 QPushButton 类从…...
做网站实例/百度指数数据分析平台
一、什么是Nginx? Nginx是一款轻量级的Web 服务器/反向代理服务器及电子邮件(IMAP/POP3)代理服务器,其特点是占有内存少,并发能力强,中国大陆使用nginx网站用户有:百度、京东、新浪、网易、腾讯、淘宝等。…...