当前位置: 首页 > news >正文

消息队列10:为RabbitMq添加连接池

环境:

  • win11
  • rabbitmq-3.8.17
  • .net 6.0
  • RabbitMQ.Client 6.8.1
  • vs2022

安装RabbitMq环境参照:

  • window下安装rabbitmq
  • linux下安装rabbitmq

问题:rabbitmq的c#客户端没有自带连接池,所以需要手动实现。

简易实现如下:

using RabbitMQ.Client;
using System.Collections.Concurrent;
using System.Text;//测试调用
var channel = await ChannelPool.Default.GetChannelAsync("guid", () =>
{var factory = new ConnectionFactory(){HostName = "localhost",Port = 5672,UserName = "test",Password = "123456",VirtualHost = "/",};return Task.FromResult(factory.CreateConnection());
});
try
{var body = Encoding.UTF8.GetBytes("{\"Name\":\"tom\"}");channel.RawChannel.BasicPublish(exchange: "", routingKey: "test-queue", body: body);
}
finally
{channel.Return();
}#region 连接池
/// <summary>
/// rabbitmq 本身没有提供链接池, 且 IModel 的建立和释放也需要发送请求, 所以建立 connection 轮训机制和 IModel 的缓冲池机制<br/>
/// 参考: <seealso href="https://www.rabbitmq.com/client-libraries/dotnet-api-guide#connection-and-channel-lifespan"/>
/// </summary>
public class ChannelPool
{public static ChannelPool Default = new(8, 50);private int connectionCount;private int channelCountPerConnection;public ChannelPool(int connectionCount = 1, int channelCountPerConnection = 5){if (connectionCount > 0) this.connectionCount = connectionCount;if (channelCountPerConnection > 0) this.channelCountPerConnection = channelCountPerConnection;}public class ChannelItem{public int ConnectionIndex { get; set; }public HostItem CacheHost { get; set; }public IModel RawChannel { get; set; }public void Return() => CacheHost.ChannelPools[ConnectionIndex].Return(this);}public class HostItem{public SemaphoreSlim HostLocker { get; set; }public List<IConnection> Connections { get; set; }public int CurrentConnectionIndex { get; set; }public List<SemaphoreSlim> ConnectionLockers { get; set; }public List<EasyPool<ChannelItem>> ChannelPools { get; set; }}#region EasyPoolpublic sealed class EasyPool<T> : IDisposable where T : class{private readonly ConcurrentBag<T> _pool;private readonly Func<T> _factory;private readonly int _maxCount;public EasyPool(Func<T> factory, int maxCount){_factory = factory;_maxCount = maxCount;_pool = new ConcurrentBag<T>();}public T Get(){if (!_pool.TryTake(out var result)) return _factory();return result;}public bool Return(T item){if (_pool.Count >= _maxCount){if (item is IDisposable disposable) try { disposable.Dispose(); } catch { }return false;}_pool.Add(item);return true;}public void Dispose(){T result;while (_pool.TryTake(out result)){if (result is IDisposable disposable){try { disposable.Dispose(); } catch { }}}}}#endregionprivate readonly Dictionary<string, HostItem> _cacheHosts = new();public async Task<ChannelItem> GetChannelAsync(string key, Func<Task<IConnection>> connectionFactoty){var connectionCount = this.connectionCount;var maxChannelCountPerConnection = this.channelCountPerConnection;//获取 HostItemif (!_cacheHosts.TryGetValue(key, out var cacheHost)){lock (_cacheHosts){if (!_cacheHosts.TryGetValue(key, out cacheHost)){cacheHost = new HostItem{HostLocker = new(1, 1),CurrentConnectionIndex = -1,Connections = new List<IConnection>(connectionCount),ConnectionLockers = new List<SemaphoreSlim>(connectionCount),ChannelPools = new List<EasyPool<ChannelItem>>(connectionCount),};for (int i = 0; i < connectionCount; i++){cacheHost.Connections.Add(null);cacheHost.ConnectionLockers.Add(new(1, 1));var idx = i;cacheHost.ChannelPools.Add(new EasyPool<ChannelItem>(() => new ChannelItem{ConnectionIndex = idx,RawChannel = cacheHost.Connections[idx].CreateModel(),CacheHost = cacheHost}, maxChannelCountPerConnection));}_cacheHosts.Add(key, cacheHost);}}}//轮训得到连接索引await cacheHost.HostLocker.WaitAsync();int connectionIdx;try{connectionIdx = ++cacheHost.CurrentConnectionIndex;if (connectionIdx >= connectionCount) cacheHost.CurrentConnectionIndex = connectionIdx = connectionIdx % connectionCount;}finally{try { cacheHost.HostLocker.Release(); } catch { }}//检查是否初始化链接var conn = cacheHost.Connections[connectionIdx];if (conn == null){var connectionLocker = cacheHost.ConnectionLockers[connectionIdx];await connectionLocker.WaitAsync();try{conn = cacheHost.Connections[connectionIdx];if (conn == null){conn = await connectionFactoty();cacheHost.Connections[connectionIdx] = conn;}}finally{try { connectionLocker.Release(); } catch { }}}//得到 Channelreturn cacheHost.ChannelPools[connectionIdx].Get();}
}
#endregion

相关文章:

消息队列10:为RabbitMq添加连接池

环境&#xff1a; win11rabbitmq-3.8.17.net 6.0RabbitMQ.Client 6.8.1vs2022 安装RabbitMq环境参照&#xff1a; window下安装rabbitmqlinux下安装rabbitmq 问题&#xff1a;rabbitmq的c#客户端没有自带连接池&#xff0c;所以需要手动实现。 简易实现如下&#xff1a; usi…...

在使用 Docker 时,用户可能会遇到各种常见的错误和问题

在使用 Docker 时&#xff0c;用户可能会遇到各种常见的错误和问题。以下是一些需要注意的常见错误及其可能的解决方案&#xff1a; 1. 权限问题 在 Linux 系统上运行 Docker 命令时&#xff0c;可能会遇到权限不足的问题。解决这个问题通常有两种方法&#xff1a; 使用 sud…...

MinIO使用客户端进行桶和对象的管理

MinIO使用客户端进行桶和对象的管理 minio安装完成后&#xff0c;除了自带的webui管理界面&#xff0c;还可以使用官方配套的客户端mc进行管理。除此之外&#xff0c;还可以使用第三方客户端s3browser也可以完成对象和桶的生命周期管理。 1. 官方客户端mc MinIO客户端 mc 命…...

数据库管理-第244期 一次无法switchover的故障处理(20240928)

数据库管理244期 2024-09-28 数据库管理-第244期 一次无法switchover的故障处理&#xff08;20240928&#xff09;1 问题展现2 问题排查与处理2.1 问题12.2 问题2 3 问题分析4 总结 数据库管理-第244期 一次无法switchover的故障处理&#xff08;20240928&#xff09; 作者&…...

太绝了死磕这本大模型神书!

今天给大家推荐一本大模型神书&#xff0c;就是这本&#xff1a;《大语言模型&#xff1a;基础与前沿》 书籍介绍&#xff1a; 本书深入阐述了大语言模型的基本概念和算法、研究前沿以及应用&#xff0c;涵盖大语言模型的广泛主题&#xff0c;从基础到前沿&#xff0c;从方法…...

Kevin‘s notes about Qt---Episode 6 不同类中创建同一对象

问题描述 使用场景 现在在我的Qt界面中需要同时使用采集卡的AI(Analog Input)和AO(Analog Output)功能,均已分别调通,但是像之前一样通过创建两个类,然后分别在两个线程中进行操作的方式并不能实现。 原本写法 头文件 art_ao.h 核心代码如下: #ifndef ART_AO_H #defi…...

YOLOv9改进策略【Conv和Transformer】| AssemFormer 结合卷积与 Transformer 优势,弥补传统方法不足

一、本文介绍 本文记录的是利用AssemFormer优化YOLOv9的目标检测网络模型。传统卷积和池化操作会导致信息丢失和压缩缺陷,且传统的注意力机制通常产生固定维度的注意力图,忽略了背景中的丰富上下文信息。本文的利用AssemFormer改进YOLOv9,以在特征传递和融合过程中增加多尺…...

Git 的安装和配置

Git 是跨平台的&#xff0c;可以在 Windows&#xff0c;Linux、Unix 和 Mac 各几大平台上使用 由于笔者主要是使用 Windows&#xff0c;其他平台下安装 Git 的方法暂且不表&#xff08;可参考廖雪峰老师的博客&#xff1a;安装 Git&#xff09; ‍ Windows 安装 Git 从 Git…...

InternVL 微调实践

任务 follow 教学文档和视频使用QLoRA进行微调模型&#xff0c;复现微调效果&#xff0c;并能成功讲出梗图. 复现过程 参考教程部署&#xff1a;https://github.com/InternLM/Tutorial/blob/camp3/docs/L2/InternVL/joke_readme.md 训练 合并权重&&模型转换 pyth…...

自然语言处理在人工智能领域的发展历程,以及NLP重点模型介绍

大家好&#xff0c;我是微学AI&#xff0c;今天给大家介绍一下自然语言处理在人工智能领域的发展历程&#xff0c;以及NLP重点模型介绍。本文详细介绍了自然语言处理的发展历程&#xff0c;同时深入探讨了各种自然语言处理模型的原理与应用。文章首先回顾了自然语言处理技术的发…...

Replit Agent:AI驱动的全自动化软件开发革命

目录 引言Replit Agent核心功能使用场景与优势最新版本更新处理复杂项目的能力常见问题解决方案支持的编程语言和技术栈与其他AI编程工具的比较结语 引言 在人工智能快速发展的今天&#xff0c;软件开发领域正经历着前所未有的变革。Replit Agent作为AI初创公司Replit推出的…...

SAP调用发起泛微OA流程

SAP调用泛微Servlet接口&#xff0c;发起流程 编写servlet接口&#xff0c;给SAP调用 public class SAPCreateWorkflow extends HttpServlet{private static final long serialVersionUID 1L;public void doPost(HttpServletRequest request, HttpServletResponse response)…...

JAVA毕业设计184—基于Java+Springboot+vue3的企业信用信息管理系统(源代码+数据库)

毕设所有选题&#xff1a; https://blog.csdn.net/2303_76227485/article/details/131104075 基于JavaSpringbootvue3的企业信用信息管理系统(源代码数据库)184 一、系统介绍 本项目前后端分离(可以改为ssm版本)&#xff0c;分为用户、管理员两种角色 1、用户&#xff1a; …...

webshell-HTTP常见特征

一、总体特点 二、蚁剑 数据中可以看到一些明文字符串函数&#xff0c;响应中可以看到响应的明文数据。 ant特征以及对数据base64可以解码 chr类别的会出现大量的chr编码 大量的百分号字符 三、哥斯拉 第一个请求包很大 响应为0 密钥被拆分到数据前后 响应包cookie带&#xf…...

docker简单熟悉

‌Docker 容器和‌虚拟机区别‌ Docker容器与虚拟机的主要区别在于虚拟化层次和资源占用&#xff1a; ‌虚拟化层次‌&#xff1a;Docker容器在操作系统级别进行虚拟化&#xff0c;共享宿主机的内核&#xff1b;而虚拟机在硬件级别进行虚拟化&#xff0c;每个虚拟机都拥有独立…...

《深海迷航》风灵月影修改器进阶教程:揭秘海底无限奥秘

潜入《深海迷航》那神秘莫测的海底世界&#xff0c;风灵月影修改器将成为你探索未知的得力助手。 遵循以下步骤&#xff0c;解锁无尽资源与生存优势&#xff1a; 1.安装与启动&#xff1a; 确保从安全源下载风灵月影修改器并安装完毕。启动游戏后&#xff0c;随即开启修改器&…...

为什么说函数传递参数最好小于四个

有一个建议说时函数传递参数最好不超过四个&#xff0c;原因有一个是参数太多难以维护&#xff0c;另一个重要的原因就是函数传递小于四个参数时候效率会更高&#xff0c;其实这个说法也不全对&#xff0c;在不同的结构下不太一样&#xff0c;也不一定是4 其实那么下面将探究函…...

三维立体自然资源“一张图”

随着信息技术的发展&#xff0c;自然资源管理迎来了新的机遇与挑战。在众多技术中&#xff0c;“三维立体自然资源‘一张图’”的概念尤为引人注目。它不仅代表了地理信息科学领域的最新成果&#xff0c;也为自然资源的有效管理和可持续利用提供了强有力的支持。本文将探讨这一…...

语言的重定向

输入输出重定向是相当有意思的一门技术&#xff0c;比如有的人每个月的收入自动转10%到支付宝&#xff0c;20%进了老婆的账户。这么有效益的事情&#xff0c;基本所有的操作系统都支持&#xff0c;本质上它不是编程语言特性&#xff0c;编程语言只是为了更方便调用操作系统的重…...

Snap 发布新一代 AR 眼镜,有什么特别之处?

Snap 发布新一代 AR 眼镜&#xff0c;有什么特别之处&#xff1f; Snap 简介 新一代的 AR 眼镜特点 Snap 简介 Snap 公司成立于 2010 年&#xff0c;2017 年美国东部时间 3 月 2 日上午 11 时许&#xff0c;在纽交所正式挂牌交易&#xff0c;股票代码为 “SNAP”。其旗下的核…...

深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录

ASP.NET Core 是一个跨平台的开源框架&#xff0c;用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录&#xff0c;以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...

反射获取方法和属性

Java反射获取方法 在Java中&#xff0c;反射&#xff08;Reflection&#xff09;是一种强大的机制&#xff0c;允许程序在运行时访问和操作类的内部属性和方法。通过反射&#xff0c;可以动态地创建对象、调用方法、改变属性值&#xff0c;这在很多Java框架中如Spring和Hiberna…...

selenium学习实战【Python爬虫】

selenium学习实战【Python爬虫】 文章目录 selenium学习实战【Python爬虫】一、声明二、学习目标三、安装依赖3.1 安装selenium库3.2 安装浏览器驱动3.2.1 查看Edge版本3.2.2 驱动安装 四、代码讲解4.1 配置浏览器4.2 加载更多4.3 寻找内容4.4 完整代码 五、报告文件爬取5.1 提…...

tree 树组件大数据卡顿问题优化

问题背景 项目中有用到树组件用来做文件目录&#xff0c;但是由于这个树组件的节点越来越多&#xff0c;导致页面在滚动这个树组件的时候浏览器就很容易卡死。这种问题基本上都是因为dom节点太多&#xff0c;导致的浏览器卡顿&#xff0c;这里很明显就需要用到虚拟列表的技术&…...

NXP S32K146 T-Box 携手 SD NAND(贴片式TF卡):驱动汽车智能革新的黄金组合

在汽车智能化的汹涌浪潮中&#xff0c;车辆不再仅仅是传统的交通工具&#xff0c;而是逐步演变为高度智能的移动终端。这一转变的核心支撑&#xff0c;来自于车内关键技术的深度融合与协同创新。车载远程信息处理盒&#xff08;T-Box&#xff09;方案&#xff1a;NXP S32K146 与…...

JVM 内存结构 详解

内存结构 运行时数据区&#xff1a; Java虚拟机在运行Java程序过程中管理的内存区域。 程序计数器&#xff1a; ​ 线程私有&#xff0c;程序控制流的指示器&#xff0c;分支、循环、跳转、异常处理、线程恢复等基础功能都依赖这个计数器完成。 ​ 每个线程都有一个程序计数…...

代码规范和架构【立芯理论一】(2025.06.08)

1、代码规范的目标 代码简洁精炼、美观&#xff0c;可持续性好高效率高复用&#xff0c;可移植性好高内聚&#xff0c;低耦合没有冗余规范性&#xff0c;代码有规可循&#xff0c;可以看出自己当时的思考过程特殊排版&#xff0c;特殊语法&#xff0c;特殊指令&#xff0c;必须…...

python爬虫——气象数据爬取

一、导入库与全局配置 python 运行 import json import datetime import time import requests from sqlalchemy import create_engine import csv import pandas as pd作用&#xff1a; 引入数据解析、网络请求、时间处理、数据库操作等所需库。requests&#xff1a;发送 …...

MySQL 主从同步异常处理

阅读原文&#xff1a;https://www.xiaozaoshu.top/articles/mysql-m-s-update-pk MySQL 做双主&#xff0c;遇到的这个错误&#xff1a; Could not execute Update_rows event on table ... Error_code: 1032是 MySQL 主从复制时的经典错误之一&#xff0c;通常表示&#xff…...

CppCon 2015 学习:REFLECTION TECHNIQUES IN C++

关于 Reflection&#xff08;反射&#xff09; 这个概念&#xff0c;总结一下&#xff1a; Reflection&#xff08;反射&#xff09;是什么&#xff1f; 反射是对类型的自我检查能力&#xff08;Introspection&#xff09; 可以查看类的成员变量、成员函数等信息。反射允许枚…...