圣辉友联网站建设/seo关键词排名优化
【RabbitMQ入门-单实例安装&5种简单模式实现通讯过程】
- 一、环境说明
- 二、安装RabbitMQ
- 三、用户权限及Virtual Host设置
- 四、5种简单模式实现通讯过程的实现
- 五、小结
一、环境说明
- 安装环境:虚拟机VMWare + Centos7.6 + Maven3.6.3 + JDK1.8
- RabbitMQ版本:rabbitmq-server-3.8.8-1.el7.noarch.rpm
二、安装RabbitMQ
具体安装过程,可参考:CentOS7安装RabbitMQ(rpm包方式)
三、用户权限及Virtual Host设置
-
用户角色创建
RabbitMQ在安装好后,可以访问http://localhost:15672 ;其自带了guest/guest的用户名和密码;如果需要创建自定义用户;那么也可以登录管理界面后,如下操作:
默认情况下,访问RabbitMQ服务的用户名和密码都是"guest",这个账户有限制,默认只能通过本地网络(如localhost)访问,远程网络访问受限,使用默认的用户 guest / guest (此也为管理员用户)登陆,会发现无法登陆,报错:User can only log in via localhost。那是因为默认是限制了guest用户只能在本机登陆,也就是只能登陆localhost:15672。所以在实现生产和消费消息之前,需要另外添加一个用户,并设置相应的访问权限。添加新用户,用户名为"sujiangming",密码为"openGauss@1234",该步骤需要在rabbitmq所在Linux服务器上执行,执行命令如下:
rabbitmqctl add_user sujiangming openGauss@1234
为root用户设置所有权限,且设置用户为管理员角色,执行如下命令:
rabbitmqctl set_permissions -p / root ".*" ".*" ".*" rabbitmqctl set_user_tags sujiangming administrator
重新登陆,正常可以登录,如图:
补充说明:有关角色
超级管理员(administrator)
:可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。监控者(monitoring)
:可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)策略制定者(policymaker)
:可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。普通管理者(management)
:仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。其他角色
:无法登陆管理控制台,通常就是普通的生产者和消费者。
-
Virtual Host设置
像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。RabbitMQ也有类似的权限管理。在RabbitMQ中有
虚拟消息服务器Virtual Host
,每个Virtual Hosts相当于一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange
、queue
、message
不能互通。 相当于mysql的db。Virtual Name一般以/
开头。- 创建Virtual Hosts
2. 设置Virtual Hosts权限,可以给其他用户授权,如root,如下图所示:
权限参数说明
:- user:用户名
- configure :一个正则表达式,用户对符合该正则表达式的所有资源拥有 configure 操作的权限
- write:一个正则表达式,用户对符合该正则表达式的所有资源拥有 write 操作的权限
- read:一个正则表达式,用户对符合该正则表达式的所有资源拥有 read 操作的权限
- 创建Virtual Hosts
四、5种简单模式实现通讯过程的实现
-
在IDEA中创建maven工程,添加依赖到pom.xml中,项目结构如下图所示;
在pom.xml
中添加如下内容:<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><amqp-version>5.6.0</amqp-version><slf4j.version>1.7.25</slf4j.version> </properties> <dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>${amqp-version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency> </dependencies>
创建com.rabbitmq.utils.CommonUtils工具类,代码如下:
import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException;/*** 连接rabbitmq工具类*/ public class ConnectionUtils {public static Connection getConnection() throws IOException, TimeoutException {//1、创建链接工厂对象ConnectionFactory factory = new ConnectionFactory();//2、设置RabbitMQ服务主机地址factory.setHost("192.168.36.132");//3、设置RabbitMQ服务端口factory.setPort(5672);//4、设置虚拟主机名字factory.setVirtualHost("/vhtest");//5、设置用户连接名factory.setUsername("sujiangming");//6、设置链接密码factory.setPassword("openGauss@1234");return factory.newConnection();} }
-
第一种:简单模式
P:生产者,也就是要发送消息的程序
C:消费者:消息的接受者,会一直等待消息到来。
Queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。创建
com.rabbitmq.simplest.SimpleProducer
类,代码如下:import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.utils.ConnectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;/*** //1、创建链接工厂对象-factory=newConnectionFactory()* //2、设置RabbitMQ服务主机地址,默认localhost-factory.setHost("localhost")* //3、设置RabbitMQ服务端口,默认-1-factory.setPort(5672)* //4、设置虚拟主机名字,默认/-factory.setVirtualHost("szitheima")* //5、设置用户连接名,默认guest-factory.setUsername("admin")* //6、设置链接密码,默认guest-factory.setPassword("admin")* //7、创建链接-connection=factory.newConnection()* //8、创建频道-channel=connection.createChannel()* //9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)* //10、创建消息-Stringm=xxx* //11、消息发送-channel.basicPublish(交换机[默认DefaultExchage],路由key[简单模式可以传递队列名称],消息其它属性,消息内容)* //12、关闭资源-channel.close();connection.close()*/ public class SimpleProducer {private static final Logger logger = LoggerFactory.getLogger(SimpleProducer.class);public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();//9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)AMQP.Queue.DeclareOk queueDeclare = channel.queueDeclare("simple_queue",true,false,false,null);System.out.println("=====>>>开始消息发送<<<=====");for (int i = 0; i < 10; i++) {//10、创建消息-Stringm=xxxString message = "我是第"+ i + "消息,我喜欢的数字是:" + i;System.out.println(">>>"+message+"<<<");//11、消息发送-channel.basicPublish(交换机[默认DefaultExchage],路由key[简单模式可以传递队列名称],消息其它属性,消息内容)channel.basicPublish("","simple_queue",null,message.getBytes(StandardCharsets.UTF_8));}//12、关闭资源-channel.close();channel.close();connection.close();//13、打印提升信息System.out.println("=====>>>消息发送结束<<=====");} }
创建
com.rabbitmq.simplest.SimpleConsumer
类,代码如下:import com.rabbitmq.client.*; import com.rabbitmq.utils.ConnectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;/*** 消费者*/ public class SimpleConsumer {private static final Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);public static void main(String[] args) throws IOException, TimeoutException {logger.debug("info");// 获取链接Connection connection = ConnectionUtils.getConnection();//8、创建频道-channel=connection.createChannel()Channel channel = connection.createChannel();//9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)channel.queueDeclare("simple_queue",true,false,false,null);//10、创建消费者Consumer callback = new DefaultConsumer(channel){/*** @param consumerTag 消费者标签,在channel.basicConsume时候可以指定* @param envelope 信封,消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties 属性信息(生产者的发送时指定)* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由的keyString routingKey = envelope.getRoutingKey();//获取交换机信息String exchange = envelope.getExchange();//获取消息IDlong deliveryTag = envelope.getDeliveryTag();//获取消息信息String message = new String(body, StandardCharsets.UTF_8);System.out.println("=====>>>读取到的消息<<<=====");System.out.println("routingKey:" + routingKey +",exchange:" + exchange +",deliveryTag:" + deliveryTag +",message:" + message);}};//11、消息消费,注意,此处不建议关闭资源,让程序一直处于读取消息/*** 消息消费* 参数1:队列名称* 参数2:是否自动应答,true为自动应答[mq接收到回复会删除消息],设置为false则需要手动应答* 参数3:消息接收到后回调*/channel.basicConsume("simple_queue",true,callback);} }
运行测试:
- 先启动消费者SimpleConsumer类,让消费者等待接收消费
- 再启动SimpleProducer类,让生产者发送10条消息,运行结果如下
-
第二种:Work queues工作队列模式
Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
应用场景:对于任务过重或任务较多情况,使用工作队列模式使用多个消费者可以提高任务处理的速度。
Work Queues特点:在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。创建生产者
com.rabbitmq.workqueues.WorkQueuesProducer
,代码如下:import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.utils.ConnectionUtils;import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;public class WorkQueuesProducer {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();System.out.println("=====>>>开始消息发送<<<=====");for (int i = 0; i < 100; i++) {//9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)channel.queueDeclare("work_queue",true,false,false,null);//10、创建消息-Stringm=xxxString message = "我是第"+ i + "WorkQueues消息,我喜欢的数字是:" + i;System.out.println(">>>"+message+"<<<");//11、消息发送-channel.basicPublish(交换机[默认DefaultExchage],路由key[简单模式可以传递队列名称],消息其它属性,消息内容)channel.basicPublish("","work_queue",null,message.getBytes(StandardCharsets.UTF_8));}//12、关闭资源-channel.close();channel.close();connection.close();//13、打印提升信息System.out.println("=====>>>消息发送结束<<=====");} }
创建第一个生产者
com.rabbitmq.workqueues.WorkQueuesConsumer
,代码如下:import com.rabbitmq.client.*; import com.rabbitmq.utils.ConnectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;public class WorkQueuesConsumer {private static final Logger logger = LoggerFactory.getLogger(WorkQueuesConsumer.class);public static final String WORK_QUEUES_NAME="work_queue";public static void main(String[] args) throws IOException, TimeoutException {logger.debug("info");// 获取链接Connection connection = ConnectionUtils.getConnection();//8、创建频道-channel=connection.createChannel()Channel channel = connection.createChannel();//9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)channel.queueDeclare(WORK_QUEUES_NAME,true,false,false,null);//10、创建消费者Consumer callback = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由的keyString routingKey = envelope.getRoutingKey();//获取交换机信息String exchange = envelope.getExchange();//获取消息IDlong deliveryTag = envelope.getDeliveryTag();//获取消息信息String message = new String(body, StandardCharsets.UTF_8);System.out.println("=====>>>读取到的消息<<<=====");System.out.println("routingKey:" + routingKey +",exchange:" + exchange +",deliveryTag:" + deliveryTag +",message:" + message);}};//11、消息消费,注意,此处不建议关闭资源,让程序一直处于读取消息channel.basicConsume(WORK_QUEUES_NAME,true,callback);} }
创建第二个生产者
com.rabbitmq.workqueues.WorkQueuesConsumer2
,代码如下:import com.rabbitmq.client.*; import com.rabbitmq.utils.ConnectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;public class WorkQueuesConsumer2 {private static final Logger logger = LoggerFactory.getLogger(WorkQueuesConsumer2.class);public static final String WORK_QUEUES_NAME="work_queue";public static void main(String[] args) throws IOException, TimeoutException {logger.debug("info");// 获取链接Connection connection = ConnectionUtils.getConnection();//8、创建频道-channel=connection.createChannel()Channel channel = connection.createChannel();//9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)channel.queueDeclare(WORK_QUEUES_NAME,true,false,false,null);//10、创建消费者Consumer callback = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由的keyString routingKey = envelope.getRoutingKey();//获取交换机信息String exchange = envelope.getExchange();//获取消息IDlong deliveryTag = envelope.getDeliveryTag();//获取消息信息String message = new String(body, StandardCharsets.UTF_8);System.out.println("=====>>>读取到的消息<<<=====");System.out.println("routingKey:" + routingKey +",exchange:" + exchange +",deliveryTag:" + deliveryTag +",message:" + message);}};//11、消息消费,注意,此处不建议关闭资源,让程序一直处于读取消息channel.basicConsume(WORK_QUEUES_NAME,true,callback);} }
运行测试
:启动两个消费者,然后再启动生产者发送消息;到IDEA的两个消费者对应的控制台查看是否竞争性的接收到消息。
-
第三种:Publish&Subscribe发布订阅模式
在发布订阅模型中,多了一个x(exchange)角色,而且过程略有变化。
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
C:消费者,消息的接受者,会一直等待消息到来。
Queue:消息队列,接收消息、缓存消息。
Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!创建生产者
com.rabbitmq.publishsubscribe.FanoutProducer
,代码如下:import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.utils.ConnectionUtils;import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;public class FanoutProducer {public static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();//9、声明交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);System.out.println("=====>>>开始消息发送<<<=====");for (int i = 10; i < 100; i++) {//10、创建消息-Stringm=xxxString message = "我是第 【"+ i + "】 fanout_exchange 消息,我喜欢的数字是: " + i;System.out.println(">>>"+message+"<<<");//11、消息发送-channel.basicPublish(交换机[默认DefaultExchage],路由key[简单模式可以传递队列名称],消息其它属性,消息内容)channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));}//12、关闭资源-channel.close();channel.close();connection.close();//13、打印提升信息System.out.println("=====>>>消息发送结束<<=====");} }
创建第一个消费者
com.rabbitmq.publishsubscribe.FanoutConsumer01
,代码如下:import com.rabbitmq.client.*; import com.rabbitmq.utils.ConnectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;public class FanoutConsumer01 {private static final Logger logger = LoggerFactory.getLogger(FanoutConsumer01.class);public static final String QUEUES_NAME="fanout_queue1";public static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws IOException, TimeoutException {logger.debug("info");// 获取链接Connection connection = ConnectionUtils.getConnection();//8、创建频道-channel=connection.createChannel()Channel channel = connection.createChannel();//9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)channel.queueDeclare(QUEUES_NAME,true,false,false,null);//10、绑定队列到交换机: channel.queueBind(队列名, 交换机名, 路由key[广播消息设置为空串])channel.queueBind(QUEUES_NAME,EXCHANGE_NAME,"");//10、创建消费者Consumer callback = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由的keyString routingKey = envelope.getRoutingKey();//获取交换机信息String exchange = envelope.getExchange();//获取消息IDlong deliveryTag = envelope.getDeliveryTag();//获取消息信息String message = new String(body, StandardCharsets.UTF_8);System.out.println("routingKey:" + routingKey +",exchange:" + exchange +",deliveryTag:" + deliveryTag +",message:" + message);}};//11、消息消费,注意,此处不建议关闭资源,让程序一直处于读取消息channel.basicConsume(QUEUES_NAME,true,callback);} }
创建第二个消费者
com.rabbitmq.publishsubscribe.FanoutConsumer02
,代码如下:import com.rabbitmq.client.*; import com.rabbitmq.utils.ConnectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;public class FanoutConsumer02 {private static final Logger logger = LoggerFactory.getLogger(FanoutConsumer02.class);public static final String QUEUES_NAME="fanout_queue2";public static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws IOException, TimeoutException {// 获取链接Connection connection = ConnectionUtils.getConnection();//8、创建频道-channel=connection.createChannel()Channel channel = connection.createChannel();//9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)channel.queueDeclare(QUEUES_NAME,true,false,false,null);//10、绑定队列到交换机: channel.queueBind(队列名, 交换机名, 路由key[广播消息设置为空串])channel.queueBind(QUEUES_NAME,EXCHANGE_NAME,"");//10、创建消费者Consumer callback = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由的keyString routingKey = envelope.getRoutingKey();//获取交换机信息String exchange = envelope.getExchange();//获取消息IDlong deliveryTag = envelope.getDeliveryTag();//获取消息信息String message = new String(body, StandardCharsets.UTF_8);System.out.println("routingKey:" + routingKey +",exchange:" + exchange +",deliveryTag:" + deliveryTag +",message:" + message);}};//11、消息消费,注意,此处不建议关闭资源,让程序一直处于读取消息channel.basicConsume(QUEUES_NAME,true,callback);} }
注意
:
绑定交换机的前提是得先有这个交换机,所以得先执行一次生产者,如果没有这个交换机就执行消费者绑定交换机的话会报错.执行完两个消费者再执行生产者后,就会看到两个消费者都消费这一条消息了。运行测试
:启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;到达广播的效果。
测试结论
:交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
发布订阅模式与work队列模式的区别
1、work队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
2、发布/订阅模式的生产方是面向交换机发送消息,work队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
3、发布/订阅模式的消费者需要设置队列和交换机的绑定,work队列模式不需要设置,实际上work队列模式会将队列绑 定到默认的交换机 。 -
第四种:Routing路由模式
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
路由模式特点
:
1.队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
2.消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
3.Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息创建生产者
com.rabbitmq.routing.RoutingProducer
,代码如下:import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.utils.ConnectionUtils;import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;public class RoutingProducer {public static final String EXCHANGE_NAME = "routing_exchange";public static final String ROUTING_LOG_ERROR = "log.error";public static final String ROUTING_LOG_INFO = "log.info";public static final String ROUTING_LOG_WARNING= "log.warning";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();//9、声明交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);System.out.println("=====>>>开始消息发送<<<=====");for (int i = 100; i < 1000; i++) {String routingKey = "";//发送消息的时候根据相关逻辑指定相应的routing key。switch (i%3){case 0: //假设i=0,为error消息routingKey = ROUTING_LOG_ERROR;break;case 1: //假设i=1,为info消息routingKey = ROUTING_LOG_INFO;break;case 2: //假设i=2,为warning消息routingKey = ROUTING_LOG_WARNING;break;}//10、创建消息-Stringm=xxxString message = "我是第 【"+ i + "】 "+ EXCHANGE_NAME +" 消息,我喜欢的数字是: " + i;System.out.println(">>>"+message+"<<<");//11、消息发送-channel.basicPublish(交换机[默认DefaultExchage],路由key[简单模式可以传递队列名称],消息其它属性,消息内容)channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes(StandardCharsets.UTF_8));}//12、关闭资源-channel.close();channel.close();connection.close();//13、打印提升信息System.out.println("=====>>>消息发送结束<<=====");} }
创建第一个消费者
com.rabbitmq.routing.RoutingConsumer01
,代码如下:import com.rabbitmq.client.*; import com.rabbitmq.utils.ConnectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;public class RoutingConsumer01 {private static final Logger logger = LoggerFactory.getLogger(RoutingConsumer01.class);public static final String QUEUES_NAME="routing_queue1";public static final String EXCHANGE_NAME = RoutingProducer.EXCHANGE_NAME;public static void main(String[] args) throws IOException, TimeoutException {logger.debug("info");// 获取链接Connection connection = ConnectionUtils.getConnection();//8、创建频道-channel=connection.createChannel()Channel channel = connection.createChannel();//9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)channel.queueDeclare(QUEUES_NAME,true,false,false,null);//10、绑定队列到交换机: channel.queueBind(队列名, 交换机名, 路由key[广播消息设置为空串])channel.queueBind(QUEUES_NAME,EXCHANGE_NAME,RoutingProducer.ROUTING_LOG_ERROR);//10、创建消费者Consumer callback = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由的keyString routingKey = envelope.getRoutingKey();//获取交换机信息String exchange = envelope.getExchange();//获取消息IDlong deliveryTag = envelope.getDeliveryTag();//获取消息信息String message = new String(body, StandardCharsets.UTF_8);System.out.println("routingKey: " + routingKey +" ,exchange: " + exchange +" ,deliveryTag: " + deliveryTag +" ,message: " + message);}};//11、消息消费,注意,此处不建议关闭资源,让程序一直处于读取消息channel.basicConsume(QUEUES_NAME,true,callback);} }
创建第二个消费者
com.rabbitmq.routing.RoutingConsumer02
,代码如下:import com.rabbitmq.client.*; import com.rabbitmq.utils.ConnectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;public class RoutingConsumer02 {private static final Logger logger = LoggerFactory.getLogger(RoutingConsumer02.class);public static final String QUEUES_NAME="routing_queue2";public static final String EXCHANGE_NAME = RoutingProducer.EXCHANGE_NAME;public static void main(String[] args) throws IOException, TimeoutException {logger.debug("info");// 获取链接Connection connection = ConnectionUtils.getConnection();//8、创建频道-channel=connection.createChannel()Channel channel = connection.createChannel();//9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)channel.queueDeclare(QUEUES_NAME,true,false,false,null);//10、绑定队列到交换机: channel.queueBind(队列名, 交换机名, 路由key[广播消息设置为空串])channel.queueBind(QUEUES_NAME,EXCHANGE_NAME,RoutingProducer.ROUTING_LOG_ERROR);channel.queueBind(QUEUES_NAME,EXCHANGE_NAME,RoutingProducer.ROUTING_LOG_INFO);channel.queueBind(QUEUES_NAME,EXCHANGE_NAME,RoutingProducer.ROUTING_LOG_WARNING);//10、创建消费者Consumer callback = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由的keyString routingKey = envelope.getRoutingKey();//获取交换机信息String exchange = envelope.getExchange();//获取消息IDlong deliveryTag = envelope.getDeliveryTag();//获取消息信息String message = new String(body, StandardCharsets.UTF_8);System.out.println("routingKey: " + routingKey +" ,exchange: " + exchange +" ,deliveryTag: " + deliveryTag +" ,message: " + message);}};//11、消息消费,注意,此处不建议关闭资源,让程序一直处于读取消息channel.basicConsume(QUEUES_NAME,true,callback);} }
运行测试:
启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key,对应队列的消息;到达按照需要接收的效果。
测试结论
:Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。
-
第五种:Topics主题模式
Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以“ . ”分割,例如: item.insert
通配符规则
:
#:匹配一个或多个词
*:匹配不多不少恰好1个词
创建生产者
com.rabbitmq.topics.TopicProducer
,代码如下:import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.utils.ConnectionUtils;import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;public class TopicProducer {public static final String EXCHANGE_NAME = "topic_exchange";public static final String ROUTING_LOG_ERROR = "log.error";public static final String ROUTING_LOG_INFO = "log.info";public static final String ROUTING_LOG_INFO_ADD = "log.info.add";public static final String ROUTING_LOG_INFO_UPDATE = "log.info.update";public static final String ROUTING_LOG_WARNING= "log.warning";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();//9、声明交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);System.out.println("=====>>>开始消息发送<<<=====");for (int i = 0; i < 100; i++) {String routingKey = "";//发送消息的时候根据相关逻辑指定相应的routing key。switch (i%5){case 0: //假设i=0,为error消息routingKey = ROUTING_LOG_ERROR;break;case 1: //假设i=1,为info消息routingKey = ROUTING_LOG_INFO;break;case 2: //假设i=2,为warning消息routingKey = ROUTING_LOG_WARNING;break;case 3: //假设i=3,为log.info.add消息routingKey = ROUTING_LOG_INFO_ADD;break;case 4: //假设i=4,为log.info.update消息routingKey = ROUTING_LOG_INFO_UPDATE;break;}//10、创建消息-Stringm=xxxString message = "我是第 【"+ i + "】 "+ EXCHANGE_NAME +" 消息,我喜欢的数字是: " + i;System.out.println(">>>"+message+"<<<");//11、消息发送-channel.basicPublish(交换机[默认DefaultExchage],路由key[简单模式可以传递队列名称],消息其它属性,消息内容)channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes(StandardCharsets.UTF_8));}//12、关闭资源-channel.close();channel.close();connection.close();//13、打印提升信息System.out.println("=====>>>消息发送结束<<=====");} }
创建第一个消费者
com.rabbitmq.topics.TopicConsumer01
,代码如下:import com.rabbitmq.client.*; import com.rabbitmq.utils.ConnectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;public class TopicConsumer01 {private static final Logger logger = LoggerFactory.getLogger(TopicConsumer01.class);public static final String QUEUES_NAME="topic_queue1";public static final String EXCHANGE_NAME = TopicProducer.EXCHANGE_NAME;public static void main(String[] args) throws IOException, TimeoutException {logger.debug("info");// 获取链接Connection connection = ConnectionUtils.getConnection();//8、创建频道-channel=connection.createChannel()Channel channel = connection.createChannel();//9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)channel.queueDeclare(QUEUES_NAME,true,false,false,null);//10、绑定队列到交换机: channel.queueBind(队列名, 交换机名, 路由key[广播消息设置为空串]) topic支持通配符方式// log.* 表示匹配log.后面一个,如log.error,log.info等channel.queueBind(QUEUES_NAME,EXCHANGE_NAME,"log.*");//10、创建消费者Consumer callback = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由的keyString routingKey = envelope.getRoutingKey();//获取交换机信息String exchange = envelope.getExchange();//获取消息IDlong deliveryTag = envelope.getDeliveryTag();//获取消息信息String message = new String(body, StandardCharsets.UTF_8);System.out.println("routingKey: " + routingKey +" ,exchange: " + exchange +" ,deliveryTag: " + deliveryTag +" ,message: " + message);}};//11、消息消费,注意,此处不建议关闭资源,让程序一直处于读取消息channel.basicConsume(QUEUES_NAME,true,callback);} }
创建第二个消费者
com.rabbitmq.topics.TopicConsumer01
,代码如下:import com.rabbitmq.client.*; import com.rabbitmq.utils.ConnectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;public class TopicConsumer02 {private static final Logger logger = LoggerFactory.getLogger(TopicConsumer02.class);public static final String QUEUES_NAME="topic_queue2";public static final String EXCHANGE_NAME = TopicProducer.EXCHANGE_NAME;public static void main(String[] args) throws IOException, TimeoutException {logger.debug("info");// 获取链接Connection connection = ConnectionUtils.getConnection();//8、创建频道-channel=connection.createChannel()Channel channel = connection.createChannel();//9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)channel.queueDeclare(QUEUES_NAME,true,false,false,null);//10、绑定队列到交换机: channel.queueBind(队列名, 交换机名, 路由key[广播消息设置为空串]) topic支持通配符方式// log.# 表示匹配log.后面一个或者多个词,如log.info,log.info.add等channel.queueBind(QUEUES_NAME,EXCHANGE_NAME,"log.#");//10、创建消费者Consumer callback = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由的keyString routingKey = envelope.getRoutingKey();//获取交换机信息String exchange = envelope.getExchange();//获取消息IDlong deliveryTag = envelope.getDeliveryTag();//获取消息信息String message = new String(body, StandardCharsets.UTF_8);System.out.println("routingKey: " + routingKey +" ,exchange: " + exchange +" ,deliveryTag: " + deliveryTag +" ,message: " + message);}};//11、消息消费,注意,此处不建议关闭资源,让程序一直处于读取消息channel.basicConsume(QUEUES_NAME,true,callback);} }
运行测试:
启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果。
测试小结:
Topic主题模式可以实现 Publish/Subscribe发布订阅模式 和 Routing路由模式 的双重功能;只是Topic在配置routing key 的时候可以使用通配符,显得更加灵活。
五、小结
- RabbitMQ五种工作模式小结:
1、简单模式 HelloWorld
一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
2、工作队列模式 Work Queue
一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)
3、发布订阅模式 Publish/subscribe
需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列
4、路由模式 Routing
需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
5、通配符模式 Topic
需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列 - 学习资料总结
本文参考了来自网络上的资料,如有侵权,请及时联系博主进行删除。
本文仅是博主本人在学习过程中作为学习笔记使用,常言道:好记性不如烂笔头。如本文对您有所帮助,请您动动发财的手指给博主点个赞
,谢谢您的阅读~~~
相关文章:

【RabbitMQ上手——单实例安装5种简单模式实现通讯过程】
【RabbitMQ入门-单实例安装&5种简单模式实现通讯过程】 一、环境说明二、安装RabbitMQ三、用户权限及Virtual Host设置四、5种简单模式实现通讯过程的实现五、小结 一、环境说明 安装环境:虚拟机VMWare Centos7.6 Maven3.6.3 JDK1.8RabbitMQ版本:…...

python+pytest接口自动化之HTTP协议基础
HTTP协议简介 HTTP 即 HyperText Transfer Protocol(超文本传输协议),是互联网上应用最为广泛的一种网络协议。所有的 WWW 文件都必须遵守这个标准。 设计 HTTP 最初的目的是为了提供一种发布和接收 HTML 页面的方法。HTTP 协议在 OSI 模型…...

【技巧】如何保护PowerPoint不被改动?
PPT,也就是PowerPoint,是很多小伙伴在工作生活中经常用到的图形演示文稿软件。 做好PPT后,担心自己不小心改动了或者不想他人随意更改,我们可以如何保护PPT呢?下面小编就来分享两个常用的方法: 1. 将PPT改…...

【APITable】教程:创建并运行一个自建小程序
1.进入APITable,在想要创建小程序的看板页面点击右上角的【小程序】,进入小程序编辑页面。 2.创建一个新的小程序区。 点击【 添加小程序】 点击创建小程序,选择模板,输入名字。 3.确定后进入小程序部署引导页面。 4.打开Xshell 7…...

使用MyBatis操作数据库
hi,大家好,今天为大家带来MyBatis操作数据库的知识 文章目录 🐷1.根据MyBatis操作数据库🧊1.1查询操作🍇1.1.1无参查询🍇1.1.2有参查询 🧊1.2删除操作🧊1.3修改操作🧊1.4增加操作🧊…...

SSM(Vue3+ElementPlus+Axios+SSM前后端分离)--功能实现[五]
文章目录 SSM--功能实现实现功能09-带条件查询分页显示列表需求分析/图解思路分析代码实现测试分页条件查询带条件分页查询显示效果 实现功能10-添加家居表单前端校验需求分析/图解思路分析代码实现完成测试测试页面效果 实现功能11-添加家居表单后端校验需求分析/图解思路分析…...

Qt应用程序窗体最大化失效问题的解决方法
记录一个在Qt开发过程中遇到的问题: 【问题描述】在showEvent中调用showMaximized(),应用程序窗体仍然无法最大化。 【定位分析】在Qt应用程序中,如果窗体最大化失效,可能是因为在窗体的showEvent事件中使用了showMaximized()方…...

python怎么判断变量的数据类型
在编程的世界里,了解各种数据类型是非常重要的。在Python中,有着丰富的数据类型用于存储和处理不同类型的数据。掌握这些数据类型的定义和作用,我们能够更好地在程序中管理和操作数据,提高代码的效率和可读性。 Python中常见的数据…...

FastAPI 构建 API 高性能的 web 框架(二)
上一篇 FastAPI 构建 API 高性能的 web 框架(一)是把LLM模型使用Fastapi的一些例子,本篇简单来看一下FastAPI的一些细节。 有中文官方文档:fastapi中文文档 假如你想将应用程序部署到生产环境,你可能要执行以下操作&a…...

如何实现 Java SpringBoot 自动验证入参数据的有效性
Java SpringBoot 通过javax.validation.constraints下的注解,实现入参数据自动验证 如果碰到 NotEmpty 否则不生效,注意看下 RequestBody 前面是否加上了Valid Validation常用注解汇总 Constraint详细信息Null被注释的元素必须为 nullNotNull被注释的元…...

golang学习随记
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 go学习快捷键及快速生成代码片段go基础循环流程控制关键字切片,拷贝函数闭包 defer语句格式化输出go语言随机数rand.seed() 包管理并发编程goroutinecha…...

【PCL-6】PCL基于凹凸型的分割算法(LCCP)
凹凸型分割算法适用于颜色类似、棱角分明的物体场景分割。LCCP方法不依赖点云颜色,只使用空间信息和法线信息。 算法流程: 1、基于超体聚类的过分割; 2、在超体聚类的基础上再聚类。 算法思路: 1、基于CC和SC判断凹凸性&…...

多进程并发服务器
文章目录 思路问题多进程并发回环服务器代码客户端代码 思路 每当一个客户端连接服务器后,创建一个子进程负责与该客户端通信,客户端断开连接之后,服务器回收子进程资源。 问题 问题1:父进程阻塞在等待连接(accept())处…...

2021秋招总结
2021 秋招总结 作为星球第一批准备秋招的人,经过这几个月的面试之后,感觉也算是有一些小小的经验了吧,就做一个简单的记录,希望能够为星球中准备秋招的伙伴们提供一些参考吧~ 序 4月初加入星球,到9月底,一…...

Linux6.34 Kubernetes yaml文件详解
文章目录 计算机系统5G云计算第三章 LINUX Kubernetes yaml文件详解一、yaml文件概述1.查看 api 资源版本标签2.写一个yaml文件demo 计算机系统 5G云计算 第三章 LINUX Kubernetes yaml文件详解 一、yaml文件概述 Kubernetes 支持 YAML 和 JSON 格式管理资源对象 JSON 格式…...

防火墙笔记
什么是防火墙 在计算机网络中是指设置在可信任的内部网络和不可信任的外部网络之间的屏障,通过强化边界控制保障内容安全,同时不妨碍内部对外部的访问。 20世纪80年代,最早的防火墙几乎与路由器同时出现,第一代防火墙主要基于包过…...

使用代码下载开源的大模型文件示例以及中文微调llama资源汇总:
一、下载示例 from huggingface_hub import snapshot_downloadrepo_id "THUDM/chatglm2-6b" local_dir ./chatglm2-6b/ cache_dir local_dir "/cache" while True:try:snapshot_download(cache_dircache_dir,local_dirlocal_dir,repo_idrepo_id,loca…...

Wav2vec2 论文阅读看到的一些问题
Wav2vec2 论文阅读看到的一些问题 这里只是简单的思考一下论文的一些问题,不是论文解读。 Q1. 为什么wav2vec依旧需要Transformer来做推理,而不直接使用VQ生成的内容? A1. Transformer在更长的序列上有更好的编码效果,例如论文也写…...

爬虫学习记录(持续更新)
一、问题记录 1.使用webdriver报错AttributeError: str object has no attribute capabilities 解决:目前使用的selenium版本是4.11.2,可以不必设置driver.exe的路径,selenium可以自己处理浏览器和驱动程序,因此,使用…...

libevent源码学习1---创建event
libevent源码学习1—创建event Libevent是一个用于开发可扩展性网络服务器的基于事件驱动(event-driven)模型的非阻塞网络库。安装请参考ubuntu下载安装libevent event_base 使用 libevent 函数之前需要分配一个或者多个 event_base 结构体。每个 event_base 结构体持有一个…...

Python类的设计
Python类的设计 # 定义一个闹钟类 class Clock:__cureen_keyNone # 私有成员不能改变和使用def __init__(self, id, price): # 类对象是立即自动执行self.id idself.price pricedef ring(self):import winsound # 内置声音方法winsound.Beep(2000,3000)clock1 Clock(…...

微信小程序的项目解构
视频链接 黑马程序员前端微信小程序开发教程,微信小程序从基础到发布全流程_企业级商城实战(含uni-app项目多端部署)_哔哩哔哩_bilibili 接口文档 https://www.escook.cn/docs-uni-shop/mds/1.start.html 1:微信小程序宿主环境 1:常见的宿…...

【Archaius技术专题】「Netflix原生态」动态化配置服务之微服务配置组件变色龙
前提介绍 如果要设计开发一套微服务基础架构,参数化配置是一个非常重要的点,而Netflix也开源了一个叫变色龙Archaius的配置中心客户端,而且Archaius可以说是比其他客户端具备更多生产级特性,也更灵活。*在NetflixOSS微服务技术栈…...

python条件分支和循环语句
python中没有{}的写法,一般时通过缩进的方式来确定分支和循环需要执行的代码块。 if 需要判断的条件表达式:条件成立时的动作 elif 需要判断的条件表达式:条件成立时的动作 else:动作for 变量 in 迭代对象:动作 示例: while 退出条件:动作...

工具推荐:Wireshark网络协议分析工具(对比tcpdump)
文章首发地址 Wireshark是一款开源的网络协议分析工具,可以捕获网络数据包并对其进行详细的分析和解释。下面是Wireshark的详细介绍: Wireshark 工作原理 Wireshark通过捕获网络接口上的数据包,将其转换为可读的格式,并在界面…...

[OnWork.Tools]系列 04-快捷启动
简介 主要功能是将常用的软件拖动到软件中,实现快速点击启动,结合软件设置中的设置的快捷键,可以快速呼出对应的面板,使用快捷键快速启动应用 拖拽内容 拖拽快捷方式到面板,双击快速打开 拖拽文件方式到面板,双击快速打开 拖拽文件夹到面板双击快速打开 拖拽项目调整顺序 右…...

如何将项目挂后台运行?【nohup和tmux】
挂后台运行,防止霸屏。 线上的程序不会将日志输出到控制台,而是输出到日志文件,方便运维查阅信息。 一.nohup--挂后台运行的命令 //nohup--英文全称no hang up,可以后台运行指定命令 //hello.log是指将日志输出到hello.log文件 …...

什么是进程、线程、协程
什么是进程? 我们都知道计算机的核心是CPU,它承担了所有的计算任务;而操作系统是计算机的管理者,它负责任务的调度、资源的分配和管理,统领整个计算机硬件;应用程序则是具有某种功能的程序,程序…...

Python爬虫——selenium_访问元素信息
from selenium import webdriver# 创建浏览器对象 path files/chromedriver.exe browser webdriver.Chrome(path)# 访问地址 url https://www.baidu.com browser.get(url)input browser.find_element_by_id(su)获取元素属性 .get_attribute(class)print(input.get_attribu…...

Linux 文件基本属性
Linux 文件基本属性 Linux 系统是一种典型的多用户系统,不同的用户处于不同的地位,拥有不同的权限。 为了保护系统的安全性,Linux 系统对不同的用户访问同一文件(包括目录文件)的权限做了不同的规定。 在 Linux 中我…...