当前位置: 首页 > news >正文

rabbitMq------信道管理模块

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 前言
  • 信道管理的字段
  • 申明/删除交换机
  • 申明/删除队列
  • 绑定/解绑
  • 消息的发布
  • 消息确认
  • 订阅队列
  • 取消订阅
  • 信道内存管理类
    • 打开信道
    • 关闭信道/获取指定信道
  • 总结


前言

信道是在通信连接上更细粒度的一个划分,也就是一个通信连接上可以由多个信道,这些信道都是复用的同一条连接,为了充分利用资源。
在用户眼中没有了网络通信的概念了,相当于信道他屏蔽了底层的网络通信细节。
用户只需要使用信道提供的服务,不需要关心底层的网络通信细节。
在用户眼中信道和信道就是完全独立的。


信道管理的字段

需要有一个信道唯一标识。
信道关联的消费者,当信道关闭的时候,需要从消费者管理中销毁这个消费者。
信道关联的连接,在信道提供的操作中有一个订阅指定队列和给回复响应中。我们需要用到这个连接。
protobufCodec协议处理句柄和连接一样的用处。我们给客户端回复响应就是通过这个句柄中提供的send操作,他会为我们添加协议报头。
消费者管理句柄,信道提供的操作中需要使用到。
虚拟机管理局并,信道提供的操作中需要使用到。
线程池管理句柄,在收到消息后需要给客户端推送,把推送打包成一个任务,放入到线程池 。

class Channel{private:std::string _cid;                   // 信道唯一标识Consumer::ptr _consumer;            // 信道关联的消费者 muduo::net::TcpConnectionPtr _conn; // 信道关联的连接ProtobufCodecPtr _codec;            // protobuf协议处理句柄ConsumerManager::ptr _cmp;          // 消费者管理句柄VirtualHost::ptr _host;             // 虚拟机管理句柄ThreadPool::ptr _pool;              // 线程池管理句柄}

信道提供了10个操作供用户使用。分别是申明/删除交换机,申明/删除队列,
绑定/取消绑定,消息发布,消息确认,订阅队列和取消订阅。

申明/删除交换机

信道这里都是收到的一个一个的请求,我们从请求中提取所需字段,然后通过虚拟机句柄,消费者句柄来进行一个操作。

 // 声明/销毁交换机void declareExchange(const declareExchangeRequestPtr &req){bool ret = _host->declareExchange(req->exchange_name(), req->exchange_type(), req->durable(), req->auto_delete(), req->args());return basicResponse(ret, req->rid(), req->cid());}void deleteExchange(const deleteExchangeRequestPtr &req){_host->deleteExchange(req->exchange_name());return basicResponse(true, req->rid(), req->cid());}

可以看到信道会调用一个basicResponse接口来进行一个响应.
这个响应是通过protobufCodeC来进行的,通过他提供的一个send接口来进行发送,需要传入信道的连接和响应结构对象。

  // 给客户端回复响应
void basicResponse(bool ok, const std::string &rid, const std::string &cid)
{basicCommonResponse resp;resp.set_rid(rid);resp.set_cid(cid);resp.set_ok(ok);_codec->send(_conn, resp);
}

申明/删除队列

申明队列的时候也需要初始化队列消费者管理,队列信息管理已经在虚拟机管理中初始化了。
删除队列的时候也需要删除消费者管理对象。

// 创建/删除队列void declareQueue(const declareQueueRequestPtr &req){bool ret = _host->declareQueue(req->queue_name(), req->durable(), req->exclusive(), req->auto_delete(), req->args());if (ret == false){return basicResponse(false, req->rid(), req->cid());}// 初始化队列的消费者管理句柄_cmp->initQueueConsumer(req->queue_name());return basicResponse(true, req->rid(), req->cid());}void deleteQueue(const deleteQueueRequestPtr &req){//删除队列的同时也要删除队列的消费者_cmp->destroyQueueConsumer(req->queue_name());_host->deleteQueue(req->queue_name());return basicResponse(true, req->rid(), req->cid());}

绑定/解绑

// 绑定队列信息/解除绑定队列信息
void queueBind(const queueBindRequestPtr &req)
{_host->bind(req->exchange_name(), req->queue_name(), req->binding_key());return basicResponse(true, req->rid(), req->cid());
}
void queueUnBind(const queueUnBindRequestPtr &req)
{_host->unbind(req->exchange_name(), req->queue_name());return basicResponse(true, req->rid(), req->cid());
}

消息的发布

在服务端的信道收到了消息发布的请求后,需要获取到请求中的交换机字段,获取到交换机绑定的所有队列信息。然后通过路由匹配模块来进行匹配,匹配成功的队列就会通过虚拟机句柄进行消息发布操作,把消息插入到队列消息中的带推送链表中。

 // 消息的发布
void basicPublish(const basicPublishRequestPtr &req)
{// 获取要发布到的交换机Exchange::ptr ep = _host->selectExchange(req->exchange_name());if (ep == nullptr){return basicResponse(false, req->rid(), req->cid());}// 进行路由交换,判断消息可以发布到交换机绑定的哪个队列MsgQueueBindingMap mqbm = _host->getExchangeBindings(req->exchange_name());BasicProperties *properties = nullptr;std::string routing_key;if (req->has_properties()){properties = req->mutable_properties();routing_key = properties->routing_key();}for (auto &binding : mqbm){if (Router::route(ep->type, routing_key, binding.second->binding_key)){// DLOG("%d,routing_key:%s binding_key:%s",ep->type,routing_key.c_str(),binding.second->binding_key.c_str());// 将消息添加到队列消息中_host->basicPublish(binding.first, properties, req->body());// 向线程池中提添加一个消息消费任务(向指定队列订阅者推送消息)auto task = std::bind(&Channel::consume, this, binding.first);_pool->push(task);}}return basicResponse(true, req->rid(), req->cid());
}

然后向线程池中推送一个任务。这个任务就是向该队列的队列消费者进行消息的推送。首先从队列消息类中取出一条消息,然后从队列消费者中取出一个消费者。通过调用这个消费者中回调函数来进行一个时间推送。这个回调函数就是在订阅队列是服务器绑定的。最后如果消费者的确认应答标志位为1的话会进行消息确认。

 // 向指定队列的某个订阅者推送消息
void consume(const std::string &qname)
{// 1. 从队列中取出一条消息mq::MessagePtr mp = _host->basicConsume(qname);if (mp.get() == nullptr){DLOG("执行消费任务失败,%s 队列没有消息!", qname.c_str());return;}// 2. 从队列订阅者中取出一个订阅者mq::Consumer::ptr cp = _cmp->choose(qname);if (cp.get() == nullptr){DLOG("执行消费任务失败,%s 队列没有消费者!", qname.c_str());return;}// 3. 调用订阅者对应的消息处理函数,实现消息的推送cp->_cb(cp->_ctag, mp->mutable_payload()->mutable_properties(), mp->payload().body());// 4. 判断如果订阅者是自动确认---不需要等待确认,直接删除消息,否则需要外部收到消息确认后再删除if (cp->_auto_ack)_host->basicAck(qname, mp->payload().properties().id());
}

消息确认

就是通过虚拟机管理句柄调用队列消息提供的操作,

 // 消息的确认void basicAck(const basicAckRequestPtr &req){_host->basicAck(req->queue_name(), req->message_id());return basicResponse(true, req->rid(), req->cid());}

订阅队列

订阅客户端可以通过信道提供的basicConsume服务来订阅一个队列。

// 订阅队列
void basicConsume(const basicConsumeRequestPtr &req)
{// 判断队列是否存在bool ret = _host->existsQueue(req->queue_name());if (ret == false){return basicResponse(false, req->rid(), req->cid());}// 创建队列的消费者
auto cb = std::bind(&Channel::callback, this, std::placeholders::_1,std::placeholders::_2, std::placeholders::_3);
_consumer = _cmp->create(req->queue_name(), req->consumer_tag(), req->auto_ack(), cb);
return basicResponse(true, req->rid(), req->cid());
}

在服务端就需要创建一个消费者。而消费者中有一个回调函数,这个回调函数就是服务器绑定的。这个回调函数就是往客户端推送消息。

 // 当有订阅队列请求来时,消费者的回调函数
void callback(const std::string &tag, const BasicProperties *bp, const std::string &body)
{basicConsumerResponse resp;resp.set_cid(_cid);resp.set_consumer_tag(tag);resp.set_body(body);if (bp){resp.mutable_properties()->set_id(bp->id());resp.mutable_properties()->set_delivery_mode(bp->delivery_mode());resp.mutable_properties()->set_routing_key(bp->routing_key());}_codec->send(_conn, resp);
}

取消订阅

取消订阅就是删除队列消费者中的指定消费者。

  // 取消订阅
void basicCancel(const basicCancelRequestPtr &req){_cmp->remove(req->consumer_tag(), req->queue_name());return basicResponse(true, req->rid(), req->cid());}

信道内存管理类

需要一个哈希表,信道唯一标识和信道对象的映射。
他提供打开信道,关闭信道和获取指定信道三个操作。

class ChannelManager
{
private:std::mutex _mutex;std::unordered_map<std::string, Channel::ptr> _channels;
}

打开信道

这些参数都是连接管理传递进来的。创建一个信道管理对象。

 bool openChannel(const std::string &id, const VirtualHost::ptr &host, const ConsumerManager::ptr &cmp, const ProtobufCodecPtr &codec, const muduo::net::TcpConnectionPtr &conn,const ThreadPool::ptr &pool) {std::unique_lock<std::mutex> lock(_mutex);auto it = _channels.find(id);if (it != _channels.end()) {DLOG("信道:%s 已经存在!", id.c_str());return false;}auto channel = std::make_shared<Channel>(id, host, cmp, codec, conn, pool);_channels.insert(std::make_pair(id, channel));return true;
}

关闭信道/获取指定信道

void closeChannel(const std::string &id){std::unique_lock<std::mutex> lock(_mutex);_channels.erase(id);}Channel::ptr getChannel(const std::string &id) {std::unique_lock<std::mutex> lock(_mutex);auto it = _channels.find(id);if (it == _channels.end()) {return Channel::ptr();}return it->second;}

总结

这个信道管理是服务端的信道管理,而在客户端也会有一个信道,客户端的信道和服务端的信道是一一对应的。但他们的操作却不一样,客户端的信道是为用户提供服务,他屏蔽饿底层的网络细节,用户只需要调用信道提供的操作,不需要关心网络通信。而服务器的信道就是在真正进行业务处理的操作的。

相关文章:

rabbitMq------信道管理模块

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言信道管理的字段申明/删除交换机申明/删除队列绑定/解绑消息的发布消息确认订阅队列取消订阅信道内存管理类打开信道关闭信道/获取指定信道 总结 前言 信道是在…...

如何只用 CSS 制作网格?

来源&#xff1a;how-to-make-a-grid-like-graph-paper-grid-with-just-css 在看 用于打印到纸张的 CSS 这篇文章时&#xff0c;对其中的网格比较好奇&#xff0c;作者提供了 stackoverflow 的链接&#xff0c;就看到了来源的这个问题和众多回复。本文从里面挑选了一些个人比较…...

Linux安装RabbitMQ安装

1. RabbitMQ介绍 1.1 RabbitMQ关键特性 异步消息传递&#xff1a;允许应用程序在不直接进行网络调用的情况下交换消息。 可靠性&#xff1a;支持消息持久化&#xff0c;确保消息不会在系统故障时丢失。 灵活的路由&#xff1a;支持多种路由选项&#xff0c;包括直接、主题、…...

SpringBoot驱动的社区医院信息管理平台

1系统概述 1.1 研究背景 随着计算机技术的发展以及计算机网络的逐渐普及&#xff0c;互联网成为人们查找信息的重要场所&#xff0c;二十一世纪是信息的时代&#xff0c;所以信息的管理显得特别重要。因此&#xff0c;使用计算机来管理社区医院信息平台的相关信息成为必然。开发…...

MyBatis-Plus如何分页查询?

MyBatis-Plus提供了一种简单而强大的分页查询功能&#xff0c;可以通过使用Page对象和Mapper接口中的方法来实现。以下是分页查询的基本步骤&#xff1a; 添加分页插件依赖 确保你的项目中已经添加了MyBatis-Plus的分页插件依赖。 <dependency><groupId>com.bao…...

云原生之容器编排实践-OpenEuler23.09离线安装Kubernetes与KubeSphere

背景 有互联网的日子确实美好&#xff0c;不过有时候&#xff0c;仅仅是有时候&#xff0c;你可能会面临离线部署 Kubernetes 与 KubeSphere 集群的要求。。 我们借助由青云开源的容器平台&#xff0c; KubeSphere 来进行可视化的服务部署。 KubeSphere 是在 Kubernetes 之上…...

构建企业数字化转型的战略基石——TOGAF框架的深度解析

数字化时代的企业变革需求 在全球范围内&#xff0c;数字化转型已成为企业提高竞争力、优化运营流程、提升客户体验的核心战略。数字技术的迅猛发展&#xff0c;不仅改变了传统行业的运作模式&#xff0c;也迫使企业重新思考其业务架构和技术基础设施。TOGAF&#xff08;The O…...

docker -私有镜像仓库 - harbor安装

文章目录 1、镜像仓库简介2、Harbor简介3、下载与安装3.1、下载3.2、安装3.2.1、上传harbor-offline-installer-v2.8.2.tgz到虚拟机中解压并修改配置文件3.2.2、解压tgz包3.2.3、切换到解压缩后的目录下3.2.4、准备配置文件3.2.5、修改配置文件 4、启动Harbor5、启动关闭命令6、…...

头号积木玩家——软件工程专业职业生涯规划报告

说明&#xff1a;本报告为博主在浙江科技学院&#xff08;现浙江科技大学&#xff09;就读软件工程本科专业时&#xff0c;在必修课程《计算机导论》中撰写的报告。&#xff08;报告主体2021年11月定稿&#xff0c;有删改&#xff09; 标题说明&#xff1a;在电影《头号玩家》…...

Redis(初步认识和安装)

初识Redis 认识NoSQLSQL结构化&#xff1a;structure关联的&#xff1a;RelationalSQL查询ACID NoSQL非结构化无关联的非SQLBASE 认识Redis安装Redis 认识NoSQL SQL和NoSQL比较 SQL 结构化&#xff1a;structure 数据库中表的字段都有固定的结构 关联的&#xff1a;Relati…...

计算机网络:计算机网络概述:网络、互联网与因特网的区别

文章目录 网络、互联网与因特网的区别网络分类 互联网因特网基于 ISP 的多层次结构的互连网络因特网的标准化工作因特网管理机构因特网的组成 网络、互联网与因特网的区别 若干节点和链路互连形成网络&#xff0c;若干网络通过路由器互连形成互联网 互联网是全球范围内的网络…...

网络编程套接字TCP

前集回顾 上一篇博客中我们写了一个UDP的echo server&#xff0c;是一个回显服务器&#xff1a;请求是啥&#xff0c;响应就是啥 一个正常的服务器&#xff0c;要做三个事情&#xff1a; 读取请求并解析根据请求&#xff0c;计算响应把响应写回到客户端 DatagramPacket res…...

Git

Git-2.34.1-64-bitGit-2.34.1-64-bitTortoiseGit-2.4.0.2-64bitTortoiseGit-LanguagePack-2.4.0.0-64bit-zh_CN 下载Git-2.34.1-64-bit、TortoiseGit-2.4.0.2-64bit、TortoiseGit-LanguagePack-2.4.0.0-64bit-zh_CN&#xff0c;依次安装。 # 配置本地Git的用户名与邮箱 git c…...

【日常记录】现在遇到的Y7000P亮度无法调节问题,无需改动注册表进行调整的方法。

1、winR 2、输入&#xff1a;services.msc 3、找到下面红框内的服务 4、右键后&#xff0c;点击重启任务&#xff0c;重启任务后&#xff0c;再次按热键即可恢复亮度调节。...

ubuntu20.04.6 触摸屏一体机,外接视频流盒子开机输入登录密码触屏失灵问题解决方法

1. 首先直接运行xrandr命令&#xff0c;查看设备的相关信息&#xff1a; 运行之后会显示当前连接设备的屏幕信息&#xff0c;如下图&#xff0c;LVDS和VGA-0&#xff0c;而HDMI屏幕为disconnect&#xff0c;意为没有连接&#xff1a; 2. 设置开机主屏幕显示&#xff1a; xrand…...

师生健康信息管理:SpringBoot技术指南

第3章 系统分析 3.1 需求分析 师生健康信息管理系统主要是为了提高工作人员的工作效率和更方便快捷的满足用户&#xff0c;更好存储所有数据信息及快速方便的检索功能&#xff0c;对系统的各个模块是通过许多今天的发达系统做出合理的分析来确定考虑用户的可操作性&#xff0c;…...

手机/平板端 Wallpaper 动态壁纸文件获取及白嫖使用指南

Wallpaper 动态壁纸文件获取及使用指南 目录 壁纸文件获取手机 / 平板使用手机 / 平板效果预览注意事项PC/Mac 使用 1. 壁纸文件获取链接 链接&#xff1a;夸克网盘分享 复制链接到浏览器打开并转存下载即可。 &#xff08;主页往期视频的 4K 原图和 mpkg 动态壁纸文件&#xf…...

【软件工程】模块化思想概述

一、定义 模块化&#xff08;modularization&#xff09;&#xff0c;也称为模组化&#xff0c;是产品设计以及项目管理中被广泛使用的一种设计理念。 模块化是指解决一个复杂问题时自顶向下逐层把系统划分成若干模块的过程&#xff0c;有多种属性&#xff0c;分别反映其内部…...

解决方案:机器学习中,出现欠拟合和过拟合,这两种情况分别如何解决

文章目录 一、现象二、解决方案欠拟合&#xff08;Underfitting&#xff09;过拟合&#xff08;Overfitting&#xff09; 一、现象 在工作中&#xff0c;在机器学习中&#xff0c;出现欠拟合和过拟合的时候&#xff0c;需要有对应的解决方法&#xff0c;所以整理一下 二、解决…...

腾讯 25 届秋招算法工程师面经

最近已有不少大厂都在秋招宣讲了&#xff0c;也有一些在 Offer 发放阶段。 节前&#xff0c;我们邀请了一些互联网大厂朋友、今年参加社招和校招面试的同学。 针对新手如何入门算法岗、该如何准备面试攻略、面试常考点、大模型技术趋势、算法项目落地经验分享等热门话题进行了…...

MySQL 实验1:Windows 环境下 MySQL5.5 安装与配置

MySQL 实验1&#xff1a;Windows 环境下 MySQL5.5 安装与配置 目录 MySQL 实验1&#xff1a;Windows 环境下 MySQL5.5 安装与配置一、MySQL 软件的下载二、安装 MySQL三、配置 MySQL1、配置环境变量2、安装并启动 MySQL 服务3、设置 MySQL 字符集4、为 root 用户设置登录密码 一…...

开源黑科技!Fish Speech TTS模型完美支持8种语言

开源黑科技&#xff01;Fish Speech TTS模型完美支持8种语言 Fish Speech是一款神奇的AI语音克隆工具&#x1f3a4;&#xff0c;可快速模仿用户声音&#xff0c;支持八种语言&#x1f30d;&#xff0c;简单易用&#xff0c;适合所有人&#x1f476;。它在客服、新闻播报和在线…...

算法知识点————数论和链表

1、n数和 2数和 有序&#xff08;递增&#xff09;&#xff1a;头尾相加&#xff0c;和目标值比较无序&#xff1a;哈希表&#xff08;target - cur&#xff09; 多数和&#xff1a; ​ 先排序 拿一个数&#xff08;检测 i 和i-1 重复的不选择&#xff09; ​ 2数和问题 &am…...

NASA:ATLAS/ICESat-2 L3B 每日和每月网格极地海面高度异常 V003

目录 简介 摘要 代码 引用 网址推荐 0代码在线构建地图应用 机器学习 ATLAS/ICESat-2 L3B Daily and Monthly Gridded Polar Sea Surface Height Anomaly V003 ATLAS/ICESat-2 L3B 每日和每月网格极地海面高度异常 V003 简介 ATLAS/ICESat-2 L3B Daily and Monthly G…...

Java类设计模式

1、单例模式 核心&#xff1a;保证一个类只有一个对象&#xff0c;并且提供一个访问该实例的全局访问点 五种单例模式&#xff1a;主要&#xff1a;饿汉式&#xff1a;线程安全&#xff0c;调用效率高&#xff0c;不能延时加载懒汉式&#xff1a;线程安全&#xff0c;调用效率…...

Valhalla实现 使用Docker部署利用OSM(Mapbox)地图实现路径规划详细步骤

一. Valhalla基本概念 1. 背景介绍&#xff1a; 官网介绍文档&#xff1a;https://valhalla.github.io/valhalla/ Valhalla是一个开源的路由引擎&#xff0c;能够实现实时路径规划&#xff0c;处理大量请求返回最优路径。 基于 OSM 数据&#xff0c;结合灵活的多模式交通方式…...

blender解决缩放到某个距离就不能继续缩放

threejs中也存在同样的问题&#xff0c;原因相同&#xff0c;都是因为相机位置和相机观察点距离太近导致的。 threejs解决缩放到某个距离就不能继续缩放-CSDN博客 blender中的解决方案 1、视图中心->视图锁定->选择你想看的物体...

2022浙江省赛G I M

G - Easy Glide 题意 思路 由于数据范围比较小&#xff08;1e3&#xff09;,把所有的移动的时间转化为图论上的边权就可以了,再用dijkstra解决,注意如果用的是邻接表存的话要建双向边 代码 #include <map> #include <set> #include <queue> #include <…...

数据链路层 ——MAC

目录 MAC帧协议 mac地址 以太网帧格式 ARP协议 ARP报文格式​编辑 RARP 其他的网络服务或者协议 DNS ICMP协议 ping traceroute NAT技术 代理服务器 网络层负责规划转发路线&#xff0c;而链路层负责在网络节点之间的转发&#xff0c;也就是"一跳"的具体传输…...

在java中都是如何实现这些锁的?或者说都有哪些具体的结构实现

在Java中&#xff0c;多种锁机制的实现依赖于不同的类和接口。以下是一些常见的锁机制及其在Java中的具体实现&#xff1a; 1. 互斥锁&#xff08;Mutex&#xff09; 实现方式&#xff1a;Java中的互斥锁可以通过synchronized关键字或ReentrantLock类来实现。synchronized关键…...

网站推广费用ihanshi/什么软件可以排名次

当你需要将DWG或者DXF格式的CAD图纸转为BMP图片格式的时候&#xff0c;你会怎么做呢&#xff1f;在网上找格式转换的软件&#xff1f;先截图再修改格式&#xff1f;......其实&#xff0c;并不需要那么麻烦&#xff0c;因为轻量级CAD绘图软件——浩辰CAD看图王电脑版中直接就有…...

网站开发技术三大件/优化seo方案

nginx访问日志 查看nginx.conf文件 vim /usr/local/nginx/conf/nginx.conf 中间有一行是定义log的格式 log_format combined_realip $remote_addr $http_x_forwarded_for [$time_local] $host "$request_uri" $status "$http_referer" "$http_user_ag…...

wordpress qoob/seo算法

使用场景&#xff1a;在操作应用时常见toast弹框&#xff0c;通过toast弹框信息的获取判断当前的某个操作是否成功 引用的包&#xff1a;from selenium.webdriver.support import expected_conditions as EC,\expected_conditions from selenium.webdriver.common.by import By…...

克隆网站首页做单页站几个文件/seo关键词怎么选

export PS1[\u\h:$PWD] 再分享一下我老师大神的人工智能教程吧。零基础&#xff01;通俗易懂&#xff01;风趣幽默&#xff01;还带黄段子&#xff01;希望你也加入到我们人工智能的队伍中来&#xff01;https://blog.csdn.net/jiangjunshow...

做网站用虚拟服务器可以吗/百度平台客服电话是多少

python 通过ansible 获取服务器基本信息&#xff1a;利用ansible的 setup 模块可以返回服务器的详细信息所有的信息都是以字典的格式显示的[roote tmp]# ansible 192.168.137.152 -m setup 192.168.137.152 | SUCCESS > {"ansible_facts": {"ansible_all_ip…...

衡阳网站建设衡阳千度网络/友链对网站seo有帮助吗

“微软实现企业信息系统远程客户端的安全技术及应用-石家庄站活动”召开在即&#xff01;热忱期待您的光临&#xff01;感谢您对本次活动的热情参与&#xff01; 城 市&#xff1a;石家庄 时 间&#xff1a;2005年11月16日 13&#xff1a;30-17&#xff1a…...