当前位置: 首页 > news >正文

RabbitMQ从原理到实战—基于Golang【万字详解】

文章目录

  • 前言
  • 一、MQ是什么?
    • 优势
    • 劣势
  • 二、MQ的用途
    • 1、应用解耦
    • 2、异步加速
    • 3、削峰填谷
    • 4、消息分发
  • 三、RabbitMQ是什么
    • 1、AMQP 协议
    • 2、RabbitMQ 包含的要素
    • 3、RabbitMQ 基础架构
  • 四、实战
    • 1、Simple模式(即最简单的收发模式)
    • 2、Work Queues 模型
    • 3、Publish/Subscribe 模型
    • 4、Routing 模型
    • 5、Topics 模型


前言

最近秋招开始找工作,顺便回顾消息队列并且总结。

一、MQ是什么?

消息队列(Message Queue)是一种在应用程序之间传递消息的通信模式。它通过在发送者和接收者之间建立一个消息队列来实现异步通信和解耦。

在消息队列模式中,发送者(Producer)将消息发送到一个中间件(Message Broker)中的消息队列,而接收者(Consumer)则从该队列中接收和处理消息。这种方式使得发送者和接收者可以独立地进行处理,而无需直接交互,从而实现解耦。发送者和接收者只需要知道如何与消息队列进行通信,而不需要知道彼此的存在。

优势

1. 异步通信:发送者将消息放入队列后即可继续进行其他操作,无需等待接收者的响应。接收者可以在合适的时候从队列中获取消息进行处理,实现了异步通信模式。

2. 解耦:发送者和接收者之间通过消息队列进行通信,彼此之间不直接耦合。发送者只需将消息发送到队列中,而不需要知道消息是如何被处理的。接收者只需从队列中获取消息进行处理,而不需要知道消息的来源。

3. 可靠性传输:消息队列通常提供持久化机制,确保消息在发送和接收过程中不会丢失。即使接收者暂时不可用,消息也会在队列中等待,直到接收者准备好接收为止。

4. 扩展性:消息队列可以支持多个发送者和接收者,实现系统的扩展性和高并发处理能力。

5. 缓冲和削峰填谷:通过将消息缓存到队列中,可以平衡发送者和接收者之间的处理速度差异,从而避免系统过载。

消息队列在分布式系统、微服务架构、异步任务处理、事件驱动架构等场景中被广泛应用。一些常见的消息队列系统包括RabbitMQ、Apache Kafka、ActiveMQ、Amazon SQS等。它们提供了丰富的功能和配置选项,可以根据应用需求选择合适的消息队列实现。

劣势

系统可用性降低
系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?
系统复杂度提高
MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
一致性问题
A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?

二、MQ的用途

四个用途
应用解耦:提高系统容错性和可维护性
异步提速:提升用户体验和系统吞吐量
削峰填谷:提高系统稳定性
消息分发:提高系统灵活性

1、应用解耦

应用解耦是指通过使用消息队列等中间件来降低应用程序之间的直接依赖性,从而实现独立开发、部署和升级的能力。通过解耦,每个应用程序可以通过消息队列发送和接收消息,而不需要了解其他应用程序的具体实现细节。通过应用解耦,可以实现系统的松耦合架构,提高系统的可维护性、扩展性和容错性。
没有使用MQ:
在这里插入图片描述

  • 系统的耦合性越高,容错性就越低,可维护性就越低。
  • 在这里插入图片描述使用 MQ 使得应用间解耦,提升容错性和可维护性。

2、异步加速

异步提速是指通过将耗时的操作转化为异步执行,从而提高系统的响应速度和吞吐量。通过异步处理,应用程序可以在等待某个操作完成的同时继续执行其他任务,而不需要阻塞等待结果返回。
例如,当一个应用程序需要进行网络请求并等待响应时,如果采用同步方式,应用程序会被阻塞,直到响应返回才能继续执行其他任务。而通过异步方式,应用程序可以继续执行其他任务,不需要等待网络请求的结果返回。这样可以提高系统的响应速度,使用户获得更好的体验。
没有使用MQ:
在这里插入图片描述
一个下单操作耗时:20 + 300 + 300 + 300 = 920ms
用户点击完下单按钮后,需要等待920ms才能得到下单响应,太慢!
使用MQ:
在这里插入图片描述

用户点击完下单按钮后,只需等待25ms就能得到下单响应 (20 + 5 = 25ms)。
提升用户体验和系统吞吐量(单位时间内处理请求的数目)。不需要的等待完成

3、削峰填谷

削峰填谷是一种通过平衡系统负载,减轻峰值压力和填充低谷时的资源利用率的技术。它的目标是在系统负载波动较大的情况下,合理利用资源,确保系统的稳定性和高效性。
没有使用MQ:
在这里插入图片描述

使用MQ:
在这里插入图片描述在这里插入图片描述
使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做填谷。简单来说就是慢慢分发
使用MQ后,可以提高系统稳定性。

4、消息分发

消息分发是一种将消息从发送者传递到接收者的机制,它在异步系统和事件驱动架构中起着重要的作用。消息分发可以实现解耦和灵活性,允许不同组件或模块之间通过消息进行通信,从而实现系统的松耦合和可扩展性。
下面是消息分发的一些关键概念和示例:

发布者(Publisher):发布者是消息分发系统中的发送者,它负责生成并发布消息。发布者将消息发送到消息分发系统,而不需要知道消息的具体接收者。

订阅者(Subscriber):订阅者是消息分发系统中的接收者,它通过订阅特定的消息或消息类型来表明自己对消息的兴趣。当有匹配的消息到达时,消息分发系统会将消息传递给订阅者。

主题(Topic):主题是消息分发系统中用于分类和组织消息的标识符或名称。发布者可以将消息发布到特定的主题,而订阅者可以选择订阅感兴趣的主题。通过主题,可以实现消息的细粒度过滤和选择性订阅。

三、RabbitMQ是什么

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求可能比较低了。

1、AMQP 协议

AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP规范发布。类比HTTP。
AMQP三层协议:
Module Layer:协议最高层,主要定义了一些客户端调用的命令,客户端可以用这些命令实现自己的业务逻辑。
Session Layer:中间层,主要负责客户端命会发送给服务器,再将服务端应答返回客户端,提供可靠性同步机制和错误处理。
TransportLayer:最底层,主要传输二进制数据流,提供帧的处理、信道服用、错误检测和数据表示等。

AMQP组件:
交换器(Exchange):消息代理服务器中用于把消息路由到队列的组件。
队列(queue):用来存储消息的数据结构,位于硬盘或内存中。
绑定(Binding):一套规则,告知交换器消息应该将消息投递给哪个队列。

2、RabbitMQ 包含的要素

生产者:消息队列创建者,发送消息到MQ
消费者:连接到RabbitMQ,订阅到队列上,消费消息,持续订阅和单条订阅
消息:包含有效载荷和标签,有效载荷指要传输的数据,标签描述了有效载荷,并且RabbitMQ用它来决定谁获得消息,消费者只能拿到有效载荷,并不知道生产者是谁

3、RabbitMQ 基础架构

在这里插入图片描述
Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost创建 exchange/queue 等
Connection:publisher/consumer 和 broker 之间的 TCP 连接
Channel:Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。是生产者、消费者与RabbitMQ通信的渠道,生产者publish或是消费者subscribe 一个队列都是通过信道来通信的。
信道是建立在TCP连接上的虚拟连接,就是说RabbitMQ在一条TCP上建立成百上千个信道来达到多个线程处理,这个TCP被多个线程共享,每个线程对应一个信道,信道在RabbitMQ都有一个唯一的ID,保证了信道私有性,对应上唯一的线程使用。
Exchange交换机:message 到达 broker 的第一站**,根据分发规则,匹配查询表中的 routing key,分发消息到queue中去。生产者将消息发送到交换器,有交换器将消息路由到一个或者多个队中。当路由不到时,或返回给生产者或直接丟弃。
Queue:消息最终被送到这里等待 consumer 取走
Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding信息被保存到 exchange 中的查询表中,用于 message 的分发依据

四、实战

RabbitMQ 提供了 6 种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。

1、Simple模式(即最简单的收发模式)

消息的消费者监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,这里可以设置成手动的ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢出)。
消费者:

package mainimport ("log""github.com/streadway/amqp"
)func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {log.Fatalf("无法连接到RabbitMQ服务器:%s", err)}defer conn.Close()// 创建一个通道ch, err := conn.Channel()if err != nil {log.Fatalf("无法创建通道:%s", err)}defer ch.Close()// 声明一个队列queue, err := ch.QueueDeclare("hello", // 队列名false,   // 持久性false,   // 自动删除false,   // 独占false,   // 等待服务器确认nil,     // 参数)if err != nil {log.Fatalf("无法声明队列:%s", err)}// 消费消息msgs, err := ch.Consume(queue.Name, // 队列名"",         // 消费者标签true,       // 自动确认false,      // 独占false,      // 不等待服务器确认false,      // 参数)if err != nil {log.Fatalf("无法注册消费者:%s", err)}// 处理接收到的消息for msg := range msgs {log.Printf("接收到消息:%s", msg.Body)}
}

上述代码首先建立了与RabbitMQ服务器的连接,然后创建了一个通道和一个名为"heo"的队列。接下来,通过ch.Consume函数注册一个消费者,用于从队列中接收消息。在fo循环中,我们处理接收到的消息,这里只是简单地打印出来。
生产者:

package mainimport ("log""github.com/streadway/amqp"
)func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {log.Fatalf("无法连接到RabbitMQ服务器:%s", err)}defer conn.Close()// 创建一个通道ch, err := conn.Channel()if err != nil {log.Fatalf("无法创建通道:%s", err)}defer ch.Close()// 声明一个队列queue, err := ch.QueueDeclare("hello", // 队列名false,   // 持久性false,   // 自动删除false,   // 独占false,   // 等待服务器确认nil,     // 参数)if err != nil {log.Fatalf("无法声明队列:%s", err)}// 发送消息body := "Hello, RabbitMQ!"err = ch.Publish("",         // 交换机queue.Name, // 队列名false,      // 必须发送到队列false,      // 不等待服务器确认amqp.Publishing{ContentType: "text/plain",Body:        []byte(body),},)if err != nil {log.Fatalf("无法发送消息:%s", err)}log.Printf("消息已发送:%s", body)
}

上述代码与消费者程序类似,首先建立了与RabbitMQ服务器的连接,然后创建了一个通道和一个名为"hello"的队列。接下来,通过ch.Publishi函数向队列发送一条消息。

2、Work Queues 模型

消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关[syncronize]保证一条消息只能被一个消费者使用)。
让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
消费者:

package mainimport ("fmt""log""math/rand""time""github.com/streadway/amqp"
)func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {log.Fatalf("无法连接到RabbitMQ服务器:%s", err)}defer conn.Close()// 创建一个通道ch, err := conn.Channel()if err != nil {log.Fatalf("无法创建通道:%s", err)}defer ch.Close()// 启动多个消费者并行处理任务for i := 1; i <= 3; i++ {go startConsumer(i, ch)}// 阻塞主进程select {}
}func generateTask(id int) string {time.Sleep(time.Duration(rand.Intn(3)) * time.Second)return fmt.Sprintf("Task %d", id)
}func startConsumer(id int, ch *amqp.Channel) {// 声明一个队列queue, err := ch.QueueDeclare("tasks_queue", // 队列名true,          // 持久性false,         // 自动删除false,         // 独占false,         // 等待服务器确认nil,           // 参数)if err != nil {log.Fatalf("无法声明队列:%s", err)}// 消费任务msgs, err := ch.Consume(queue.Name, // 队列名"",         // 消费者标签false,      // 手动确认false,      // 不等待服务器确认false,      // 不使用内置的参数false,      // 参数nil,           // 参数)if err != nil {log.Fatalf("无法注册消费者:%s", err)}for msg := range msgs {task := string(msg.Body)log.Printf("消费者 %d 接收到任务:%s", id, task)log.Printf("消费者 %d 完成任务:%s", id, task)// 手动确认任务已处理msg.Ack(false)}
}

利用协城启动多个消费者进行消费。
结果如下:
在这里插入图片描述

3、Publish/Subscribe 模型

每个消费者监听自己的队列。
生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
在RabbitMQ的Publish/Subscribe模型中,生产者将消息发送到交换机,交换机负责将消息广播给所有绑定到它上面的队列。消费者创建队列并将其绑定到交换机上,从而接收交换机发送的消息。这样,一个消息可以被多个消费者接收。
在这里插入图片描述

在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:

P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
C:消费者,消息的接收者,会一直等待消息到来
Queue:消息队列,接收消息、缓存消息
Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

package mainimport ("log""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()// 创建一个通道ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare("logs",   // 交换机名称"fanout", // 交换机类型true,     // 是否持久化false,    // 是否自动删除false,    // 是否内部使用false,    // 是否等待服务器响应nil,      // 其他属性)failOnError(err, "Failed to declare an exchange")// 发布消息到交换机body := "Hello, RabbitMQ!"err = ch.Publish("logs", // 交换机名称"",     // 路由键,留空表示广播给所有队列false,  // 是否等待服务器响应false,  // 其他属性amqp.Publishing{ContentType: "text/plain",Body:        []byte(body),},)failOnError(err, "Failed to publish a message")log.Printf("Message sent: %s", body)
}

连接到RabbitMQ服务器,声明了一个名为"logs"的交换机,并通过调用ch.Publish方法将消息发布到交换机上。
在示例代码中,通过指定交换机名称为"logs",路由键为空字符串,消息将被广播给所有绑定到该交换机的队列。

package mainimport ("fmt""log""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()// 创建一个通道ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare("logs",   // 交换机名称"fanout", // 交换机类型true,     // 是否持久化false,    // 是否自动删除false,    // 是否内部使用false,    // 是否等待服务器响应nil,      // 其他属性)failOnError(err, "Failed to declare an exchange")// 声明一个临时队列q, err := ch.QueueDeclare("",    // 队列名称,留空表示由RabbitMQ自动生成false, // 是否持久化false, // 是否自动删除(当没有任何消费者连接时)true,  // 是否排他队列(仅限于当前连接)false, // 是否等待服务器响应nil,   // 其他属性)failOnError(err, "Failed to declare a queue")// 将队列绑定到交换机上err = ch.QueueBind(q.Name, // 队列名称"",     // 路由键,留空表示接收交换机的所有消息"logs", // 交换机名称false,  // 是否等待服务器响应nil,    // 其他属性)failOnError(err, "Failed to bind a queue")// 订阅消息msgs, err := ch.Consume(q.Name, // 队列名称"",     // 消费者标识符,留空表示由RabbitMQ自动生成true,   // 是否自动应答false,  // 是否独占模式(仅限于当前连接)false,  // 是否等待服务器响应false,  // 其他属性nil,    // 其他属性)failOnError(err, "Failed to register a consumer")// 接收消息的goroutinego func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)}}()log.Printf("Waiting for messages. To exit press CTRL+C")<-make(chan struct{}) // 阻塞主goroutine
}

它连接到RabbitMQ服务器,声明一个fanout类型的交换机(Exchange),创建一个临时队列,将队列绑定到交换机上,并订阅消息。

在示例代码中,创建的交换机名为"logs",交换机类型为"fanout",表示消息将被广播给所有绑定到该交换机的队列。

消费者创建了一个临时队列,并将其绑定到交换机上,这样交换机就会将消息发送到该队列中。

4、Routing 模型

在fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在这里插入图片描述

在Direct模型下:

1、队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
2、消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
3、Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

消息生产者将消息发送给交换机按照路由判断,路由是字符串(info)当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息。
生产者

package mainimport ("log""os""strings""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()// 创建一个通道ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare("logs_direct", // 交换机名称"direct",      // 交换机类型true,          // 是否持久化false,         // 是否自动删除false,         // 是否内部使用false,         // 是否等待服务器响应nil,           // 其他属性)failOnError(err, "Failed to declare an exchange")// 从命令行参数获取要发送的路由键和消息内容if len(os.Args) < 3 {log.Fatalf("Usage: %s [info] [message]", os.Args[0])}severity := os.Args[1]message := strings.Join(os.Args[2:], " ")// 发布消息到交换机,并指定路由键err = ch.Publish("logs_direct", // 交换机名称severity,      // 路由键false,         // 是否等待服务器响应false,         // 是否立即将消息写入磁盘amqp.Publishing{ContentType: "text/plain",Body:        []byte(message),},)failOnError(err, "Failed to publish a message")log.Printf("Sent message: %s", message)
}

它连接到RabbitMQ服务器,声明一个direct类型的交换机(Exchange),并通过指定路由键将消息发布到交换机。

在示例代码中,创建的交换机名为"logs_direct",交换机类型为"direct",表示消息将根据指定的路由键进行选择性地发送给队列。

生产者从命令行参数获取要发送的路由键和消息内容。路由键可以是任意字符串,用于标识消息的类型或者级别。消息内容可以是任意文本。
消费者

package mainimport ("fmt""log""os""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()// 创建一个通道ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare("logs_direct", // 交换机名称"direct",      // 交换机类型true,          // 是否持久化false,         // 是否自动删除false,         // 是否内部使用false,         // 是否等待服务器响应nil,           // 其他属性)failOnError(err, "Failed to declare an exchange")// 声明一个临时队列q, err := ch.QueueDeclare("",    // 队列名称,留空表示由RabbitMQ自动生成false, // 是否持久化false, // 是否自动删除(当没有任何消费者连接时)true,  // 是否排他队列(仅限于当前连接)false, // 是否等待服务器响应nil,   // 其他属性)failOnError(err, "Failed to declare a queue")// 从命令行参数获取要绑定的路由键if len(os.Args) < 2 {log.Fatalf("Usage: %s [info] [warning] [error]", os.Args[0])}severities := os.Args[1:]// 将队列绑定到交换机上,并指定要接收的路由键for _, severity := range severities {err = ch.QueueBind(q.Name,        // 队列名称severity,      // 路由键"logs_direct", // 交换机名称false,         // 是否等待服务器响应nil,           // 其他属性)failOnError(err, "Failed to bind a queue")}// 订阅消息msgs, err := ch.Consume(q.Name, // 队列名称"",     // 消费者标识符,留空表示由RabbitMQ自动生成true,   // 是否自动应答false,  // 是否独占模式(仅限于当前连接)false,  // 是否等待服务器响应false,  // 其他属性nil,    // 其他属性)failOnError(err, "Failed to register a consumer")// 接收消息的goroutinego func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)}}()log.Printf("Waiting for messages. To exit press CTRL+C")<-make(chan struct{}) // 阻塞主goroutine
}

上述代码实现了一个Routing模型的消费者。它连接到RabbitMQ服务器,声明一个direct类型的交换机(Exchange),创建一个临时队列,并将队列绑定到交换机上,同时指定要接收的路由键。

在RabbitMQ的Routing模型中,生产者将消息发送到交换机,并在发送消息时指定一个路由键(routing key)。交换机根据路由键将消息发送给与之绑定的队列。消费者创建队列并将其绑定到交换机上,并通过指定要接收的路由键来选择性地接收消息。

在示例代码中,创建的交换机名为"logs_direct",交换机类型为"direct",表示消息将根据指定的路由键进行选择性地发送给队列。

消费者创建了一个临时队列,并通过循环将该队列绑定到交换机上,并指定要接收的路由键。路由键可以是任意字符串,用于标识消息的类型或者级别。在示例中,我们通过命令行参数传入要绑定的路由键。

最后,消费者通过调用ch.Consume方法订阅消息。该方法返回一个消息通道msgs,消费者可以从该通道接收到消息。在示例中,我们使用一个goroutine来异步接收消息,并在收到消息时打印出来。

5、Topics 模型

在这里插入图片描述
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

统配符
* 匹配不多不少恰好1个词
# 匹配一个或多个词

如:
fan.# 匹配 fan.one.two 或者 fan.one 等
fan.* 只能匹配 fan.one
生产者

func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()// 创建一个通道ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare("logs_topic", // 交换机名称"topic",      // 交换机类型true,         // 是否持久化false,        // 是否自动删除false,        // 是否内部使用false,        // 是否等待服务器响应nil,          // 其他属性)failOnError(err, "Failed to declare an exchange")// 定义要发送的消息的路由键和内容routingKey := "example.key.das"message := "Hello, RabbitMQ!"// 发布消息到交换机,并指定路由键err = ch.Publish("logs_topic", // 交换机名称routingKey,   // 路由键false,        // 是否等待服务器响应false,        // 是否立即发送amqp.Publishing{ContentType: "text/plain",Body:        []byte(message),},)failOnError(err, "Failed to publish a message")log.Printf("Sent message: %s", message)
}

消费者


func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()// 创建一个通道ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare("logs_topic", // 交换机名称"topic",      // 交换机类型true,         // 是否持久化false,        // 是否自动删除false,        // 是否内部使用false,        // 是否等待服务器响应nil,          // 其他属性)failOnError(err, "Failed to declare an exchange")// 声明一个临时队列q, err := ch.QueueDeclare("",    // 队列名称,留空表示由RabbitMQ自动生成false, // 是否持久化false, // 是否自动删除(当没有任何消费者连接时)true,  // 是否排他队列(仅限于当前连接)false, // 是否等待服务器响应nil,   // 其他属性)failOnError(err, "Failed to declare a queue")// 将队列绑定到交换机上,并指定要接收的路由键err = ch.QueueBind(q.Name,       // 队列名称"example.#",  // 路由键,可以使用通配符*匹配多个单词"logs_topic", // 交换机名称false,        // 是否等待服务器响应nil,          // 其他属性)failOnError(err, "Failed to bind a queue")// 创建一个消费者通道msgs, err := ch.Consume(q.Name, // 队列名称"",     // 消费者标识符,留空表示由RabbitMQ自动生成true,   // 是否自动应答false,  // 是否排他消费者false,  // 是否阻塞false,  // 是否等待服务器响应nil,    // 其他属性)failOnError(err, "Failed to register a consumer")// 接收和处理消息forever := make(chan bool)go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)}}()log.Printf("Waiting for messages...")<-forever
}

相关文章:

RabbitMQ从原理到实战—基于Golang【万字详解】

文章目录 前言一、MQ是什么&#xff1f;优势劣势 二、MQ的用途1、应用解耦2、异步加速3、削峰填谷4、消息分发 三、RabbitMQ是什么1、AMQP 协议2、RabbitMQ 包含的要素3、RabbitMQ 基础架构 四、实战1、Simple模式(即最简单的收发模式)2、Work Queues 模型3、Publish/Subscribe…...

机器学习——KNN算法

1、&#xff1a;前提知识 KNN算法是机器学习算法中用于分类或者回归的算法&#xff0c;KNN全称为K nearest neighbour&#xff08;又称为K-近邻算法&#xff09; 原理&#xff1a;K-近邻算法采用测量不同特征值之间的距离的方法进行分类。 优点&#xff1a;精度高 缺点&…...

Kali 软件管理测试案例

案例1 &#xff1a;显示目录树 tree ┌──(root㉿kali)-[~] └─# tree --help usage: tree [-acdfghilnpqrstuvxACDFJQNSUX] [-L level [-R]] [-H baseHREF][-T title] [-o filename] [-P pattern] [-I pattern] [--gitignore][--gitfile[]file] [--matchdirs] [--metafirs…...

【分布式】Zookeeper

Java开发者视角下的Zookeeper—— 在什么场景下使用&#xff0c;怎么用 可以参考&#xff1a;https://zhuanlan.zhihu.com/p/62526102 Zookeeper是什么&#xff1f; ZooKeeper 是一个分布式的&#xff0c;开放源码的分布式应用程序协同服务。ZooKeeper 的设计目标是将那些复…...

ScheduleJS Crack,新的“信息列”水平滚动功能

ScheduleJS Crack,新的“信息列”水平滚动功能 增加了对Angular 16的支持 新的“信息列”水平滚动功能。 新的“信息列”固定功能。 添加了输入属性以处理组件模板中的偶数和奇数ScheduleRowPlainBackgroundColor以及CSS变量。 改进了“信息列”和角度甘特组件的类型。 Schedul…...

curl封装

一。由于工作的原因&#xff0c;需要对curl做一些封装&#xff0c;附加上我们的证书&#xff0c;提供给第三个C和jAVA使用。 二。头文件封闭四个函数&#xff0c;get&#xff0c;post&#xff0c;download&#xff0c;upload #ifndef CURLHTTP_H #define CURLHTTP_H#include …...

C语言数据类型和变量

C语言数据类型和变量 数据类型分类内置类型【C语言本身就具有的类型】自定义类型【自己来创建类型】取值范围 变量变量的创建变量创建的语法形式变量的分类全局变量局部变量 栈区、堆区、静态区 算术操作符赋值操作符连续赋值复合赋值符 单目操作符&#xff1a;、--、、-强制类…...

分布式训练 最小化部署docker swarm + docker-compose落地方案

目录 背景&#xff1a; 前提条件&#xff1a; 一、docker环境初始化配置 1. 安装nvidia-docker2 2. 安装docker-compose工具 3. 获取GPU UUID 4. 修改docker runtime为nvidia&#xff0c;指定机器的UUID 二、docker-swarm 环境安装 1. 初始化swarm管理节点 2. 加入工…...

QT学习笔记-开发环境编译Qt MySql数据库驱动与交叉编译Qt MySql数据库驱动

QT学习笔记-开发环境编译Qt MySql数据库驱动与交叉编译Qt MySql数据库驱动 0、背景1、基本环境2、开发环境编译Qt MySql数据库驱动2.1 依赖说明2.2 MySQL驱动编译过程 3、交叉编译Qt MySql数据库驱动3.1 依赖说明3.3.1 如何在交叉编译服务器上找到mysql.h及相关头文件3.3.2 如果…...

QT使用QXlsx实现数据验证与Excel公式操作 QT基础入门【Excel的操作】

准备环境:QT中使用QtXlsx库的三种方法 1、公式操作写单行公式 //右值初始化Format rAlign;rAlign.setHorizontalAlignment(Format::AlignRight);//左值初始化Format lAlign;lAlign.setHorizontalAlignment(Format::AlignLeft);xlsx.write("B3", 40, lAlign);xlsx.wr…...

renrenfast Vue2 打包发布

1、修改 static/config/index-prod.js 文件 // api接口请求地址 window.SITE_CONFIG[baseUrl] http://192.168.1.86:8080/renren-fast; /*** 生产环境*/ ;(function () {window.SITE_CONFIG {};// api接口请求地址window.SITE_CONFIG[baseUrl] http://192.16…...

NoSQL数据库介绍+Redis部署

目录 一、NoSQL概述 1、数据的高并发读写 2、海量数据的高效率存储和访问 3、数据库的高扩展和高可用 二、NoSQL的类别 1、键值存储数据库 2、列存储数据库 3、文档型数据库 4、图形化数据库 三、分布式数据库中的CAP原理 1、传统的ACID 1&#xff09;、A--原子性 …...

【mindspore学习】环境配置

本次实验搭配的环境是 CUDA 11.6 CUDNN v8.9.4 TensorRT-8.4.1.5 mindspore 2.1.0。 1、配置 Nvidia 显卡驱动 如果原来的主机已经安装了 nvidia 驱动&#xff0c;为避免版本的冲突&#xff0c;建议先清除掉旧的 nvidia驱动 sudo apt-get --purge remove nvidia* sudo apt…...

基于shell脚本对aliyun npm仓库(https://packages.aliyun.com)登录认证

文章目录 基于shell脚本对阿里云npm仓库&#xff08;https://packages.aliyun.com&#xff09;登录认证食用人群食用方式 基于shell脚本对阿里云npm仓库&#xff08;https://packages.aliyun.com&#xff09;登录认证 食用人群 由于一些安全的原因&#xff0c;某些企业可能会…...

K8s Pod 安全认知:从openshift SCC 到 PSP 弃用以及现在的 PSA

写在前面 简单整理,博文内容涉及: PSP 的由来PSA 的发展PSA 使用认知不涉及使用,用于了解 Pod 安全 API 资源理解不足小伙伴帮忙指正对每个人而言,真正的职责只有一个:找到自我。然后在心中坚守其一生,全心全意,永不停息。所有其它的路都是不完整的,是人的逃避方式,是…...

提高企业会计效率,选择Manager for Mac(企业会计软件)

作为一家企业&#xff0c;良好的财务管理是保持业务运转的关键。而选择一款适合自己企业的会计软件&#xff0c;能够帮助提高会计效率、减少错误和节约时间。在众多的选择中&#xff0c;Manager for Mac(企业会计软件)是一款值得考虑的优秀软件。 首先&#xff0c;Manager for…...

软考:中级软件设计师:信息系统的安全属性,对称加密和非对称加密,信息摘要,数字签名技术,数字信封与PGP

软考&#xff1a;中级软件设计师:信息系统的安全属性 提示&#xff1a;系列被面试官问的问题&#xff0c;我自己当时不会&#xff0c;所以下来自己复盘一下&#xff0c;认真学习和总结&#xff0c;以应对未来更多的可能性 关于互联网大厂的笔试面试&#xff0c;都是需要细心准…...

Vue3中reactive响应式失效的问题

情景阐述 弹窗内部有一个挑选框&#xff0c;要通过请求接口获取挑选框下面可供选择的数据。 这是一个很简单的情境&#xff0c;我立刻有了自己的思路。如果实现搜索&#xff0c;数据较少可以直接用elementplus自带的filter。如果数据较多&#xff0c;就需要传val&#xff0c;…...

lamp

LAMP 环境 指的是在 Linux 操作系统中分别安装 Apache 网页服务器、MySQL 数据库服务器和 PHP 开发服务器&#xff0c;以及一些对应的扩展软件。AMP也支持win操作系统 &#xff08;sccm 域升级版&#xff09; LAMP架构是目前成熟的企业网站应用模式之一&#xff0c;指的是协同…...

LeetCode 周赛上分之旅 #42 当 LeetCode 考树上倍增,出题的趋势在变化吗

⭐️ 本文已收录到 AndroidFamily&#xff0c;技术和职场问题&#xff0c;请关注公众号 [彭旭锐] 和 BaguTree Pro 知识星球提问。 学习数据结构与算法的关键在于掌握问题背后的算法思维框架&#xff0c;你的思考越抽象&#xff0c;它能覆盖的问题域就越广&#xff0c;理解难度…...

Qt 自定义菜单 托盘菜单

托盘菜单实现&#xff1a;通过QSystemTrayIconQMenuQAction即可完美实现&#xff01; 实现方式&#xff1a;createActions用于创建菜单、菜单项,translateActions用于设置文本、实现多语化&#xff0c;translateAccount用于设置用户空间配额。 void TrayMenu::createActions(…...

channel并发编程

不要通过共享内存通信&#xff0c;要通过通信共享内存。 channel是golang并发编程中一种重要的数据结构&#xff0c;用于多个goroutine之间进行通信。 我们通常可以把channel想象成一个传送带&#xff0c;将goroutine想象成传送带周边的人&#xff0c;一个传送带的上游放上物品…...

苹果新健康专利:利用 iPhone、Apple Watch 来分析佩戴者的呼吸情况

根据美国商标和专利局&#xff08;USPTO&#xff09;公示的清单&#xff0c;苹果获得了一项健康相关的技术专利&#xff0c;可以利用 iPhone、Apple Watch 来分析佩戴者的呼吸系统。 苹果在专利中概述了一种测量用户呼吸功能的系统&#xff0c;通过 iPhone 上的光学感测单元&am…...

数据分析基础-数据可视化02-不同数据类型的可视化概念及原则

将数据空间映射到颜色空间。 数据空间&#xff1a;连续或分类 数据可以被划分为两个主要的数据空间&#xff1a;连续数据和分类数据。这两种数据空间有不同的特点和适用的分析方法。 连续数据&#xff08;Continuous Data&#xff09;&#xff1a; 连续数据是指可以在某个范…...

QT项目使用Qss的总结

什么是QSS QSS称为Qt Style Sheets也就是Qt样式表&#xff0c;它是Qt提供的一种用来自定义控件外观的机制。QSS大量参考了CSS的内容&#xff0c;只不过QSS的功能比CSS要弱很多&#xff0c;体现在选择器要少&#xff0c;可以使用的QSS属性也要少很多&#xff0c;并且并不是所有…...

suricata初体验+wireshark流量分析

目录 一、suricata介绍 1.下载安装 2.如何使用-攻击模拟 二、wireshark流量分析 1.wireshark过滤器使用 2.wireshark其他使用 一、suricata介绍 1.下载安装 通过官网下载suricata&#xff0c;根据官网步骤进行安装。 官网地址&#xff1a; https://documentation.wazuh.…...

机器学习:异常检测实战

文章目录 Anomaly Detection目录任务介绍数据集方法评估Baseline报告报告评价标准 Anomaly Detection 目录 任务介绍 无监督的异常检测 数据集 方法 autoencode 是否能够还原出原始类型图片&#xff0c;基于重构loss来判断是否正常 重构误差当作异常分数 评估 采用ROC和AUC…...

数据结构1

数据结构是计算机科学中存储和组织数据的一种方式&#xff0c;它定义了数据的表示方式和对数据进行操作的方法&#xff0c;常见的数据结构包括数组、栈、链表、队列、树、图等。 目录 一、常见的数据结构 1.数组 2.栈 3.队列 4.链表 5.树 6.图 一、常见的数据结构 1.数…...

自然语言处理学习笔记(七)————字典树效率改进

目录 1. 首字散列其余二分的字典树 2.双数组字典树 3.AC自动机(多模式匹配) &#xff08;1&#xff09;goto表 &#xff08;2&#xff09;output表 &#xff08;3&#xff09;fail表 4.基于双数组字典树的AC自动机 字典树的数据结构在以上的切分算法中已经很快了&#x…...

forEach和map有什么区别,使用场景?

forEach和map有什么区别&#xff0c;使用场景&#xff1f; 区别什么意思&#xff1f;forEach: 不直接改变原始数组&#xff0c;但可以在回调中更改原始数组。 区别 forEach 和 map 都是数组的常用方法&#xff0c;但它们有不同的目的和用法。下面是它们之间的主要区别以及各自…...

做外贸哪个网站比较好/搜索排名广告营销怎么做

雷锋网(公众号&#xff1a;雷锋网)按&#xff1a;数据科学、大数据和物联网正在以令人炫目的速度发展和演进&#xff0c;而商业界正以缓慢的速度将更多来自不同渠道的数据整合起来&#xff0c;并能从中洞察更多信息。本文是 Andrew Dipper 对数据科学行业2017年的展望&#xff…...

保定模板建站软件/网站友情链接检测

一、两种方法实现多线程&#xff1a; 很多时候程序员无法选择何时在代码中出现线程&#xff0c;但是在基本的Web库类、Servlet中&#xff0c;多线程都是很重要的实现并发的方式。 1.继承Thread类 class 类名 extends Thread{ run(){//覆盖run方法 线程处理的程序 } } /*** …...

网站开发怎么自动获取位置/常见的网络营销模式

目录 一、scala运算符本质 二、算术运算符 三、逻辑运算符 四、关系运算符 注&#xff1a;scala 与equals区别 五、赋值运算符 六、位运算符 一、scala运算符本质 在Scala中其实是没有运算符的&#xff0c;所有运算符都是方法。 1&#xff09;当调用对象的方法时&#…...

企业服务网/vue seo优化

课前作业如果你对 promise 、reflux 还不那么熟悉&#xff0c;请先行了解他们是什么&#xff0c;有什么用。ReFlux细说大白话讲解Promise现状分析首先我们来絮叨絮叨后端同学是怎么写代码的。首先絮叨一下经典问题&#xff0c;MVC用户看到viewview ——————————》 cont…...

wordpress 自定义鼠标/链接是什么意思

创建2张表 一张t_shuiguo 水果表 一张t_supermarket 超市表现在我要查一个超市的各区水果价格的汇总如下: 表A那么首先水果表 是可以动态添加的 所有A表中的列 是动态的 先不考虑先看下静态的 如果就是这么4个水果那么SQL可以这么写 (参考了网上一些列子)-- 静态sqlselect ifnu…...

如何在360做网站SEO/网络营销岗位

转自&#xff1a;https://blog.csdn.net/z69183787/article/details/48933481 自从开始使用Maven管理项目&#xff0c;最近在配置MyBatis的Mapper&#xff0c;在Eclipse上调试时都是正常的&#xff0c;但是最近把项目迁移到 IntelliJ IDEA 上后发现不管是直接用Jetty调试&#…...