当前位置: 首页 > news >正文

2.7日学习打卡----初学RabbitMQ(二)

2.7日学习打卡

在这里插入图片描述
JMS
由于MQ产品很多,操作方式各有不同,于是JAVA提供了一套规则
——JMS,用于操作消息中间件。JMS即Java消息服务
(JavaMessage Service)应用程序接口,是一个Java平台中关于面
向消息中间件的API。JMS是JavaEE规范中的一种,类比JDBC。很多
MQ产品都实现了JMS规范,例如ActiveMQ。RabbitMQ官方并没
有实现JMS规范,但是开源社区有JMS的实现包。

创建项目

# 开启管控台插件
rabbitmq-plugins enable
rabbitmq_management
# 启动rabbitmq
rabbitmq-server -detached

创建普通maven项目,添加RabbitMQ依赖:

<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqpclient</artifactId><version>5.14.0</version></dependency>
</dependencies>

一. RabbitMQ 简单模式在这里插入图片描述

P:生产者,也就是要发送消息的程序

C:消费者:消息的接收者,会一直等待消息到来

queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息

特点:

  1. 一个生产者对应一个消费者,通过队列进行消息传递。
  2. 该模式使用direct交换机,direct交换机是RabbitMQ默认交换机

生产者代码实现

步骤:

  1. 创建连接工厂ConnectionFactory
  2. 设置工厂的参数
  3. 创建连接 Connection
  4. 创建管道 Channel
  5. 简单模式中没有交换机exchange,所以不用创建(RabbitMQ会使用默认的交换机!)
  6. 创建队列 queue
  7. 设置发送内容,使用channal.basicPublish()发送
  8. 释放资源

代码实现

package com.jjy.mq.simple;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;//生产者
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();//使用自己的服务器ip地址connectionFactory.setHost("192.168.66.100");//rabbitmq的默认端口5672connectionFactory.setPort(5672);//用户名connectionFactory.setUsername("jjy");//密码connectionFactory.setPassword("jjy");//虚拟机connectionFactory.setVirtualHost("/");//2.创建连接Connection connection = connectionFactory.newConnection();//3.建立信道Channel channel = connection.createChannel();//4.创建队列,如果队列已存在,则使用该队列/**//     * 参数1:队列名//     * 参数2:是否持久化,true表示MQ重启后队列还在。//     * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问//     * 参数4:是否自动删除,true表示不再使用队列时自动删除队列//     * 参数5:其他额外参数//     */channel.queueDeclare("simple_queue",false,false,false,null);//5.发送消息String mesg="hello rabbitmq";/*** 参数1:交换机名,""表示默认交换机* 参数2:路由键,简单模式就是队列名* 参数3:其他额外参数* 参数4:要传递的消息字节数组*/channel.basicPublish("","simple_queue",null,mesg.getBytes());//6.关闭资源(信道和连接)channel.close();connection.close();System.out.println("发送成功");}
}

消费者代码实现

在这里插入图片描述

步骤:

1.创建连接工厂ConnectionFactory
2.设置工厂参数
3.创建连接
4.创建信道
前四步代码基本是一致的,需要注意的是生产者与消费者的Channel是不同Connection中的!不是同一个对象.
5. 最简单的模型没有交换机exchange,所以此处RabbitMQ会使用默认的交换机
6. 接收消息,有一个回调方法 channel.basicConsume()

代码实现

package com.jjy.mq.simple;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Customer {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");//2.创建连接Connection connection = connectionFactory.newConnection();//3.建立信道Channel channel = connection.createChannel();//4.监听队列/*** 参数1:监听的队列名* 参数2:是否自动签收,如果设置为false,则需要手动确认消息已收到,否则MQ会一直发送消息* 参数3:Consumer的实现类,重写该类方法表示接受到消息后如何消费*/channel.basicConsume("simple_queue",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body,"UTF-8");System.out.println("接受消息,消息为:"+message);}});//}
}

二. RabbitMQ 工作队列模式

在这里插入图片描述
与简单模式相比,工作队列模式(Work Queue)多了一些消费者,该
模式也使用direct交换机,应用于处理消息较多的情况。特点如
下:

  1. 一个队列对应多个消费者。
  2. 一条消息只会被一个消费者消费。
  3. 消息队列默认采用轮询的方式将消息平均发送给消费者

应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度

生产者代码实现

代码实现

package com.jjy.mq.work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 2.创建连接Connection connection = connectionFactory.newConnection();// 3.建立信道Channel channel = connection.createChannel();// 4.创建队列,持久化队列channel.queueDeclare("work_queue",true,false,false,null);// 5.发送大量消息,参数3表示该消息为持久化消息,即除了保存到内存还会保存到磁盘中for(int i=0;i<100;i++){channel.basicPublish("","work_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, ("你好,这是今天的第"+i+"条消息").getBytes());}// 6.关闭资源channel.close();connection.close();}
}

消费者代码实现

在这里插入图片描述

消费者1:

package com.jjy.mq.work;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Customer1 {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");//2.创建连接Connection connection = connectionFactory.newConnection();//3.建立信道Channel channel = connection.createChannel();// 4.监听队列,处理消息channel.basicConsume("work_queue",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("消费者1消费消息,消息为:" + message);}});}
}

消费者2

package com.jjy.mq.work;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Customer2 {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");//2.创建连接Connection connection = connectionFactory.newConnection();//3.建立信道Channel channel = connection.createChannel();// 4.监听队列,处理消息channel.basicConsume("work_queue",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("消费者2消费消息,消息为:" + message);}});}}

消费者3

package com.jjy.mq.work;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Customer3 {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");//2.创建连接Connection connection = connectionFactory.newConnection();//3.建立信道Channel channel = connection.createChannel();// 4.监听队列,处理消息channel.basicConsume("work_queue",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("消费者3消费消息,消息为:" + message);}});}}

三. RabbitMQ 发布订阅模式

在这里插入图片描述
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机

C:消费者,消息的接收者,会一直等待消息到来

Queue:消息队列,接收消息、缓存消息

在开发过程中,有一些消息需要不同消费者进行不同的处理,如电
商网站的同一条促销信息需要短信发送、邮件发送、站内信发送
等。此时可以使用发布订阅模式(Publish/Subscribe)
特点:

  1. 生产者将消息发送给交换机,交换机将消息转发到绑定此交换机的每个队列中。
  2. 工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换机能将消息发送给多个队列。发布订阅模式使用fanout交换机。

Exchange:交换机(X)一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、 递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic(常用):通配符,把消息交给符合routing pattern(路由模式)的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失
在这里插入图片描述

生产者代码实现

与之前的步骤相比,多了创建交换机和绑定交换机与队列的操作

代码实现

package com.jjy.mq.publish;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class produce {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");//2.创建连接Connection connection = connectionFactory.newConnection();//3.建立信道Channel channel = connection.createChannel();//4.创建交换机/*exchangeDeclare(String exchange,                  -- 交换机的名称String type,                      -- 交换机的类型,4种枚举(direct,fanout,topic,headers)boolean durable,                  -- 持久化boolean autoDelete,               -- 自动删除boolean internal,                 -- 内部使用,基本是falseMap<String, Object> arguments)    -- 参数*/*** 参数1:交换机名* 参数2:交换机类型* 参数3:交换机持久化*/channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT,true);//5.创建队列//短信队列channel.queueDeclare("SEND_MAIL",true,false,false,null);//消息队列channel.queueDeclare("SEND_MESSAGE",true,false,false,null);//站内信息channel.queueDeclare("SEND_STATION",true,false,false,null);//6.交换机绑定队列/*** 参数1:队列名* 参数2:交换机名* 参数3:路由关键字,发布订阅模式写""即可*/channel.queueBind("SEND_MAIL","exchange_fanout","");channel.queueBind("SEND_MESSAGE","exchange_fanout","");channel.queueBind("SEND_STATION","exchange_fanout","");//7.发送消息for (int i = 1; i <= 10 ; i++) {channel.basicPublish("exchange_fanout","",null,("你好,尊敬的用户,秒杀商品开抢了!"+i).getBytes(StandardCharsets.UTF_8));}//8.关闭资源channel.close();connection.close();}
}

消费者代码实现

接下来编写三个消费者,分别监听各自的队列。
//站内信消费者

package com.jjy.mq.publish;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 站内信消费者
public class CustomerStation {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_STATION", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送站内信:"+message);}});}
}

邮件消费者

 
package com.jjy.mq.publish;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class CustomerMail {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_MAIL", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送邮件:"+message);}});}
}

短信消费者

package com.jjy.mq.publish;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class CustomerMessage {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_MESSAGE", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送短信:"+message);}});}
}

也可以使用工作队列+发布订阅模式同时使用,两个消费者同时监听
一个队列:


// 短信消费者2
public class CustomerMessage2 {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.0.162");connectionFactory.setPort(5672);connectionFactory.setUsername("itbaizhan");connectionFactory.setPassword("itbaizhan");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_MESSAGE", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送短信2:"+message);}});}
}

两个不一样的系统,对同一条消息做不一样的处理

发布订阅模式与工作队列模式的区别
(1)工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机

(2)发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用 默认交换机)

(3)发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机

四. RabbitMQ 路由模式

在这里插入图片描述
使用发布订阅模式时,所有消息都会发送到绑定的队列中,但很多
时候,不是所有消息都无差别的发布到所有队列中。比如电商网站
的促销活动,双十一大促可能会发布到所有队列;而一些小的促销
活动为了节约成本,只发布到站内信队列。此时需要使用路由模式
(Routing)完成这一需求。
特点:

  1. 每个队列绑定路由关键字RoutingKey
  2. 生产者将带有RoutingKey的消息发送给交换机,交换机根据RoutingKey转发到指定队列。路由模
    式使用direct交换机。

队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)

消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey

Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息
在这里插入图片描述

生产者代码实现

package com.jjy.mq.routing;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class produce {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");//2.创建连接Connection connection = connectionFactory.newConnection();//3.建立信道Channel channel = connection.createChannel();//4.创建交换机/*** 参数1:交换机名* 参数2:交换机类型* 参数3:交换机持久化*/channel.exchangeDeclare("exchange_routing", BuiltinExchangeType.DIRECT,true);// 5.创建队列channel.queueDeclare("SEND_MAIL2",true,false,false,null);channel.queueDeclare("SEND_MESSAGE2",true,false,false,null);channel.queueDeclare("SEND_STATION2",true,false,false,null);//6.交换机绑定队列/*** 参数1:队列名* 参数2:交换机名* 参数3:路由关键字,发布订阅模式写""即可*/channel.queueBind("SEND_MAIL2","exchange_routing","import");channel.queueBind("SEND_MESSAGE2","exchange_routing","import");channel.queueBind("SEND_STATION2","exchange_routing","import");channel.queueBind("SEND_STATION2","exchange_routing","normal");//7.发送消息channel.basicPublish("exchange_routing","import",null,"双十一大促活动".getBytes());channel.basicPublish("exchange_routing","normal",null,"小型促销活动".getBytes());//8.关闭资源channel.close();connection.close();}
}

消费者代码实现

站内信消费者

package com.jjy.mq.routing;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 站内信消费者
public class CustomerStation {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_STATION2", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送站内信:"+message);}});}
}

短信消费者

package com.jjy.mq.routing;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class CustomerMessage {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_MESSAGE2", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送短信:"+message);}});}
}

邮件消费者

package com.jjy.mq.routing;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class CustomerMail {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_MAIL2", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送邮件:"+message);}});}
}

总的来说就一句话:

Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。

五. RabbitMQ 通配符模式

在这里插入图片描述
通配符模式(Topic)是在路由模式的基础上,给队列绑定带通配符的
路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消
息转发到该队列。通配符模式比路由模式更灵活,使用topic交换
机.
通配符规则

  1. 消息设置RoutingKey时,RoutingKey由多个单词构成,中间以 . 分割。
  2. 队列设置RoutingKey时, # 可以匹配任意多个单词, * 可以匹配任意一个单词。

生产者代码实现

在这里插入图片描述
代码实现

package com.jjy.mq.topic;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class produce {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");//2.创建连接Connection connection = connectionFactory.newConnection();//3.建立信道Channel channel = connection.createChannel();//4.创建交换机/*** 参数1:交换机名* 参数2:交换机类型* 参数3:交换机持久化*/channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC,true);// 5.创建队列channel.queueDeclare("SEND_MAIL3",true,false,false,null);channel.queueDeclare("SEND_MESSAGE3",true,false,false,null);channel.queueDeclare("SEND_STATION3",true,false,false,null);//6.交换机绑定队列channel.queueBind("SEND_MAIL3","exchange_topic","#.mail.#");channel.queueBind("SEND_MESSAGE3","exchange_topic","#.message.#");channel.queueBind("SEND_STATION3","exchange_topic","#.station.#");//7.发送消息channel.basicPublish("exchange_topic","mail.message.station",null,"双十一大促活动".getBytes());channel.basicPublish("exchange_topic","station",null,"小型促销活动".getBytes());//8.关闭资源channel.close();connection.close();}
}

消费者代码实现

站内信消费者

package com.jjy.mq.topic;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 站内信消费者
public class CustomerStation {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_STATION3", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送站内信:"+message);}});}
}

短信消费者

package com.jjy.mq.topic;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class CustomerMessage {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_MESSAGE3", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送短信:"+message);}});}
}

邮件消费者

package com.jjy.mq.topic;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class CustomerMail {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_MAIL3", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送邮件:"+message);}});}
}

总述:topics模式比routing模式要更加灵活,笼统的说就是routing模式加上通配符

如果我的内容对你有帮助,请点赞,评论,收藏。创作不易,大家的支持就是我坚持下去的动力!
在这里插入图片描述

相关文章:

2.7日学习打卡----初学RabbitMQ(二)

2.7日学习打卡 JMS 由于MQ产品很多&#xff0c;操作方式各有不同&#xff0c;于是JAVA提供了一套规则 ——JMS&#xff0c;用于操作消息中间件。JMS即Java消息服务 &#xff08;JavaMessage Service&#xff09;应用程序接口&#xff0c;是一个Java平台中关于面 向消息中间件的…...

【工作学习 day04】 9. uniapp 页面和组件的生命周期

问题描述 uniapp常用的有&#xff1a;页面和组件&#xff0c;并且页面和组件各自有各自的生命周期函数&#xff0c;那么在页面/组件请求数据时&#xff0c;是用created呢&#xff0c;还是用onLoad呢&#xff1f; 先说结论: 组件使用组件的生命周期&#xff0c;页面使用页面的…...

Mysql-数据库优化-客户端连接参数

客户端参数 原文地址 # 连接池配置 # 初始化连接数 spring.datasource.druid.initial-size1 # 最小空闲连接数&#xff0c;一般设置和initial-size一致 spring.datasource.druid.min-idle1 # 最大活动连接数&#xff0c;一个数据库能够支撑最大的连接数是多少呢&#xff1f; …...

【十二】【C++】vector用法的探究

vector类创建对象 /*vector类创建对象*/ #if 1 #define _CRT_SECURE_NO_WARNINGS#include <iostream> using namespace std; #include <vector> #include <algorithm> #include <crtdbg.h>class Date {public:Date(int year 1900, int month 1, int …...

Docker 基本介绍

Docker 基本介绍 镜像 Docker镜像就是一个只读的模板。 例如&#xff1a;一个镜像可以包含一个完整的ubuntu操作系统环境&#xff0c;里面仅安装了Apache或用户需要的其它应用 程序。 镜像可以用来创建Docker容器。Docker提供了一个很简单的机制来创建镜像或者更新现有的镜…...

CentOS 7 安装 install abiword

安装 1.下载noarch安装包 wget http://repo.iotti.biz/CentOS/7/noarch/lux-release-7-1.noarch.rpm 2.安装noarch rpm -Uvh lux-release-7-1.noarch.rpm 3.安装abiword yum -y install abiword...

开源的直播平台

​​​​​​直播平台系统界面介绍 开源一套直播平台 私信可获取源码...

ChatGPT 变懒最新解释!或和系统Prompt太长有关

大家好我是二狗。 ChatGPT变懒这件事又有了最新解释了。 这两天&#xff0c;推特用户Dylan Patel发文表示&#xff1a; 你想知道为什么 ChatGPT 和 6 个月前相比会如此糟糕吗&#xff1f; 那是因为ChatGPT系统Prompt是竟然包含1700 tokens&#xff0c;看看这个prompt里面有多…...

书生·浦语大模型第三课作业

基础作业&#xff1a; 复现课程知识库助手搭建过程 (截图) 进阶作业&#xff1a; 选择一个垂直领域&#xff0c;收集该领域的专业资料构建专业知识库&#xff0c;并搭建专业问答助手&#xff0c;并在 OpenXLab 上成功部署&#xff08;截图&#xff0c;并提供应用地址&#x…...

【Redis笔记】分布式锁及4种常见实现方法

线程锁 主要用来给方法、代码块加锁。当某个方法或代码使用锁&#xff0c;在同一时刻仅有一个线程执行该方法或该代码段。线程锁只在同一JVM中有效果&#xff0c;因为线程锁的实现在根本上是依靠线程之间共享内存实现的&#xff0c;比如Synchronized、Lock等。 进程锁 控制同…...

SpringMVC第一天

一、SpringMVC简介 1 SpringMVC概述 1.1 SpringMVC概述 SpringMVC是一种基于Java实现MVC模型的轻量级Web框架 优点 使用简单&#xff0c;开发便捷&#xff08;相比于Servlet&#xff09; 灵活性强 2 入门案例【重点】 问题导入 在Controller中如何定义访问路径&#xff…...

如何利用腾讯工蜂提升广告推广和用户运营效率

无代码开发&#xff1a;腾讯工蜂的连接优势 在广告推广和用户运营中&#xff0c;腾讯工蜂的无代码开发优势让广告系统和用户运营系统能够轻松地实现无需API开发的集成。这使得没有专业编程技能的工作人员也能通过腾讯工蜂的用户友好界面&#xff0c;实现系统的快速连接和集成&…...

【QT+QGIS跨平台编译】之三十二:【MiniZip+Qt跨平台编译】(一套代码、一套框架,跨平台编译)

文章目录 一、MiniZip介绍二、文件下载三、文件分析四、pro文件五、编译实践一、MiniZip介绍 MiniZip是一个轻量级的开源库,用于创建、读取和操作ZIP文件格式的压缩文件。它提供了一组简单而灵活的API,可以方便地在应用程序中进行ZIP文件的压缩和解压操作。 MiniZip的主要特…...

OLAP技术的发展及趋势简述

这里写自定义目录标题 历史发展基于电子表格的数据分析基于传统数据库的数据分析基于大数据的数据分析 当下的现状OLAP技术的分类MOLAPROLAPHOLAP 主流的OLAP引擎新技术的普及内存向量计算列式数据存储及交换增量查询多源融合计算下推物化视图 发展趋势智能化分析多源融合和自动…...

stupid_brain

前言&#xff1a; 本文用于记录本人AI新手期间犯的各种错误&#xff0c;时常更新。 正文开始&#xff1a; 读取数据的num_worker设置过少&#xff0c;以至于训练速度卡在读取数据上。训练集数据处理&#xff1a;数据增强有利于解决过拟合问题。模型&#xff1a;relu少写、batc…...

C# BackgroundWorker的使用

C# 中的 BackgroundWorker 类是 .NET Framework 提供的一个组件&#xff0c;用于在后台线程上异步执行长时间运行的操作&#xff0c;同时保持与用户界面&#xff08;UI&#xff09;的交互&#xff0c;如更新进度信息或处理取消请求。这使得可以轻松地在不冻结UI的情况下执行耗时…...

(简单有案例)前端实现主题切换、动态换肤的两种简单方式

目录 背景 &#xff08;强烈推荐&#xff09;实现方法1&#xff1a;CSS中原生变量“--”和var() 步骤1&#xff1a;定义css变量 步骤2&#xff1a;使用CSS变量 步骤3&#xff1a;切换主题 实现完整案例 实现方法2&#xff1a;link 动态引入 背景 我们需要做一个功能&#…...

wyh的迷宫

涉及知识点&#xff1a;求迷宫能否到达终点的&#xff0c;而不是求路径数的&#xff0c;用bfs时可以不用重置状态数组&#xff08;回溯&#xff09;。 题目描述 给你一个n*m的迷宫&#xff0c;这个迷宫中有以下几个标识&#xff1a; s代表起点 t代表终点 x代表障碍物 .代…...

AWS云用户创建

问题 需要给工友创建AWS云的用户&#xff0c;这里假设使用分配给自己AWS开发者IAM账号&#xff0c;给别人创建aws IAM账号。 登录系统 打开页面&#xff1a;https://xxx.signin.aws.amazon.com/console&#xff0c;使用分配的开发者账号登录。如下图&#xff1a; 创建用户…...

微信小程序(三十七)选项点击高亮效果

注释很详细&#xff0c;直接上代码 上一篇 新增内容&#xff1a; 1.选择性渲染类 2.以数字为需渲染内容&#xff08;数量&#xff09; 源码&#xff1a; index.wxml <view class"Area"><!-- {{activeNumindex?Active:}}是选择性添加类名进行渲染 -->&l…...

通过Demo学WPF—数据绑定(二)

准备 今天学习的Demo是Data Binding中的Linq&#xff1a; 创建一个空白解决方案&#xff0c;然后添加现有项目&#xff0c;选择Linq&#xff0c;解决方案如下所示&#xff1a; 查看这个Demo的效果&#xff1a; 开始学习这个Demo xaml部分 查看MainWindow.xaml&#xff1a; …...

数据湖的整体思路

湖本质上是一个集中化&#xff0c;中心化的&#xff0c;一体化的存储技术&#xff0c;并且在其之上追求技术架构的统一化&#xff0c;如流批一体&#xff0c;服务分析一体化。 当数据湖成为中心&#xff0c;那么就可以围湖而建“数据服务环”&#xff0c;环上的服务包括了数仓、…...

51单片机 跑马灯

#include <reg52.h>//毫秒级延时函数 void delay(int z) {int x,y;for(x z; x > 0; x--)for(y 114; y > 0 ; y--); }sbit LED1 P1^0x0; sbit LED2 P1^0x1; sbit LED3 P1^0x2; sbit LED4 P1^0x3; sbit LED5 P1^0x4; sbit LED6 P1^0x5; sbit LED7 P1^0x6; s…...

迎新年年终总结

迎新年年终总结 1、除夕迎新年登高有感 1、除夕迎新年登高有感 除旧岁&#xff0c;迎新年。凭栏立&#xff0c;意阑珊。 天空阔&#xff0c;世道艰。唯自强&#xff0c;可彼岸。 于2024年2月9日 10:51。...

一台服务器可以支持多少TCP连接

前言 ​ 在linux系统中一切皆文件&#xff0c;每当有一个tcp连接建立&#xff0c;那么就会打开一个文件描述符。在Linux系统中&#xff0c;文件描述符打开的个数是有限制的&#xff0c;当超过这个限制的时候内核就会跑出too many open files异常。 ​ linux上能打开的最大文件…...

svg基础(六)滤镜-图像,光照效果(漫反射,镜面反射),组合

1 feImage&#xff1a;图像滤镜 feImage 滤镜从外部来源取得图像数据&#xff0c;并提供像素数据作为输出&#xff08;意味着如果外部来源是一个 SVG 图像&#xff0c;这个图像将被栅格化。&#xff09; 1.1 用法: <feImage x"" y"" width"&quo…...

电脑数据误删如何恢复?9 个Windows 数据恢复方案

无论您是由于软件或硬件故障、网络犯罪还是意外删除而丢失数据&#xff0c;数据丢失都会带来压力和令人不快。 如今的企业通常将其重要数据存储在云或硬盘上。但在执行其中任何一项操作之前&#xff0c;您很有可能会丢失数据。 数据丢失的主要原因是意外删除&#xff0c;任何…...

【doghead】uv_loop_t的创建及线程执行

worker测试程序,类似mediasoup对uv的使用,是one loop per thread 。创建一个UVLoop 就可以创建一个uv_loop_t Transport 创建一个: 试验配置创建一个: UvLoop 封装了libuv的uv_loop_t ,作为共享指针提供 对uv_loop_t 创建并初始化...

云计算运营模式介绍

目录 一、云计算运营模式概述 1.1 概述 二、云计算服务角色 2.1 角色划分 2.1.1 云服务提供商 2.1.2 云服务消费者 2.1.3 云服务代理商 2.1.4 云计算审计员 2.1.5 云服务承运商 三、云计算责任模型 3.1 云计算服务模式与责任关系图 3.2 云计算服务模式与责任关系解析…...

物资捐赠管理系统

文章目录 物资捐赠管理系统一、项目演示二、项目介绍三、系统部分功能截图四、部分代码展示五、底部获取项目&#xff08;9.9&#xffe5;带走&#xff09; 物资捐赠管理系统 一、项目演示 爱心捐赠系统 二、项目介绍 基于springboot的爱心捐赠管理系统 开发语言&#xff1a…...

YOLOv8改进 | 检测头篇 | 独创RFAHead检测头超分辨率重构检测头(适用Pose、分割、目标检测)

一、本文介绍 本文给大家带来的改进机制是RFAHead,该检测头为我独家全网首发,本文主要利用将空间注意力机制与卷积操作相结合的卷积RFAConv来优化检测头,其核心在于优化卷积核的工作方式,特别是在处理感受野内的空间特征时。RFAConv主要的优点就是增加模型的特征提取能力,…...

私有化部署一个吃豆人小游戏

目录 效果 安装步骤 1.安装并启动httpd 2.下载代码 3.启动httpd 使用 效果 安装步骤 1.安装并启动httpd yum -y install httpd 2.下载代码 进入目录 cd /var/www/html/ 下载 git clone https://gitee.com/WangZhe168_admin/pacman-canvas.git 3.启动httpd syste…...

社区店经营管理新思路:提升业绩的秘诀

作为一名资深的鲜奶吧创业者&#xff0c;我深知在社区经营一家店铺所面临的挑战与机遇。经过5年的探索与实践&#xff0c;我总结出了一套提升社区店业绩的秘诀&#xff0c;今天就和大家分享一下。 一、明确目标客户群体&#xff0c;精准定位 在社区开店&#xff0c;首先要明确…...

统一数据格式返回,统一异常处理

目录 1.统一数据格式返回 2.统一异常处理 3.接口返回String类型问题 1.统一数据格式返回 添加ControllerAdvice注解实现ResponseBodyAdvice接口重写supports方法&#xff0c;beforeBodyWrite方法 /*** 统一数据格式返回的保底类 对于一些非对象的数据的再统一 即非对象的封…...

arm 平台安装snort3

本文来自原创,转载请说明来源。谢谢配合。 选择初衷 最近在学习渗透相关课程,回想起曾经拥有自己的域名和服务器的经历。不幸的是,服务器被注入了木马文件,起初并没有察觉。直到我加入了定时任务,才发现了这个问题。当时我下定决心要打造一个安全的网站,以保护自己的网…...

【Ubuntu 20.04/22.04 LTS】最新 esp-matter SDK 软件编译环境搭建步骤

仓库链接&#xff1a;esp-matter SDK官方软件说明&#xff1a;ESP Matter Programming Guide官方参考文档&#xff1a;使用 Matter-SDK 快速搭建 Matter 环境 (Linux) 环境要求 Ubuntu 20.04 或 Ubuntu22.04网络环境支持访问 Gihub 在安装 esp-matter SDK 软件编译环境之前&a…...

【C语言】案例:输出n位水仙花数

1.题目 输入一个整数n&#xff0c;输出所有n位的水仙花数 2.代码 #include <stdio.h> #include <math.h>// 计算数字的位数 int countDigits(int num) {int count 0;while (num ! 0) {num / 10;count;}return count; }// 计算水仙花数 void findNarcissisticNu…...

代码随想录算法训练营第四十六天(动态规划篇)|01背包(滚动数组方法)

01背包&#xff08;滚动数组方法&#xff09; 学习资料&#xff1a;代码随想录 (programmercarl.com) 题目链接&#xff08;和上次一样&#xff09;&#xff1a;题目页面 (kamacoder.com) 思路 使用一维滚动数组代替二维数组。二维数组的解法记录在&#xff1a;代码随想录算…...

【QT+QGIS跨平台编译】之三十:【NetCDF+Qt跨平台编译】(一套代码、一套框架,跨平台编译)

文章目录 一、NetCDF介绍二、文件下载三、文件分析四、pro文件4.1 netcdf34.2 netcdf44.3 netcdf五、编译实践一、NetCDF介绍 NetCDF(Network Common Data Form)是一种用于存储和处理科学数据的文件格式和库。它提供了一种自描述、可移植和可扩展的方式来组织多维数据,并支…...

从0开始图形学(光栅化)

前言 说起图形学&#xff0c;很多人就会提到OpenGL&#xff0c;但其实两者并不是同一个东西。引入了OpenGL加重了学习的难度和成本&#xff0c;使得一些原理并不直观。可能你知道向量&#xff0c;矩阵&#xff0c;纹理&#xff0c;重心坐标等概念&#xff0c;但就是不知道这些概…...

B站弹幕分析系统

视频展示&#xff0c;请点击。 尚硅谷案例 utllib的基本使用 # 使用urllib来获取百度首页的源码 import urllib.request# (1)定义一个url 就是你要访问的地址 url http://www.baidu.com# (2)模拟浏览器先服务器发送请求 response响应 response urllib.request.urlopen(url)…...

戴上HUAWEI WATCH GT 4,解锁龙年新玩法

春节将至&#xff0c;华为WATCH GT 4作为一款颜值和实力并存的手表&#xff0c;能为节日增添了不少趣味和便利。无论你是钟情于龙年表盘或定制属于自己的表盘&#xff0c;还是过年用来抢红包或远程操控手机拍全家福等等&#xff0c;它都能成为你的“玩伴”。接下来&#xff0c;…...

鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之StepperItem组件

鸿蒙&#xff08;HarmonyOS&#xff09;项目方舟框架&#xff08;ArkUI&#xff09;之StepperItem组件 一、操作环境 操作系统: Windows 10 专业版、IDE:DevEco Studio 3.1、SDK:HarmonyOS 3.1 二、StepperItem组件 用作Stepper组件的页面子组件。 子组件 无。 接口 St…...

2024-02-08 Unity 编辑器开发之编辑器拓展1 —— 自定义菜单栏与窗口

文章目录 1 特殊文件夹 Editor2 在 Unity 菜单栏中添加自定义页签3 在 Hierarchy 窗口中添加自定义页签4 在 Project 窗口中添加自定义页签5 在菜单栏的 Component 菜单添加脚本6 在 Inspector 为脚本右键添加菜单7 加入快捷键8 小结 1 特殊文件夹 Editor ​ Editor 文件夹是 …...

Intellij IDEA各种调试+开发中常见bug

Intellij IDEA中使用好Debug&#xff0c;主要包括如下内容&#xff1a; 一、Debug开篇 ①、以Debug模式启动服务&#xff0c;左边的一个按钮则是以Run模式启动。在开发中&#xff0c;我一般会直接启动Debug模式&#xff0c;方便随时调试代码。 ②、断点&#xff1a;在左边行…...

文件上传-Webshell

Webshell简介 webshell就是以aspphpjsp或者cgi等网页文件形式存在的一种命令执行环境&#xff0c;也可以将其称做为一种网页木马后门。 攻击者可通过这种网页后门获得网站服务器操作权限&#xff0c;控制网站服务器以进行上传下载文件、查看数据库、执行命令等… 什么是木马 …...

掌握虚拟化与网络配置之道:深入浅出VMware及远程管理技巧

目录 虚拟机介绍 虚拟机的关键字 服务器架构的发展 为什么用虚拟机VMware 虚拟机和阿里云的区别 功能角度 价格因素 应用场景 优势方面 找到windows的服务管理 配置VMware 关于VMware安装的几个服务 vmware如何修改各种网络配置 关于NAT的详细信息(了解) NAT(网…...

【漏洞复现】狮子鱼CMS某SQL注入漏洞

Nx01 产品简介 狮子鱼CMS&#xff08;Content Management System&#xff09;是一种网站管理系统&#xff0c;它旨在帮助用户更轻松地创建和管理网站。该系统拥有用户友好的界面和丰富的功能&#xff0c;包括页面管理、博客、新闻、产品展示等。通过简单直观的管理界面&#xf…...

Python学习之路-Tornado基础:安全应用

Python学习之路-Tornado基础:安全应用 Cookie 对于RequestHandler&#xff0c;除了在初始Tornado中讲到的之外&#xff0c;还提供了操作cookie的方法。 设置 set_cookie(name, value, domainNone, expiresNone, path‘/’, expires_daysNone) 参数说明&#xff1a; 参数名…...

6.0 Zookeeper session 基本原理详解教程

客户端与服务端之间的连接是基于 TCP 长连接&#xff0c;client 端连接 server 端默认的 2181 端口&#xff0c;也就 是 session 会话。 从第一次连接建立开始&#xff0c;客户端开始会话的生命周期&#xff0c;客户端向服务端的ping包请求&#xff0c;每个会话都可以设置一个…...