当前位置: 首页 > news >正文

RabbitMQ-客户端源码之AMQCommand

AMQCommand不是直接包含Method等成员变量的,而是通过CommandAssembler又做了一次封装。
接下来先看下CommandAssembler类。此类中有这些成员变量:

/** Current state, used to decide how to handle each incoming frame. */
private enum CAState {EXPECTING_METHOD, EXPECTING_CONTENT_HEADER, EXPECTING_CONTENT_BODY, COMPLETE
}
private CAState state;/** The method for this command */
private Method method;/** The content header for this command */
private AMQContentHeader contentHeader;/** The fragments of this command's content body - a list of byte[] */
private final List bodyN;
/** sum of the lengths of all fragments */
private int bodyLength;/** No bytes of content body not yet accumulated */
private long remainingBodyBytes;
  • CAState state标识这此Command目前的状态,是准备处理Method(EXPECTING_METHOD),还是处理Content header(EXPECTING_CONTENT_HEADER),还是准备处理Content body(EXPECTING_CONTENT_BODY),还是以及完成了(COMPLETE)。
  • Method method代表type=Method的AMQP帧
  • AMQContentHeader contentHeader代表type=Content header的AMQP帧
  • final List bodyN代表type=Content body的AMQP帧,就是真正的消息体(Message body)。
  • bodyLength就是消息体大小

这个类中除了构造函数,getMethod, getContentHeader, getContentBody,isComplete这个几个方法,最关键的方法就是:

public synchronized boolean handleFrame(Frame f) throws IOException
{switch (this.state) {case EXPECTING_METHOD:          consumeMethodFrame(f); break;case EXPECTING_CONTENT_HEADER:  consumeHeaderFrame(f); break;case EXPECTING_CONTENT_BODY:    consumeBodyFrame(f);   break;default:throw new AssertionError("Bad Command State " + this.state);}return isComplete();
}

这个方法主要是处理AQMP帧的,根据CAState state来处理相应状态类型的帧,然后赋值给相应的成员变量。
采用consumeMethodFrame(Frame f)方法举个例子:

private void consumeMethodFrame(Frame f) throws IOException {if (f.type == AMQP.FRAME_METHOD) {this.method = AMQImpl.readMethodFrom(f.getInputStream());this.state = this.method.hasContent() ? CAState.EXPECTING_CONTENT_HEADER : CAState.COMPLETE;} else {throw new UnexpectedFrameError(f, AMQP.FRAME_METHOD);}
}

这个方法首先判断当前帧是否是Method帧(AMQP.FRAME_METHOD),然后调用AMQPImp.readMethodFrom的方法。就那Connection.Start这个真来将,它会从socket的输入流中读取:

public Start(MethodArgumentReader rdr) throws IOException {this(rdr.readOctet(), rdr.readOctet(), rdr.readTable(), rdr.readLongstr(), rdr.readLongstr());
}

对应于下图:

  • 第一个rdr.readOctet()是指Version-Magor:0
  • 第二个rdr.readOctet()是指Version-Minor:9
  • 第三个rdr.readTable()是指Server-Properties
  • 第四个rdr.readLongstr()是指Mechanisms
  • 第五个rdr.readLongstr()是指Locales

而MethodArgumentReader.readOctet()就是:

public final int readOctet()throws IOException
{clearBits();return in.readOctet();//in对象是DataInputStream对象
}

写到这里,思路再跳回来,知道了底层其实是Socket的DataInputStream,其上只是做了封装再封装
CommandAssembler 中的handleFrame这个方法只在AMQCommand中的:

private final CommandAssembler assembler;
public boolean handleFrame(Frame f) throws IOException {return this.assembler.handleFrame(f);
}

只在这个方法中调用。CommandAssembler只是对Method,Content-Header,Content-Body做了一下封装。下面继续回到AMQCommand这个类中来。
仔细阅读源码的同学会发现在handleFrame方法当遇到类似Basic.Publish时会有Method,Content-Header,Content-Body一起的报文,那么handleFrame处理完Method之后就直接返回了,没有完全处理完,这该如何是好?
这个就又要联系到AMQConnection中的MainLoop的内部类了。此类中的关键代码如下:

while (_running) {Frame frame = _frameHandler.readFrame();if (frame != null) {_missedHeartbeats = 0;if (frame.type == AMQP.FRAME_HEARTBEAT) {// Ignore it: we've already just reset the heartbeat counter.} else {if (frame.channel == 0) { // the special channel_channel0.handleFrame(frame);} else {if (isOpen()) {// If we're still _running, but not isOpen(), then we// must be quiescing, which means any inbound frames// for non-zero channels (and any inbound commands on// channel zero that aren't Connection.CloseOk) must// be discarded.ChannelManager cm = _channelManager;if (cm != null) {cm.getChannel(frame.channel).handleFrame(frame);}}}}} else {// Socket timeout waiting for a frame.// Maybe missed heartbeat.handleSocketTimeout();}
}

可以看到这是一个一直轮询读取Frame并处理Frame的过程。在遇到类似Basic.Publish这种带Method, Content-Header, Content-Body的类型的报文时,会循环处理,直到处理完成。注意这里的Method, Content-Header以及Content-Body都是看成单个Frame的,也就是这个while循环要三次,而不是将Basic.Publish看成一个帧。
上面调用的handleFrame方法是AMQChannel类中的(详细可以参考([[五]RabbitMQ-客户端源码之AMQChannel][RabbitMQ-_AMQChannel])):

public void handleFrame(Frame frame) throws IOException {AMQCommand command = _command;if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line_command = new AMQCommand(); // prepare for the next onehandleCompleteInboundCommand(command);}
}

可以看到只有当AMQCommand的handleFrame方法返回true时,即执行完成之后才会继续处理。


AMQCommand也有getMethod, getContentHeader, getContentBody等方法,这些都是间接调用CommandAssembler类中相应的方法的。
AMQCommand中也有个特别重要的方法:

/*** Sends this command down the named channel on the channel's* connection, possibly in multiple frames.* @param channel the channel on which to transmit the command* @throws IOException if an error is encountered*/
public void transmit(AMQChannel channel) throws IOException {int channelNumber = channel.getChannelNumber();AMQConnection connection = channel.getConnection();synchronized (assembler) {Method m = this.assembler.getMethod();connection.writeFrame(m.toFrame(channelNumber));if (m.hasContent()) {byte[] body = this.assembler.getContentBody();connection.writeFrame(this.assembler.getContentHeader().toFrame(channelNumber, body.length));int frameMax = connection.getFrameMax();int bodyPayloadMax = (frameMax == 0) ? body.length : frameMax- EMPTY_FRAME_SIZE;for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {int remaining = body.length - offset;int fragmentLength = (remaining < bodyPayloadMax) ? remaining: bodyPayloadMax;Frame frame = Frame.fromBodyFragment(channelNumber, body,offset, fragmentLength);connection.writeFrame(frame);}}}connection.flush();
}

这段主要通过传输AMQP帧的,通过AMQChannel获取到通信链路connection,然后将AMQCommand对象自身的method成员变量(或者包括content-header以及content-body)传送给broker。这段方法里还有判断payload大小是否超过broker端所设置的最大帧大小frameMax,即(frameMax == 0) ? body.length : frameMax - EMPTY_FRAME_SIZE这段代码。当frameMax=0时则没有大小限制,当frameMax不为0时则按照payload拆分成若干的payload然后发送多个FRAME_BODY帧。

相关文章:

RabbitMQ-客户端源码之AMQCommand

AMQCommand不是直接包含Method等成员变量的&#xff0c;而是通过CommandAssembler又做了一次封装。 接下来先看下CommandAssembler类。此类中有这些成员变量&#xff1a; /** Current state, used to decide how to handle each incoming frame. */ private enum CAState {EXP…...

linux设置登录失败处理功能(密码错误次数限制、pam_tally2.so模块)和操作超时退出功能(/etc/profile)

一、登录失败处理功能策略 1、登录失败处理功能策略&#xff08;服务器终端&#xff09; &#xff08;1&#xff09;编辑系统/etc/pam.d/system-auth 文件&#xff0c;在 auth 字段所在的那一部分添加如下pam_tally2.so模块的策略参数&#xff1a; auth required pam_tally2…...

Centos7上Docker安装

文章目录1.Docker常识2.安装Docker1.卸载旧版本Docker2.安装Docker3.启动Docker4.配置镜像加速前天开学啦~所以可以回来继续卷了哈哈哈&#xff0c;放假在家效率不高&#xff0c;在学校事情也少点(^_−)☆昨天和今天学了学Docker相关的知识&#xff0c;也算是简单了解了下&…...

新瑞鹏“狂飙”,宠物医疗是门好生意吗?

宠物看病比人还贵&#xff0c;正在让不少年轻一族陷入尴尬境地。在知乎上&#xff0c;有个高赞提问叫“你愿意花光积蓄&#xff0c;给宠物治病吗”&#xff0c;这个在老一辈人看来不可思议的魔幻选择&#xff0c;真实地发生在当下的年轻人身上。提问底下&#xff0c;有人表示自…...

Spring循环依赖问题,Spring是如何解决循环依赖的?

文章目录一、什么是循环依赖1、代码实例2、重要信息二、源码分析1、初始化Student对Student中的ClassRoom进行Autowire操作2、Student的自动注入ClassRoom时&#xff0c;又对ClassRoom的初始化3、ClassRoom的初始化&#xff0c;又执行自动注入Student的逻辑4、Student注入Class…...

更改SAP GUI登录界面信息

在SAP GUI的登录界面&#xff0c;左部输入登录信息如客户端、用户名、密码等&#xff0c;右部空余部分可维护一些登录信息文本&#xff0c;如登录的产品、客户端说明及注意事项等&#xff0c;此项操作详见SAP Notes 205487 – Own text on SAPGui logon screen 维护文档使用的…...

分布式微服务架构下网络通信的底层实现原理

在分布式架构中&#xff0c;网络通信是底层基础&#xff0c;没有网络&#xff0c;也就没有所谓的分布式架构。只有通过网络才能使得一大片机器互相协作&#xff0c;共同完成一件事情。 同样&#xff0c;在大规模的系统架构中&#xff0c;应用吞吐量上不去、网络存在通信延迟、我…...

进大厂必备的Java面试八股文大全(2023最新精简易懂版,八股文中的八股文)

为什么同样是跳槽&#xff0c;有些人薪资能翻三倍&#xff1f;” 最近一个粉丝发出了灵魂拷问&#xff0c;类似的问题我收到过很多次&#xff0c;身边也确实有认识的同事、朋友们有非常成功的跳槽经历和收益&#xff0c;先说一个典型例子&#xff1a; 学弟小 A 工作一年半&am…...

都说测试行业饱和了,为什么我们公司给初级测试开到了12K?

故事起因&#xff1a; 最近我有个刚毕业的学生问我说&#xff1a;我感觉现在测试行业已经饱和了&#xff0c;也不是说饱和了&#xff0c;是初级的测试根本就没有公司要&#xff0c;哪怕你不要工资也没公司要你&#xff0c;测试刚学出来&#xff0c;没有任何的项目经验和工作经验…...

解决Idea启动项目失败,提示Error running ‘XXXApplication‘: Command line is too long

IDEA版本为&#xff1a;IntelliJ IDEA 2018.2 (Ultimate Edition)一、问题描述有时当我们使用IDEA&#xff0c;Run/Debug一个SpringBoot项目时&#xff0c;可能会启动失败&#xff0c;并提示以下错误。Error running XXXApplication: Command line is too long. Shorten comman…...

GB/T28181-2022针对H.265、AAC的说明和技术实现

GB/T28181-2022规范说明GB/T28181-2022相对来GB/T28181-2016针对H.265、AAC的更新如下&#xff1a;——更改了“联网系统通信协议结构图”&#xff0c;媒体流通道增加了 H.265、G.722.1、AAC&#xff08;见 4.3.1&#xff0c;2016 年版的 4.3.1&#xff09;。——增加了对 H.26…...

开关电源环路稳定性分析(11)——观察法找零极点

大家好&#xff0c;这里是大话硬件。 这篇文章主要是分享如何用观察法直接写出补偿网络中的零极点的表达式。 在前面的文章中&#xff0c;我们分别整理了OTA和OPA型的补偿网络&#xff0c;当时有下面的结论。 针对某个固定的补偿网络&#xff0c;我们可以用数学的方法推导补偿…...

焕新启航,「龙蜥大讲堂」2023 年度招募来了!13 场技术分享先睹为快

龙蜥大讲堂是龙蜥推出的系列技术直播活动&#xff0c;邀请龙蜥社区的开发者们分享围绕龙蜥技术展开&#xff0c;包括但不限于内核、编译器、机密计算、容器、储存等相关技术领域。欢迎社区开发者们积极参与&#xff0c;共享技术盛宴。往期回顾龙蜥社区技术系列直播截至目前已举…...

推广传单制作工具

临近节日如何制作推广活动呢&#xff1f;没有素材制作满减活动宣传单怎么办&#xff1f;小编教你如何使用在线设计工具乔拓云&#xff0c;轻松设计商品的专属满减活动宣传单&#xff0c;不仅设计简单&#xff0c;还能自动生成活动分享链接&#xff0c;只需跟着小编下面的设计步…...

软件设计(十一)数据结构(上)

线性结构 线性表 线性表是n个元素的有限序列&#xff0c;通常记为(a1&#xff0c;a2....an)&#xff0c;特点如下。 存在唯一的一个称作“第一个”的元素。存在位移的一个称作“最后一个”的元素。除了表头外&#xff0c;表中的每一个元素均只有唯一的直接前趋除了表尾外&…...

https协议

文章目录对称加密方案非对称加密方案对称加密方案非对称加密方案对称加密方案非对称加密方案数字证书因为HTTP是明文传输&#xff0c;所以会很有可能产生中间人攻击&#xff08;获取并篡改传输在客户端及服务端的信息并不被人发觉&#xff09;&#xff0c;HTTPS加密应运而生。 …...

深入浅出C语言——数据在内存中的存储

文章目录一、数据类型详细介绍1. C语言中的内置类型2. 类型的基本归类&#xff1a;二. 整形在内存中的存储1. 原码、反码、补码2. 大小端三.浮点数存储规则一、数据类型详细介绍 1. C语言中的内置类型 C语言的内置类型有char、short、int、long、long long、float、double&…...

在 Centos 上在线安装 GitLab

作为程序员&#xff0c;其中一个愿望是拥有一个自己的代码存储库。在支持私有部署的代码存储库产品中&#xff0c;GitLab 是比较著名的了&#xff0c;所以今天我总结了一下在 Centos 上安装 GitLab 的过程。 依赖 基础依赖 首先&#xff0c;需要安装部分基础的依赖&#xff…...

模型解释性:SHAP包的使用

本篇博客介绍另一种事后可解释性方法&#xff1a;SHAP(SHapley Additive exPlanation)方法。 1. Shapley值理论 Shapley值是博弈论中的一个概念&#xff0c;通过衡量联盟中各成员对联盟总目标的贡献程度&#xff0c;从而根据贡献程度来进行联盟成员的利益分配&#xff0c;避免…...

算法训练营 day45 动态规划 0-1背包理论 分割等和子集

算法训练营 day45 动态规划 0-1背包理论 分割等和子集 0-1背包理论 有n件物品和一个最多能背重量为w 的背包。第i件物品的重量是weight[i]&#xff0c;得到的价值是value[i] 。每件物品只能用一次&#xff0c;求解将哪些物品装入背包里物品价值总和最大。 在下面的讲解中&…...

IDEA运行Tomcat出现乱码问题解决汇总

最近正值期末周&#xff0c;有很多同学在写期末Java web作业时&#xff0c;运行tomcat出现乱码问题&#xff0c;经过多次解决与研究&#xff0c;我做了如下整理&#xff1a; 原因&#xff1a; IDEA本身编码与tomcat的编码与Windows编码不同导致&#xff0c;Windows 系统控制台…...

【Oracle APEX开发小技巧12】

有如下需求&#xff1a; 有一个问题反馈页面&#xff0c;要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据&#xff0c;方便管理员及时处理反馈。 我的方法&#xff1a;直接将逻辑写在SQL中&#xff0c;这样可以直接在页面展示 完整代码&#xff1a; SELECTSF.FE…...

【JavaEE】-- HTTP

1. HTTP是什么&#xff1f; HTTP&#xff08;全称为"超文本传输协议"&#xff09;是一种应用非常广泛的应用层协议&#xff0c;HTTP是基于TCP协议的一种应用层协议。 应用层协议&#xff1a;是计算机网络协议栈中最高层的协议&#xff0c;它定义了运行在不同主机上…...

【WiFi帧结构】

文章目录 帧结构MAC头部管理帧 帧结构 Wi-Fi的帧分为三部分组成&#xff1a;MAC头部frame bodyFCS&#xff0c;其中MAC是固定格式的&#xff0c;frame body是可变长度。 MAC头部有frame control&#xff0c;duration&#xff0c;address1&#xff0c;address2&#xff0c;addre…...

AI Agent与Agentic AI:原理、应用、挑战与未来展望

文章目录 一、引言二、AI Agent与Agentic AI的兴起2.1 技术契机与生态成熟2.2 Agent的定义与特征2.3 Agent的发展历程 三、AI Agent的核心技术栈解密3.1 感知模块代码示例&#xff1a;使用Python和OpenCV进行图像识别 3.2 认知与决策模块代码示例&#xff1a;使用OpenAI GPT-3进…...

【SpringBoot】100、SpringBoot中使用自定义注解+AOP实现参数自动解密

在实际项目中,用户注册、登录、修改密码等操作,都涉及到参数传输安全问题。所以我们需要在前端对账户、密码等敏感信息加密传输,在后端接收到数据后能自动解密。 1、引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId...

五年级数学知识边界总结思考-下册

目录 一、背景二、过程1.观察物体小学五年级下册“观察物体”知识点详解&#xff1a;由来、作用与意义**一、知识点核心内容****二、知识点的由来&#xff1a;从生活实践到数学抽象****三、知识的作用&#xff1a;解决实际问题的工具****四、学习的意义&#xff1a;培养核心素养…...

MODBUS TCP转CANopen 技术赋能高效协同作业

在现代工业自动化领域&#xff0c;MODBUS TCP和CANopen两种通讯协议因其稳定性和高效性被广泛应用于各种设备和系统中。而随着科技的不断进步&#xff0c;这两种通讯协议也正在被逐步融合&#xff0c;形成了一种新型的通讯方式——开疆智能MODBUS TCP转CANopen网关KJ-TCPC-CANP…...

相机Camera日志分析之三十一:高通Camx HAL十种流程基础分析关键字汇总(后续持续更新中)

【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了:有对最普通的场景进行各个日志注释讲解,但相机场景太多,日志差异也巨大。后面将展示各种场景下的日志。 通过notepad++打开场景下的日志,通过下列分类关键字搜索,即可清晰的分析不同场景的相机运行流程差异…...

三体问题详解

从物理学角度&#xff0c;三体问题之所以不稳定&#xff0c;是因为三个天体在万有引力作用下相互作用&#xff0c;形成一个非线性耦合系统。我们可以从牛顿经典力学出发&#xff0c;列出具体的运动方程&#xff0c;并说明为何这个系统本质上是混沌的&#xff0c;无法得到一般解…...