基于golang多消息队列中间件的封装nsq,rabbitmq,kafka
基于golang多消息队列中间件的封装nsq,rabbitmq,kafka
场景
在创建个人的公共方法库中有这样一个需求,就是不同的项目会用到不同的消息队列中间件,我的思路把所有的消息队列中间件进行封装一个消息队列接口(MQer)有两个方法一个生产一个消费,那么在实例化对象的时候根据配置文件指定当前项目使用的那个消息队列中间件;
接口模型
这个模型的核心思想是消息队列的核心功能生产者生产消息方法和消费者消费消息,任何消息队列都必须有这两个功能;根据如下代码消息队列中间件是可扩展的,只需在实例化消息队列对象那里添加新消息队列的实现;
// MQer 消息队列接口
type MQer interface {Producer(topic string, data []byte)Consumer(topic, channel string, ch chan []byte, f func(b []byte))
}// NewMQ 实例化消息队列对象
func NewMQ() MQer {switch conf.Conf.Default.Mq { // mq 设置的类型case "nsq":return new(MQNsqService)case "rabbit":return new(MQRabbitService)case "kafka":return new(MQKafkaService)default:return new(MQNsqService)}
}/*
配置文件结构设计mqType: "" # nsq, rabbit, kafka 这三个值然当然了是可扩展的nsq:producer: ""consumer: ""rabbit:addr: ""user: ""password: ""kafka:addr: ""
*/
各个消息队列的实现
1. 依赖库
- nsq : github.com/nsqio/go-nsq
- rabbitmq : github.com/streadway/amqp
- kafka : github.com/Shopify/sarama
2. nsq
nsq结构体
// MQNsqService NSQ消息队列
type MQNsqService struct {
}
生产者
// Producer 生产者
func (m *MQNsqService) Producer(topic string, data []byte) {nsqConf := &nsq.Config{}client, err := nsq.NewProducer(nsqServer, nsqConf)if err != nil {log.Error("[nsq]无法连接到队列")return}log.DebugF(fmt.Sprintf("[生产消息] topic : %s --> %s", topic, string(data)))err = client.Publish(topic, data)if err != nil {log.Error("[生产消息] 失败 : " + err.Error())}
}
消费者
var (nsqServer = conf.Conf.Default.Nsq.Producer // nsqServer
)// Consumer 消费者
func (m *MQNsqService) Consumer(topic, channel string, ch chan []byte, f func(b []byte)) {mh, err := NewMessageHandler(nsqServer, channel)if err != nil {log.Error(err)return}go func() {mh.SetMaxInFlight(1000)mh.Registry(topic, ch)}()go func() {for {select {case s := <-ch:f(s)}}}()log.DebugF("[NSQ] ServerID:%v => %v started", channel, topic)
}// MessageHandler MessageHandler
type MessageHandler struct {msgChan chan *goNsq.Messagestop boolnsqServer stringChannel stringmaxInFlight int
}// NewMessageHandler return new MessageHandler
func NewMessageHandler(nsqServer string, channel string) (mh *MessageHandler, err error) {if nsqServer == "" {err = fmt.Errorf("[NSQ] need nsq server")return}mh = &MessageHandler{msgChan: make(chan *goNsq.Message, 1024),stop: false,nsqServer: nsqServer,Channel: channel,}return
}// Registry register nsq topic
func (m *MessageHandler) Registry(topic string, ch chan []byte) {config := goNsq.NewConfig()if m.maxInFlight > 0 {config.MaxInFlight = m.maxInFlight}consumer, err := goNsq.NewConsumer(topic, m.Channel, config)if err != nil {panic(err)}consumer.SetLogger(nil, 0)consumer.AddHandler(goNsq.HandlerFunc(m.handlerMessage))err = consumer.ConnectToNSQLookupd(m.nsqServer)if err != nil {panic(err)}m.process(ch)
}
- rabbitmq
结构体
// MQRabbitService Rabbit消息队列
type MQRabbitService struct {
}
生产者
// Producer 生产者
func (m *MQRabbitService) Producer(topic string, data []byte) {mq, err := NewRabbitMQPubSub(topic)if err != nil {log.Error("[rabbit]无法连接到队列")return}//defer mq.Destroy()log.DebugF(fmt.Sprintf("[生产消息] topic : %s --> %s", topic, string(data)))err = mq.PublishPub(data)if err != nil {log.Error("[生产消息] 失败 : " + err.Error())}
}// NewRabbitMQPubSub 订阅模式创建 rabbitMq实例 (目前用的fanout模式)
func NewRabbitMQPubSub(exchangeName string) (*RabbitMQ, error) {mq, err := NewRabbitMQ("", exchangeName, "", "")if mq == nil || err != nil {return nil, err}//获取connectionmq.conn, err = amqp.Dial(mq.MqUrl)mq.failOnErr(err, "failed to connect mq!")if mq.conn == nil || err != nil {return nil, err}//获取channelmq.channel, err = mq.conn.Channel()mq.failOnErr(err, "failed to open a channel!")return mq, err
}...其余代码见源码: https://github.com/mangenotwork/common/tree/main/mq
消费者
// Consumer 消费者
func (m *MQRabbitService) Consumer(topic, serverId string, ch chan []byte, f func(b []byte)) {mh, err := NewRabbitMQPubSub(topic)if err != nil {log.Error("[rabbit]无法连接到队列")return}msg := mh.RegistryReceiveSub()go func(m <-chan amqp.Delivery) {for {select {case s := <-m:f(s.Body)}}}(msg)log.DebugF("[Rabbit] ServerID:%v => %v started", serverId, topic)
}// NewRabbitMQPubSub 订阅模式创建 rabbitMq实例 (目前用的fanout模式)
func NewRabbitMQPubSub(exchangeName string) (*RabbitMQ, error) {mq, err := NewRabbitMQ("", exchangeName, "", "")if mq == nil || err != nil {return nil, err}//获取connectionmq.conn, err = amqp.Dial(mq.MqUrl)mq.failOnErr(err, "failed to connect mq!")if mq.conn == nil || err != nil {return nil, err}//获取channelmq.channel, err = mq.conn.Channel()mq.failOnErr(err, "failed to open a channel!")return mq, err
}... 其余代码见源码: https://github.com/mangenotwork/common/tree/main/mq
- kafka
结构体
// MQKafkaService Kafka消息队列
type MQKafkaService struct {
}
生产者
func (m *MQKafkaService) Producer(topic string, data []byte) {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follower都确认config.Producer.Partitioner = sarama.NewRandomPartitioner //写到随机分区中,我们默认设置32个分区config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回// 构造一个消息msg := &sarama.ProducerMessage{}msg.Topic = topicmsg.Value = sarama.ByteEncoder(data)// 连接kafkaclient, err := sarama.NewSyncProducer(kafkaServer, config)if err != nil {log.Error("Producer closed, err:", err)return}defer client.Close()// 发送消息pid, offset, err := client.SendMessage(msg)if err != nil {log.Error("send msg failed, err:", err)return}log.InfoF("pid:%v offset:%v\n", pid, offset)
}
消费者
// Consumer 消费者
func (m *MQKafkaService) Consumer(topic, serverId string, ch chan []byte, f func(b []byte)) {var wg sync.WaitGroupconsumer, err := sarama.NewConsumer(kafkaServer, nil)if err != nil {log.ErrorF("Failed to start consumer: %s", err)return}partitionList, err := consumer.Partitions("task-status-data") // 通过topic获取到所有的分区if err != nil {log.Error("Failed to get the list of partition: ", err)return}log.Info(partitionList)for partition := range partitionList { // 遍历所有的分区pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest) // 针对每个分区创建一个分区消费者if err != nil {log.ErrorF("Failed to start consumer for partition %d: %s\n", partition, err)}wg.Add(1)go func(sarama.PartitionConsumer) { // 为每个分区开一个go协程取值for msg := range pc.Messages() { // 阻塞直到有值发送过来,然后再继续等待log.DebugF("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))f(msg.Value)}defer pc.AsyncClose()wg.Done()}(pc)}wg.Wait()consumer.Close()
}
总结
golang的接口是一种抽象类型,是对其他类型行为的概括与抽象,从语法角度来看,接口是一组方法定义的集合,文本的封装使用了golang接口这一特性,把所有的消息队列中间件抽象为一个MQer拥有生产和消费两个方法,具体的各个消息队列中间件去实现这两个方法即可,最明显的优点在于扩展性,解耦性,选择性,维护性这几个表象上。
完整代码
https://github.com/mangenotwork/common/tree/main/mq
你的星星是我分享的最大动力 : )
相关文章:
基于golang多消息队列中间件的封装nsq,rabbitmq,kafka
基于golang多消息队列中间件的封装nsq,rabbitmq,kafka 场景 在创建个人的公共方法库中有这样一个需求,就是不同的项目会用到不同的消息队列中间件,我的思路把所有的消息队列中间件进行封装一个消息队列接口(MQer)有两个方法一个…...
【第一阶段】kotlin的函数
函数头 fun main() {getMethod("zhangsan",22) }//kotlin语言默认是public,kotlin更规范,先有输入( getMethod(name:String,age:Int))再有输出(Int[返回值]) private fun getMethod(name:String,age:Int): Int{println("我叫…...
PAM安全配置-用户密码锁定策略
PAM是一个用于实现身份验证的模块化系统,可以在操作系统中的不同服务和应用程序中使用。 pam_faillock模块 pam_faillock模块用来实现账号锁定功能,它可以在一定的认证失败次数后锁定用户账号,防止暴力破解密码攻击。 常见选项 deny&…...
AndroidManifest.xml日常笔记
1 Bundle介绍 Bundle主要用于传递数据;它保存的数据,是以key-value(键值对)的形式存在的。 我们经常使用Bundle在Activity之间传递数据,传递的数据可以是boolean、byte、int、long、float、double、string等基本类型或它们对应的数组…...
SpringBoot异步框架
参考:解剖SpringBoot异步线程池框架_哔哩哔哩_bilibili 1、 为什么要用异步框架,它解决什么问题? 在SpringBoot的日常开发中,一般都是同步调用的。但经常有特殊业务需要做异步来处理,例如:注册新用户&…...
导出LLaMA ChatGlm2等LLM模型为onnx
通过onnx模型可以在支持onnx推理的推理引擎上进行推理,从而可以将LLM部署在更加广泛的平台上面。此外还可以具有避免pytorch依赖,获得更好的性能等优势。 这篇博客(大模型LLaMa及周边项目(二) - 知乎)进行…...
C++项目:在线五子棋对战网页版--匹配对战模块开发
玩家匹配是根据自己的天梯分数进行匹配的,而服务器中将玩家天梯分数分为三个档次: 1. 普通:天梯分数小于2000分 2. 高手:天梯分数介于2000~3000分之间 3. 大神:天梯分数大于3000分 当玩家进行对战匹配时,服…...
ssh 连接断开,正在执行的shell脚本也被中断了
背景 最近在训练chatGLM,一次训练经常要花掉近2个小时,但是由于网络不稳定,经常ssh莫名的断开,导致训练不得不重新开启,这就很浪费时间了 解决方案 下面教大家一种在后台执行命令的方案,即使你ssh连接断…...
UML 用例图,类图,时序图,活动图
UML之用例图,类图,时序图,活动图_用例图 时序图_siyan985的博客-CSDN博客 https://www.cnblogs.com/GumpYan/p/14734357.html 用例图与类图 - 简书...
Java 面试题2023
Java core JVM 1、JVM内存模型 2、JVM运行时内存分配 3、如何确定当前对象是个垃圾 4、GCrooot 包括哪些? 5、JVM对象头包含哪些部分 6、GC算法有哪些 7、JVM中类的加载机制 8、分代收集算法 9、JDK1.8 和 1.7做了哪些优化 10、内存泄漏和内存溢出有什么区别 11、J…...
【CSS3】CSS3 动画 ④ ( 使用动画制作地图热点图 )
文章目录 一、需求说明二、动画代码分析1、地图背景设置2、热点动画位置测量3、热点动画布局分析4、动画定义5、小圆点实现6、波纹效果盒子实现7、延迟动画设置 三、代码示例 一、需求说明 实现如下效果 , 在一张地图上 , 以某个位置为中心点 , 向四周发散 ; 核心 是实现 向四周…...
命令模式(Command)
命令模式是一种行为设计模式,可将一个请求封装为一个对象,用不同的请求将方法参数化,从而实现延迟请求执行或将其放入队列中或记录请求日志,以及支持可撤销操作。其别名为动作(Action)模式或事务(Transaction)模式。 Command is …...
Dapper 微型orm的光
介绍 Dapper是一个轻量级的ORM(对象关系映射)框架,它可以方便地将数据库查询结果映射到.NET对象上,同时也支持执行原生SQL查询。下面我将详细介绍Dapper的使用方法。 安装Dapper 首先,你需要通过NuGet包管理器将Dap…...
Mysql随心记--第一篇
MylSAM:查询速度快,有较好的索引优化和数据压缩技术,但是它不支持事务 InnoDB:它支持事务,并且提供行级的锁定,应用也相当广泛 docker ps -a --filter "ancestormysql" 查看linux中创建了多少个d…...
使用dockerfile安装各种服务组件
使用dockerfile安装各种服务组件 elasticsearch、minio、mongodb、nacos、redis 一、使用dockerfile安装elasticsearch:7.8.0 1、Dockerfile文件 FROM elasticsearch:7.8.0 #添加分词器 ADD elasticsearch-analysis-ik /usr/share/elasticsearch/plugins/elasticsearch-anal…...
如何简单的无人直播
环境搭建 ffmpeg安装,我这里用的是centos搭建的,其他平台可以自己百度 yum -y install wgetwget --no-check-certificate https://www.johnvansickle.com/ffmpeg/old-releases/ffmpeg-4.0.3-64bit-static.tar.xztar -xJf ffmpeg-4.0.3-64bit-static.ta…...
【基于HBase和ElasticSearch构建大数据实时检索项目】
基于HBase和ElasticSearch构建大数据实时检索项目 一、项目说明二、环境搭建三、编写程序四、测试流程 一、项目说明 利用HBase存储海量数据,解决海量数据存储和实时更新查询的问题;利用ElasticSearch作为HBase索引,加快大数据集中实时查询数…...
ProComponent 用法学习
相信很多同学都用过 Ant Design 这一 react 著名组件库,而 ProComponents 则是在 antd 之上进行封装的页面级组件库(指一个组件就可以搞定一个页面)。它同时也是 Ant Design Pro 中后台框架所用的主要组件库。如果你手上有要用 react 开发的中…...
巨人互动|Google海外户Google Analytics的优缺点是什么?
Google Analytics是一个由谷歌开发的网站分析工具,旨在帮助网站和移动应用程序运营者收集和分析数据,以更好地了解用户行为和改进业务。虽然Google Analytics具有许多优势,但也存在一些缺点。在本文中,我们将探讨Google Analytics…...
MySQL数据库的操作
MySQL 连接服务器 库的操作创建数据库数据库删除查看数据库进入数据库查看所在的数据库修改数据库显示创建语句查看连接情况 表的操作创建表查看数据库所有的表查看表的详细信息查看创建表时的详细信息删除表修改表名向表中插入数据在表结构中新增一列对表结构数据的修改删除表…...
人工智能行业岗位一览
人工智能行业的岗位薪资高、待遇好、涨薪快已经是公开的事实,那么在人工智能行业中具体有哪些职业岗位呢?对于普通人来说,想要入行人工智能又有哪些机会呢? 下面是人工智能领域中的一部分职业岗位,随着技术的不断发展&…...
《Linux运维实战:Docker基础总结》
一、简介 1、docker的基本结构是什么,包含哪些组件? docker的基本机构是c/s模式,即客户端/服务端模式。 由docker客户端和docker守护进程组成。docker客户端通过命令行或其它工具使用docker sdk与docker守护进程通信,发送容器管理…...
Clash 意外退出后 chrome / google 谷歌 浏览器无法连接互联网
解决方案: 以管理员模式打开命令行,输入:netsh winsock reset ,然后重启电脑 如果还不行的话, 在 chromevs中选中 设置>隐私和安全>安全>使用安全 dns> 使用您当前的服务提供商 即可...
89 | Python人工智能篇 —— 深度学习算法 Keras 实现 MNIST分类
本教程将带您深入探索Keras,一个开源的深度学习框架,用于构建人工神经网络模型。我们将一步步引导您掌握Keras的核心概念和基本用法,学习如何构建和训练深度学习模型,以及如何将其应用于实际问题中。 文章目录 Keras 构建实际mnist图像分类案例.1. 介绍2. 环境搭建3. 数据准…...
每天一道leetcode:剑指 Offer 32 - III. 从上到下打印二叉树 III(中等广度优先遍历)
今日份题目: 请实现一个函数按照之字形顺序打印二叉树,即第一行按照从左到右的顺序打印,第二层按照从右到左的顺序打印,第三行再按照从左到右的顺序打印,其他行以此类推。 示例 给定二叉树: [3,9,20,null,null,15,7…...
day10 快速排序 方法重载 和 方法递推
方法重载 斐波拉契数列问题 使用重载思想解决 public static int method(int n){if (n 2 ){return 1 ;}return (n-1)*2method(n-1);}public static int f(int n){if (n 1){return 1;}if (n 2){return 2;}return f(n-1)f(n-2);} 快速排序 思维很简单,类似二…...
Qt 6. 其他类调用Ui中的控件
1. 把主类指针this传给其他类,tcpClientSocket new TcpClient(this); //ex2.cpp #include "ex2.h" #include "ui_ex2.h"Ex2::Ex2(QWidget *parent): QDialog(parent), ui(new Ui::Ex2) {ui->setupUi(this);tcpClientSocket new TcpClient…...
PHP 的不同版本(src 版、nts 版和 win 版)之间的区别和共同点。
在下载php时会有很多版本供我们选择,PHP 的不同版本(src 版、nts 版和 win 版)之间的区别又是什么呢。 src 版本:src 版本指的是 PHP 的源代码版本,您需要自行编译并安装它。这个版本通常用于自定义编译、开发环境和高…...
3 vue的if语法
vue的if语法是相当于一个标签的属性来写进去的,比如说<h1 v-if“”>。要注意的是if语句里可以自动从数据层取值的,比如<h1 v-if"message">,这里就会自动把key为message的值取过来,而如果要传一个字符串&…...
python基础3——流程控制
文章目录 一、操作符1.1 比较操作符1.2 逻辑操作符1.3 成员操作符1.4 身份操作符 二、流程控制2.1 条件判断2.2 循环语句2.2.1 for循环2.2.2 while循环 2.3 continue与break语句2.4 文件操作函数 三、函数3.1 定义函数3.2 作用域3.3 闭包3.4 函数装饰器3.5 内建函数 一、操作符…...
短网址生成系统源码/搜索引擎优化培训中心
WebSocket 以前没用过,之前写过一篇博客是基于原生socket的(查看)比较复杂,慎入。今天另外一个APP需要接websocket了,然后便找到了facebook的 SocketRocket 框架,然后用了一天时间接上了,完成了…...
常州做网站找哪家好/免费外贸接单平台
2019独角兽企业重金招聘Python工程师标准>>> 我们在编写代码的时候,总会遇到一些需要反复使用的代码片段。这时候就需要反复的复制和黏贴,大大影响效率。我们利用Sublime Text的snippet功能,就能很好的解决这一问题。通俗的讲&…...
本地旅游网站模版/公司网站推广方法
python神器 Jupyter Notbook 简介 Jupyter Notebook是基于网页的用于交互计算的应用程序。其可被应用于全过程计算:开发、文档编写、运行代码和展示结果。 Jupyter Notebook官方 简而言之,Jupyter Notebook是以网页的形式打开,可以在网页页面…...
wordpress上传gif/个人引流推广怎么做
Taro.showToast(option) 显示消息提示框 Taro.showToast({title: 成功,icon: success,duration: 2000 })Taro.showModal(option) 显示模态对话框 Taro.showModal({title: 提示,content: 这是一个模态弹窗,success: function (res) {if (res.confirm) {console.log(用户点击…...
自己建网站怎么赚钱/海南seo顾问服务
👇👇关注后回复 “进群” ,拉你进程序员交流群👇👇来源:cnblogs.com/peiyu1988.html01前言2019年初,我通过一整天的笔试及面试加入一家(某一线城市国资委全资控股)某集团…...
网页设计就业/企业关键词优化公司
2019独角兽企业重金招聘Python工程师标准>>> 基本思路:通过一个队列保存最近10次的登录记录,下一次登录请求来时,通过与队列中的第一条进行比较,如果在1分钟之内则拒绝请求,否则移除队首元素,将…...