当前位置: 首页 > news >正文

RocketMQ Broker消息处理流程及部分源码解析

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2023年2月10日

🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

  • 消息处理流程
  • 消息存储目录结构
  • `SendMessage`源码
    • `processRequest`
    • `sendMessage`

消息处理流程

  1. SendMessageProcessor处理类接收到消息
  2. DefaultMessageStore实例将消息变成IndexFileConsumeQueueCommitLog对象
  3. 上述对象转成内存映射对象后进行落盘
    在这里插入图片描述

消息存储目录结构

RocketMQ的文件存储在store文件夹里,里面包含commitlogconfigconsumerqueueindex文件夹和abortcheckpoint两个文件。

文件夹:

  • commitlog存储写入到commitLog的消息内容
  • config存储配置信息
  • consumerqueue存储消费者队列信息
  • index存储消息队列的索引文件

文件:

  • abort标记RocketMQ是否正常退出
  • checkpoint存储commitlogconfigconsumerqueueindex文件的刷盘时间
├── abort
├── checkpoint
├── commitlog
│   ├── 00000000000000000000
│   ├── 00000000001073741824
├── config
│   ├── consumerFilter.json
│   ├── consumerOffset.json
│   ├── delayOffset.json
│   ├── subscriptionGroup.json
│   ├── topics.json
├── consumequeue
│   ├── TopicA
│   ├── TopicB
│   ├── TopicC
├── index
│   ├── 00000000000000000000
│   ├── 00000000001073741824

RocketMQ内有专门对应磁盘上存储文件的封装类:

  1. CommitLog:对应commitlog文件
  2. ConsumeQueue:对应consumerqueue文件
  3. IndexFile:对应index文件
  4. MappedFile:直接内存映射业务的封装类,通过操作该类实例,可以把消息写入内存映射缓冲区,或将消息刷盘
  5. MappedFileQueue:连续物理存储的封装类,可以通过offset快速定位消息所在的MappedFile
  6. MappedFileBuff:堆外内存

SendMessage源码

SendMessageProcessor是接收消息的一个钩子函数,该类的对象将会处理发送到Broker的消息

processRequest

主要流程已在代码片注释中给出:

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {private List<ConsumeMessageHook> consumeMessageHookList;public SendMessageProcessor(final BrokerController brokerController) {super(brokerController);}public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default:// 解析请求体SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return CompletableFuture.completedFuture(null);}// 建立消息上下文mqtraceContext = buildMsgContext(ctx, requestHeader);// 发送消息前的逻辑this.executeSendMessageHookBefore(ctx, request, mqtraceContext);if (requestHeader.isBatch()) {return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}}}/*** ......**/}

根据源码可以看出,首先解析发送消息的请求SendMessageRequestHeader,然后调用asyncSend(Batch)Message方法进行消息的发送。

该类提供了发送或接收消息的钩子函数:如果发送消息,则调用sendMessage方法,如果是接收消息则调用pullMessage拉取消息的方法。


sendMessage

消息发送给Broker服务器时,调用的是sendMessage方法接收并存储消息,主要流程已在代码片注释中给出:

    private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {// 初始化响应final RemotingCommand response = preSend(ctx, request, requestHeader);// 构建响应头final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}final byte[] body = request.getBody();int queueIdInt = requestHeader.getQueueId();TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}// 设置消息体数据msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());MessageAccessor.setProperties(msgInner, origProps);msgInner.setBornTimestamp(requestHeader.getBornTimestamp());msgInner.setBornHost(ctx.channel().remoteAddress());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());// 获取Broker集群名称String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);// 同步等待消息存储成功if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);} else {// 异步msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));}CompletableFuture<PutMessageResult> putMessageResult = null;String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (transFlag != null && Boolean.parseBoolean(transFlag)) {// Broker拒绝接收消息if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}

相关文章:

RocketMQ Broker消息处理流程及部分源码解析

&#x1f34a; Java学习&#xff1a;Java从入门到精通总结 &#x1f34a; 深入浅出RocketMQ设计思想&#xff1a;深入浅出RocketMQ设计思想 &#x1f34a; 绝对不一样的职场干货&#xff1a;大厂最佳实践经验指南 &#x1f4c6; 最近更新&#xff1a;2023年2月10日 &#x…...

Java面试题:Java集合框架

文章目录一、Java集合框架二、Java集合特性三、各集合类的使用ArrayListLinkedListHashSetHashSet源码解析对源码进行总结HashSet可同步HashSet的使用HashMap四、Iterator迭代器五、遍历集合元素的若干方式参考文章&#xff1a;Hash详解参考文章&#xff1a;深入浅出学Java——…...

时间之间的比较与计算相差年、月、日、小时、分钟、毫秒、纳秒以及判断闰年--LocalDateTime

如何把String/Date转成LocalDateTime参考String、Date与LocalDate、LocalTime、LocalDateTime之间互转 String、Date、LocalDateTime、Calendar与时间戳之间互相转化参考String、Date、LocalDateTime、Calendar与时间戳之间互相转化 比较方法介绍 isBefore(ChronoLocalDateT…...

PyTorch学习笔记:nn.L1Loss——L1损失

PyTorch学习笔记&#xff1a;nn.L1Loss——L1损失 torch.nn.L1Loss(size_averageNone, reduceNone, reductionmean)功能&#xff1a;创建一个绝对值误差损失函数&#xff0c;即L1损失&#xff1a; l(x,y)L{l1,…,lN}T,ln∣xn−yn∣l(x,y)L\{l_1,\dots,l_N\}^T,l_n|x_n-y_n| l(…...

Java程序设计-ssm企业财务管理系统设计与实现

摘要系统设计系统实现开发环境&#xff1a;摘要 对于企业集来说,财务管理的地位很重要。随着计算机和网络在企业中的广泛应用&#xff0c;企业发展速度在不断加快&#xff0c;在这种市场竞争冲击下企业财务管理系统必须优先发展&#xff0c;这样才能保证在竞争中处于优势地位。…...

疑难杂症篇(二十一)--Ubuntu18.04安装usb-cam过程出现的问题

对Ubuntu18.04{\rm Ubuntu 18.04}Ubuntu18.04环境下的ROS{\rm ROS}ROS的melodic{\rm melodic}melodic版本安装usb−cam{\rm usb-cam}usb−cam过程出现的两个常见问题提出解决方案。 1.问题1&#xff1a;usb-cam功能包编译时出现"未定义的引用"的问题 问题描述&#…...

npm-npm i XX --save 和--save-dev

之前使用npm i XX --save 和--save-dev 没太在意&#xff0c;就想记录一下&#xff0c;查到一篇比较全的(链接&#xff1a;NPM install -save 和 -save-dev 傻傻分不清)&#xff0c;直接看好了&#xff0c;哈哈~ # 安装模块到项目目录下 npm install moduleName # -g 的意思是…...

可重构或可调谐微波滤波器技术

电子可重构&#xff0c;或者说电调微波滤波器由于其在改善现在及未来微波系统容量中不断提高的重要性而正吸引着人们越来越多的关注来对其进行研究和开发。例如&#xff0c;崭露头脚的超宽带&#xff08;UWB&#xff09;技术要求使用很宽的无线电频谱。然而&#xff0c;作为资源…...

医院智能化解决方案-门(急)诊、医技、智能化项目解决方案

【版权声明】本资料来源网络&#xff0c;知识分享&#xff0c;仅供个人学习&#xff0c;请勿商用。【侵删致歉】如有侵权请联系小编&#xff0c;将在收到信息后第一时间删除&#xff01;完整资料领取见文末&#xff0c;部分资料内容&#xff1a;篇幅有限&#xff0c;无法完全展…...

判断元素是否在可视区域

前言 在日常开发中&#xff0c;我们经常需要判断目标元素是否在视窗之内或者和视窗的距离小于一个值&#xff08;例如 100 px&#xff09;&#xff0c;从而实现一些常用的功能&#xff0c;例如&#xff1a; 图片的懒加载列表的无限滚动计算广告元素的曝光情况可点击链接的预加…...

告别传统繁杂的采购合同管理 打造企业自动化采购管理模式

随着企业竞争日趋激烈&#xff0c;采购成本压力剧增&#xff0c;企业对于采购合同管理更加严格&#xff0c;从而把控物资成本。对于任何一家企业采购来说&#xff0c;规范化合同的全面管理&#xff0c;是采购活动中重要的一个环节。 但在如今&#xff0c;依旧有很多企业采购合…...

【prism】路由事件映射到Command命令

在之前的一篇文章中&#xff0c;我介绍了普通的自定义事件&#xff1a; 【wpf】自定义事件总结&#xff08;Action&#xff0c; EventHandler&#xff09;_code bean的博客-CSDN博客_wpf action可以说通过Action和EventHandle&#xff0c;自定义事件是相当的方便简单了。https…...

面向对象的基本概念和方法

面向对象的开发方法在近几十年见得以广泛应用&#xff0c;我们常见的Java语言就是一种典型的面向对象的开发语言。然而&#xff0c;面向对象的概念较为复杂&#xff0c;知识点也很细碎&#xff0c;本文整理了面向对象的基本概念和方法&#xff0c;供大家参考。为了便于读者理解…...

数据可视化大屏百度地图绘制行政区域标注实战案例解析(个性化地图、标注、视频、控件、定位、检索)

百度地图开发系列目录 数据可视化大屏应急管理综合指挥调度系统完整案例详解&#xff08;PHP-API、Echarts、百度地图&#xff09;数据可视化大屏百度地图API开发&#xff1a;停车场分布标注和检索静态版百度地图高级开发&#xff1a;map.getDistance计算多点之间的距离并输入…...

1.面向对象和类的关系?2.什么是Promise、3.Promise和async、await的关系

面向对象:面向对象是一种编程思想&#xff08;oop&#xff09;。(Js里面所有的东西都可以看做对象&#xff0c;Js它是基于原型的面向对象语言&#xff0c;采用原型的方式来构造对象)很多个具有相同属性和行为的对象就可以抽象为类&#xff0c;对象是类的一个实例。JavaScript在…...

【程序化天空盒】过程记录01:日月 天空渐变 大气散射

1 日月 SunAndMoon 昼夜的话肯定少不了太阳和月亮&#xff0c;太阳和月亮实现的道理是一样的&#xff0c;只不过是月亮比太阳多了一个需要控制月牙程度&#xff08;or添加贴图&#xff09;的细节~ 1.1 Sun 太阳的话很简单&#xff0c;直接在shader里实现一个太阳跟随平行光旋…...

无线通信中的轨道角动量

目录 一. 前言 二. 如何传输 三. 如何产生 3.1 螺旋结构器件 &#xff08;1&#xff09;螺旋相位板 &#xff08;2&#xff09;螺旋抛物面天线 3.2 超表面 3.3 天线阵列 3.3.1 相控阵 3.3.2 时控阵 四. 如何识别 一. 前言 轨道角动量&#xff1a;Orbital Angular M…...

以后更新功能,再也不用App发版了!智能小程序将为开发者最大化减负

在 IoT 时代&#xff0c;越来越多的企业意识到打造自有 App 对于品牌的重要性。作为智能设备不可或缺的控制终端&#xff0c;App 具备连接用户、完善服务、精细化运营用户的独特优势&#xff0c;可帮助企业大大提升品牌竞争力。 为了帮助品牌企业打造更具个性化、差异化的智能…...

C++之类模板全特化和偏特化

类模板类模板是通用类的描述&#xff0c;使用任意类型&#xff08;泛型&#xff09;来描述类的定义。使用类模板的时候&#xff0c;指定具体的数据类型&#xff0c;让编译器生成该类型的类定义。注意&#xff1a;函数模板中可以不指定具体数据类型&#xff0c;让编译器自动推到…...

Python 手写数字识别 MNIST数据集下载失败

目录 一、MNIST数据集下载失败 1 失败的解决办法&#xff08;经验教训&#xff09;&#xff1a; 2 亲测有效的解决方法&#xff1a; 一、MNIST数据集下载失败 场景复现&#xff1a;想要pytorchMINIST数据集来实现手写数字识别&#xff0c;首先就是进行MNIST数据集的下载&am…...

华为机试题:HJ61 放苹果(python)

文章目录博主精品专栏导航知识点详解1、input()&#xff1a;获取控制台&#xff08;任意形式&#xff09;的输入。输出均为字符串类型。1.1、input() 与 list(input()) 的区别、及其相互转换方法2、print() &#xff1a;打印输出。3、整型int() &#xff1a;将指定进制&#xf…...

【论文速递】ICCV2021 - 基于超相关压缩实现实时高精度的小样本语义分割

【论文速递】ICCV2021 - 基于超相关压缩的小样本语义分割 【论文原文】&#xff1a;Hypercorrelation Squeeze for Few-Shot Segmentation 【作者信息】&#xff1a;Juhong Min Dahyun Kang Minsu Cho 获取地址&#xff1a;https://openaccess.thecvf.com/content/ICCV2021/…...

单例模式(Singleton Pattern)

目录 1.什么是单例模式&#xff1a; 2.单例模式存在的原因&#xff1a; 3.单例模式的优缺点&#xff1a; 4.创建方式&#xff1a; 1. 单线程单例模式立即创建&#xff08;饿汉式&#xff09;&#xff1a; 2. 单线程单例模式延迟创建&#xff08;懒汉式&#xff09;&#xf…...

docker file和compose

文章目录1.dockerfile&#xff08;单机脚本&#xff09;1.概念2.原理3.dockerfile核心四步4.命令2.docker compose1.概念2.注意事项3.常用字段4.常用命令1.dockerfile&#xff08;单机脚本&#xff09; 1.概念 通过脚本&#xff0c;生成一个镜像&#xff0c;并运行对应的容器…...

如何解决thinkphp验证码不能显示问题?

thinkPHP做验证码这一块,可以使用自带的验证码扩展,具体步骤如下: 一、安装扩展 composer require topthink/think-captcha 二、模版中使用 将原来静态页面的验证码图片替换为{:captcha_img()},这个会自动生成验证码图片。 <div>{:captcha_img()}</div> 或者 &…...

Vue极简使用

Vue安装Vue模板语法安装Vue 安装nodejs 这里我安装的是14.5.4版本 https://nodejs.org/download/release/v14.15.4/解压后配置一下环境变量就行 安装cnpm镜像 (这个安装的版本可能过高&#xff0c;后面安装Vue可能出问题) npm install -g cnpm --registryhttps://registry…...

【Nacos】Nacos配置中心服务端源码分析

上文说了Nacos配置中心客户端的源码流程&#xff0c;这篇介绍下Nacos配置中心服务端的源码。 服务端的启动 先来看服务启动时干了啥&#xff1f; init()方法上面有PostConstruct&#xff0c;该方法会在ExternalDumpService实例化后执行。 com.alibaba.nacos.config.server.s…...

第十五章 栅格数据重分类、栅格计算器、插值分析

文章目录第十五章 栅格数据分析第一章 栅格数据重分类第一节 栅格数据重分类第二节 栅格重分类的使用第三节 重分类的使用中的空值使用第四节 重分类的案例&#xff1a;分类统计面积第五节 坡度矢量分级图生成第二章 栅格计算器第一节 栅格计算器介绍第二节 栅格计算器使用第三…...

CS5260测试版|CS5260demoboard|typec转VGA参考PCB原理图

CS5260测试版|CS5260demoboard|typec转VGA参考PCB原理图 CS5260是一款高度集成的TYPEC转VGA转换方案芯片。 CS5260输出端接口:外接高清VGA设备如:显示器投影机电视带高清的设备&#xff0c;广泛应用于 笔记本Macbook Air 12寸USB3.1输出端对外接高清VGA设备如:显示器投影机电视…...

winform开发心得

最近一直在从事winform的开发&#xff0c;每次都是需要从网上查找资料才能对应具体风格要求&#xff0c;现在总结一下。 ui方面可以使用CSkin对应的一套ui&#xff0c;使用步骤 1.在窗口界面&#xff0c;工具箱空白处点击右键&#xff0c;弹出菜单有个”选择项“&#xff0c;点…...

做视频网站收入/推广引流吸引人的标题

g的编译选项介绍&#xff1a; -WI的理解&#xff0c;gcc的-WI,xxx选项似乎是在 gcc 中使用 ld 链接选项时候的编译器选项 -L: “链接” 的时候&#xff0c;去找的链接库的目录 - rpath&#xff08;或 - R &#xff0c;这似乎是一个内容&#xff09;&#xff0c;意思是“运行…...

直播网站是怎么做的/东莞网络推广排名

1、什么是虚悬镜像&#xff1f; 构建和删除镜像时出现一些错误&#xff0c;导致仓库&#xff08;REPOSITORY&#xff09;和标签&#xff08;TAG&#xff09;都是NONE 2、自己写一个虚悬镜像 3、查看docker容器中存在的虚悬镜像 docker image ls -f danglingtrue4、删除docker…...

湖南省住房和城乡建设厅网站/南宁seo排名首页

本文价值与收获 看完本文后,您将能够作出下面的界面 看完本文您将掌握的技能 绘制虚线设置虚线宽度和颜色实战需求 我需要创建一条虚线。我尝试通过创建带有虚线笔触的Rectangle视图来解决这个问题。但是,将矩形的高度设置为1时,会导致出现一条双线,因为它同时显示了视图…...

有哪些做投行网站/quark搜索引擎入口

导读&#xff1a;谈到锁住&#xff0c;大家应该都熟悉&#xff0c;有朋友问台式电脑键盘锁是哪个键&#xff0c;还有人问台式电脑键盘被锁住按什么键恢复&#xff0c;这到底是咋回事&#xff1f;事实上台式电脑键盘被锁住按什么键恢复呢&#xff0c;以下是小编为你精心整理的台…...

南通优普网站建设团队/百度图片识别搜索

相关操作学习记录备忘录 echo offrem 1、添加winrar压缩软件到系统环境变量&#xff0c;才可以压缩文件 rem 2、设置变量 不能有空格 "set a 123"(报错) "set a123"(正确) rem 3、强制删除文件夹 /s /q rem 4、重命名文件 第二个参数必须是文件…...

网易做相册旅游网站/福建省人民政府

npm install less less-loader --save-dev 有可能报错&#xff1a; 版本原因&#xff1a; 降低less-loader版本 npm install less-loader4.1.0...