当前位置: 首页 > news >正文

RabbitMQ-客户端源码之AMQChannel

AMQChannel是一个抽象类,是ChannelN的父类。其中包含唯一的抽象方法:

/*** Protected API - called by nextCommand to check possibly handle an incoming Command before it is returned to the caller of nextCommand. If this method* returns true, the command is considered handled and is not passed back to nextCommand's caller; if it returns false, nextCommand returns the command as* usual. This is used in subclasses to implement handling of Basic.Return and Basic.Deliver messages, as well as Channel.Close and Connection.Close.* @param command the command to handle asynchronously* @return true if we handled the command; otherwise the caller should consider it "unhandled"*/
public abstract boolean processAsync(Command command) throws IOException;

有关processAsync()这个方法的会在介绍ChannelN类的时候详细阐述([[八]RabbitMQ-客户端源码之ChannelN][RabbitMQ-_ChannelN])。


首先来说下AMQChannel的成员变量:

protected final Object _channelMutex = new Object();
/** The connection this channel is associated with. */
private final AMQConnection _connection;
/** This channel's channel number. */
private final int _channelNumber;
/** Command being assembled */
private AMQCommand _command = new AMQCommand();
/** The current outstanding RPC request, if any. (Could become a queue in future.) */
private RpcContinuation _activeRpc = null;
/** Whether transmission of content-bearing methods should be blocked */
public volatile boolean _blockContent = false;
  • _channelMutex这个是内部用来当对象锁的,没有实际的意义,可忽略
  • _connection是指AMQConnection这个对象。
  • _channelNumber是指channel number, 这个应该不用多解释了吧。通道编号为0的代表全局连接中的所有帧,1-65535代表特定通道的帧.
  • _command是内部处理使用的对象,调用AMQCommand的方法来处理一些东西。
  • _activeRpc是指当前未处理完的rpc请求(the current outstanding rpc request)。
  • _blockContent 是在Channel.Flow里用到的,其余情况都是false
    在AMQChannel的构造函数中,只有两个参数:AMQConnection connection以及int channelNumber.

AMQChannel中有个handleFrame方法:

/*** Private API - When the Connection receives a Frame for this* channel, it passes it to this method.* @param frame the incoming frame* @throws IOException if an error is encountered*/
public void handleFrame(Frame frame) throws IOException {AMQCommand command = _command;if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line_command = new AMQCommand(); // prepare for the next onehandleCompleteInboundCommand(command);}
}/*** Private API - handle a command which has been assembled* @throws IOException if there's any problem** @param command the incoming command* @throws IOException*/
public void handleCompleteInboundCommand(AMQCommand command) throws IOException {// First, offer the command to the asynchronous-command// handling mechanism, which gets to act as a filter on the// incoming command stream.  If processAsync() returns true,// the command has been dealt with by the filter and so should// not be processed further.  It will return true for// asynchronous commands (deliveries/returns/other events),// and false for commands that should be passed on to some// waiting RPC continuation.if (!processAsync(command)) {// The filter decided not to handle/consume the command,// so it must be some reply to an earlier RPC.nextOutstandingRpc().handleCommand(command);markRpcFinished();}
}

这个在[[六]RabbitMQ-客户端源码之AMQCommand][RabbitMQ-_AMQCommand]有所介绍,主要是用来处理Frame帧的,当调用AMQCommand的handleFrame处理之后返回为true是,即处理完毕时继续调用handleCompleteInboundCommand方法。这其中也牵涉到AMQConnection的MainLoop内部类,具体可以看看:[[六]RabbitMQ-客户端源码之AMQCommand][RabbitMQ-_AMQCommand]。


AMQChannel中有很多方法带有rpc的字样,这来做一个整理。
首先是:

public void enqueueRpc(RpcContinuation k)
{synchronized (_channelMutex) {boolean waitClearedInterruptStatus = false;while (_activeRpc != null) {try {_channelMutex.wait();} catch (InterruptedException e) {waitClearedInterruptStatus = true;}}if (waitClearedInterruptStatus) {Thread.currentThread().interrupt();}_activeRpc = k;}
}

这个方法在AMQConnection.start()方法中有过使用:_channel0.enqueueRpc(conStartBroker)。这个方法就是将参数付给成员变量_activeRpc,至于这个RpcContinuation到底是个什么gui,我们下面再讲。
继续下一个方法:

public boolean isOutstandingRpc()
{synchronized (_channelMutex) {return (_activeRpc != null);}
}

这个方法是判断一下当前的_activeRpc是否为null,为null则为false,否则为true。看方法的名字应该猜出大半。
下面一个方法:

public RpcContinuation nextOutstandingRpc()
{synchronized (_channelMutex) {RpcContinuation result = _activeRpc;_activeRpc = null;_channelMutex.notifyAll();return result;}
}

方法将当前的_activeRpc返回,并置AQMChannel的_activeRpc为null。

接下来几个方法联系性很强:

/*** Protected API - sends a {@link Method} to the broker and waits for the* next in-bound Command from the broker: only for use from* non-connection-MainLoop threads!*/
public AMQCommand rpc(Method m)throws IOException, ShutdownSignalException
{return privateRpc(m);
}public AMQCommand rpc(Method m, int timeout)throws IOException, ShutdownSignalException, TimeoutException {return privateRpc(m, timeout);
}private AMQCommand privateRpc(Method m)throws IOException, ShutdownSignalException
{SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();rpc(m, k);// At this point, the request method has been sent, and we// should wait for the reply to arrive.//// Calling getReply() on the continuation puts us to sleep// until the connection's reader-thread throws the reply over// the fence.return k.getReply();
}private AMQCommand privateRpc(Method m, int timeout)throws IOException, ShutdownSignalException, TimeoutException {SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();rpc(m, k);return k.getReply(timeout);
}public void rpc(Method m, RpcContinuation k)throws IOException
{synchronized (_channelMutex) {ensureIsOpen();quiescingRpc(m, k);}
}public void quiescingRpc(Method m, RpcContinuation k)throws IOException
{synchronized (_channelMutex) {enqueueRpc(k);quiescingTransmit(m);}
}

主要是看最后一个方法——quiescingRpc.这个方法说白就两行代码:
enqueueRpc(k);是将由privateRpc等方法内部创建的SimpleBlockingRpcContinuation对象附给当前的AQMChannel对象的成员变量_activeRpc
关于quiescingTransmit(m)就要接下去看了:

public void quiescingTransmit(Method m) throws IOException {synchronized (_channelMutex) {quiescingTransmit(new AMQCommand(m));}
}
public void quiescingTransmit(AMQCommand c) throws IOException {synchronized (_channelMutex) {if (c.getMethod().hasContent()) {while (_blockContent) {try {_channelMutex.wait();} catch (InterruptedException e) {}// This is to catch a situation when the thread wakes up during// shutdown. Currently, no command that has content is allowed// to send anything in a closing state.ensureIsOpen();}}c.transmit(this);}
}

上面代码只需要看: c.transmit(this);这一句,其余的都是摆设。看到这里,就调用了AMQCommand的transmit方法,这个transmit方法就是讲AMQChannel中封装的内容发给broker,然后等待broker返回,进而通过之前附值的_activeRpc来处理回传的帧。

虽然之前在AMQConnection([[二]RabbitMQ-客户端源码之AMQConnection][RabbitMQ-_AMQConnection])中详细讲述了start()方法,但是这里还是要来拿这个来举例这个AMQChannel中的rpc怎么使用
在AMQConnection中有这么一段代码:

Method method = (challenge == null)? new AMQP.Connection.StartOk.Builder().clientProperties(_clientProperties).mechanism(sm.getName()).response(response).build(): new AMQP.Connection.SecureOk.Builder().response(response).build();try {Method serverResponse = _channel0.rpc(method, HANDSHAKE_TIMEOUT/2).getMethod();if (serverResponse instanceof AMQP.Connection.Tune) {connTune = (AMQP.Connection.Tune) serverResponse;} else {challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();response = sm.handleChallenge(challenge, this.username, this.password);}

客户端将Method封装成Connection.StartOk帧之后等待broker返回Connection.Tune帧。
此时调用了AMQChannel的rpc(Method m, int timeout)方法,其间接调用了AMQChannel的privateRpc(Method, int timeout)方法。代码详情上面已经罗列出来。

注意privateRpc(Method, int timeout)方法的最有一句返回:return k.getReply(timeout);这句代码的意思是SimpleBlockingRpcContinuation对象在等待broker的返回,确切的来说是MainLoop线程处理之后返回,即AMQChannel类中handleCompleteInboundCommand方法的nextOutstandingRpc().handleCommand(command)这行代码。

相关文章:

RabbitMQ-客户端源码之AMQChannel

AMQChannel是一个抽象类,是ChannelN的父类。其中包含唯一的抽象方法: /*** Protected API - called by nextCommand to check possibly handle an incoming Command before it is returned to the caller of nextCommand. If this method* returns true…...

注意力机制(SE,ECA,CBAM) Pytorch代码

注意力机制1 SENet2 ECANet3 CBAM3.1 通道注意力3.2 空间注意力3.3 CBAM4 展示网络层具体信息1 SENet SE注意力机制(Squeeze-and-Excitation Networks):是一种通道类型的注意力机制,就是在通道维度上增加注意力机制,主要内容是是…...

Vue2笔记03 脚手架(项目结构),常用属性配置,ToDoList(本地存储,组件通信)

Vue脚手架 vue-cli 向下兼容可以选择较高版本 初始化 全局安装脚手架 npm install -g vue/cli 创建项目:切换到项目所在目录 vue create xxx 按照指引选择vue版本 创建成功 根据指引依次输入上面指令即可运行项目 也可使用vue ui在界面上完成创建&…...

Java程序的执行顺序、简述对线程池的理解

点个关注,必回关 文章目录一、Java程序是如何执行的二、合理利用线程池能够带来三个好处一、Java程序是如何执行的 我们日常的工作中都使用开发工具(IntelliJ IDEA 或 Eclipse 等)可以很方便的调试程序,或者是通 过打包工具把项目…...

【前言】嵌入式系统简介

随手拍拍💁‍♂️📷 日期: 2022.12.01 地点: 杭州 介绍: 2022.11.30下午两点时,杭州下了一场特别大的雪。隔天的12月路过食堂时,边上的井盖上发现了这个小雪人。此时边上的雪已经融化殆尽,只有这个雪人依旧维持着原状⛄…...

React设计原理—1框架原理

阅读前须知 本文是笔者学习卡颂的《React设计原理》的读书笔记,对书中有价值内容以Q&A方式进行呈现,同时结合了自己的理解🤔阅读时推荐先看问题,想想自己的答案,再和答案比对一下本文属于前端框架科普,…...

(C00034)基于Springboot+html前后端分离技术的宿舍管理系统-有文档

基于Springboothtml技术的宿舍管理系统-有文档项目简介项目获取开发环境项目技术运行截图项目简介 基于Springboothtml的前后端分离技术的宿舍管理系统项目为了方便对学生宿舍进行管理而设计,分为后勤、宿管、学生三种用户,后勤对整体宿舍进行管理、宿管…...

Flink面试题

一 基础篇Flink的执行图有哪几种?分别有什么作用Flink中的执行图一般是可以分为四类,按照生成顺序分别为:StreamGraph-> JobGraph-> ExecutionGraph->物理执行图。1)StreamGraph顾名思义,这里代表的是我们编写…...

Python学习笔记

前言:又从仓库翻出来了一些以前总结的文档,以下内容是我初学Python时网上找的或是图书馆借书抄写的笔记,现在再看有点零散不成体系,但是也还是纪念一下子吧。 Python学习笔记 对于初学编程的人来说,Python可以缩短编…...

最适合入门的100个深度学习实战项目

🚨注意🚨:最近经粉丝反馈,发现有些订阅者将此专栏内容进行二次售卖,特在此声明,本专栏内容仅供学习,不得以任何方式进行售卖,未经作者许可不得对本专栏内容行使发表权、署名权、修改…...

AssertionError: 618 columns passed, passed data had 508 columns【已解决】

问题描述 程序中断,报错如下AssertionError: 618 columns passed, passed data had 508 columns Exception has occurred: ValueError 618 columns passed, passed data had 508 columns AssertionError: 618 columns passed, passed data had 508 columnsThe abo…...

166_技巧_Power BI 窗口函数处理连续发生业务问题

166_技巧_Power BI 窗口函数处理连续发生业务问题 一、背景 在生产经营的数据监控中,会有一类指标需要监控是否连续发生,从而根据其在设定区间中的连续频次来评价业务。 例如: 员工连续迟到天数。销售金额连续上升或者下降。用户连续登陆…...

电子科技大学人工智能期末复习笔记(五):机器学习

目录 前言 监督学习 vs 无监督学习 回归 vs 分类 Regression vs Classification 训练集 vs 测试集 vs 验证集 泛化和过拟合 Generalization & Overfitting 线性分类器 Linear Classifiers 激活函数 - 概率决策 ⚠线性回归 决策树 Decision Trees 决策树构建递归…...

使用DDD指导业务设计的总结思考

领域驱动设计(DDD) 是 Eric Evans 提出的一种软件设计方法和思想,主要解决业务系统的设计和建模。DDD 有大量难以理解的概念,尤其是翻译的原因,某些词汇非常生涩,例如:模型、限界上下文、聚合、…...

面试官问:如何确保缓存和数据库的一致性?

如果你对这个问题有过研究,应该可以发现这个问题其实很好回答,如果第一次听到或者第一次遇到这个问题,估计会有点懵,今天我们来聊聊这个话题。 1、问题分析 首先我们来看看为什么会有这个问题! 我们在日常开发中&am…...

16.数据库Redis

一、基本概念 Redis(Remote Dictionary Server)译为“远程字典服务”,它是一款基于内存实现的键值型 NoSQL 数据库, 通常也被称为数据结构服务器,这是因为它可以存储多种数据类型,比如 string(字…...

【Redis高级-集群分片】

单机安装Redis首先需要安装Redis所需要的依赖:yum install -y gcc tclRedis安装包上传到虚拟机的任意目录:我放到了/tmp目录:解压缩:tar -zxvf /tmp/redis-6.2.4.tar.gz -C /tmp解压后:进入redis目录:cd /t…...

CSDN - CSDN27题解

文章目录幸运数字题目描述解题思路AC代码投篮题目描述解题思路AC代码通货膨胀-x国货币题目描述解题思路AC代码最后一位题目描述解题思路AC代码CSDN编程竞赛报名地址:https://edu.csdn.net/contest/detail/41 这次题目描述刚开始好像有些问题,之后被修正了…...

docker拉取mysql

搜索mysql版本docker search mysql搜索获赞数(星星数量) 大于 1000 的镜像docker search --filterstars1000 mysql搜索官方发布的版本docker search --filter is-officialtrue mysql搜索版本号docker search mysql57拉取docker pull devbeta/mysql57查看下载镜像docker images启…...

在Linux上安装Python3

记录:373场景:在CentOS 7.9操作系统上,安装Python-3.8.9环境。版本:JDK 1.8 Python-3.8.9官网地址:https://www.python.org下载地址:https://www.python.org/ftp/python/1.安装基础依赖1.1安装gcc(1)安装命…...

23 种设计模式的通俗解释,看完秒懂

01 工厂方法 追 MM 少不了请吃饭了,麦当劳的鸡翅和肯德基的鸡翅都是 MM 爱吃的东西,虽然口味有所不同,但不管你带 MM 去麦当劳或肯德基,只管向服务员说「来四个鸡翅」就行了。麦当劳和肯德基就是生产鸡翅的 Factory 工厂模式&…...

如何做好需求管理?经验方法、模型、工具

需求管理能力是衡量产品经理能力的一个重要指标。因为需求是产品的基石,只有选取恰当的方法进行需求分析及管理,才能更好的构建产品方案,从而输出精准的产品定义。结合本人学习和自身经验,打算将需求管理分”需求挖掘”、”需求分…...

怎么用期货做风险对冲(如何利用期货对冲风险)

不同期货市场的同一期货品种的对冲交易怎么做 不同 期货市场 的同一期货品种的 对冲交易 。 因为地域和 制度环境 不同,同一种期货品种在不同市场的同一时间的价格很可能是不一样的,并且也是在不断变化的。 这样在一个市场做多头买进&#xff0…...

C++标准模板库type_traits源码剖析

一、type_traits源码介绍 1、type_traits是C11提供的模板元基础库。 2、type_traits可实现在编译期计算。包括添加修饰、萃取、判断查询、类型推导等等功能。 3、type_traits提供了编译期的true和false。 二、type_traits的作用 1、根据不同类型,模板匹配不同版本…...

Python获取公众号(pc客户端)数据,使用Fiddler抓包工具

前言 嗨喽~大家好呀,这里是魔王呐 ❤ ~! 今天来教大家如何使用Fiddler抓包工具,获取公众号(PC客户端)的数据。 Fiddler是一个http协议调试代理工具,它能够记录并检查所有你的电脑和互联网之间的http通讯,…...

Maven进阶

这里写目录标题1.分模块开发1.1 模块更新后,会造成的影响2.依赖管理2.1 依赖传递2.2 可选依赖(隐藏自己的依赖,不让别人用)2.3 排除依赖(用别人的资源,把不用的去了)3.聚合与继承3.1 为什么要使用聚合工程?3.2 聚合工程开发2.1 聚合工程三级目录1.分模块开发 我们之前做的项目…...

AXI实战(一)-为AXI总线搭建简单的仿真测试环境

AXI实战(一)-搭建简单仿真环境 看完在本文后,你将可能拥有: 一个可以仿真AXI/AXI_Lite总线的完美主端(Master)或从端(Slave)一个使用SystemVerilog仿真模块的船信体验小何的AXI实战系列开更了,以下是初定的大纲安排: 欢迎感兴趣的朋友关注并支持,以下为正文部分 文章目录…...

数据库管理-第五十六期 监控(20230210)

数据库管理 2023-02-10第五十六期 监控1 怎么监控2 直观3 历史分析4 另一个BUG总结第五十六期 监控 春节后的7天班过后就来到了2月份,本周对之前发现X8M上的那个bug进行补丁修复和协助从12.2迁移了一套PDB到这个一体机上面,2次割接。这周还和原厂老大哥…...

测试开发,测试架构师为什么能拿50 60k呢需要掌握哪些技能呢

这篇文章是软件工程系列知识总结的第五篇,同样我会以自己的理解来阐述软件工程中关于架构设计相关的知识。相比于我们常见的研发架构师,测试架构师是近几年才出现的一个岗位,当然岗位title其实没有特殊的含义,在我看来测试架构师其…...

Miniblink 入门

miniblink官网:入门之前强烈建议将Miniblink介绍仔细看一遍。 MB内核组件标准版接口文档:这里列举了所有的api以及简单的说明,但是本人建议还是看wke.h更方便,里面都是宏实现的,直接搜相关函数即可。 mb demo下载和参…...

河南商丘疫情最新政策/seo是什么意思 为什么要做seo

Symantec Endpoint Protection v11.0.5002.333 简体中文企业版 【现象描述】:为了内网安全,内网可以上外网的一台服务器上面部署了网络版symantec杀毒软件,版本是 v11.0.5002.333 简体中文企业版;由它生成的客户端,分发…...

wordpress缺少临时文件夹/百度网页版下载安装

备份一下手打快排的写法。 只能算是备份吧,没有解释快排的思想,以 int 为例。 没有考虑什么特殊情况,比如传入不合法指针等 /* 模仿一波 C 的 sort() 的接口* 只传排序区间的首尾指针*/ void quick_sort(int *left, int *right) {/* 枢轴值…...

网站引导页怎么做的/软文推广发稿

idea中,我当前设置的是eclipse的快捷键(从eclipse转过来的) 一般情况下,查看类的子类CtrlT 如何以树的形式查看完整继承关系,快捷键:F4 效果如下: 尤其从根节点查看的时候,完整的继承…...

淮安做网站的公司/百度网盘下载的文件在哪

分形树绘制 v1.0 五角星绘制: 使用tutle库在Python中绘制简单图形 案例分析: import turtle:引入绘制图形的turtle库利用turtle库中的函数进行编程——模型编程上机实验: 1 """2 作者:李舵3 功…...

东莞电子产品网站建设/青岛seo关键词

android ViewPager滑动事件讲解 今天在做项目的时候,由于要处理viewPager页面滑动的事件,所以对其进行了一个小小的研究: 首先ViewPager在处理滑动事件的时候要用到OnPageChangeListener OnPageChangeListener这个接口需要实现三个方法&#…...

县建设局 协会网站/域名关键词排名查询

中新网1月17日电 据欧联网援引欧联通讯社报道,当地时间1月15日晚,一名搭乘意大利航空公司班机的30岁埃及男子,试图强行滞留意大利未果后被遣返。男子遭遣返登机后趁机舱关门之际跳机逃往机场起降区域,引发机场大乱被迫临时关闭&am…...