在线考试响应式网站模板下载/发布软文的平台
目录
- RabbitMQ认识
- 概念
- 使用场景
- 优点
- AMQP协议
- JMS
- RabbitMQ安装
- 安装elang
- 安装RabbitMQ
- 安装管理插件
- 登录RabbitMQ
- 消息队列的工作流程
- RabbitMQ常用模型
- HelloWorld-基本消息模型
- 生产者发送消息
- 导包
- 获取链接工具类
- 消息的生产者
- 消费者消费消息
- 模拟消费者
- 手动签收消息
- Work Queues
- Sender
- Consume1
- Consume2
- 订阅模型-FANOUT-广播
- Sender
- Consume1
- Consume2
- 订阅模型-Direct-定向
- Sender
- Consume1
- Consume2
- 订阅模型-Topic-通配符
- Sender
- Consume1
- Consume2
- 总结
- SpringBoot集成RabbitMQ
- 导包
- yml
- config
- producer
- consumer
RabbitMQ认识
概念
MQ全称为Message Queue,即消息队列. 它也是一个队列,遵循FIFO原则 。RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue Protocol高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。官方地址:http://www.rabbitmq.com/
使用场景
优点
任务异步处理:
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。(丢进去由接收方分别异步处理)
消除峰值:
异步化提速(发消息),提高系统稳定性(多系统调用),服务解耦(5-10个服务),排序保证,消除峰值
(放入队列中不用马上都处理完,有中间状态,消息分发后可由多个订阅方分别异步处理)
服务解耦:
应用程序解耦合 MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
(将单体业务拆分为生产者,消息队列和消费者)
AMQP协议
AMQP是一套公开的消息队列协议,最早在2003年被提出,它旨在从协议层定义消息通信数据的标准格式, 为的就是解决MQ市场上协议不统一的问题。RabbitMQ就是遵循AMQP标准协议开发的MQ服务。
(其他Python,C#,PHP也都能用)
JMS
JMS是Java消息服务,是java提供的一套消息服务API标准,其目的是为所有的java应用程序提供统一的消息通信的标准,类似java的 jdbc,只要遵循jms标准的应用程序之间都可以进行消息通信。它和AMQP有什么不同,jms是java语言专属的消息服务标准,它是在api层定义标准,并且只能用于java应用;而AMQP是在协议层定义的标准,是跨语言的。
(只能Java用,基本已经被摒弃)
RabbitMQ安装
安装elang
otp_win64_20.2.exe
配置环境变量
安装RabbitMQ
rabbitmq-server-3.7.4.exe
可通过任务管理器或开始菜单启动或关闭服务
安装管理插件
安装rabbitMQ的管理插件,方便在浏览器端管理RabbitMQ ,进入到RabbitMQ的sbin目录,使用cmd执行命令: rabbitmq-plugins.bat enable rabbitmq_management , 安装成功后重新启动RabbitMQ
(开启可视化界面)
重启MQ
登录RabbitMQ
进入浏览器,输入:http://localhost:15672,初始账号和密码:guest/guest
消息队列的工作流程
RabbitMQ常用模型
HelloWorld-基本消息模型
一个生产者与一个消费者
生产者发送消息
导包
<dependencies><!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><!--和springboot2.0.5对应--><version>5.4.1</version></dependency>
</dependencies>
获取链接工具类
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class ConnectionUtil {/*** 建立与RabbitMQ的连接* @return* @throws Exception*/public static Connection getConnection() throws Exception {//定义连接工厂ConnectionFactory factory = new ConnectionFactory();//设置服务地址factory.setHost("127.0.0.1");//端口,和管理端端口15672不一样,管理端是另外一台网页版的系统,5672才是MQ本身factory.setPort(5672);//设置账号信息,用户名、密码、vhostfactory.setVirtualHost("/");//集群的时候才用这个参数factory.setUsername("guest");factory.setPassword("guest");// 通过工程获取连接Connection connection = factory.newConnection();return connection;}
}
消息的生产者
import cn.itsource.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;//消息的生产者
public class Sender {public static final String HELLO_QUEUE="hello_queue";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//3.创建队列(hello这里用默认的交换机)/* String queue :队列的名字,可自定义, boolean durable: 持久化, boolean exclusive:是否独占;大家都能用,传false,boolean autoDelete: 用完即删;关了就没了,消费者还要拿,所以传false,Map<String, Object> arguments:没有其他要传的属性就传false */channel.queueDeclare(HELLO_QUEUE, true, false, false, null);String msg="今天中午吃啥";//4.发送消息channel.basicPublish("", HELLO_QUEUE, null, msg.getBytes());channel.close();conn.close();}
}
消费者消费消息
模拟消费者
import com.rabbitmq.client.*;
import java.io.IOException;//模拟消费者
public class Consume {public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//回调,可新建类实现Consumer接口或继承DefaultConsumer类或用匿名内部类覆写处理方法Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);// 消费者标识System.out.println("envelope:"+envelope);// 消息队列里面的一些公共属性System.out.println("消息内容:"+new String(body));System.out.println("消费完成----------------");}};//3.监听队列/*queue :队列名字autoAck:自动签收Consumer callback: 回调*/channel.basicConsume(Sender.HELLO_QUEUE,false,callback);}
}
只要消费者不关,生产者发一次消息消费者就自动监听消费一次消息
手动签收消息
import cn.itsource.utils.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;//模拟消费者
public class Consume {public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//回调,可新建类实现Consumer接口或继承DefaultConsumer类或用匿名内部类覆写处理方法Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);// 消费者标识System.out.println("envelope:"+envelope);// 消息队列里面的一些公共属性
// System.out.println(1/0);System.out.println("消息内容:"+new String(body));System.out.println("消费完成----------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);// 第二个参数为是否同时签收多个,传false}};//3.监听队列/*queue :队列名字autoAck:自动签收 签收不等于消费成功,处理逻辑走完没有报错才算签收成功Consumer callback: 回调*/channel.basicConsume(Sender.HELLO_QUEUE,false,callback);}
}
Work Queues
一个生产者与多个消费者。
默认轮询,也可以改成能者多劳
Sender
//消息的生产者
/*如果有多个消费者监听同一个队列,默认轮询*/
public class Sender {public static final String WORK_QUEUE="work_queue";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//3.创建队列/*String queue :队列的名字boolean durable: 持久化boolean exclusive:是否独占boolean autoDelete: 用完即删Map<String, Object> arguments*/channel.queueDeclare(WORK_QUEUE, true, false, false, null);String msg="今天中午吃啥";//4.发送消息channel.basicPublish("", WORK_QUEUE, null, msg.getBytes());channel.close();conn.close();}
}
Consume1
//模拟消费者
public class Consume1 {public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量
// channel.basicQos(1);//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+new String(body));
// try {
// Thread.sleep(100);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收 签收 不等于 消费成功Consumer callback: 回调*/channel.basicConsume(Sender.WORK_QUEUE,false,callback);}
}
Consume2
//模拟消费者
public class Consume2 {public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量
// channel.basicQos(1);//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);System.out.println("消息内容:"+new String(body));
// try {
// Thread.sleep(10000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收 签收 不等于 消费成功Consumer callback: 回调*/channel.basicConsume(Sender.WORK_QUEUE,false,callback);}
}
订阅模型-FANOUT-广播
在广播模式下,消息发送流程是这样的:
1) 可以有多个消费者
2) 每个消费者有自己的queue(队列)
3) 每个队列都要绑定到Exchange(交换机)
4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
5) 交换机把消息发送给绑定过的所有队列
6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
Sender
//消息的生产者
/*变化1.不创建 队列2.创建交换机3.给交换机发送消息*/
public class Sender {public static final String FANOUT_EXCHANGE="fanout_exchange";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//3.创建交换机/*exchange:交换机的名字type:交换机的类型durable:是否持久化*/channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);String msg="今天晚上吃啥";//4.发送消息channel.basicPublish(FANOUT_EXCHANGE, "", null, msg.getBytes());channel.close();conn.close();}
}
Consume1
//模拟消费者
/*1.创建队列2.队列绑定到交换机3.每个消费者要监听自己的队列*/
public class Consume1 {public static final String FANOUT_QUEUE1="fanout_queue1";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(FANOUT_QUEUE1, true, false, false, null);//绑定到交换机channel.queueBind(FANOUT_QUEUE1, Sender.FANOUT_EXCHANGE, "");//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+new String(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收 签收 不等于 消费成功Consumer callback: 回调*/channel.basicConsume(FANOUT_QUEUE1,false,callback);}
}
Consume2
//模拟消费者
/*1.创建队列2.队列绑定到交换机3.每个消费者要监听自己的队列*/
public class Consume2 {public static final String FANOUT_QUEUE2="fanout_queue2";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(FANOUT_QUEUE2, true, false, false, null);//绑定到交换机channel.queueBind(FANOUT_QUEUE2, Sender.FANOUT_EXCHANGE, "");//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+new String(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收 签收 不等于 消费成功Consumer callback: 回调*/channel.basicConsume(FANOUT_QUEUE2,false,callback);}
}
订阅模型-Direct-定向
把消息交给符合指定routing key 的队列 一堆或一个
Sender
//消息的生产者
/*变化1.交换机类型2.给交换机发送消息,指定 routing key*/
public class Sender {public static final String DIRECT_EXCHANGE="direct_exchange";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//3.创建交换机/*exchange:交换机的名字type:交换机的类型durable:是否持久化*/channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);String msg="今天晚上吃啥";//4.发送消息channel.basicPublish(DIRECT_EXCHANGE, "dept", null, msg.getBytes());channel.close();conn.close();}
}
Consume1
//模拟消费者
/*1.指定routing key*/
public class Consume1 {public static final String DIRECT_QUEUE1="direct_queue1";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(DIRECT_QUEUE1, true, false, false, null);//绑定到交换机channel.queueBind(DIRECT_QUEUE1, Sender.DIRECT_EXCHANGE, "emp.delete");//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+new String(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收 签收 不等于 消费成功Consumer callback: 回调*/channel.basicConsume(DIRECT_QUEUE1,false,callback);}
}
Consume2
//模拟消费者
/*1.指定routing key*/
public class Consume2 {public static final String DIRECT_QUEUE2="direct_queue2";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(DIRECT_QUEUE2, true, false, false, null);//绑定到交换机channel.queueBind(DIRECT_QUEUE2, Sender.DIRECT_EXCHANGE, "dept");//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+new String(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收 签收 不等于 消费成功Consumer callback: 回调*/channel.basicConsume(DIRECT_QUEUE2,false,callback);}
}
订阅模型-Topic-通配符
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: goods.insert
通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词
Sender
//消息的生产者
/*变化1.交换机类型2.给交换机发送消息,指定 routing key*/
public class Sender {public static final String TOPIC_EXCHANGE="topic_exchange";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//3.创建交换机/*exchange:交换机的名字type:交换机的类型durable:是否持久化*/channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);String msg="今天晚上吃啥";//4.发送消息channel.basicPublish(TOPIC_EXCHANGE, "user.insert.add.pubilsh", null, msg.getBytes());channel.close();conn.close();}
}
Consume1
//模拟消费者
/*1.指定routing key*/
public class Consume1 {public static final String TOPIC_QUEUE1="topic_queue1";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(TOPIC_QUEUE1, true, false, false, null);//绑定到交换机/*#.1到多个单词*. 一个单词*/channel.queueBind(TOPIC_QUEUE1,Sender.TOPIC_EXCHANGE, "user.#");//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+new String(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收 签收 不等于 消费成功Consumer callback: 回调*/channel.basicConsume(TOPIC_QUEUE1,false,callback);}
}
Consume2
//模拟消费者
/*1.指定routing key*/
public class Consume2 {public static final String TOPIC_QUEUE2="topic_queue2";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(TOPIC_QUEUE2, true, false, false, null);//绑定到交换机/*#.1到多个单词*. 一个单词*/channel.queueBind(TOPIC_QUEUE2,Sender.TOPIC_EXCHANGE, "email.*");//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+new String(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收 签收 不等于 消费成功Consumer callback: 回调*/channel.basicConsume(TOPIC_QUEUE2,false,callback);}
}
总结
01_hello生产者 1.获取连接 2.获取通道 3.创建队列 4.发送消息消费者 1.获取连接 2.获取通道 3.监听队列 (并回调)02_workqueue 默认轮询 可修改(能者多劳)生产者 1.获取连接 2.获取通道 3.创建队列 4.发送消息消费者 1.获取连接 2.获取通道 3.监听队列 (并回调)03_fanout 广播 将消息交给所有绑定到交换机的队列(多个消费者都能收到)生产者 1.获取连接 2.获取通道 3.创建交换机 4.发送消息到交换机消费者 1.获取连接 2.获取通道 创建队列 绑定到交换机 3.监听队列 (并回调)04_direct 定向 把消息交给符合指定 routing key 的队列 一堆或一个生产者 1.获取连接 2.获取通道 3.创建交换机 4.发送消息到交换机消费者 1.获取连接 2.获取通道 创建队列 绑定到交换机 3.监听队列 (并回调)05_topic 通配符 把消息交给符合routing pattern (路由模式) 的队列 一堆或一个生产者 1.获取连接 2.获取通道 3.创建交换机 4.发送消息到交换机消费者 1.获取连接 2.获取通道 创建队列 绑定到交换机 3.监听队列 (并回调)
SpringBoot集成RabbitMQ
导包
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.5.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><!--spirngboot集成rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies>
yml
server:port: 44000
spring:application:name: test‐rabbitmq‐producerrabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtualHost: /listener:simple:acknowledge-mode: manual #手动签收prefetch: 1 #消费者的消息并发处理数量#publisher-confirms: true #消息发送到交换机失败回调#publisher-returns: true #消息发送到队列失败回调template:mandatory: true # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃
config
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {public static final String EXCHANGE_SPRINGBOOT="exchange_springboot";public static final String QUEUE1_SPRINGBOOT="queue1_springboot";public static final String QUEUE2_SPRINGBOOT="queue2_springboot";//创建一个交换机@Beanpublic Exchange createExchange(){return ExchangeBuilder.topicExchange(EXCHANGE_SPRINGBOOT).durable(true).build();}//创建两个队列@Beanpublic Queue createQueue1(){return new Queue(QUEUE1_SPRINGBOOT,true);}@Beanpublic Queue createQueue2(){return new Queue(QUEUE2_SPRINGBOOT,true);}//把交换机和队列绑定到一起@Beanpublic Binding bind1(){return BindingBuilder.bind(createQueue1()).to(createExchange()).with("user.*").noargs();}@Beanpublic Binding bind2(){return BindingBuilder.bind(createQueue2()).to(createExchange()).with("email.*").noargs();}//消费者 还原对象方式(从MQ里取出json转为对象)@Bean("rabbitListenerContainerFactory")public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);factory.setPrefetchCount(1);return factory;}//放到消息队列里面的转换(转为json存进MQ)@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());return rabbitTemplate;}
}
producer
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest(classes = App.class)
@RunWith(SpringRunner.class)
public class Sender {@AutowiredRabbitTemplate rabbitTemplate;@Testpublic void test(){/*问题:多系统之间 信息交互 传递对象解决方案:转换为json存储实现:1.fastjson 对象 - josn (作业)2.重写转换器模式*/for (int i = 0; i < 5; i++) {rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_SPRINGBOOT, "email.save", new User(1L,"文达"));}System.out.println("消息发送完毕");}
}
consumer
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.io.IOException;//消费者
@Component
public class Consu {@RabbitListener(queues = {RabbitMQConfig.QUEUE1_SPRINGBOOT},containerFactory = "rabbitListenerContainerFactory")//用这个转换器接public void user(@Payload User user, Channel channel, Message message) throws IOException {System.out.println(message);System.out.println("user队列:"+user);//手动签收channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}@RabbitListener(queues = {RabbitMQConfig.QUEUE2_SPRINGBOOT})public void email(@Payload User user,Channel channel,Message message ) throws IOException {System.out.println(message);System.out.println("email队列:"+user);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}
队列内容可传string,entity序列化对象,json对象,
相关文章:

B080-RabbitMQ
目录 RabbitMQ认识概念使用场景优点AMQP协议JMS RabbitMQ安装安装elang安装RabbitMQ安装管理插件登录RabbitMQ消息队列的工作流程 RabbitMQ常用模型HelloWorld-基本消息模型生产者发送消息导包获取链接工具类消息的生产者 消费者消费消息模拟消费者手动签收消息 Work QueuesSen…...

关于岛屿的三道leetcode原题:岛屿周长、岛屿数量、统计子岛屿
题1:岛屿周长 给定一个 row x col 的二维网格地图 grid ,其中:gridi 1 表示陆地, gridi 0 表示水域。 网格中的格子 水平和垂直 方向相连(对角线方向不相连)。整个网格被水完全包围,但其中恰…...

lintcode 1081 · 贴纸拼单词【hard 递归+记忆化搜索才能通过】
题目 https://www.lintcode.com/problem/1081/ 给出N种不同类型的贴纸。 每个贴纸上都写有一个小写英文单词。 通过裁剪贴纸上的所有字母并重排序来拼出字符串target。 每种贴纸可以使用多次,假定每种贴纸数量无限。 拼出target最少需要多少张贴纸?如果…...

HarmonyOS/OpenHarmony(Stage模型)应用开发单一手势(二)
三、拖动手势(PanGesture) .PanGestureOptions(value?:{ fingers?:number; direction?:PanDirection; distance?:number}) 拖动手势用于触发拖动手势事件,滑动达到最小滑动距离(默认值为5vp)时拖动手势识别成功&am…...

计算机毕设之基于Python+django+MySQL可视化的学习系统的设计与实现
系统阐述的是使用可视化的学习系统的设计与实现,对于Python、B/S结构、MySql进行了较为深入的学习与应用。主要针对系统的设计,描述,实现和分析与测试方面来表明开发的过程。开发中使用了 django框架和MySql数据库技术搭建系统的整体架构。利…...

Kotlin inline、noinline、crossinline 深入解析
主要内容: inline 高价函数的原理分析Non-local returns noinlinecrossinline inline 如果有C语言基础的,inline 修饰一个函数表示该函数是一个内联函数。编译时,编译器会将内联函数的函数体拷贝到调用的地方。我们先看下在一个普通的 kot…...

在 CentOS 7 / RHEL 7 上安装 Python 3.11
原文链接:https://computingforgeeks.com/install-python-3-on-centos-rhel-7/ Python 是一种高级解释性编程语言,已被用于各种应用程序开发,并在近年来获得了巨大的流行。Python 可用于编写广泛的应用程序,包括 Web 开发、数据分…...

SVN基本使用笔记——广州云科
简介 SVN是什么? 代码版本管理工具 它能记住你每次的修改 查看所有的修改记录 恢复到任何历史版本 恢复己经删除的文件 SVN跟Git比,有什么优势 使用简单,上手快 目录级权限控制,企业安全必备 子目录Checkout,减少不必要的文件检出…...

python爬虫-Selenium
一、Selenium简介 Selenium是一个用于Web应用程序测试的工具,Selenium 测试直接运行在浏览器中,就像真正的用户在操作一样。模拟浏览器功能,自动执行网页中的js代码,实现动态加载。 二、环境配置 1、查看本机电脑谷歌浏览器的版…...

flutter plugins插件【一】【FlutterJsonBeanFactory】
1、FlutterJsonBeanFactory 在Setting->Tools->FlutterJsonBeanFactory里边自定义实体类的后缀,默认是entity 复制json到粘贴板,右键自己要存放实体的目录,可以看到JsonToDartBeanAction Class Name是实体名字,会默认加上…...

系统中出现大量不可中断进程和僵尸进程(理论)
一 进程状态 当 iowait 升高时,进程很可能因为得不到硬件的响应,而长时间处于不可中断状态。从 ps 或者 top 命令的输出中,你可以发现它们都处于 D 状态,也就是不可中断状态(Uninterruptible Sleep)。 R …...

L1-012 计算指数(Python实现) 测试点全过
前言: {\color{Blue}前言:} 前言:本系列题使用的是“PTA中的团体程序设计天梯赛——练习集”的题库,难度有L1、L2、L3三个等级,分别对应团体程序设计天梯赛的三个难度,如有需要可以直接查看对应专栏。发布个…...

String、StringBuffer、StringBuilder的区别
String、StringBuffer、StringBuilder的区别 String的内容不可修改,StringBuffer与StringBuilder的内容可以修改.StringBuffer与StringBuilder(更快)大部分功能是相似的StringBuffer采用同步处理,属于线程安全操作;而S…...

.net基础概念
1. .NET Framework .NET Framework开发平台包含公共语言运行库(CLR)和基类库(BCL),前者负载管理代码的执行,后者提供了丰富的类库来构建应用程序。.NET Framework仅支持Windows平台 2. Mono 由于.NET Framework支支持windows环境,因此社区…...

电缆工厂 3D 可视化管控系统 | 智慧工厂
近年来,我国各类器材制造业已经开始向数字化生产转型,使得生产流程变得更加精准高效。通过应用智能设备、物联网和大数据分析等技术,企业可以更好地监控生产线上的运行和质量情况,及时发现和解决问题,从而提高生产效率…...

bazel高效使用和调优
Bazel 为了正确性和高性能,做了很多优秀的设计,那么我们如何正确的使用这些能力,让我们的构建性能“起飞”呢, 我们将从本地研发和 CI pipeline 两种场景进行分析。 本地研发 本地研发通常采用默认的 Bazel 配置即可,…...

【实训项目】传道学习助手APP设计
1.设计摘要 跨入21世纪以来,伴随着时代的飞速发展,国民对教育的重视度也有了进一步的提升。我们不难发现虽然很多学习内容有学习资料或者答案,但是这些内容并不能达到让所有求学的人对所需知识进行完全地理解与掌握。所以我们需要进行提问与求助。那么一…...

短信验证码服务
使用的是 阿里云 阿里云官网 1.找到 左上角侧边栏 -云通信 -短信服务 2.在快速学习测试处 ,按照步骤完成快速学习,绑定要测试的手机号,选专用 【测试模板】,自定义模板需要人工审核,要一个工作日 3.右上角 获取 Acces…...

windows如何更改/禁用系统更新
提示:首先说明这属于将更新时间更改,不过你可以的将更新时间更改为十年一百年 废话不多说开始正文: 1.首先:winR打开运行,输入regedit,进入注册表编辑器 2.进入编辑器后依次点击:HKEY_LOCAL_MACHINE\SOFT…...

Clion 使用ffmpeg 学习1 开发环境配置
Clion 使用ffmpeg 学习1 开发环境配置 一、准备工作1. 准备环境2. 下载FFmpeg 二、操作步骤1. Clion 新建一个C项目2. 修改 CMakeLists.txt3. 修改配置4. 运行测试5. 打印rtsp 流信息的 demo 一、准备工作 在视频处理和多媒体应用程序开发中,FFmpeg 是一个强大的开…...

浏览器连不上 Flink WebUI 8081 端口
安装 flink-1.17.0 后,start-cluster.sh 启动,发现浏览器连不上 Flink WebUI 的8081端口。 问题排查: command R,输入cmd,检查宿主机能否ping通虚拟机,发现能ping通。 检查是否有flink以外的任务占用8081…...

Doris集群安装部署(1.2.4.1 release)
此文阅读需要有Linux和服务器硬件基础!某些内容写的不是特别细,如果常见的linux基础命令tar、uzip、mv、mkdir、系统包的安装等等,以文字带过了,这样可以减少文章篇幅。官方的安装部署方式一定要好好看一下,最好是尝试…...

对HashMap的value做升序、降序
public class MapUtils {// Map的value值降序排序public static <K, V extends Comparable<? super V>> Map<K, V> sortDescend(Map<K, V> map) {List<Map.Entry<K, V>> list new ArrayList<>(map.entrySet());list.sort((o1, o2)…...

算法面试-深度学习基础面试题整理-AIGC相关(2023.9.01开始,持续更新...)
1、stable diffusion和GAN哪个好?为什么 ? Stable diffusion是一种基于随机微分方程的生成方法,它通过逐步增加噪声来扰动原始图像,直到完全随机化。然后,它通过逐步减少噪声来恢复图像,同时使用一个神经网…...

Python、PHP和Java下的反序列化漏洞复现实例
环境准备 这篇文章旨在用于网络安全学习,请勿进行任何非法行为,否则后果自负。 python反序列化 p83 CTF夺旗 Python考点SST&反序列化&字符串_正经人_____的博客-CSDN博客 php反序列化 p84 CTF夺旗-PHP弱类型&异或取反&序列化&…...

html的使用
一,HBuilder –1,使用 直接解压就可以用, 创建项目: 直接点击 新建项目,输入项目名和选中项目存放位置,创建. 创建资源: 选中项目,右键,新建… 二,HTML –1,概述 是超文本标记语言,专门用来制作网页的. 超文本: 网页中可以包含各种类型的元素.包括: 文字,数字,符号,图片,音频,…...

docker linux(centos 7) 安装
这是个目录 1:安装1:手动安装(适用于centos7)之一2:手动安装(适用于centos7)之二3:一键安装docker4:二进制安装1:下载二进制包2:解压3:移动文件4:后台运行docker5:测试 dicker命令表999:遇到的问…...

C语言sizeof和strlen的区别?
sizeof和strlen有什么区别? sizeof本质是运算符(sizoof既是关键字也是运算符,不是函数哈),而strlen就是函数。sizeof后面如果是类型,则必须加括号,如果是变量,可以不加括号。 sizeof…...

小文智能GPT助手介绍
如何使用小文交互的GPT助手,让AI更加智能,适用更多场景? 在小文智能最新推出的4.0版本,有一个新功能,叫做GPT助手。GPT助手,顾名思义,即在小文智能的场景中,接入ChatGPT,…...

SpringBoot使用i18n国际化
使用的SpringBoot版本是2.3.5 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.5.RELEASE</version><relativePath/> </parent> 一、简单测试…...