当前位置: 首页 > news >正文

golang rabbitMQ 生产者复用channel以及生产者组分发策略

引用的是rabbitMQ官方示例的库:github.com/rabbitmq/amqp091-go

在网络编程中我们知道tcp连接的创建、交互、销毁等相关操作的"代价"都是很高的,所以就要去实现如何复用这些连接,并要做到高效并可靠。

预期效果:

项目初始化构建时可以自定义选择生产者开启多个connection,每个connection可以启动多少个channel【都是全局复用的】,因为rabbitMQ所有的命令都是基本都是通过channel去操作完成的,所以这个channel很重要,也是我们想要复用的重点。

初始化创建完connection和channel后,当生产者需要发送一条消息的时候,我们可以通过一些策略去选择它发送到哪个connection和channel,我这里采用的就是随机选择,也可以采用哈希取模、轮询权重算法等,这个可以根据自身业务来做。

我简单画了一个效果图:

定义RabbitMQ结构体以及Config结构体

type Config struct {Host     stringPort     intUser     stringPassword string
}type RabbitMQ struct {ctx     context.Contextn       intm       *sync.MutexConn    *amqp.ConnectionChannel []*amqp.Channel
}

实例化RabbitMQ结构体

func (mq *RabbitMQ) New(config Config) (rabbitmq *RabbitMQ) {configString := fmt.Sprintf("amqp://%s:%s@%s:%d/", config.User, config.Password, config.Host, config.Port)conn, err := amqp.Dial(configString)if err != nil {log.Panicf("amqp connect error: %v \n", err)}rabbitmq = &RabbitMQ{ctx:  context.Background(),m:    &sync.Mutex{},Conn: conn,}return
}

一、创建消费者

// ConsumeWithWork rabbitmq消费消息[work模式 channelNums可以设置当前连接开启多少个channel]
func (mq *RabbitMQ) ConsumeWithWork(queueName string, channelNums int) {for i := 0; i < channelNums; i++ {go func(i int) {ch, err := mq.Conn.Channel()if err != nil {log.Panicf("amqp open a channel error: %v \n", err)}q, err := ch.QueueDeclare(queueName, // nametrue,      // durablefalse,     // delete when unusedfalse,     // exclusivefalse,     // no-waitnil,       // arguments)if err != nil {log.Panicf("amqp declare a queue error: %v \n", err)}err = ch.Qos(1,     // prefetch count0,     // prefetch sizefalse, // global)if err != nil {log.Panicf("amqp set QoS error: %v \n", err)}msg, err := ch.Consume(q.Name, // queue"",     // consumerfalse,  // auto-ackfalse,  // exclusivefalse,  // no-localfalse,  // no-waitnil,    // args)if err != nil {log.Panicf("amqp register a consumer error: %v \n", err)}log.Printf(" [work-%d] Waiting for messages. To exit press CTRL+C", i)for d := range msg {time.Sleep(2 * time.Second)fmt.Printf("[work-%d] Received a message: %s \n", i, d.Body)err = d.Ack(false)if err != nil {log.Printf("work_one Ack Err: %v", err)}}}(i)}var forever chan struct{}<-forever
}

二、创建生产者组

// NewPlusherGroups 创建生产者组
func NewPlusherGroups(config Config, connNums, channelNums int) (plusherGroups map[int]*RabbitMQ) {plusherGroups = make(map[int]*RabbitMQ, connNums)for i := 0; i < connNums; i++ {var rabbitmq *RabbitMQrabbitmq = rabbitmq.New(config)rabbitmq.n = ifor cN := 0; cN < channelNums; cN++ {ch, err := rabbitmq.Conn.Channel()if err != nil {log.Panicf("amqp open a channel error: %v \n", err)}rabbitmq.Channel = append(rabbitmq.Channel, ch)}plusherGroups[i] = rabbitmq}return
}

三、将消息随机分发给不同的connection、channel

// SendMessageWithWork 生产者发送消息[work模式+(many conn and many channel)]
func SendMessageWithWork(plusherGroups map[int]*RabbitMQ, queueName, body string) bool {if plusherGroups == nil {log.Panicln("SendMessageWithWork plusherGroups params is nil!")}rand.Seed(time.Now().UnixNano())//获取连接个数connNums := len(plusherGroups)//随机分配一个连接对象randConnIndex := rand.Intn(connNums)//选择随机分配的连接对象conn := plusherGroups[randConnIndex]//既然采用了发布者复用conn、channel的形式那么一定要加锁处理//这里为每个对象的操作进行加锁(非线程安全,不加锁会报错的)//至于在存在并发竞争的情况下会存在一定性能损耗,但是我们配置好适量的conn和channel这个基本可以忽略conn.m.Lock()defer conn.m.Unlock()//获取当前对象的channel个数channelNums := len(conn.Channel)//随机分配一个channel对象randChannelIndex := rand.Intn(channelNums)//选择随机分配的channelch := conn.Channel[randChannelIndex]q, err := ch.QueueDeclare(queueName, // nametrue,      // durablefalse,     // delete when unusedfalse,     // exclusivefalse,     // no-waitnil,       // arguments)if err != nil {log.Panicf("amqp declare a queue error: %v \n", err)}body = fmt.Sprintf("conn[%d] channel[%d] send message : %s", randConnIndex, randChannelIndex, body)err = ch.PublishWithContext(conn.ctx,"",     // exchangeq.Name, // routing keyfalse,  // mandatoryfalse,amqp.Publishing{DeliveryMode: amqp.Persistent,ContentType:  "text/plain",Body:         []byte(body),})if err != nil {log.Panicf("amqp publish a message error: %v \n", err)}return true
}

四、main函数调用消费者

package mainimport (rabbitmq "go-test/rabbitmq/package"
)func main()  {queueName := "task_queue"config := rabbitmq.Config{Host: "192.168.6.103",Port: 5672,User: "root",Password: "root",}var mq *rabbitmq.RabbitMQmq = mq.New(config)//开启N个消费者mq.ConsumeWithWork(queueName, 3)
}

五、main函数调用生产者组发送消息

package mainimport ("fmt""github.com/gin-gonic/gin"rabbitmq "go-test/rabbitmq/package""net/http""time"
)func main()  {var messageNo intqueueName := "task_queue"config := rabbitmq.Config{Host: "192.168.6.103",Port: 5672,User: "root",Password: "root",}//conn连接数connNums := 2//channel连接数channelNums := 3//启动N个不同conn的连接,并且每个连接对应的channel为N个的rabbitmq实例plusherGroup := rabbitmq.NewPlusherGroups(config, connNums, channelNums)e := gin.Default()e.GET("/", func(c *gin.Context) {body := fmt.Sprintf("这是第%d条消息...", messageNo)if rabbitmq.SendMessageWithWork(plusherGroup, queueName, body) == true {messageNo++c.JSON(200, gin.H{"code": 200,"msg": "success",})} else {c.JSON(200, gin.H{"code": 500,"msg": "error",})}})server := &http.Server{Addr:         ":18776",Handler:      e,ReadTimeout:  time.Minute,WriteTimeout: time.Minute,}if err := server.ListenAndServe(); err != nil {panic(any("HttpServer启动失败"))}
}

执行流程:

  1. 启动消费者进程

可以看到我们用3个协程开启了3个work,也就是对应了3个channel

  1. 启动生产者组进程

这里用的gin框架,正常启动

我们可以看到rabbitMQ的控制台中,一共3个连接,1个是消费者进程,另外2个是生产者组进程,这2个正好和我们上面配置的connNums参数匹配

我们可以看到rabbitMQ的控制台中,一共9个channel,3个是消费者进程,另外6个是生产者组进程,这6个正好和我们上面配置的channelNums参数匹配

  1. 调用发送消息

ab.exe -n 1000 -c 1000 http://127.0.0.1:18776/

我们来看消费者日志打印情况,标红的可以证明我们在发送消息时让生产者根据我们的随机分配策略选择connection和channel

相关文章:

golang rabbitMQ 生产者复用channel以及生产者组分发策略

引用的是rabbitMQ官方示例的库&#xff1a;github.com/rabbitmq/amqp091-go在网络编程中我们知道tcp连接的创建、交互、销毁等相关操作的"代价"都是很高的&#xff0c;所以就要去实现如何复用这些连接&#xff0c;并要做到高效并可靠。预期效果&#xff1a;项目初始化…...

掌握了这项技能的性能测试师,90%都升职加薪了

初入职场的新人该怎么做才能让自己快速成长&#xff1f;在公司一直做着手工测试&#xff0c;如何才能提升自己&#xff0c;避免陷入“只涨年龄不涨经验”的尴尬&#xff1f;做为一名软件测试工程师&#xff0c;我们不得不去面对这些问题&#xff0c;有的人找到了答案&#xff0…...

linux中crontab定时任务导致磁盘满和云监控未报警的的坑

一个后台开发者&#xff0c;兼职运维工作中&#xff0c;配置linux中crontab定时任务&#xff0c;导致磁盘满和云监控未报警的问题的坑。 1.磁盘满 使用命令 df -h2.问题排查 2.1排查日志 命令 cat /var/log/messages日志文件的默认路径是&#xff1a;/var/log 下面是几个…...

vscode中安装python运行调试环境

在运行代码之前&#xff0c;需要到微软商店下载安装python环境&#xff0c;35m&#xff0c;都是自动的。 1、安装python 的extensions插件。 ctrlshiftx 输入 python 后点击 install 按钮。 2、新建文件夹spider文件夹。 3、在新建文件夹spider下新建文件spider.py源代码。…...

【微服务】微服务架构超强讲解,通俗易懂

微服务架构目录一、微服务架构介绍二、出现和发展三、传统开发模式和微服务的区别四、微服务的具体特征五、面向服务的架构SOA&#xff08;service oriented architecture&#xff09;和微服务的区别1、SOA喜欢重用&#xff0c;微服务喜欢重写2、SOA喜欢水平服务&#xff0c;微…...

内核中的竞态产生的原因和解决方法

产生原因&#xff1a; 由于多进程对临界资源的抢占 根本原因&#xff1a; 1、对于单核处理器而言&#xff0c;内核支持抢占就会出现竞态 2、对于多核处理器而言&#xff0c;是核与核的竞态 3、进程与中断间存在竞态 4、arm开发板不会出现中断与中断间的竞态&#xff08;目前&am…...

【微服务】Elasticsearch文档索引库操作(二)

&#x1f697;Es学习第二站~ &#x1f6a9;Es学习起始站&#xff1a;【微服务】Elasticsearch概述&环境搭建(一) &#x1f6a9;本文已收录至专栏&#xff1a;微服务探索之旅 &#x1f44d;希望您能有所收获 一.索引库操作 索引库就类似数据库表&#xff0c;mapping映射就类…...

【论文速递】NAACL2022-DEGREE: 一种基于生成的数据高效事件抽取模型

【论文速递】NAACL2022-DEGREE: 一种基于生成的数据高效事件抽取模型 【论文原文】&#xff1a;DEGREE A Data-Efficient Generation-Based Event Extraction Mode 【作者信息】&#xff1a;I-Hung Hsu &#xff0c; Kuan-Hao Huang&#xff0c; Elizabeth Boschee &#xff…...

C++类和对象(下)

✨个人主页&#xff1a; Yohifo &#x1f389;所属专栏&#xff1a; C修行之路 &#x1f38a;每篇一句&#xff1a; 图片来源 I do not believe in taking the right decision. I take a decision and make it right. 我不相信什么正确的决定。我都是先做决定&#xff0c;然后把…...

Java常见的六种线程池、线程池-四种拒绝策略总结

点个关注&#xff0c;必回关 一、线程池的四种拒绝策略&#xff1a; CallerRunsPolicy - 当触发拒绝策略&#xff0c;只要线程池没有关闭的话&#xff0c;则使用调用线程直接运行任务。 一般并发比较小&#xff0c;性能要求不高&#xff0c;不允许失败。 但是&#xff0c;由于…...

Node=>Express中间件分类 学习4

1.中间件分类 应用级别的中间件路由级别的中间件错误级别的中间件Express 内置的中间件第三方的中间件 通过app.use&#xff08;&#xff09;或app.get&#xff08;&#xff09;或app.post&#xff08;&#xff09;绑定到app实力上的中间件&#xff0c;叫做应用级别的中间件 …...

在阿里当外包,是一种什么工作体验?

上周和在阿里做外包的朋友一起吃饭&#xff0c;朋友吃着吃着&#xff0c;就开启了吐槽模式。 他一边喝酒一边说&#xff0c;自己现在做着这份工作&#xff0c;实在看不到前途。 看他状态不佳&#xff0c;问了才知道&#xff0c;是手上的项目太磨人。 他们现在做的项目&#…...

Vue3快速入门【二】

Vue3快速入门一、传值父传子&#xff0c;子传父v-model二、插槽2.1、匿名插槽2.2、具名插槽2.3、插槽作用域2.4、插槽作用域案例2.4.1、初始布局2.4.2、插槽使用2.4.3、点击编辑按钮获取本行数据&#xff08;插槽作用域的使用&#xff09;2.4.4、类型书写优化2.4.5、全局接口抽…...

C++-类和对象(上)

类和对象&#xff08;上&#xff09;一&#xff0c;构造函数1&#xff0c;概念2&#xff0c;特性二&#xff0c;析构函数1&#xff0c;概念2&#xff0c;特性三&#xff0c;拷贝构造1&#xff0c;概念2&#xff0c;特性四&#xff0c;运算符重载1&#xff0c;概念2&#xff0c;…...

CAPL(vTESTStudio) - DoIP - TCP接收_04

TCP接收 函数介绍 TcpOpen函数...

联合培养博士经历对于国内就业有优势吗?

2023年国家留学基金委&#xff08;CSC&#xff09;申请在即&#xff0c;很多在读博士在关心申报的同时&#xff0c;也对联培经历能否有助于国内就业心中存疑&#xff0c;故此知识人网小编重点解答此问题。之前&#xff0c;我们在“CSC联合培养-国内在读博士出国的绝佳选择”一文…...

测试左移之需求质量

测试左移的由来 缺陷的修复成本逐步升高 下面是质量领域司空见惯的一张图&#xff0c;看图说话&#xff0c;容易得出&#xff1a;大部分缺陷都是早期引入的&#xff0c;同时大部分缺陷都是中晚期发现的&#xff0c;而缺陷发现的越晚&#xff0c;其修复成本就越高。因此&#…...

【数据结构初阶】第三节.顺序表详讲

文章目录 前言 一、顺序表的概念 二、顺序表功能接口概览 三、顺序表基本功能的实现 四、四大功能 1、增加数据 1.1 头插法&#xff1a; 1.2 尾插法 1.3 指定下标插入 2、删除数据 2.1 头删 2.2 尾删 2.3 指定下标删除 2.4 删除首次出现的指定元素 3、查找数据…...

新手小白适合做跨境电商吗?

今天的跨境电商已经逐渐成熟&#xff0c;靠运气赚钱的时代早已过去&#xff0c;馅饼不可能从天上掉下来&#xff0c;尤其是你想做一个没有货源的小白劝你醒醒。做跨境电商真的不容易&#xff0c;要想做&#xff0c;首先要分析自己是否适合做。米贸搜整理了以下资料&#xff0c;…...

Python搭建自己[IP代理池]

IP代理是什么&#xff1a;ip就是访问网页数据服务器位置信息&#xff0c;每一个主机或者网络都有一个自己IP信息为什么要使用代理ip&#xff1a;因为在向互联网发送请求中&#xff0c;网页端会识别客户端是真实用户还是爬虫程序&#xff0c;在今天以互联网为主导的世界中&#…...

pandas——plot()方法可视化

pandas——plot()方法可视化 作者&#xff1a;AOAIYI 创作不易&#xff0c;如果觉得文章不错或能帮助到你学习&#xff0c;记得点赞收藏评论哦 在此&#xff0c;感谢你的阅读 文章目录pandas——plot()方法可视化一、实验目的二、实验原理三、实验环境四、实验内容五、实验步骤…...

【Three.js基础】坐标轴辅助器、requestAnimationFrame处理动画、Clock时钟、resize页面尺寸(二)

&#x1f431; 个人主页&#xff1a;不叫猫先生 &#x1f64b;‍♂️ 作者简介&#xff1a;前端领域新星创作者、阿里云专家博主&#xff0c;专注于前端各领域技术&#xff0c;共同学习共同进步&#xff0c;一起加油呀&#xff01; &#x1f4ab;系列专栏&#xff1a;vue3从入门…...

C++之完美转发、移动语义(forward、move函数)

完美转发1. 在函数模板中&#xff0c;可以将自己的参数“完美”地转发给其它函数。所谓完美&#xff0c;即不仅能准确地转发参数的值&#xff0c;还能保证被转发参数的左、右值属性不变。2. C11标准引入了右值引用和移动语义&#xff0c;所以&#xff0c;能否实现完美转发&…...

LeetCode刷题系列 -- 48. 旋转图像

给定一个 n n 的二维矩阵 matrix 表示一个图像。请你将图像顺时针旋转 90 度。你必须在 原地 旋转图像&#xff0c;这意味着你需要直接修改输入的二维矩阵。请不要 使用另一个矩阵来旋转图像。示例 1&#xff1a;输入&#xff1a;matrix [[1,2,3],[4,5,6],[7,8,9]]输出&#…...

在多线程环境下使用哈希表

一.HashTable和HashMapHashTable是JDK1.0时创建的&#xff0c;其在创建时考虑到了多线程情况下存在的线程安全问题&#xff0c;但是其解决线程安全问题的思路也相对简单&#xff1a;在其众多实现方法上加上synchronized关键字&#xff08;效率较低&#xff09;&#xff0c;保证…...

【排序算法】堆排序(Heap Sort)

堆排序是指利用堆这种数据结构所设计的一种排序算法。堆是一个近似完全二叉树的结构&#xff0c;并同时满足堆积的性质&#xff1a;即子结点的键值或索引总是小于&#xff08;或者大于&#xff09;它的父节点。堆排序介绍学习堆排序之前&#xff0c;有必要了解堆&#xff01;若…...

分类预测 | Matlab实现SSA-RF和RF麻雀算法优化随机森林和随机森林多特征分类预测

分类预测 |Matlab实现SSA-RF和RF麻雀算法优化随机森林和随机森林多特征分类预测 目录分类预测 |Matlab实现SSA-RF和RF麻雀算法优化随机森林和随机森林多特征分类预测分类效果基本介绍模型描述程序设计参考资料分类效果 基本介绍 Matlab实现SSA-RF和RF麻雀算法优化随机森林和随机…...

Allegro如何添加ICT操作指导

Allegro如何添加ICT操作指导 当PCB板需要做飞针测试的时候,通常需要在PCB设计的时候给需要测试的网络添加上ICT。 如图: Allegro支持给网络添加ICT,具体操作如下 首先在库中创建一个阻焊开窗的过孔,比如via10-ict一般阻焊开窗的尺寸比盘单边大2mil 在PCB中选择Manufacture…...

软件架构设计(二)——领域架构、基于架构的软件开发方法

目录 一、架构描述语言 ADL 二、特定领域软件架构 DSSA 三、DSSA的三层次架构模型 . 四、基于架构的软件开发方法 (1)基于架构的软件设计(ABSD) (2)开发过程 一、架构描述语言 ADL ADL是一种形式化语言&#xff0c;它在底层语义模型的支持下&#xff0c;为软件系统概念体…...

数组常用方法(2)---数组遍历方法

1. forEach(cb) 回调函数中有三个参数&#xff0c;第一个是当前遍历项&#xff08;必须&#xff09;&#xff0c;第二个是索引&#xff0c;第三个是遍历的数组本身。forEach() 对于空数组不会执行回调函数。forEach()不会使用回调函数的返回值&#xff0c;返回值为undefined。…...

做电商网站都需要学什么/网站百度收录要多久

时间久了很容易忘&#xff0c;这里做个备份FastCGI协议php语言的实现&#xff0c;可以高效处理来自web端的动态请求php-fpm维护一个或者多个php-cgi进程池&#xff0c;处理请求时不需要频繁创建进程所以比传统的CGI协议要更高效技术架构 单master - 多workermaster 非阻塞&…...

wordpress5.1更新/公司品牌宣传

阅读文本大概需要3分钟。公司建设一个SaaS平台&#xff0c;用于发布各种企业级的SaaS应用&#xff0c;需要新增一个多租户版本的用户管理系统&#xff0c;下面归纳总结下整个多租户版本的用户管理数据模型设计。1、数据存储架构现有的多租户用户数据存储主要分为三种方式&#…...

17一起做网站广州/网站建设需要啥

目录1.ECSHOP商品比较,如何限制每次比较的数量 2.将ECSHOP购买记录的用户名的后几位字符用***代替 3.ECSHOP商品页,加入购物车弹出浮动层,仿淘宝效果 4.在ECSHOP会员中心显示冻结资金 5.ECSHOP商品页购买记录的每页条数如何修改 6.ECSHOP信息提示页面的跳转时间设置. 7.在ECSHO…...

濮阳做网站的价格/长沙seo推广优化

2019独角兽企业重金招聘Python工程师标准>>> php程序分纯的MVC模式&#xff0c;还有所谓过程化的框架&#xff0c;帝国cms和discuz所于过程化的框架&#xff0c;而shopnc属于纯MVC模式。 要说哪个好&#xff0c;还真很给判断各有各的优点吧。 过程化的好处是灵活&am…...

广广东网站建设/做一套二级域名网站怎么做

sar也是一个Linux下用于查看系统资源的命令。CentOS好像不自带这个命令&#xff0c;要先安装sysstat包。它包括了sar、iostat和mpstat这三个工具。sar主要用于监视CPU和磁盘I/O情况。安装&#xff1a;# yum install sysstat一般格式&#xff1a;sar [ options… ] [ [ ] ]常用选…...

wordpress 选择插件/公司网络推广排名定制

【PConline资讯】Win10宽带无法连接提示“调制解调器报告了一个错误”怎么解决&#xff1f; 怎么解决&#xff1f;最近一位Win10用户遇到宽带无法连接的情况&#xff0c;系统提示“调制解调器(或其他连接设备)报告了一个错误。”&#xff0c;这是怎么回事呢&#xff1f;出现这种…...