RabbitMQ简单应用
概念
RabbitMQ 是一种流行的开源消息代理(Message Broker)软件,它实现了高级消息队列协议(AMQP - Advanced Message Queuing Protocol)。RabbitMQ 通过高效的消息传递机制,主要应用于分布式系统中解耦应用组件、异步消息发送、流量削峰等场景,可提高系统扩展性和稳定性。
RabbitMQ 的核心功能
-
消息队列
RabbitMQ 提供消息队列功能,用于存储和转发消息,确保生产者和消费者之间的解耦。 -
可靠性
RabbitMQ 支持消息持久化、确认机制和高可用性集群,保证消息在传递过程中的可靠性。 -
灵活的路由
借助交换机(Exchange)机制,RabbitMQ 支持灵活的消息路由规则,包括广播(fanout)、直连(direct)、主题(topic)等。 -
消息确认机制
消费者需要确认消息已被成功消费,未确认的消息可以重新投递(避免消息丢失)。 -
扩展性和高可用性
RabbitMQ 支持集群部署,可根据负载动态扩展,同时提供镜像队列功能,实现高可用性。 -
插件机制
RabbitMQ 支持丰富的插件,用于监控、身份验证、消息追踪等扩展功能。
RabbitMQ 的核心概念
-
消息(Message): RabbitMQ 处理的最小单位,包含消息头和消息体。 消息头描述消息的属性(如优先级、过期时间等),消息体是实际的数据内容。
-
生产者(Producer): 发送消息到 RabbitMQ 的应用程序。
-
消费者(Consumer): 从 RabbitMQ 中接收并处理消息的应用程序。
-
队列(Queue): 存储消息的容器,遵循先进先出(FIFO)规则。消息只能存储到队列中,消费者从队列中取出消息进行处理。
-
交换机(Exchange): 用于接收生产者发送的消息,并根据绑定规则将消息路由到队列。 常见交换机类型:
direct
:根据精确匹配的路由键转发消息。fanout
:将消息广播到所有绑定的队列。topic
:按模式匹配的路由键转发消息。headers
:根据消息头的属性匹配路由。 -
绑定(Binding): 交换机与队列之间的关系,指定消息如何从交换机路由到队列。
-
路由键(Routing Key): 用于匹配交换机和队列的绑定规则。
-
虚拟主机(Virtual Host,vhost): 类似于一个命名空间,用于隔离队列、交换机等资源。
-
连接和通道(Connection & Channel): 生产者和消费者通过连接与 RabbitMQ 交互,每个连接可包含多个通道,通道是实际读写消息的通信路径。
-
ACK 确认机制: 生产者和消费者可确认消息是否成功投递或处理。确认机制分为:
生产者确认
:确保消息发送到队列。消费者确认
:确保消息成功处理。
RabbitMQ 的工作流程
-
生产者发送消息到交换机
生产者通过指定交换机和路由键发送消息。 -
交换机将消息路由到队列
根据绑定规则,交换机将消息路由到一个或多个队列。 -
消费者从队列接收消息
消费者监听队列,并从队列中取出消息进行处理。 -
消息确认
消费者处理完成后,向 RabbitMQ 发送确认,RabbitMQ 删除该消息。若消费者未确认,RabbitMQ 可重新投递消息。
RabbitMQ 的应用场景
-
解耦微服务
RabbitMQ 在分布式架构中充当消息桥梁,避免服务之间的直接依赖。 -
异步任务处理
将耗时的任务放入队列,消费者后台处理,提升系统响应速度。 -
日志收集
使用 RabbitMQ 作为日志消息的中间件,集中处理和分析日志数据。 -
分布式系统的负载均衡
RabbitMQ 可将消息分发给多个消费者,实现任务的均衡处理。 -
实时消息推送
支持高并发的实时消息推送场景,如在线聊天、通知系统。
RabbitMQ 的优缺点
优点:
- 支持多种协议(AMQP、STOMP、MQTT 等)。
- 功能强大,支持复杂的消息路由。
- 高可靠性,支持持久化和集群模式。
- 插件机制丰富,便于扩展。
- 广泛支持多种编程语言(Java、Python、Go 等)。
缺点:
- 性能较 ActiveMQ、Kafka 稍低,不适合大数据流场景。
- 配置复杂,需要一定的学习成本。
- 占用资源较高,尤其是大量队列和消息积压时。
RabbitMQ 是一个功能强大、易用的消息中间件,适合需要可靠消息传递、灵活路由和高可用性的场景。通过其简单直观的架构,开发者可以轻松实现消息解耦、异步处理和分布式通信功能,从而大大提高系统的可扩展性和可靠性。
接下来通过java代码展示其简单应用。关于rabbitmq服务的安装,请参考linux安装rabbitmq
创建连接
创建一个maven项目,在
pom
添加如下依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.21.0</version>
</dependency><!-- 导入slf4j相关,为解决控制台出现如下信息:SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
-->
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.32</version> <!-- 或使用其他版本 -->
</dependency>
<dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.6</version> <!-- 或使用其他版本 -->
</dependency>
<dependency><groupId>ch.qos.logback</groupId><artifactId>logback-core</artifactId><version>1.2.6</version>
</dependency>
使用rabbitmq的连接工厂,来创建对
rabbitmq-server
的连接:
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** 连接工具,建立与RabbitMQ服务的连接**/
public class ConnectionUtil {public static Connection getConnection() throws Exception {//定义连接工厂ConnectionFactory factory = new ConnectionFactory();//设置服务地址,也就是安装rabbitmq的服务器ipfactory.setHost("192.168.137.200");//端口factory.setPort(5672);//设置虚拟机名称,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq,这里使用默认虚拟机factory.setVirtualHost("/");//设置用户名factory.setUsername("admin");//设置密码factory.setPassword("admin");// 通过工厂获取连接Connection connection = factory.newConnection();return connection;}}
虚拟机信息在管理控制台页面中如下
在Admin
页签的Virtual Host
一栏中,Name
即为虚拟机名字(‘’/" 为rabbitmq的默认虚拟机),Users
为该虚拟机中的用户:
想要添加新的虚拟机,可以通过上图中的Add virtual host
按钮进行添加
用户信息在管理控制台页面中如下
在Admin
页签的Users
一栏中,想要添加新的用户,可以通过下图中的Add user
按钮进行添加,用户可分配的权限为Admin、Monitoring、Policymaker、Management、Impersonator、None:
Admin (管理员权限)
赋予用户完全的管理权限,可以执行几乎所有操作,适用于需要对 RabbitMQ 系统进行全面管理和配置的用户。
包括的权限:
- 创建、删除、管理队列、交换机、绑定和虚拟主机。
- 配置和管理用户权限和角色。
- 配置 RabbitMQ 集群、插件和策略。
- 查看和修改 RabbitMQ 的所有设置。
Monitoring (监控权限)
允许用户查看 RabbitMQ 的监控信息,但不能进行任何修改操作。适用于需要查看 RabbitMQ 系统运行状况、但不需要做出修改的用户(如运维人员、监控人员)
包括的权限:
- 查看队列、交换机、连接、通道的状态信息。
- 查看消息流、消息队列的深度和消费者等监控数据。
- 查看系统的资源使用情况(如内存、磁盘、CPU 使用等)。
Policymaker (策略管理权限)
允许用户管理 RabbitMQ 中的策略,适用于负责 RabbitMQ 策略配置(如队列策略、镜像策略等)的用户。
包括的权限:
- 创建、删除和修改虚拟主机的策略。
- 配置消息队列的生命周期、镜像策略、磁盘空间限制等。
- 不允许进行其他管理操作,如修改队列、交换机、绑定等。
Management (管理界面权限)
允许用户访问和使用 RabbitMQ 的管理控制台,查看系统状态和配置信息,但不包括修改操作。适用于需要监控和查看 RabbitMQ 系统状态,但不需要对系统做修改的用户。
包括的权限:
- 访问 RabbitMQ 的管理界面。
- 查看管理控制台的所有信息(如队列、交换机、连接、用户等)。
- 不允许执行创建、删除、修改等操作。
Impersonator (伪装权限)
允许用户以其他用户的身份执行操作,但不具有实际的权限修改能力。适用于需要代替其他用户执行操作或进行调试的用户。
包括的权限:
- 可以 "伪装" 成为其他用户,从而以该用户的权限来执行操作。通常,用于临时授予某些操作权限。
- 这种权限通常用于管理审计或系统调试。
None (无权限)
此权限不授予任何权限,适用于不需要访问 RabbitMQ 系统的用户,或者是仅用作某些临时操作的用户。
包括的权限:
- 不允许用户访问管理页面,执行任何操作。
- 该用户几乎不具有任何权限,不能进行查看或修改操作。
简单模式
RabbitMQ 的简单模式(Simple模式) 是消息队列的一种基本模式,该模式对应一个生产者与一个消费者。在简单模式下,消息生产者将消息发送到队列中,然后由消费者从队列中取出消息进行处理。
简单模式的基本概念
- 生产者(Producer):负责发送消息的应用程序或服务。生产者将消息发送到指定的队列中。
- 队列(Queue):消息的存储区域。队列在 RabbitMQ 服务器上,生产者将消息发送到队列,消费者从队列中获取消息。
- 消费者(Consumer):负责接收和处理消息的应用程序或服务。消费者从队列中获取消息进行处理。
- RabbitMQ 服务器:负责管理队列并协调消息的发送和接收。
简单模式的工作流程如下:
- 生产者连接到 RabbitMQ 服务器。
- 创建通道:生产者通过连接创建一个通道,用于声明队列及发布消息。
- 声明队列:生产者在发送消息之前,会利用通道声明一个队列。声明队列的作用是确保队列存在,如果队列不存在会创建队列;如果队列已存在,则跳过创建。
- 生产者发送消息到队列:生产者使用
basicPublish
方法将消息发送到指定的队列中。 - 消费者连接到 RabbitMQ 服务器。
- 消费者从队列中获取消息:消费者从指定队列中接收消息,进行消费处理。
- 确认消息(ACK):在消费完成后,消费者会发送确认信号(ACK),告知 RabbitMQ 消息已经处理完毕。这样 RabbitMQ 可以将消息从队列中删除。ACK可设置自动回复或手动回复。
生产者代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;public class SimpleProducer {//队列名称private final static String QUEUE_NAME = "simple_queue";public static void main(String[] args){//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channeltry (//使用上面创建的连接工具,来获取到连接Connection connection = ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel()) {/*** 声明队列,参数明细如下:* 1、queue 队列名称* 2、durable 是否持久化,如果持久化,mq重启后队列还在* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 消息内容String message = "Hello World";/*** 使用通道将消息发布到队列中,参数说明如下:* 1.第一个参数是exchange,即交换机名称,简单模式不需要设置交换机,此处为空即可(为空时,使用的是rabbitmq的默认交换机AMQP default)* 2.第二个参数是routingKey,即路由键,简单模式下不使用交换机时,此处参数与队列名相同,会自动匹配目标队列* 3.第三个参数为BasicProperties,即消息属性,可以为要发送的消息设置一些条件,比如消息类型、过期时间等等* 4.第四个参数是消息本身,即消息的内容,这里是一个字节数组,表示消息的正文* */channel.basicPublish("", QUEUE_NAME, null, message.getBytes());} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");}}}
消费者代码
消费者连接rabbitmq后,依然需要声明队列,因为需要确保队列的存在
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;public class SimpleConsumer {//要与生产者的队列名保持一致private final static String QUEUE_NAME = "simple_queue";public static void main(String[] args){//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channeltry (//使用之前创建的连接工具,来获取到连接Connection connection = ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel()) {// 声明队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);//实现消费方法DefaultConsumer consumer = new DefaultConsumer(channel){/*** 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg = new String(body,"utf-8");System.out.println("接收消息: " + msg + "!");}};/*** 通过通道监听队列消息并实现消费,参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中*/channel.basicConsume(QUEUE_NAME, true, consumer);}catch (Exception ex) {ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
测试
先运行生产者主方法,运行后查看管理页面的
Queues and Streams
页签:
红框Messages
部分即代表队列中的消息, Ready
、Unacked
和 Total
的含义如下:
-
Ready:表示队列中已经准备好可以被消费者消费的消息数量。即这些消息还没有被任何消费者接收。
-
Unacked(未确认的消息):表示已经被消费者接收但还没有被确认(acknowledged)的消息数量。这意味着这些消息被消费者消费后,还未发送确认,因此 RabbitMQ 会等待消费者确认消息处理完成。如果消费者断开连接或未确认,RabbitMQ 会将这些消息重新放回队列中,以便被其他消费者重新消费。
-
Total:表示队列中的消息总数,是
Ready
和Unacked
两者的总和。
图中:
Ready
为 1,表示有 1 条消息在队列中等待被消费。Unacked
为 0,表示没有消息被消费者接收且未确认。Total
为 1,表示队列中的消息总数为 1。
再运行消费者主方法,运行后查看管理页面的
Queues and Streams
页签:
红框Messages
部分都为0,代表消息都已被消费
消费者代码控制台输出:
接收消息: Hello World!
工作模式
在 RabbitMQ 的工作队列模式(Work Queue / Task Queue)中,一个生产者会对应多个消费者。
消息分发给多个消费者的方式主要有两种:轮询分配 和 公平分配。
轮询分配(Round-robin Dispatching)
原理:
- 默认情况下,RabbitMQ 将消息以 轮询的方式 均匀地分发给所有消费者,消息的分配模式是一个消费者分配一条,直至消息消费完成。
- 每个消费者都会轮流收到消息,而不会考虑消费者当前的工作负载。
特点:
- 无视消费者处理能力:
RabbitMQ 不会关心某个消费者是否已经忙碌或是否处理得更快,而是严格地轮流发送消息。 - 简单高效:
实现方式简单,但在消费者性能不均的情况下,可能导致某些消费者负载过高或过低。
公平分配(Fair Dispatching)
原理:
- 公平分配遵循能者多劳的原则,核心是基于 消费者的繁忙程度 分发消息。
- RabbitMQ 通过 消息确认(ACK)机制 来检测消费者是否空闲。
- 如果消费者在当前未完成上一个任务,则不会分配新的任务给该消费者。
特点:
-
消费者负载感知:
RabbitMQ 根据消费者的负载情况分发消息,而不是简单地轮流发送,这种方式确保消息只发送到空闲的消费者,避免让忙碌的消费者承担额外的负担。 -
消息确认机制(ACK):
消费者需要显式地向 RabbitMQ 确认(ACK)已成功处理一条消息。
未确认的消息(比如因消费者挂掉或处理时间过长)会重新投递到其他消费者,确保消息不会丢失。
-
基于 QoS 的限流控制:
使用
basicQos
参数(如basicQos(1)
)限制 RabbitMQ 在未收到消费者确认时不发送新的消息。
轮询发送
创建一个生产者,两个消费者来演示轮询发送效果:
- 每个消费者都添加
channel.basicQos(1)
,来保证每次只接收一条消息,用于演示轮询效果- 消费者一添加线程阻塞1秒,消费者二不添加,观察二者得到的消息数量是否一致
生产者代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;public class WorkProducer {//队列名称private final static String QUEUE_NAME = "work_queue";public static void main(String[] args){//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channeltry (//使用上面创建的连接工具,来获取到连接Connection connection = ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel()) {/*** 声明队列,参数明细如下:* 1、queue 队列名称* 2、durable 是否持久化,如果持久化,mq重启后队列还在* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);//循环发送10个消息for (int i = 0; i < 10; i++) {// 消息内容String message = "工作模式消息-Hello World-" + i;/*** 使用通道将消息发布到队列中,参数说明如下:* 1.第一个参数是exchange,即交换机名称,工作模式不需要设置交换机,此处为空即可(为空时,使用的是rabbitmq的默认交换机AMQP default)* 2.第二个参数是routingKey,即路由键,工作模式下不使用交换机时,此处参数与队列名相同,会自动匹配目标队列* 3.第三个参数为BasicProperties,即消息属性,可以为要发送的消息设置一些条件,比如消息类型、过期时间等等* 4.第四个参数是消息本身,即消息的内容,这里是一个字节数组,表示消息的正文* */channel.basicPublish("", QUEUE_NAME, null, message.getBytes());}} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");}}}
消费者一
import com.rabbitmq.client.*;
import util.ConnectionUtil;import java.io.IOException;public class WorkConsumerOne {//要与生产者的队列名保持一致private final static String QUEUE_NAME = "work_queue";public static void main(String[] args) {//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可try {//使用之前创建的连接工具,来获取到连接Connection connection = ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);//同一时刻服务器只会发一条消息给消费者channel.basicQos(1);//实现消费方法DefaultConsumer consumer = new DefaultConsumer(channel){/*** 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg = new String(body,"utf-8");System.out.println("工作模式消费者1接收消息: " + msg + "!");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}};/*** 通过通道监听队列消息并实现消费,参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中*/channel.basicConsume(QUEUE_NAME, true, consumer);}catch (Exception ex) {ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
消费者二
import com.rabbitmq.client.*;
import util.ConnectionUtil;import java.io.IOException;public class WorkConsumerTwo {//要与生产者的队列名保持一致private final static String QUEUE_NAME = "work_queue";public static void main(String[] args) {//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可try {//使用之前创建的连接工具,来获取到连接Connection connection = ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);//同一时刻服务器只会发一条消息给消费者channel.basicQos(1);//实现消费方法DefaultConsumer consumer = new DefaultConsumer(channel){/*** 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg = new String(body,"utf-8");System.out.println("工作模式消费者2接收消息: " + msg + "!");}};/*** 通过通道监听队列消息并实现消费,参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中*/channel.basicConsume(QUEUE_NAME, true, consumer);}catch (Exception ex) {ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
测试
先启动两个消费者,最后启动生产者
消费者一控制台输出
工作模式消费者1接收消息: 工作模式消息-Hello World-0!
工作模式消费者1接收消息: 工作模式消息-Hello World-2!
工作模式消费者1接收消息: 工作模式消息-Hello World-4!
工作模式消费者1接收消息: 工作模式消息-Hello World-6!
工作模式消费者1接收消息: 工作模式消息-Hello World-8!
消费者二控制台输出
工作模式消费者2接收消息: 工作模式消息-Hello World-1!
工作模式消费者2接收消息: 工作模式消息-Hello World-3!
工作模式消费者2接收消息: 工作模式消息-Hello World-5!
工作模式消费者2接收消息: 工作模式消息-Hello World-7!
工作模式消费者2接收消息: 工作模式消息-Hello World-9!
这里看到,生产者一共发送了10条消息到队列中,即便消费者一添加了线程阻塞方法来延缓执行,两个消费者接收到的消息数量依然相同。
公平分发
启用公平分配的设置:
要实现公平分配,需要修改以下两个参数:
-
basicQos
参数:- 用于限制 RabbitMQ 在消费者未确认消息时,不会发送新的消息。
- 设置为
basicQos(1)
表示每次只分发一条消息,消费者处理并确认(ACK)后,才会继续分发下一条消息。
-
消息确认(Manual ACK):
- 需要将消费者改为 手动确认模式。
- 当消费者处理完消息后,手动发送一个 ACK 来告诉 RabbitMQ,消息已经处理完成。
- 如果消息没有确认(如消费者挂掉),RabbitMQ 会重新将消息发送给其他消费者。
创建一个生产者,两个消费者来演示轮询发送效果:
- 每个消费者都添加
channel.basicQos(1)
,来保证每次只接收一条消息,用于演示轮询效果- 每个消费者都添加
channel.basicAck(envelope.getDeliveryTag(),false)
实现手动确认- 消费者一添加线程阻塞1秒,消费者二不添加,观察二者得到的消息数量
生产者代码
生产者代码与轮询模式的生产者相同
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;public class WorkProducer {//队列名称private final static String QUEUE_NAME = "work_queue";public static void main(String[] args){//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channeltry (//使用上面创建的连接工具,来获取到连接Connection connection = ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel()) {/*** 声明队列,参数明细如下:* 1、queue 队列名称* 2、durable 是否持久化,如果持久化,mq重启后队列还在* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);//循环发送10个消息for (int i = 0; i < 10; i++) {// 消息内容String message = "工作模式消息-Hello World-" + i;/*** 使用通道将消息发布到队列中,参数说明如下:* 1.第一个参数是exchange,即交换机名称,工作模式不需要设置交换机,此处为空即可(为空时,使用的是rabbitmq的默认交换机AMQP default)* 2.第二个参数是routingKey,即路由键,工作模式下不使用交换机时,此处参数与队列名相同,会自动匹配目标队列* 3.第三个参数为BasicProperties,即消息属性,可以为要发送的消息设置一些条件,比如消息类型、过期时间等等* 4.第四个参数是消息本身,即消息的内容,这里是一个字节数组,表示消息的正文* */channel.basicPublish("", QUEUE_NAME, null, message.getBytes());}} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");}}}
消费者一
在消费方法中加入Thread.sleep(1000)
,让消费者一的消息处理变慢。
import com.rabbitmq.client.*;
import util.ConnectionUtil;import java.io.IOException;public class PubWorkConsumerOne {//要与生产者的队列名保持一致private final static String QUEUE_NAME = "work_queue";public static void main(String[] args) {//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可try {//使用之前创建的连接工具,来获取到连接Connection connection = ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);//同一时刻服务器只会发一条消息给消费者channel.basicQos(1);//实现消费方法DefaultConsumer consumer = new DefaultConsumer(channel){/*** 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg = new String(body,"utf-8");System.out.println("工作模式消费者1接收消息: " + msg + "!");//手动返回ackchannel.basicAck(envelope.getDeliveryTag(),false);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}};/*** 通过通道监听队列消息并实现消费,参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中*/channel.basicConsume(QUEUE_NAME, false, consumer);}catch (Exception ex) {ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
消费者二
无线程阻塞,正常处理消息,以观察处理结果
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;public class PubWorkConsumerTwo {//要与生产者的队列名保持一致private final static String QUEUE_NAME = "work_queue";public static void main(String[] args) {//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可try {//使用之前创建的连接工具,来获取到连接Connection connection = ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);//同一时刻服务器只会发一条消息给消费者channel.basicQos(1);//实现消费方法DefaultConsumer consumer = new DefaultConsumer(channel){/*** 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg = new String(body,"utf-8");System.out.println("工作模式消费者2接收消息: " + msg + "!");//手动返回ackchannel.basicAck(envelope.getDeliveryTag(),false);}};/*** 通过通道监听队列消息并实现消费,参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中*/channel.basicConsume(QUEUE_NAME, false, consumer);}catch (Exception ex) {ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
测试
先启动两个消费者,最后启动生产者
消费者一
工作模式消费者1接收消息: 工作模式消息-Hello World-1!
工作模式消费者1接收消息: 工作模式消息-Hello World-2!
消费者二
工作模式消费者2接收消息: 工作模式消息-Hello World-0!
工作模式消费者2接收消息: 工作模式消息-Hello World-3!
工作模式消费者2接收消息: 工作模式消息-Hello World-4!
工作模式消费者2接收消息: 工作模式消息-Hello World-5!
工作模式消费者2接收消息: 工作模式消息-Hello World-6!
工作模式消费者2接收消息: 工作模式消息-Hello World-7!
工作模式消费者2接收消息: 工作模式消息-Hello World-8!
工作模式消费者2接收消息: 工作模式消息-Hello World-9!
由于消费者一中存在线程阻塞,消费者二没有,消费者二处理更快。所以根据能者多劳原则,消费者二会处理更多的消息。
两者对比
轮询与公平分发对比
特性 | 轮询分配 | 公平分配 |
---|---|---|
分配机制 | 严格轮流,无视消费者负载 | 根据消费者工作量分配 |
消息确认机制 | 可选(默认自动 ACK) | 必须手动确认 |
适用场景 | 消费者处理能力相当的场景 | 消费者处理能力差异大的场景 |
优点 | 实现简单,消息均匀分配 | 分配更智能,避免过载 |
缺点 | 消费者容易过载或空闲 | 稍微复杂,需要手动 ACK |
- 轮询分配 适用于简单任务,且消费者负载相近的场景。
- 公平分配 适用于任务复杂度不同、消费者能力差异较大的场景,是生产中更常见的做法,因为它可以更好地利用系统资源并避免消息堆积。
广播模式
RabbitMQ 的 广播模式 是一种特殊的消息分发模式,使用 Fanout Exchange(扇形交换机) 实现。它可以将消息广播到所有绑定到该交换机的队列中,所有消费者都会接收到消息。
注意:广播模式下是一个消费者对应一个队列(如上图),并通过一个交换机将消息分发给多个绑定的队列来实现广播
工作机制
- 交换机类型:
fanout
(扇形交换机)。 - 消息分发规则:
- 扇形交换机忽略路由键(Routing Key),不关心消息的具体内容。
- 绑定到交换机的所有队列都能接收到消息,进而将消息分发给队列的消费者,无论绑定时是否指定了路由键。
- 消息流程:
- 生产者:发送消息到 Fanout Exchange。
- 交换机:将消息复制并广播到绑定的所有队列中。
- 消费者:从对应队列中获取消息进行消费。
特性
- 无条件广播:所有绑定到交换机的队列都能接收消息,队列对应的消费者都会消费消息。
- 路由键无效:Fanout Exchange 不会检查或使用路由键。
- 动态绑定:队列可以在交换机创建后动态绑定或解绑。
场景举例
- 群发通知:多个消费者需要同时收到一条通知,比如发布新闻、推送更新等。
- 日志处理:多个系统模块需要接收相同的日志信息以进行分析或处理。
- 实时监控:比如系统状态的实时监控,需要广播到多个模块进行处理。
优缺点
优点
- 高效广播:生产者只需发送一次消息,交换机负责广播,降低了生产者的复杂性。
- 解耦设计:生产者无需知道消费者的具体信息,消费者动态绑定队列即可。
- 支持多消费者:一个消息可以被多个消费者消费。
缺点
- 所有绑定队列都接收消息:无选择性,可能导致某些消费者接收到不需要的消息。
- 消息积压风险:如果某个队列的消费者处理速度较慢,可能导致队列堆积。
特点总结
- Fanout Exchange 忽略路由键,直接广播消息。
- 消息广播给所有绑定队列,支持多个消费者消费相同消息。
- 常用于群发通知、日志处理等场景。
生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;public class FanoutProducer {//交换机名称private final static String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) {//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channeltry (//使用上面创建的连接工具,来获取到连接Connection connection = ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel()) {/*** 从通道声明指定交换机* 参数1: 交换机名称,没有自动创建* 参数2: 交换机类型 fanout-广播类型** */channel.exchangeDeclare(EXCHANGE_NAME,"fanout");// 消息内容String message = "广播模式消息-Hello World";/*** 使用通道将消息发布到队列中,参数说明如下:* 1.第一个参数是exchange,即交换机名称,广播模式下需要指定一个交换机来进行消息广播* 2.第二个参数是routingKey,即路由键,广播模式下不使用路由键,会把消息发布给所有绑定交换机的队列* 3.第三个参数为BasicProperties,即消息属性,可以为要发送的消息设置一些条件,比如消息类型、过期时间等等* 4.第四个参数是消息本身,即消息的内容,这里是一个字节数组,表示消息的正文* */channel.basicPublish(EXCHANGE_NAME,"",null, message.getBytes());} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");}}
}
消费者一
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;public class FanoutConsumerOne {//交换机名称private final static String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) {//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可try {//使用之前创建的连接工具,来获取到连接Connection connection = ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 从通道声明指定交换机* 参数1: 交换机名称,没有自动创建* 参数2: 交换机类型 fanout-广播类型** */channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//声明临时队列String queueName = channel.queueDeclare().getQueue();/*** 绑定交换机和队列,以实现消息路由,通过交换机发布的消息只会分配给其绑定的队列* 从左到右的参数分别是:队列名、交换机名、路由键** */channel.queueBind(queueName,EXCHANGE_NAME,"");//实现消费方法DefaultConsumer consumer = new DefaultConsumer(channel){/*** 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg = new String(body,"utf-8");System.out.println("广播模式消费者1接收消息: " + msg + "!");}};/*** 通过通道监听队列消息并实现消费,参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中*/channel.basicConsume(queueName, true, consumer);}catch (Exception ex) {ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
运行消费者一后查看管理界面,会多一个临时队列:
消费者二
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;public class FanoutConsumerTwo {//交换机名称private final static String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) {//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可try {//使用之前创建的连接工具,来获取到连接Connection connection = ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 从通道声明指定交换机* 参数1: 交换机名称,没有自动创建* 参数2: 交换机类型 fanout-广播类型** */channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//声明临时队列临时队列String queueName = channel.queueDeclare().getQueue();/*** 绑定交换机和队列,以实现消息路由,通过交换机发布的消息只会分配给其绑定的队列* 从左到右的参数分别是:队列名、交换机名、路由键** */channel.queueBind(queueName,EXCHANGE_NAME,"");//实现消费方法DefaultConsumer consumer = new DefaultConsumer(channel){/*** 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg = new String(body,"utf-8");System.out.println("广播模式消费者2接收消息: " + msg + "!");}};/*** 通过通道监听队列消息并实现消费,参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中*/channel.basicConsume(queueName, true, consumer);}catch (Exception ex) {ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
运行消费者二后查看管理界面,也会多一个临时队列:
测试
先启动两个消费者,再启动生产者,可以看到两个消费者都得到了消息
消费者一控制台输出:
广播模式消费者1接收消息: 广播模式消息-Hello World!
消费者二控制台输出:
广播模式消费者2接收消息: 广播模式消息-Hello World!
此时去管理界面查看交换机,多了新创建的fanout_exchange
:
结束两个消费者的运行后,临时队列消失:
Direct模式
RabbitMQ 的 Direct 模式 是最常用的消息路由模式之一,适用于精确匹配路由键的场景。在 Direct 模式下,队列会通过路由键与交换机进行绑定。发布消息时,需要指定路由键进行发布,交换机会将消息发送到与该路由键精确匹配的队列。
注意: Direct 模式中一个队列对应一个消费者,交换机通过路由键将消息发布到不同的队列中由消费者消费
Direct 模式的特点
-
精确匹配:
消息发布的路由键必须与队列绑定的路由键完全一致,消息才能被路由到该队列。
不支持模糊匹配。 -
消息定向投递:
用于发送消息到特定的队列,实现消息的点对点投递。
如果消息发布使用的路由键没有任何对应绑定的队列,消息会被丢弃(除非使用备用交换机)。 -
支持多个队列绑定:
多个队列可以使用相同的路由键绑定到同一个交换机,消息会同时发送到所有匹配的队列。
Direct 模式的核心概念
-
路由键(Routing Key):消息发送时指定的字符串,用于指示消息的目标。是 Direct 模式中消息路由的唯一依据。
-
交换机(Exchange):Direct 模式使用
direct
类型的交换机。 -
队列(Queue):消息最终被路由到的存储位置,消费者从队列中获取消息进行处理。
Direct 模式的工作原理
-
创建交换机和队列:生产者创建一个类型为
direct
的交换机,并创建需要的队列。 -
绑定队列:队列通过路由键绑定到交换机。
-
发送消息:生产者发送消息时指定路由键。
-
路由消息:交换机会根据消息的路由键将消息路由到对应绑定的队列。
Direct 模式的应用场景
-
任务分发:将不同类型的任务发送到不同的队列,由专门的消费者处理。
-
日志系统:按日志级别(如
info
、error
、debug
)发送消息到不同的队列。 -
定向通知:给特定用户或特定服务发送消息。
Direct 模式的优缺点
优点:
- 简单易用,逻辑清晰。
- 精确匹配路由键,适合点对点场景。
- 高效且易于维护。
缺点:
- 灵活性相对较低,仅支持精确匹配。
- 无法实现广播或模糊匹配场景(需要结合其他模式)。
生产者
声明交换机后,通过两个路由键来发布不同的消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;public class DirectProducer {//交换机名称private final static String EXCHANGE_NAME = "direct_exchange";public static void main(String[] args) {//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channeltry (//使用上面创建的连接工具,来获取到连接Connection connection = ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel()) {/*** 从通道声明指定交换机* 参数1: 交换机名称,没有自动创建* 参数2: 交换机类型 direct-路由模式** */channel.exchangeDeclare(EXCHANGE_NAME,"direct");//创建两个路由键String routingkey1 = "info";String routingkey2 = "error";/*** 使用通道将消息发布到队列中,参数说明如下:* 1.第一个参数是exchange,即交换机名称,广播模式下需要指定一个交换机来进行消息广播* 2.第二个参数是routingKey,即路由键,direct模式下交换机会通过路由键发布消息,只有通过该路由键绑定到交换机的队列才会接收到消息* 3.第三个参数为BasicProperties,即消息属性,可以为要发送的消息设置一些条件,比如消息类型、过期时间等等* 4.第四个参数是消息本身,即消息的内容,这里是一个字节数组,表示消息的正文** 这里使用两个路由键来发布两个消息* */channel.basicPublish(EXCHANGE_NAME,routingkey1,null, "路由模式消息-info".getBytes());channel.basicPublish(EXCHANGE_NAME,routingkey2,null, "路由模式消息-error".getBytes());} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");}}
}
消费者一
声明两个队列,通过与生产者相同的路由键来绑定生产者的交换机,来接收生产者消息
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;public class DirectConsumerOne {//交换机名称private final static String EXCHANGE_NAME = "direct_exchange";//用于接收info路由键所发布消息的队列private final static String QUEUE_INFO = "queue_info";//用于接收error路由键所发布消息的队列private final static String QUEUE_ERROR = "queue_error";public static void main(String[] args) {//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可try {//使用之前创建的连接工具,来获取到连接Connection connection = ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 从通道声明指定交换机* 参数1: 交换机名称,没有自动创建* 参数2: 交换机类型 direct-路由模式** */channel.exchangeDeclare(EXCHANGE_NAME, "direct");// 声明队列channel.queueDeclare(QUEUE_INFO, true, false, false, null);channel.queueDeclare(QUEUE_ERROR, true, false, false, null);/*** 绑定交换机和队列,以实现消息路由,通过交换机发布的消息只会分配给其绑定的队列* 从左到右的参数分别是:队列名、交换机名、绑定的路由键** 这里使用不同的路由键,同时将两个队列绑定到交换机* */channel.queueBind(QUEUE_INFO, EXCHANGE_NAME, "info");channel.queueBind(QUEUE_ERROR, EXCHANGE_NAME, "error");//实现消费方法DefaultConsumer consumer = new DefaultConsumer(channel) {/*** 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用** @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg = new String(body, "utf-8");System.out.println("路由模式消费者1接收消息: " + msg + "!");}};/*** 通过通道监听队列消息并实现消费,参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中** 这里同时消费两个队列的消息*/channel.basicConsume(QUEUE_INFO, true, consumer);channel.basicConsume(QUEUE_ERROR, true, consumer);} catch (Exception ex) {ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
消费者二
声明一个队列,使用生产者不存在的路由键来绑定生产者的交换机,观察是否会收到生产者发布的消息
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;public class DirectConsumerTwo {//交换机名称private final static String EXCHANGE_NAME = "direct_exchange";//队列private final static String QUEUE_OTHER = "queue_other";public static void main(String[] args) {//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可try {//使用之前创建的连接工具,来获取到连接Connection connection = ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 从通道声明指定交换机* 参数1: 交换机名称,没有自动创建* 参数2: 交换机类型 direct-路由模式** */channel.exchangeDeclare(EXCHANGE_NAME, "direct");//声明队列channel.queueDeclare(QUEUE_OTHER, true, false, false, null);/*** 绑定交换机和队列,以实现消息路由,通过交换机发布的消息只会分配给其绑定的队列* 从左到右的参数分别是:队列名、交换机名、绑定的路由键** */channel.queueBind(QUEUE_OTHER, EXCHANGE_NAME, "other");//实现消费方法DefaultConsumer consumer = new DefaultConsumer(channel) {/*** 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用** @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg = new String(body, "utf-8");System.out.println("路由模式消费者2接收消息: " + msg + "!");}};/*** 通过通道监听队列消息并实现消费,参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中*/channel.basicConsume(QUEUE_OTHER, true, consumer);} catch (Exception ex) {ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
测试
先启动两个消费者,再启动生产者,效果为消费者一控制台有输出,消费者二控制台无输出。
消费者一控制台输出如下:
路由模式消费者1接收消息: 路由模式消息-info!
路由模式消费者1接收消息: 路由模式消息-error!
因为direct路由模式根据路由键来进行精确匹配,生产者并没有用与消费者二相同的路由键发布消息,所以消费者二没有收到任何消息。
下图是路由模式创建的交换机及队列
Topic模式
RabbitMQ 的 Topic 模式 是一种基于主题路由的消息模式,允许使用路由键进行模糊匹配来发布消息。相比 Direct 模式,Topic 模式提供了更灵活的消息路由机制,适用于更复杂的场景。
Topic 模式的核心特点
-
模糊匹配:
- 消息的路由键可以是一个点分隔的字符串(如
order.created.us
),通过绑定键中的通配符来实现模糊匹配。
- 消息的路由键可以是一个点分隔的字符串(如
-
支持通配符:
*
:匹配一个单词(由点.
分隔)。#
:匹配零个或多个单词。
-
灵活性高:
- 消息可以根据多级主题(如区域、服务类型、操作类型等)进行分类和路由。
-
广播与定向的结合:
- 可以实现精确匹配(类似 Direct 模式)或主题广播(类似 Fanout 模式)。
Topic 模式的核心概念
-
交换机(Exchange):Topic 模式使用
topic
类型的交换机。 -
路由键(Routing Key):消息发送、绑定交换机与队列时指定的键,通常是点分隔的多级字符串(如
log.info
、order.created.us
)。 -
队列(Queue):接收和存储消息。
Topic 模式的工作原理
-
生产者发送消息:生产者向
topic
类型的交换机发送消息,并指定路由键。 -
队列绑定规则:队列通过路由键与交换机绑定,路由键可以使用通配符来定义匹配规则。
-
交换机路由消息:交换机会根据消息的路由键与队列进行匹配,将符合条件的消息发送到对应队列。
Topic 模式的通配符规则
-
*
(星号):- 匹配一个单词(由
.
分隔)。 - 例如:
- 消息路由键:
log.info
- 队列绑定路由键:
log.*
- 匹配成功。
- 消息路由键:
- 匹配一个单词(由
-
#
(井号):- 匹配零个或多个单词。
- 例如:
- 路由键:
order.created.us
- 队列绑定路由键:
order.#
- 匹配成功。
- 路由键:
Topic 模式的应用场景
-
日志系统:按照日志的类别(如
info
、error
、warning
)或模块(如auth
、order
)路由消息。 -
分布式任务:根据任务的类型或区域分发任务(如
order.created.us
)。 -
通知系统:按照不同主题(如用户通知、系统警报)发送消息。
Topic 模式的优缺点
优点:
- 支持复杂的消息路由规则。
- 灵活性高,适合动态场景。
- 支持广播和定向投递的结合。
缺点:
- 配置复杂度略高。
- 对通配符匹配的性能要求高,可能影响路由效率。
生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;public class TopicProducer {//交换机名称private final static String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) {//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channeltry (//使用上面创建的连接工具,来获取到连接Connection connection = ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel()) {/*** 从通道声明指定交换机* 参数1: 交换机名称,没有自动创建* 参数2: 交换机类型 topic-模糊匹配路由模式** */channel.exchangeDeclare(EXCHANGE_NAME,"topic");//创建两个路由键String routingkey1 = "message.info.one";String routingkey2 = "message.error.one";/*** 使用通道将消息发布到队列中,参数说明如下:* 1.第一个参数是exchange,即交换机名称,广播模式下需要指定一个交换机来进行消息广播* 2.第二个参数是routingKey,即路由键,topic模式下交换机会通过路由键发布消息,队列绑定时可通过模糊匹配路由键来接收消息* 3.第三个参数为BasicProperties,即消息属性,可以为要发送的消息设置一些条件,比如消息类型、过期时间等等* 4.第四个参数是消息本身,即消息的内容,这里是一个字节数组,表示消息的正文** 这里使用两个路由键来发布两个消息* */channel.basicPublish(EXCHANGE_NAME,routingkey1,null, "topic模式消息-info".getBytes());channel.basicPublish(EXCHANGE_NAME,routingkey2,null, "topic模式消息-error".getBytes());} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");}}}
消费者一
import com.rabbitmq.client.*;
import util.ConnectionUtil;import java.io.IOException;public class TopicConsumerOne {//交换机名称private final static String EXCHANGE_NAME = "topic_exchange";//用于接收info路由键所发布消息的队列private final static String QUEUE_INFO = "topic_queue_info";//用于接收error路由键所发布消息的队列private final static String QUEUE_ERROR = "topic_queue_error";public static void main(String[] args) {//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可try {//使用之前创建的连接工具,来获取到连接Connection connection = ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 从通道声明指定交换机* 参数1: 交换机名称,没有自动创建* 参数2: 交换机类型 topic-模糊匹配路由模式** */channel.exchangeDeclare(EXCHANGE_NAME, "topic");// 声明队列channel.queueDeclare(QUEUE_INFO, true, false, false, null);channel.queueDeclare(QUEUE_ERROR, true, false, false, null);/*** 绑定交换机和队列,以实现消息路由,通过交换机发布的消息只会分配给其绑定的队列* 从左到右的参数分别是:队列名、交换机名、绑定的路由键** 这里使用不同的路由键,同时将两个队列绑定到交换机* *//*** 使用通配符路由键来绑定交换机与队列:** 通配符* * (star) can substitute for exactly one word. 匹配不多不少恰好1个词* # (hash) can substitute for zero or more words. 匹配零个、一个或多个词* 如:* audit.# 匹配audit、audit.irs 、或者audit.irs.corporate等* audit.* 只能匹配 audit.irs* */channel.queueBind(QUEUE_INFO, EXCHANGE_NAME, "*.info.#");channel.queueBind(QUEUE_ERROR, EXCHANGE_NAME, "*.error.#");//实现消费方法DefaultConsumer consumer = new DefaultConsumer(channel) {/*** 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用** @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg = new String(body, "utf-8");System.out.println("topic模式消费者1接收消息: " + msg + "!");}};/*** 通过通道监听队列消息并实现消费,参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中** 这里同时消费两个队列的消息*/channel.basicConsume(QUEUE_INFO, true, consumer);channel.basicConsume(QUEUE_ERROR, true, consumer);} catch (Exception ex) {ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
消费者二
import com.rabbitmq.client.*;
import util.ConnectionUtil;import java.io.IOException;public class TopicConsumerTwo {//交换机名称private final static String EXCHANGE_NAME = "topic_exchange";//用于接收info路由键所发布消息的队列private final static String QUEUE_INFO = "topic_queue_info";//用于接收error路由键所发布消息的队列private final static String QUEUE_ERROR = "topic_queue_error";public static void main(String[] args) {//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可try {//使用之前创建的连接工具,来获取到连接Connection connection = ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 从通道声明指定交换机* 参数1: 交换机名称,没有自动创建* 参数2: 交换机类型 topic-模糊匹配路由模式** */channel.exchangeDeclare(EXCHANGE_NAME, "topic");// 声明队列channel.queueDeclare(QUEUE_INFO, true, false, false, null);channel.queueDeclare(QUEUE_ERROR, true, false, false, null);/*** 绑定交换机和队列,以实现消息路由,通过交换机发布的消息只会分配给其绑定的队列* 从左到右的参数分别是:队列名、交换机名、绑定的路由键** 这里使用不同的路由键,同时将两个队列绑定到交换机* *//*** 使用通配符路由键来绑定交换机与队列:** 通配符* * (star) can substitute for exactly one word. 匹配不多不少恰好1个词* # (hash) can substitute for zero or more words. 匹配零个、一个或多个词* 如:* audit.# 匹配audit、audit.irs 、或者audit.irs.corporate等* audit.* 只能匹配 audit.irs* */channel.queueBind(QUEUE_INFO, EXCHANGE_NAME, "*.info");channel.queueBind(QUEUE_ERROR, EXCHANGE_NAME, "*.error");//实现消费方法DefaultConsumer consumer = new DefaultConsumer(channel) {/*** 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用** @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg = new String(body, "utf-8");System.out.println("topic模式消费者2接收消息: " + msg + "!");}};/*** 通过通道监听队列消息并实现消费,参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中** 这里同时消费两个队列的消息*/channel.basicConsume(QUEUE_INFO, true, consumer);channel.basicConsume(QUEUE_ERROR, true, consumer);} catch (Exception ex) {ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
测试
先启动两个消费者,再启动生产者,效果为消费者一控制台有输出,消费者二控制台无输出。
生产者发布消息的路由键为
message.info.one
message.error.one
消费者一的路由键为
*.info.#
*.error.#
*
匹配不多不少恰好1个词,#
匹配零个、一个或多个词,所以消费者一可以匹配成功。而消费者二的路由键:
*.info
*.error
只能匹配类似于message.info与message.error格式的路由键,故无法接收到消息。
消费者一控制台输出:
topic模式消费者1接收消息: topic模式消息-info!
topic模式消费者1接收消息: topic模式消息-error!
下图为管理界面topic模式创建的交换机及队列
Rpc模式
RabbitMQ的RPC(Remote Procedure Call) 模式允许生产者发布消息后,接收消费者的回调信息(类似http的请求与响应),就像调用本地方法一样接收返回值。RabbitMQ 提供了一个简单但强大的机制来实现 RPC 功能。
RPC 模式的核心概念
-
客户端(Client):即生产者,发起 RPC 请求,发送消息到队列并等待服务端返回结果。
-
服务端(Server):即消费者,接收 RPC 请求,对请求进行处理并将结果返回给客户端。
-
队列:客户端(生产者)将请求发送到队列中,服务端监听该队列以接收请求。
-
回调队列(Callback Queue):客户端为接收服务端(消费者)返回的结果而设置的专用队列。
-
Correlation ID(相关 ID):用于标识每个 RPC 请求和其对应的响应,使客户端能正确处理返回结果。
RPC 模式的工作流程
-
客户端发送请求:
- 创建一个唯一的回调队列。
- 生成一个唯一的
Correlation ID
,用于标识请求。 - 将消息发送到指定的请求队列,并设置消息的
replyTo
属性为回调队列。
-
服务端处理请求:
- 从请求队列中获取消息。
- 执行处理逻辑并生成结果。
- 将结果发送到客户端指定的回调队列,带上原始消息的
Correlation ID
。
-
客户端接收响应:
- 监听回调队列。
- 检查返回消息的
Correlation ID
是否与请求的Correlation ID
匹配。 - 返回结果给调用方。
RPC 模式的优缺点
优点:
-
实现简单:通过 RabbitMQ 提供的基本功能可以实现完整的 RPC 流程。
-
松耦合:客户端和服务端无需直接通信,降低了依赖。
-
支持并发:多个服务端可以监听同一请求队列,实现任务负载均衡。
缺点:
-
性能限制:消息的发送和接收增加了额外的延迟,不适合高实时性要求的场景。
-
资源开销:每个请求需要单独的回调队列,消耗更多的资源。
-
缺乏强一致性保证:消息的丢失或服务端失败可能导致请求无响应。
应用场景
-
任务分发和结果收集:将复杂的任务分发到多个服务处理,收集处理结果。
-
远程调用:在微服务架构中,调用其他服务的接口。
-
计算密集型任务:将大规模计算任务分发到多个节点执行。
生产者
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;public class RpcProducer {//rpc队列private final static String REQUEST_QUEUE = "rpc_queue";public static void main(String[] args) {//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channeltry (//使用上面创建的连接工具,来获取到连接Connection connection = ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel()) {//定义临时队列,用于消息回调String replyQueueName = channel.queueDeclare().getQueue();final String corrId = UUID.randomUUID().toString(); // 生成唯一的 Correlation ID// 设置请求属性AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName) // 设置回调队列.build();//发送请求到请求队列channel.basicPublish("", REQUEST_QUEUE, props, "Rpc模式消息-Hello World".getBytes("UTF-8"));//创建阻塞队列用于接收响应final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);//消费回调队列中的消息String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {if (delivery.getProperties().getCorrelationId().equals(corrId)) {response.offer(new String(delivery.getBody(), "UTF-8")); // 放入响应队列}}, consumerTag -> {});//等待响应并返回String result = response.take();//取消订阅回调队列channel.basicCancel(ctag);System.out.println("回调消息:"+result);} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");}}}
消费者
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;public class RpcConsumer {//rpc队列private final static String REQUEST_QUEUE = "rpc_queue";public static void main(String[] args) {//这里不关闭连接,否则接收不到消息并无法回调,测试完成后在IDEA中手动结束主方法即可try {//使用上面创建的连接工具,来获取到连接Connection connection = ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(REQUEST_QUEUE, false, false, false, null);// 设置每次只处理一个消息channel.basicQos(1);//实现消费方法DefaultConsumer consumer = new DefaultConsumer(channel) {/*** 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用** @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// body 即消息体String msg = new String(body, "utf-8");System.out.println("rpc模式消费者接收消息: " + msg + "!");// 发送响应到回调队列AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();channel.basicPublish("", properties.getReplyTo(), replyProps, "Rpc消费者接收成功".getBytes("UTF-8"));channel.basicAck(envelope.getDeliveryTag(), false);}};/*** 通过通道监听队列消息并实现消费,参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中** 这里同时消费两个队列的消息*/channel.basicConsume(REQUEST_QUEUE, false, consumer);} catch (Exception ex) {ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
测试
先启动生产者,再启动消费者
消费者控制台输出:
rpc模式消费者接收消息: Rpc模式消息-Hello World!
生产者控制台输出:
回调消息:Rpc消费者接收成功
生产者启动并发布消息后,会等待消费者的回调消息,当消费者成功消费后,生产者接收到回调消息并打印控制台
过期时间设置
过期时间TTL
表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置TTL。目前有两种方法可以设置。
- 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
- 第二种方法是对消息进行单独设置,每条消息TTL可以不同。
如果上述两种方法同时使用,则消息的过期时间以两者之间TTL
较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL
值,就成为dead message被投递到死信队列, 消费者将无法再收到该消息。
设置队列过期时间
这里以简单模式为例演示
生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;
import java.util.HashMap;
import java.util.Map;public class QueueTtlProducer {//队列名称private final static String QUEUE_NAME = "ttl_queue";public static void main(String[] args){//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channeltry (//使用上面创建的连接工具,来获取到连接Connection connection = ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel()) {/*** channel.queueDeclare()方法为声明队列,参数明细如下:* 1、queue 队列名称* 2、durable 是否持久化,如果持久化,mq重启后队列还在* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/Map<String,Object> arguments = new HashMap<>();arguments.put("x-message-ttl",10000);//x-message-ttl为队列的超时属性,这里设置为10秒过期channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);// 消息内容String message = "Hello World";/*** 使用通道将消息发布到队列中,参数说明如下:* 1.第一个参数是exchange,即交换机名称,简单模式不需要设置交换机,此处为空即可(为空时,使用的是rabbitmq的默认交换机AMQP default)* 2.第二个参数是routingKey,即路由键,简单模式下不使用交换机时,此处参数与队列名相同,会自动匹配目标队列* 3.第三个参数为BasicProperties,即消息属性,可以为要发送的消息设置一些条件,比如消息类型、过期时间等等* 4.第四个参数是消息本身,即消息的内容,这里是一个字节数组,表示消息的正文* */channel.basicPublish("", QUEUE_NAME, null, message.getBytes());} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");}}}
消费者
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;public class QueueTtlConsumer {//要与生产者的队列名保持一致private final static String QUEUE_NAME = "ttl_queue";public static void main(String[] args){//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channeltry (//使用之前创建的连接工具,来获取到连接Connection connection = ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel()) {// 声明队列Map<String,Object> arguments = new HashMap<>();arguments.put("x-message-ttl",10000);//x-message-ttl为队列的超时属性,这里设置为10秒过期channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);//实现消费方法DefaultConsumer consumer = new DefaultConsumer(channel){/*** 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg = new String(body,"utf-8");System.out.println("超时队列接收消息: " + msg + "!");}};/*** 通过通道监听队列消息并实现消费,参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中*/channel.basicConsume(QUEUE_NAME, true, consumer);}catch (Exception ex) {ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
进行测试
先启动生产者,查看管理页面,多出了声明的ttl_queue
队列,拥有超时属性,并且队列内存在一个未消费的消息:
不启动消费者,等待10秒后页面自动刷新,消息已消失:
生产者启动10秒后再启动消费者,其控制台无任何输出信息,如果在10秒内启动消费者,则消费者会收到消息:
超时队列接收消息: Hello World!
设置消息过期时间
消息的过期时间;只需要在发送消息(可以发送到任何队列,不管该队列是否属于某个交换机)的时候设置过期时间即可。
生产者
依旧以简单模式为例,这里使用BasicProperties
来为消息设置超时时间:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;public class MessageTtlProducer {//队列名称private final static String QUEUE_NAME = "messageTtl_queue";public static void main(String[] args){//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channeltry (//使用上面创建的连接工具,来获取到连接Connection connection = ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel()) {//声明队列时不设置超时channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 消息内容String message = "Hello World";//设置消息的过期时间,此处相当于为每一个消息单独设置属性AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().contentEncoding("UTF-8") // 编码方式.expiration("10000")// 过期时间.build();/*** 使用通道将消息发布到队列中,参数说明如下:* 1.第一个参数是exchange,即交换机名称,简单模式不需要设置交换机,此处为空即可(为空时,使用的是rabbitmq的默认交换机AMQP default)* 2.第二个参数是routingKey,即路由键,简单模式下不使用交换机时,此处参数与队列名相同,会自动匹配目标队列* 3.第三个参数为BasicProperties,即消息属性,可以为要发送的消息设置一些条件,比如消息类型、过期时间等等* 4.第四个参数是消息本身,即消息的内容,这里是一个字节数组,表示消息的正文* */channel.basicPublish("", QUEUE_NAME, basicProperties, message.getBytes());} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");}}}
消费者
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;public class MessageTtlConsumer {//要与生产者的队列名保持一致private final static String QUEUE_NAME = "messageTtl_queue";public static void main(String[] args){//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channeltry (//使用之前创建的连接工具,来获取到连接Connection connection = ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel()) {// 声明队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);//实现消费方法DefaultConsumer consumer = new DefaultConsumer(channel){/*** 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg = new String(body,"utf-8");System.out.println("超时队列接收消息: " + msg + "!");}};/*** 通过通道监听队列消息并实现消费,参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中*/channel.basicConsume(QUEUE_NAME, true, consumer);}catch (Exception ex) {ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
测试
先启动生产者,查看管理页面看到新创建的队列,由于只是为消息设置超时,队列没有了蓝色的TTL标识:
如果在生产者设置的超时时间内启动消费者,则其控制台输出如下,否则消息超时消费者接收不到:
超时队列接收消息: Hello World!
其他属性设置
上面列举了队列与消息的超时属性设置,除此之外还有很多其他可设置项
队列扩展属性
常见队列属性
在 RabbitMQ 中,队列可以通过 queueDeclare
的 arguments
参数来配置扩展属性。以下是常见的队列扩展属性及其含义:
x-message-ttl
- 含义:设置队列中消息的存活时间(以毫秒为单位)。
- 用法:
arguments.put("x-message-ttl", 10000);
消息超过这个时间未被消费就会过期并被移除队列。
x-expires
- 含义:设置队列的存活时间(以毫秒为单位)。如果队列在指定时间内未被使用(没有消费者连接、没有消息存储等),队列将被自动删除。
- 用法:
arguments.put("x-expires", 60000);
队列将在 60 秒后自动删除。
x-max-length
- 含义:限制队列中最大消息数量。如果队列中的消息数达到限制,新发布的消息将被丢弃或替换最早的消息(与
x-overflow
配合使用)。 - 用法:
arguments.put("x-max-length", 1000);
队列最多存储 1000 条消息。
x-max-length-bytes
- 含义:限制队列中消息总大小(以字节为单位)。如果总大小超过限制,新发布的消息将被丢弃或替换最早的消息。
- 用法:
arguments.put("x-max-length-bytes", 10485760);
队列的消息总大小限制为 10 MB。
x-overflow
- 含义:设置队列的溢出行为,当队列达到
x-max-length
或x-max-length-bytes
时的处理方式。 - 取值:
"drop-head"
:丢弃最早的消息(FIFO 式移除)。"reject-publish"
:拒绝新发布的消息。
- 用法:
arguments.put("x-overflow", "drop-head");
x-dead-letter-exchange
- 含义:设置队列的死信交换机。队列中的死信消息(如过期、被拒绝、队列满等)将被转发到指定的交换机。
- 用法:
arguments.put("x-dead-letter-exchange", "dead_exchange");
当消息变成死信时,它们将路由到dead_exchange
。
x-dead-letter-routing-key
- 含义:设置死信消息的路由键(配合
x-dead-letter-exchange
使用)。 - 用法:
arguments.put("x-dead-letter-routing-key", "dead_key");
死信消息将使用dead_key
进行路由。
x-max-priority
- 含义:设置队列的最大优先级,启用消息优先级队列。
- 用法:
arguments.put("x-max-priority", 10);
队列支持消息优先级,优先级范围为 0 到 10。
x-queue-mode
- 含义:设置队列模式。
- 取值:
"default"
:默认模式,所有消息存储在内存和磁盘上。"lazy"
:惰性模式,尽可能将消息存储到磁盘以减少内存消耗。
- 用法:
arguments.put("x-queue-mode", "lazy");
队列切换到惰性模式。
x-queue-master-locator
- 含义:在 RabbitMQ 集群环境中,指定队列主副本的位置策略。
- 取值:
"min-masters"
:选择负载最低的节点作为主副本。"client-local"
:选择与客户端最近的节点作为主副本。
- 用法:
arguments.put("x-queue-master-locator", "min-masters");
自定义插件属性
- RabbitMQ 支持通过插件扩展的属性,例如延迟消息插件(
rabbitmq-delayed-message-exchange
)会引入新的属性:x-delayed-type
:指定延迟队列的交换机类型(如direct
、fanout
、topic
)。- 用法:
arguments.put("x-delayed-type", "direct");
示例代码
以下代码示例展示了如何为队列配置多种属性:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.util.HashMap;
import java.util.Map;public class QueueDeclareExample {private final static String QUEUE_NAME = "example_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 配置队列属性Map<String, Object> arguments = new HashMap<>();arguments.put("x-message-ttl", 60000); // 消息过期时间arguments.put("x-max-length", 100); // 最大消息数arguments.put("x-dead-letter-exchange", "dead_exchange"); // 死信交换机arguments.put("x-queue-mode", "lazy"); // 惰性队列// 声明队列channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);System.out.println("Queue declared with custom properties.");}}
}
注意事项
- 属性顺序依赖:某些属性依赖其他属性,例如
x-dead-letter-routing-key
需要同时设置x-dead-letter-exchange
。 - 持久化:队列的持久化属性与扩展属性分开设置,队列扩展属性不会影响持久化行为。
- 集群环境:某些属性(如
x-queue-master-locator
)仅在集群环境中有效。
通过合理配置队列属性,可以更好地满足业务需求并提升 RabbitMQ 的性能和可靠性。
消息扩展属性
在 RabbitMQ 中,可以通过 AMQP.BasicProperties
配置发布消息时的多种属性。除了示例代码中设置的消息过期时间 (expiration
) 外,还可以为消息配置其他重要属性。
常见消息属性
contentType
(内容类型)
-
描述:指定消息的 MIME 类型,例如
text/plain
、application/json
等。 -
示例:
basicProperties = new AMQP.BasicProperties.Builder().contentType("application/json").build();
contentEncoding
(内容编码)
-
描述:指定消息内容的编码方式,例如
UTF-8
、gzip
等。 -
示例:
basicProperties = new AMQP.BasicProperties.Builder().contentEncoding("UTF-8").build();
expiration
(过期时间)
-
描述:设置消息的生存时间(以毫秒为单位)。消息在队列中超过指定时间后会变为死信。
-
示例:
basicProperties = new AMQP.BasicProperties.Builder().expiration("60000") // 60 秒.build();
priority
(优先级)
-
描述:指定消息的优先级,配合队列的
x-max-priority
属性使用。值范围是 0(最低优先级)到队列配置的最大优先级。 -
示例:
basicProperties = new AMQP.BasicProperties.Builder().priority(5).build();
correlationId
(关联 ID)
-
描述:用于将请求和响应进行关联,通常在 RPC 模式中使用。
-
示例:
basicProperties = new AMQP.BasicProperties.Builder().correlationId("12345").build();
replyTo
(回调队列名称)
-
描述:指定响应消息的回调队列,用于 RPC 模式。
-
示例:
basicProperties = new AMQP.BasicProperties.Builder().replyTo("response_queue").build();
messageId
(消息 ID)
-
描述:消息的唯一标识符,用于幂等性校验或追踪。
-
示例:
basicProperties = new AMQP.BasicProperties.Builder().messageId("msg-001").build();
timestamp
(时间戳)
-
描述:消息的创建时间,通常由生产者设置。
-
示例:
basicProperties = new AMQP.BasicProperties.Builder().timestamp(new Date()).build();
type
(消息类型)
-
描述:指定消息的类型,用于消费者区分不同消息的处理逻辑。
-
示例:
basicProperties = new AMQP.BasicProperties.Builder().type("order_created").build();
userId
(用户 ID)
-
描述:用于验证发布消息的用户。RabbitMQ 会检查
userId
是否与连接的用户一致。 -
示例:
basicProperties = new AMQP.BasicProperties.Builder().userId("guest").build();
appId
(应用 ID)
-
描述:标识发布消息的应用程序。
-
示例:
basicProperties = new AMQP.BasicProperties.Builder().appId("my_app").build();
headers
(自定义头部信息)
-
描述:消息的元数据,存储为键值对格式,可以传递扩展信息。
-
示例:
Map<String, Object> headers = new HashMap<>(); headers.put("source", "web"); headers.put("destination", "api"); basicProperties = new AMQP.BasicProperties.Builder().headers(headers).build();
完整示例代码
以下代码展示了如何为消息设置多种属性并发布到队列:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;import java.util.Date;
import java.util.HashMap;
import java.util.Map;public class MessagePropertiesProducer {private final static String QUEUE_NAME = "properties_queue";public static void main(String[] args) {try (Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel()) {// 声明队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 消息内容String message = "Hello RabbitMQ with properties!";// 配置消息属性Map<String, Object> headers = new HashMap<>();headers.put("format", "json");headers.put("source", "application");AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().contentType("application/json").contentEncoding("UTF-8").expiration("60000").priority(5).correlationId("12345").replyTo("response_queue").messageId("msg-001").timestamp(new Date()).type("custom_message").userId("guest").appId("my_app").headers(headers).build();// 发布消息channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());System.out.println("Message sent with properties!");} catch (Exception e) {e.printStackTrace();}}
}
注意事项
- 属性优先级:队列的属性可能会覆盖消息的属性。例如,队列的
x-message-ttl
会优先于消息的expiration
。 - 类型匹配:某些属性需要特定格式,如
priority
必须是整数,headers
是键值对。 - 用户权限:使用
userId
属性时,RabbitMQ 会严格校验用户身份,需确保设置正确。
通过设置这些属性,可以增强消息的功能性和可靠性,满足更多业务需求。
死信队列
概念
死信队列是 RabbitMQ 中的一种特殊队列,用于存储被拒绝或无法被正常处理的消息。消息变成“死信”并被转发到死信队列,方便后续分析和处理。
死信队列是消息处理失败时的补救措施。可以使用死信队列进行日志分析、故障排查或重新投递机制。
消息变成死信的三种情况
-
消息被消费者拒绝且
requeue=false
:- 消费者显式拒绝消息(使用
channel.basicReject
或channel.basicNack
),并指定不重新入队。
- 消费者显式拒绝消息(使用
-
消息在队列中TTL(Time-To-Live)过期:
- 设置消息或队列的 TTL,当消息超时未被消费时,进入死信队列。
-
队列达到最大长度限制:
- 队列中消息数量超过最大限制,最早的消息被丢弃,转发到死信队列。
死信队列配置
配置死信队列需要为队列设置以下参数:
x-dead-letter-exchange
:
指定死信队列对应的交换机。x-dead-letter-routing-key
(可选):
指定死信消息在死信交换机上的路由键。
生产者示例
声明一个正常的交换机,用于投递消息,使用direct模式。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;public class DLXProducer {private static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) {try (Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel()) {// 声明正常交换机channel.exchangeDeclare(NORMAL_EXCHANGE, "direct");String routingKey = "normal_key";String message = "测试死信消息";// 发送消息channel.basicPublish(NORMAL_EXCHANGE, routingKey, null, message.getBytes());System.out.println("发送消息: " + message);} catch (Exception e) {e.printStackTrace();}}
}
消费者一
声明正常的交换机与队列,再声明死信交换机与队列,然后进行绑定
私信队列使用步骤:
- 声明正常的交换机与正常队列,通过一个路由键将二者绑定。
- 声明死信交换机与死信队列,通过一个路由键将二者绑定。
- 为正常的队列配置扩展参数:
x-dead-letter-exchange
与x-dead-letter-routing-key
,分别为所声明的死信交换机与死信路由键。
代码展示
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;public class DLXConsumer {//普通队列private static final String NORMAL_QUEUE = "normal_queue";//普通交换机private static final String NORMAL_EXCHANGE = "normal_exchange";//普通路由键private static final String NORMAL_KEY= "normal_key";//死信队列private static final String DLX_QUEUE = "dlx_queue";//死信交换机private static final String DLX_EXCHANGE = "dlx_exchange";//死信路由键private static final String DLX_KEY= "dlx_key";public static void main(String[] args) {try{//创建连接与通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明普通交换机与死信交换机channel.exchangeDeclare(DLX_EXCHANGE, "direct");channel.exchangeDeclare(NORMAL_EXCHANGE, "direct");// 声明死信队列channel.queueDeclare(DLX_QUEUE, true, false, false, null);// 声明普通队列并为其配置死信交换机及死信路由键,该队列的消息变为'死信'后会被投入死信交换机Map<String,Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange",DLX_EXCHANGE);// 指定死信交换机arguments.put("x-dead-letter-routing-key",DLX_KEY);// 指定死信路由键channel.queueDeclare(NORMAL_QUEUE, true, false, false,arguments);//绑定死信交换机与死信队列channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_KEY);//绑定普通交换机与普通队列channel.queueBind(DLX_QUEUE, DLX_EXCHANGE, DLX_KEY);// 消费普通队列消息,并模拟拒绝消息,autoAck设置为false来进行手动消息确认channel.basicConsume(NORMAL_QUEUE, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body);System.out.println("接收到正常队列消息: " + message);// 拒绝消息并将其发送到死信队列channel.basicReject(envelope.getDeliveryTag(), false);}});} catch (Exception e) {e.printStackTrace();}}
}
消费者二
监听死信队列,消费其中的消息
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;/*** 用于消费死信队列中的消息* */
public class DQConsumer {//前面声明的死信队列private static final String DLX_QUEUE = "dlx_queue";public static void main(String[] args){try {//使用之前创建的连接工具,来获取到连接Connection connection = ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(DLX_QUEUE, true, false, false, null);//实现消费方法DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// body 即消息体String msg = new String(body,"utf-8");System.out.println("接收消息: " + msg + "!");}};//消费消息channel.basicConsume(DLX_QUEUE, true, consumer);}catch (Exception ex) {ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
测试
先启动消费者DLXConsumer,然后启动生产者DLXProducer,生产者启动后会将消息进行发布,然后DLXConsumer中会拒绝消息,此时消息被放入声明的死信队列中,下图的Ready
为1,表示死信队列中存有1条未消费的消息:
然后启动用于消费死信队列的消费者DQConsumer,其控制台输出:
接收消息: 测试死信消息!
然后在管理页面等待5秒自动刷新,结果如下:
相关文章:
RabbitMQ简单应用
概念 RabbitMQ 是一种流行的开源消息代理(Message Broker)软件,它实现了高级消息队列协议(AMQP - Advanced Message Queuing Protocol)。RabbitMQ 通过高效的消息传递机制,主要应用于分布式系统中解耦应用…...
使用LUKS对Linux磁盘进行加密
前言 本实验用于日常学习用,如需对存有重要数据的磁盘进行操作,请做好数据备份工作。 此实验只是使用LUKS工具的冰山一角,后续还会有更多功能等待探索。 LUKS(Linux Unified Key Setup)是Linux系统中用于磁盘加密的一…...
戴尔电脑安装centos7系统遇到的问题
1,找不到启动盘(Operation System Loader signature found in SecureBoot exclusion database(‘dbx’).All bootable devices failed secure Boot Verification) 关闭 Secure Boot(推荐): 进入 BIOS/UEFI…...
3.4.SynchronousMethodHandler组件之ResponseHandler
前言 feign发送完请求后, 拿到返回结果, 那么这个返回结果肯定是需要经过框架进一步处理然后再返回到调用者的, 其中ResponseHandler就是用来处理这个返回结果的, 这也是符合正常思维的处理方式, 例如springmvc部分在调用在controller端点前后都会增加扩展点。 从图中可以看得…...
Linux 下进程的状态
操作系统中常见进程状态 在操作系统中有六种常见进程状态: 新建状态: 进程正在被创建. 此时操作系统会为进程分配资源, 如: 内存空间等, 进行初始化就绪状态: 进程已经准备好运行了, 只需要等待被调度, 获取 CPU 资源就可以执行了, 操作系统中可能同时存在多个进程处于就绪状…...
【计算机网络】核心部分复习
目录 交换机 v.s. 路由器OSI七层更实用的TCP/IP四层TCPUDP 交换机 v.s. 路由器 交换机-MAC地址 链接设备和设备 路由器- IP地址 链接局域网和局域网 OSI七层 物理层:传输设备。原始电信号比特流。数据链路层:代表是交换机。物理地址寻址,交…...
Spring Boot开发实战:从入门到构建高效应用
Spring Boot 是 Java 开发者构建微服务、Web 应用和后端服务的首选框架之一。其凭借开箱即用的特性、大量的自动化配置和灵活的扩展性,极大简化了开发流程。本文将以实战为核心,从基础到高级,全面探讨 Spring Boot 的应用开发。 一、Spring B…...
pyshark安装使用,ubuntu:20.04
1.容器创建 命令 docker run -d --name pyshark -v D:\src:/root/share ubuntu:2004 /bin/bash -c "while true;do sleep 1000;done" 用于创建并启动一个新的 Docker 容器。 docker run -d --name pyshark -v D:\src:/root/share ubuntu:2004 /bin/bash -c "w…...
基本功能实现
目录 1、环境搭建 2、按键控制灯&电机 LED 电机 垂直按键(机械按键) 3、串口调试功能 4、定时器延时和定时器中断 5、振动强弱调节 6、万年历 7、五方向按键 1、原理及分析 2、程序设计 1、环境搭建 需求: 搭建一个STM32F411CEU6工程 分析: C / C 宏定义栏…...
《那个让服务器“跳舞”的bug》
在程序的世界里,bug 就像隐藏在暗处的小怪兽,时不时跳出来捣乱。而在我的职业生涯中,有一个bug让我至今难忘,它不仅让项目差点夭折,还让我熬了无数个通宵。这个故事发生在一个风和日丽的下午,我们正在开发一…...
Python 网络爬虫进阶:动态网页爬取与反爬机制应对
在上一篇文章中,我们学习了如何使用 Python 构建一个基本的网络爬虫。然而,在实际应用中,许多网站使用动态内容加载或实现反爬机制来阻止未经授权的抓取。因此,本篇文章将深入探讨以下进阶主题: 如何处理动态加载的网…...
创建可直接用 root 用户 ssh 登陆的 Docker 镜像
有时候我们在 Mac OS X 或 Windows 平台下需要开发以 Linux 为运行时的应用,IDE 或可直接使用 Docker 容器,或 SSH 远程连接。本地命令行下操作虽然可以用 docker exec 连接正在运行的容器,但 IDE 远程连接的话 SSH 总是一种较为通用的连接方…...
wordpress 中添加图片放大功能
功能描述 使用 Fancybox 实现图片放大和灯箱效果。自动为文章内容中的图片添加链接,使其支持 Fancybox。修改了 header.php 和 footer.php 以引入必要的 CSS 和 JS 文件。在 functions.php 中通过过滤器自动为图片添加 data-fancybox 属性。 最终代码 1. 修改 hea…...
数据结构 (7)线性表的链式存储
前言 线性表是一种基本的数据结构,用于存储线性序列的元素。线性表的存储方式主要有两种:顺序存储和链式存储。链式存储,即链表,是一种非常灵活和高效的存储方式,特别适用于需要频繁插入和删除操作的场景。 链表的基本…...
库的操作.
创建、删除数据库 创建语法: CREATE DATABASE [IF NOT EXISTS] db_name[ ]是可选项,IF NOT EXISTS 是表明如果不存在才能创建数据库 //查看数据库,假设7行 show databases; //创建数据库 --- 本质在Linux创建一个目录 create database databa…...
Vue进阶之Vue CLI服务—@vue/cli-service Vuex
Vue CLI服务—vue/cli-service & Vuex vue/cli-service初识bin/vue-cli-service.js代码执行解读 Vuexgenerator/index.jsstore/index.js插件化的能力怎么引入呢? vue/cli-service 初识 第一块是上一个讲述的cli是把我们代码的配置项,各种各样的插件…...
导入100道注会cpa题的方法,导入试题,自己刷题
一、问题描述 复习备考的小伙伴们,往往希望能够利用零碎的时间和手上的试题,来复习和备考 用一个能够导入自己试题的刷题工具,既能加强练习又能利用好零碎时间,是一个不错的解决方案 目前市面上刷题工具存下这些问题 1、要收费…...
数据库操作、锁特性
1. DML、DDL和DQL是数据库操作语言的三种主要类型 1.1 DML(Data Manipulation Language)数据操纵语言 DML是用于检索、插入、更新和删除数据库中数据的SQL语句。 主要的DML语句包括: SELECT:用于查询数据库中的数据。 INSERT&a…...
学习笔记039——SpringBoot整合Redis
文章目录 1、Redis 基本操作Redis 默认有 16 个数据库,使用的是第 0 个,切换数据库添加数据/修改数据查询数据批量添加批量查询删除数据查询所有的 key清除当前数据库清除所有数据库查看 key 是否存在设置有效期查看有效期 2、Redis 数据类型String追加字…...
(笔记)简单了解ZYNQ
1、zynq首先是一个片上操作系统(Soc),结合了arm(PS)和fpga(PL)两部分组成 Zynq系统主要由两部分组成:PS(Processing System)和PL(Programmable L…...
大众点评小程序mtgsig1.2算法
测试效果: var e function _typeof(o) {return "function" typeof Symbol && "symbol" typeof Symbol.iterator? function (o) {return typeof o;}: function (o) {return o && "function" typeof Symbol &…...
七牛云AIGC内容安全方案助力企业合规创新
随着人工智能生成内容(AIGC)技术的飞速发展,内容审核的难度也随之急剧上升。在传统审核场景中,涉及色情、政治、恐怖主义等内容的标准相对清晰明确,但在AIGC的应用场景中,这些界限变得模糊且难以界定。用户可能通过交互性引导AI生成违规内容,为审核工作带来了前所未有的不可预测…...
.net的winfrom程序 窗体透明打开窗体时出现在屏幕右上角
窗体透明, 将Form的属性Opacity,由默认的100% 调整到 80%,这个数字越小越透明(尽量别低于50%,不信你试试看)! 打开窗体时出现在屏幕右上角 //构造函数 public frmCalendarList() {InitializeComponent();//打开窗体&…...
基于YOLOv8深度学习的智慧课堂教师上课行为检测系统研究与实现(PyQt5界面+数据集+训练代码)
随着人工智能技术的迅猛发展,智能课堂行为分析逐渐成为提高教学质量和提升教学效率的关键工具之一。在现代教学环境中,能够实时了解教师的课堂表现和行为,对于促进互动式教学和个性化辅导具有重要意义。传统的课堂行为分析依赖于人工观测&…...
使用 Tkinter 创建一个简单的 GUI 应用程序来合并视频和音频文件
使用 Tkinter 创建一个简单的 GUI 应用程序来合并视频和音频文件 Python 是一门强大的编程语言,它不仅可以用于数据处理、自动化脚本,还可以用于创建图形用户界面 (GUI) 应用程序。在本教程中,我们将使用 Python 的标准库模块 tkinter 创建一…...
【C++笔记】模板进阶
前言 各位读者朋友们大家好!上一期我们讲了stack、queue以及仿函数。先前我们讲过模板的初阶内容,这一期我们来更深入的学习一下模板。 一. 非类型模板参数 1.1 非类型模板参数 模板参数分为类型形参和类类型形参: 类型形参:…...
Soul App创始人张璐团队亮相GITEX GLOBAL 2024,展示多模态AI的交互创新
随着全球AI领域的竞争加剧,越来越多的科技巨头和创新企业纷纷致力于多模态AI的开发。2024年10月14日至18日,GITEX GLOBAL海湾信息技术博览会在迪拜举行,吸引了超过6700家全球科技巨头和创新公司参与,展示了智能互联、人工智能等领域的新成果。 此次展会中,Soul App创始人张璐团…...
ffmpeg.wasm 在浏览器运行ffmpeg操作视频
利用ffmpeg.wasm,可以在浏览器里运行ffmpeg,实现对音视频的操作 参考链接: https://blog.csdn.net/jchsgwbr/article/details/143252044 https://gitee.com/CXBalCai/ffmpeg-template 其他参考 https://github.com/ffmpegwasm/ffmpeg.wasm https://b…...
用Python爬虫“偷窥”1688商品详情:一场数据的奇妙冒险
引言:数据的宝藏 在这个信息爆炸的时代,数据就像是一座座等待挖掘的宝藏。而对于我们这些电商界的探险家来说,1688上的商品详情就是那些闪闪发光的金子。今天,我们将化身为数据的海盗,用Python这把锋利的剑࿰…...
CentOS上如何离线批量自动化部署zabbix 7.0版本客户端
CentOS上如何离线批量自动化部署zabbix 7.0版本客户端 管理的服务器大部分都是CentOS操作系统,版本主要是CentOS 7。因为监控服务器需要,要在前两天搭建的Zabbix 7.0系统上把这些CentOS 7系统都监控起来。因为服务器数量众多,而且有些服务器…...
wordpress 雪花插件/河南疫情最新情况
cut 命令在Linux和Unix中的作用是从文件中的每一行中截取出一些部分,并输出到标准输出中。我们可以使用 cut 命令从一行字符串中于以字节,字符,字段(分隔符)等单位截取一部分内容出来。 在本文中,我们通过…...
做商城网站服务器配置怎么选择/网络营销到底是干嘛的
今天为大家带来一篇SQL语句的常用语法 。对学习MYSQL数据库很有帮助,给大家做个参考吧。01当某一字段的值希望通过其它字值显示出来时(记录转换),可通过下面的语句实现:case Type when 1 then 普通通道 when 2 then 高端通道 end as Type其中…...
怎么创建音乐网站/推广关键词怎么设置
最短路径分析属于ArcGIS的网络分析范畴。而ArcGIS的网络分析分为两类,分别是基于几何网络和网络数据集的网络分析。它们都可以实现最短路径功能。下面先介绍基于几何网络的最短路径分析的实现。以后会陆续介绍基于网络数据集的最短路径分析以及这两种方法的区别。 几…...
萝岗网站建设/腾讯企点app
在海底捞、西贝等餐饮企业涨价之后,妹子们用来续命的奶茶也开始涨价了,近一个月内,喜茶、奈雪、CoCo、一点点部分产品都涨价1-4元。奶茶涨价,别拿原材料涨价当借口事实上,从 2月中旬开始,喜茶旗下的豆豆波波…...
bilibili推广网站/诊断网站seo现状的方法
文章目录一、使用idea构建项目二、项目结构三、编写第一个程序Hello World四、配置项目的热部署五、单元测试一、使用idea构建项目 1、选择 File -> New —> Project… 弹出新建项目的框 2、选择 Spring Initializr,Next 也会出现上述类似的配置界面…...
无锡seo网站管理/扬州网络推广哪家好
Samba原理和配置 个人原创,转载请注明,否则追究法律责任。 一,原理及安装 1,Samba是在Linux和UNIX系统上实现在局域网上共享文件一种通信协议,它为局域网内的不同计算机之间提供文件等资源的共享服务。 2,Samba访问…...