RabbitMQ-客户端源码之AMQChannel
AMQChannel是一个抽象类,是ChannelN的父类。其中包含唯一的抽象方法:
/*** Protected API - called by nextCommand to check possibly handle an incoming Command before it is returned to the caller of nextCommand. If this method* returns true, the command is considered handled and is not passed back to nextCommand's caller; if it returns false, nextCommand returns the command as* usual. This is used in subclasses to implement handling of Basic.Return and Basic.Deliver messages, as well as Channel.Close and Connection.Close.* @param command the command to handle asynchronously* @return true if we handled the command; otherwise the caller should consider it "unhandled"*/
public abstract boolean processAsync(Command command) throws IOException;
有关processAsync()这个方法的会在介绍ChannelN类的时候详细阐述([[八]RabbitMQ-客户端源码之ChannelN][RabbitMQ-_ChannelN])。
首先来说下AMQChannel的成员变量:
protected final Object _channelMutex = new Object();
/** The connection this channel is associated with. */
private final AMQConnection _connection;
/** This channel's channel number. */
private final int _channelNumber;
/** Command being assembled */
private AMQCommand _command = new AMQCommand();
/** The current outstanding RPC request, if any. (Could become a queue in future.) */
private RpcContinuation _activeRpc = null;
/** Whether transmission of content-bearing methods should be blocked */
public volatile boolean _blockContent = false;
- _channelMutex这个是内部用来当对象锁的,没有实际的意义,可忽略
- _connection是指AMQConnection这个对象。
- _channelNumber是指channel number, 这个应该不用多解释了吧。通道编号为0的代表全局连接中的所有帧,1-65535代表特定通道的帧.
- _command是内部处理使用的对象,调用AMQCommand的方法来处理一些东西。
- _activeRpc是指当前未处理完的rpc请求(the current outstanding rpc request)。
- _blockContent 是在Channel.Flow里用到的,其余情况都是false
在AMQChannel的构造函数中,只有两个参数:AMQConnection connection以及int channelNumber.
AMQChannel中有个handleFrame方法:
/*** Private API - When the Connection receives a Frame for this* channel, it passes it to this method.* @param frame the incoming frame* @throws IOException if an error is encountered*/
public void handleFrame(Frame frame) throws IOException {AMQCommand command = _command;if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line_command = new AMQCommand(); // prepare for the next onehandleCompleteInboundCommand(command);}
}/*** Private API - handle a command which has been assembled* @throws IOException if there's any problem** @param command the incoming command* @throws IOException*/
public void handleCompleteInboundCommand(AMQCommand command) throws IOException {// First, offer the command to the asynchronous-command// handling mechanism, which gets to act as a filter on the// incoming command stream. If processAsync() returns true,// the command has been dealt with by the filter and so should// not be processed further. It will return true for// asynchronous commands (deliveries/returns/other events),// and false for commands that should be passed on to some// waiting RPC continuation.if (!processAsync(command)) {// The filter decided not to handle/consume the command,// so it must be some reply to an earlier RPC.nextOutstandingRpc().handleCommand(command);markRpcFinished();}
}
这个在[[六]RabbitMQ-客户端源码之AMQCommand][RabbitMQ-_AMQCommand]有所介绍,主要是用来处理Frame帧的,当调用AMQCommand的handleFrame处理之后返回为true是,即处理完毕时继续调用handleCompleteInboundCommand方法。这其中也牵涉到AMQConnection的MainLoop内部类,具体可以看看:[[六]RabbitMQ-客户端源码之AMQCommand][RabbitMQ-_AMQCommand]。
AMQChannel中有很多方法带有rpc的字样,这来做一个整理。
首先是:
public void enqueueRpc(RpcContinuation k)
{synchronized (_channelMutex) {boolean waitClearedInterruptStatus = false;while (_activeRpc != null) {try {_channelMutex.wait();} catch (InterruptedException e) {waitClearedInterruptStatus = true;}}if (waitClearedInterruptStatus) {Thread.currentThread().interrupt();}_activeRpc = k;}
}
这个方法在AMQConnection.start()方法中有过使用:_channel0.enqueueRpc(conStartBroker)。这个方法就是将参数付给成员变量_activeRpc,至于这个RpcContinuation到底是个什么gui,我们下面再讲。
继续下一个方法:
public boolean isOutstandingRpc()
{synchronized (_channelMutex) {return (_activeRpc != null);}
}
这个方法是判断一下当前的_activeRpc是否为null,为null则为false,否则为true。看方法的名字应该猜出大半。
下面一个方法:
public RpcContinuation nextOutstandingRpc()
{synchronized (_channelMutex) {RpcContinuation result = _activeRpc;_activeRpc = null;_channelMutex.notifyAll();return result;}
}
方法将当前的_activeRpc返回,并置AQMChannel的_activeRpc为null。
接下来几个方法联系性很强:
/*** Protected API - sends a {@link Method} to the broker and waits for the* next in-bound Command from the broker: only for use from* non-connection-MainLoop threads!*/
public AMQCommand rpc(Method m)throws IOException, ShutdownSignalException
{return privateRpc(m);
}public AMQCommand rpc(Method m, int timeout)throws IOException, ShutdownSignalException, TimeoutException {return privateRpc(m, timeout);
}private AMQCommand privateRpc(Method m)throws IOException, ShutdownSignalException
{SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();rpc(m, k);// At this point, the request method has been sent, and we// should wait for the reply to arrive.//// Calling getReply() on the continuation puts us to sleep// until the connection's reader-thread throws the reply over// the fence.return k.getReply();
}private AMQCommand privateRpc(Method m, int timeout)throws IOException, ShutdownSignalException, TimeoutException {SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();rpc(m, k);return k.getReply(timeout);
}public void rpc(Method m, RpcContinuation k)throws IOException
{synchronized (_channelMutex) {ensureIsOpen();quiescingRpc(m, k);}
}public void quiescingRpc(Method m, RpcContinuation k)throws IOException
{synchronized (_channelMutex) {enqueueRpc(k);quiescingTransmit(m);}
}
主要是看最后一个方法——quiescingRpc.这个方法说白就两行代码:
enqueueRpc(k);是将由privateRpc等方法内部创建的SimpleBlockingRpcContinuation对象附给当前的AQMChannel对象的成员变量_activeRpc
关于quiescingTransmit(m)就要接下去看了:
public void quiescingTransmit(Method m) throws IOException {synchronized (_channelMutex) {quiescingTransmit(new AMQCommand(m));}
}
public void quiescingTransmit(AMQCommand c) throws IOException {synchronized (_channelMutex) {if (c.getMethod().hasContent()) {while (_blockContent) {try {_channelMutex.wait();} catch (InterruptedException e) {}// This is to catch a situation when the thread wakes up during// shutdown. Currently, no command that has content is allowed// to send anything in a closing state.ensureIsOpen();}}c.transmit(this);}
}
上面代码只需要看: c.transmit(this);这一句,其余的都是摆设。看到这里,就调用了AMQCommand的transmit方法,这个transmit方法就是讲AMQChannel中封装的内容发给broker,然后等待broker返回,进而通过之前附值的_activeRpc来处理回传的帧。
虽然之前在AMQConnection([[二]RabbitMQ-客户端源码之AMQConnection][RabbitMQ-_AMQConnection])中详细讲述了start()方法,但是这里还是要来拿这个来举例这个AMQChannel中的rpc怎么使用
在AMQConnection中有这么一段代码:
Method method = (challenge == null)? new AMQP.Connection.StartOk.Builder().clientProperties(_clientProperties).mechanism(sm.getName()).response(response).build(): new AMQP.Connection.SecureOk.Builder().response(response).build();try {Method serverResponse = _channel0.rpc(method, HANDSHAKE_TIMEOUT/2).getMethod();if (serverResponse instanceof AMQP.Connection.Tune) {connTune = (AMQP.Connection.Tune) serverResponse;} else {challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();response = sm.handleChallenge(challenge, this.username, this.password);}
客户端将Method封装成Connection.StartOk帧之后等待broker返回Connection.Tune帧。
此时调用了AMQChannel的rpc(Method m, int timeout)方法,其间接调用了AMQChannel的privateRpc(Method, int timeout)方法。代码详情上面已经罗列出来。
注意privateRpc(Method, int timeout)方法的最有一句返回:return k.getReply(timeout);这句代码的意思是SimpleBlockingRpcContinuation对象在等待broker的返回,确切的来说是MainLoop线程处理之后返回,即AMQChannel类中handleCompleteInboundCommand方法的nextOutstandingRpc().handleCommand(command)这行代码。
相关文章:
RabbitMQ-客户端源码之AMQChannel
AMQChannel是一个抽象类,是ChannelN的父类。其中包含唯一的抽象方法: /*** Protected API - called by nextCommand to check possibly handle an incoming Command before it is returned to the caller of nextCommand. If this method* returns true…...

注意力机制(SE,ECA,CBAM) Pytorch代码
注意力机制1 SENet2 ECANet3 CBAM3.1 通道注意力3.2 空间注意力3.3 CBAM4 展示网络层具体信息1 SENet SE注意力机制(Squeeze-and-Excitation Networks):是一种通道类型的注意力机制,就是在通道维度上增加注意力机制,主要内容是是…...

Vue2笔记03 脚手架(项目结构),常用属性配置,ToDoList(本地存储,组件通信)
Vue脚手架 vue-cli 向下兼容可以选择较高版本 初始化 全局安装脚手架 npm install -g vue/cli 创建项目:切换到项目所在目录 vue create xxx 按照指引选择vue版本 创建成功 根据指引依次输入上面指令即可运行项目 也可使用vue ui在界面上完成创建&…...

Java程序的执行顺序、简述对线程池的理解
点个关注,必回关 文章目录一、Java程序是如何执行的二、合理利用线程池能够带来三个好处一、Java程序是如何执行的 我们日常的工作中都使用开发工具(IntelliJ IDEA 或 Eclipse 等)可以很方便的调试程序,或者是通 过打包工具把项目…...

【前言】嵌入式系统简介
随手拍拍💁♂️📷 日期: 2022.12.01 地点: 杭州 介绍: 2022.11.30下午两点时,杭州下了一场特别大的雪。隔天的12月路过食堂时,边上的井盖上发现了这个小雪人。此时边上的雪已经融化殆尽,只有这个雪人依旧维持着原状⛄…...

React设计原理—1框架原理
阅读前须知 本文是笔者学习卡颂的《React设计原理》的读书笔记,对书中有价值内容以Q&A方式进行呈现,同时结合了自己的理解🤔阅读时推荐先看问题,想想自己的答案,再和答案比对一下本文属于前端框架科普,…...

(C00034)基于Springboot+html前后端分离技术的宿舍管理系统-有文档
基于Springboothtml技术的宿舍管理系统-有文档项目简介项目获取开发环境项目技术运行截图项目简介 基于Springboothtml的前后端分离技术的宿舍管理系统项目为了方便对学生宿舍进行管理而设计,分为后勤、宿管、学生三种用户,后勤对整体宿舍进行管理、宿管…...

Flink面试题
一 基础篇Flink的执行图有哪几种?分别有什么作用Flink中的执行图一般是可以分为四类,按照生成顺序分别为:StreamGraph-> JobGraph-> ExecutionGraph->物理执行图。1)StreamGraph顾名思义,这里代表的是我们编写…...

Python学习笔记
前言:又从仓库翻出来了一些以前总结的文档,以下内容是我初学Python时网上找的或是图书馆借书抄写的笔记,现在再看有点零散不成体系,但是也还是纪念一下子吧。 Python学习笔记 对于初学编程的人来说,Python可以缩短编…...

最适合入门的100个深度学习实战项目
🚨注意🚨:最近经粉丝反馈,发现有些订阅者将此专栏内容进行二次售卖,特在此声明,本专栏内容仅供学习,不得以任何方式进行售卖,未经作者许可不得对本专栏内容行使发表权、署名权、修改…...

AssertionError: 618 columns passed, passed data had 508 columns【已解决】
问题描述 程序中断,报错如下AssertionError: 618 columns passed, passed data had 508 columns Exception has occurred: ValueError 618 columns passed, passed data had 508 columns AssertionError: 618 columns passed, passed data had 508 columnsThe abo…...

166_技巧_Power BI 窗口函数处理连续发生业务问题
166_技巧_Power BI 窗口函数处理连续发生业务问题 一、背景 在生产经营的数据监控中,会有一类指标需要监控是否连续发生,从而根据其在设定区间中的连续频次来评价业务。 例如: 员工连续迟到天数。销售金额连续上升或者下降。用户连续登陆…...

电子科技大学人工智能期末复习笔记(五):机器学习
目录 前言 监督学习 vs 无监督学习 回归 vs 分类 Regression vs Classification 训练集 vs 测试集 vs 验证集 泛化和过拟合 Generalization & Overfitting 线性分类器 Linear Classifiers 激活函数 - 概率决策 ⚠线性回归 决策树 Decision Trees 决策树构建递归…...

使用DDD指导业务设计的总结思考
领域驱动设计(DDD) 是 Eric Evans 提出的一种软件设计方法和思想,主要解决业务系统的设计和建模。DDD 有大量难以理解的概念,尤其是翻译的原因,某些词汇非常生涩,例如:模型、限界上下文、聚合、…...

面试官问:如何确保缓存和数据库的一致性?
如果你对这个问题有过研究,应该可以发现这个问题其实很好回答,如果第一次听到或者第一次遇到这个问题,估计会有点懵,今天我们来聊聊这个话题。 1、问题分析 首先我们来看看为什么会有这个问题! 我们在日常开发中&am…...
16.数据库Redis
一、基本概念 Redis(Remote Dictionary Server)译为“远程字典服务”,它是一款基于内存实现的键值型 NoSQL 数据库, 通常也被称为数据结构服务器,这是因为它可以存储多种数据类型,比如 string(字…...

【Redis高级-集群分片】
单机安装Redis首先需要安装Redis所需要的依赖:yum install -y gcc tclRedis安装包上传到虚拟机的任意目录:我放到了/tmp目录:解压缩:tar -zxvf /tmp/redis-6.2.4.tar.gz -C /tmp解压后:进入redis目录:cd /t…...
CSDN - CSDN27题解
文章目录幸运数字题目描述解题思路AC代码投篮题目描述解题思路AC代码通货膨胀-x国货币题目描述解题思路AC代码最后一位题目描述解题思路AC代码CSDN编程竞赛报名地址:https://edu.csdn.net/contest/detail/41 这次题目描述刚开始好像有些问题,之后被修正了…...
docker拉取mysql
搜索mysql版本docker search mysql搜索获赞数(星星数量) 大于 1000 的镜像docker search --filterstars1000 mysql搜索官方发布的版本docker search --filter is-officialtrue mysql搜索版本号docker search mysql57拉取docker pull devbeta/mysql57查看下载镜像docker images启…...
在Linux上安装Python3
记录:373场景:在CentOS 7.9操作系统上,安装Python-3.8.9环境。版本:JDK 1.8 Python-3.8.9官网地址:https://www.python.org下载地址:https://www.python.org/ftp/python/1.安装基础依赖1.1安装gcc(1)安装命…...

日语AI面试高效通关秘籍:专业解读与青柚面试智能助攻
在如今就业市场竞争日益激烈的背景下,越来越多的求职者将目光投向了日本及中日双语岗位。但是,一场日语面试往往让许多人感到步履维艰。你是否也曾因为面试官抛出的“刁钻问题”而心生畏惧?面对生疏的日语交流环境,即便提前恶补了…...

盘古信息PCB行业解决方案:以全域场景重构,激活智造新未来
一、破局:PCB行业的时代之问 在数字经济蓬勃发展的浪潮中,PCB(印制电路板)作为 “电子产品之母”,其重要性愈发凸显。随着 5G、人工智能等新兴技术的加速渗透,PCB行业面临着前所未有的挑战与机遇。产品迭代…...

中南大学无人机智能体的全面评估!BEDI:用于评估无人机上具身智能体的综合性基准测试
作者:Mingning Guo, Mengwei Wu, Jiarun He, Shaoxian Li, Haifeng Li, Chao Tao单位:中南大学地球科学与信息物理学院论文标题:BEDI: A Comprehensive Benchmark for Evaluating Embodied Agents on UAVs论文链接:https://arxiv.…...
【SpringBoot】100、SpringBoot中使用自定义注解+AOP实现参数自动解密
在实际项目中,用户注册、登录、修改密码等操作,都涉及到参数传输安全问题。所以我们需要在前端对账户、密码等敏感信息加密传输,在后端接收到数据后能自动解密。 1、引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId...

【大模型RAG】Docker 一键部署 Milvus 完整攻略
本文概要 Milvus 2.5 Stand-alone 版可通过 Docker 在几分钟内完成安装;只需暴露 19530(gRPC)与 9091(HTTP/WebUI)两个端口,即可让本地电脑通过 PyMilvus 或浏览器访问远程 Linux 服务器上的 Milvus。下面…...
生成 Git SSH 证书
🔑 1. 生成 SSH 密钥对 在终端(Windows 使用 Git Bash,Mac/Linux 使用 Terminal)执行命令: ssh-keygen -t rsa -b 4096 -C "your_emailexample.com" 参数说明: -t rsa&#x…...
Matlab | matlab常用命令总结
常用命令 一、 基础操作与环境二、 矩阵与数组操作(核心)三、 绘图与可视化四、 编程与控制流五、 符号计算 (Symbolic Math Toolbox)六、 文件与数据 I/O七、 常用函数类别重要提示这是一份 MATLAB 常用命令和功能的总结,涵盖了基础操作、矩阵运算、绘图、编程和文件处理等…...
MySQL账号权限管理指南:安全创建账户与精细授权技巧
在MySQL数据库管理中,合理创建用户账号并分配精确权限是保障数据安全的核心环节。直接使用root账号进行所有操作不仅危险且难以审计操作行为。今天我们来全面解析MySQL账号创建与权限分配的专业方法。 一、为何需要创建独立账号? 最小权限原则…...
python报错No module named ‘tensorflow.keras‘
是由于不同版本的tensorflow下的keras所在的路径不同,结合所安装的tensorflow的目录结构修改from语句即可。 原语句: from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense 修改后: from tensorflow.python.keras.lay…...

Linux 内存管理实战精讲:核心原理与面试常考点全解析
Linux 内存管理实战精讲:核心原理与面试常考点全解析 Linux 内核内存管理是系统设计中最复杂但也最核心的模块之一。它不仅支撑着虚拟内存机制、物理内存分配、进程隔离与资源复用,还直接决定系统运行的性能与稳定性。无论你是嵌入式开发者、内核调试工…...