RabbitMQ从入门到精通之安装、通讯方式详解
文章目录
- RabbitMQ
- 一、RabbitMQ介绍
- 1.1 现存问题
- 一、RabbitMQ介绍
- 二、RabbitMQ安装
- 三、RabbitMQ架构
- 四、RabbitMQ通信方式
- 4.1 RabbitMQ提供的通讯方式
- 4.2 Helloworld 方式
- 4.2Work queues
- 4.3 Publish/Subscribe
- 4.4 Routing
- 4.5 Topics
- 4.6 RPC (了解)
- 五、Springboot 操作RabbitMQ
- 六、RabbitMQ保证消息可靠性
- 6.1、 保证消息一定送达到Exchange
- 6.2、保证消息可以路由到Queue中
- 6.3、保证队列持久化消息
- 6.4、保证消息者可以正常消费消息
- 6.5 SpringBoot实现上述操作
- 6.5.1 Confirm
- 6.5.2 Return
- 6.5.3 消息持久化
- 七、RabbitMQ死信队列 & 延迟交换机
- 7.4、准备Exchange&Queue
- 7.5、实现效果
- 八、RabbitMQ的集群
RabbitMQ
一、RabbitMQ介绍
1.1 现存问题
-
- 服务异步调用: 服务A如何保证异步请求一定能被服务B接收到并处理
- 服务异步调用: 服务A如何保证异步请求一定能被服务B接收到并处理
-
- 削峰: 海量请求,如何实现削峰的效果,将请求全部放到一个队列中,慢慢的消费,这个队列怎么实现?
-
- 服务解耦: 如何尽量的降低服务之间的耦合问题,如果在订单与积分和商家服务解构,需要一个队列,而这个队列依然需要实现上述两个情况功能。
一、RabbitMQ介绍
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
AMQP协议:
Erlang:
Erlang在1991年由爱立信公司向用户推出了第一个版本,经过不断的改进完善和发展,在1996年爱立信又为所有的Erlang用户提供了一个非常实用且稳定的OTP软件库并在1998年发布了第一个开源版本。Erlang同时支持的操作系统有linux,windows,unix等,可以说适用于主流的操作系统上,尤其是它支持多核的特性非常适合多核CPU,而分布式特性也可以很好融合各种分布式集群。
二、RabbitMQ安装
docker-compose.yml
version: “3.1”
services:
rabbitmq:
image: daocloud.io/library/rabbitmq:3.8.5
container_name: rabbitmq
restart: always
volumes:
- ./data/:/var/lib/rabbitmq/
ports:
- 5672:5672
- 15672:15672
docker-compose.yml文件内容:
镜像拉取完成后,直接在linux 内部执行: curl localhost:5672
执行后能够显示AMQP 字样的内容就说明执行成功了
执行 docker exec -it rabbitmq bash 命令 进入容器内部
cd opt/rabbitmq/ 目录下
执行cd plugins/sbin 命令进入目录下, 执行 ./rabbitmq-plugins enable rabbitmq_managent命令 启动rabbitmq 图形化界面
访问15672 端口,默认的账号密码是guest/guest
三、RabbitMQ架构
四、RabbitMQ通信方式
4.1 RabbitMQ提供的通讯方式
- Hello World :为了入门操作
- Work queues : 一个队列被多个消费者消费
- Publish/Subscribe:手动创建Exchange(FANOUT)
- Routing: 手动创建Exchange(DIRECT)
- Topics : 手动创建Exchange(TOPIC)
- RPC: RPC方式
- Publisher Confirms
4.2 Helloworld 方式
//工具类
public class RabbitMQConnectionUtil {public static final String RABBITMQ_HOST ="172.16.177.133";public static final int RABBITMQ_POST =5672;public static final String RABBITMQ_USERNAME ="guest";public static final String RABBITMQ_PASSWORD ="guest";public static final String RABBITMQ_VIRTUAL_HOST ="/";/*** 构建RabbitMQ的连接对象* @return*/public static Connection getConnection() throws Exception{//1.创建connection 工厂对象ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(RABBITMQ_HOST);connectionFactory.setPort(RABBITMQ_POST);connectionFactory.setUsername(RABBITMQ_USERNAME);connectionFactory.setPassword(RABBITMQ_PASSWORD);connectionFactory.setVirtualHost(RABBITMQ_VIRTUAL_HOST);//2.设置RabbitMQ的连接信息Connection connection = connectionFactory.newConnection();//3. 返回连接对象return connection;}
//生产者@Testpublic void consume() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2.创建Channel 通道Channel channel = connection.createChannel();//3.构建队列channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);//4. 监听消息DefaultConsumer callback =new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消息者获取消息"+new String(body,"UTF-8"));}};channel.basicConsume(Publisher.QUEUE_NAME,true,callback);System.out.println("开始监听");System.in.read();}//消费者@Testpublic void publish() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2.创建Channel 通道Channel channel = connection.createChannel();//3.构建队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//4. 发布消息String message="hello word!";channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息发送成功");System.in.read();}
4.2Work queues
一个队列中的消息,只会被一个消费者成功的消费,默认情况下,RabbitMQ的队列会将消息以轮询的方式交给不同的消费者消费,消费者拿到消息后,需要给RabbitMQ一个ack,RabbitMQ认为消费者已经拿到消息了
//消费者@Testpublic void consume() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2.创建Channel 通道Channel channel = connection.createChannel();//3.构建队列channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);//3.1 设置消息的控制,一次拿几个消息
//#2 channel.basicQos(1);//4. 监听消息DefaultConsumer callback =new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {//模拟业务执行时间Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消息者获取消息"+new String(body,"UTF-8"));
//#1 channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume(Publisher.QUEUE_NAME,true,callback);
//#1 channel.basicConsume(Publisher.QUEUE_NAME,false,callback);System.out.println("开始监听");System.in.read();//消费者2@Testpublic void consume2() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2.创建Channel 通道Channel channel = connection.createChannel();//3.构建队列channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);//3.1 设置消息的控制,一次拿几个消息
//#2 channel.basicQos(1);//4. 监听消息 DefaultConsumer callback =new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {//模拟业务执行时间Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消息者获取消息"+new String(body,"UTF-8"));//basicAck(标识,是否批量操作)
//#1 channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume(Publisher.QUEUE_NAME,true,callback);
//#1 channel.basicConsume(Publisher.QUEUE_NAME,false,callback);System.out.println("开始监听");System.in.read();}
//生产者public static final String QUEUE_NAME="work";@Testpublic void publish() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2.创建Channel 通道Channel channel = connection.createChannel();//3.构建队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//4. 发布消息for (int i=0;i<10;i++){String message="hello word!"+i;channel.basicPublish("",QUEUE_NAME,null,message.getBytes());}System.out.println("消息发送成功");System.in.read();}
当两台消费者的消费能力不相同的时候,为了提高效率,就不能以轮询的方式进行分发,而是以消费者消费完成后手动传递ack 的方式进行下一个消息的分发,将== #1 #2 的代码 ==打开即可
操作步骤:
- 操作#1 让消费者关闭自动ack,并且设置消息的流控,最终实现消费者可以尽可能去多消费消息
- 操作#2 设置每次拿几个消息
4.3 Publish/Subscribe
自行创建路由器,并绑定队列
如何构建一个自定义交换机,并指定类型是FANOUT,让交换机和多个Queue绑定到一起
//生产者
public static final String EXCHANGE_NAME="pubsub";public static final String QUEUE_NAME1="pubsub-one";public static final String QUEUE_NAME2="pubsub-two";@Testpublic void pulish() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannalChannel channel = connection.createChannel();//3.构建虚拟机, exchange 交换机名称,type 交换机类型【枚举】channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//4. 构建队列channel.queueDeclare(QUEUE_NAME1, false, false, false, null);channel.queueDeclare(QUEUE_NAME2, false, false, false, null);//5. 绑定 交换机 和 队列,使用的是FANOUT类型的交换机,绑定方式是直接绑定 ,routingkey 参数随便写什么都可以,channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"");channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"");//6.发消息到交换机channel.basicPublish(EXCHANGE_NAME,"",null,"publish/subscribe!".getBytes());System.out.println("消息成功发送");}
4.4 Routing
DIRECT 类型的交换机,在绑定Exchange和Queue时,需要指定好routingKey同时在发送消息的时候,也指定routingkey,只有在routingkey 一致时,才会把指定的消息路由到指定的队列
public static final String EXCHANGE_NAME="routing";public static final String QUEUE_NAME1="routing-one";public static final String QUEUE_NAME2="routing-two";@Testpublic void pulish() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannalChannel channel = connection.createChannel();//3.构建虚拟机, exchange 交换机名称,type 交换机类型【枚举】 * 交换机类型 改成 BuiltinExchangeType.DIRECTchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//4. 构建队列channel.queueDeclare(QUEUE_NAME1, false, false, false, null);channel.queueDeclare(QUEUE_NAME2, false, false, false, null);//5. 绑定 交换机 和 队列,使用的是FANOUT类型的交换机,绑定方式是直接绑定 ,routingkey 参数随便写什么都可以,channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"ORANGE");channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"BLACK");channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"GREEN");//6.发消息到交换机channel.basicPublish(EXCHANGE_NAME,"ORANGE",null,"大橙子!".getBytes());channel.basicPublish(EXCHANGE_NAME,"BLACK",null,"黑布林!".getBytes());channel.basicPublish(EXCHANGE_NAME,"WHITE",null,"小白兔!".getBytes());System.out.println("消息成功发送");}
4.5 Topics
topics 模式支持模糊匹配RoutingKey,就像是sql中的 like子句模糊查询,而路由模式等同于sql中的where子句等值查询
通过模糊路由到队列。该方式的Routing key必须具有固定格式:以 . 间隔的一串单词,比如:quick.orange.rabbit,Routing key 最多不能超过255byte。
交换机和队列的Binding key用通配符来表示,有两种语法:
- * 可以替代一个单词;
- # 可以替代 0 或多个单词;
例如 #.com.#
#可以表示0级或多级。xx.com、com.xx、com、xx.com.xx.xx、xx.xx.com.xx都可以例如 *.com. *
*表示一级,xx.com.xx 可以 ,com.xx 不可以,前面缺少一级,xx.com.xx.xx不可以,com后面只能有一级xx,最终格式必须是 xx.com.xx
public static final String EXCHANGE_NAME="topics";public static final String QUEUE_NAME1="topics-one";public static final String QUEUE_NAME2="topics-two";@Testpublic void pulish() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannalChannel channel = connection.createChannel();//3.构建虚拟机, exchange 交换机名称,type 交换机类型【枚举】 * 交换机类型 改成 BuiltinExchangeType.TOPICchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);//4. 构建队列channel.queueDeclare(QUEUE_NAME1, false, false, false, null);channel.queueDeclare(QUEUE_NAME2, false, false, false, null);//5. 绑定 交换机 和 队列,// Topic类型的交换机,在和队列绑定时,需要以aaa.bbb.ccc 方式编写routingKey// 其中有两个特殊字符: *(相当于占位符),# (相当通配符)channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"*.orange.*");channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"*.*.rabbit");channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"lazy.#");//6.发消息到交换机channel.basicPublish(EXCHANGE_NAME,"big.orange.rabbit",null,"大橙子兔子!".getBytes());//匹配1、2channel.basicPublish(EXCHANGE_NAME,"small.while.rabbit",null,"小兔子!".getBytes());//匹配1、2channel.basicPublish(EXCHANGE_NAME,"lazy.dog.dog.dog.dog",null,"懒狗狗狗狗!".getBytes());//匹配 3System.out.println("消息成功发送");}
4.6 RPC (了解)
因为两个服务在交互时,可以尽量做到Client和server的结偶,通过RabbitMQ进行结藕操作
需要让client 发送消息时,携带两个属性,
- replyto告知server将相应信息放到哪个队列
- correlationId告知server 发送相应消息时,需要携带位置标识来告知client响应的消息
public class Publisher {public static final String QUEUE_PUBLISHER = "rpc_publisher";public static final String QUEUE_CONSUMER = "rpc_consumer";@Testpublic void publish() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2.创建Channel 通道Channel channel = connection.createChannel();//3.构建队列channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);//4. 发布消息String message="hello rpc!";String uuid = UUID.randomUUID().toString();AMQP.BasicProperties prop =new AMQP.BasicProperties().builder().replyTo(QUEUE_CONSUMER).correlationId(uuid).build();channel.basicPublish("",QUEUE_PUBLISHER,prop,message.getBytes());channel.basicConsume(QUEUE_CONSUMER,false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {String id = properties.getCorrelationId();if(id!=null && id.equals(uuid)){System.out.println("接收到服务端的响应:"+new String(body));}channel.basicAck(envelope.getDeliveryTag(),false);}});System.out.println("消息发送成功");System.in.read();}
}public class Consumer {public static final String QUEUE_PUBLISHER = "rpc_publisher";public static final String QUEUE_CONSUMER = "rpc_consumer";@Testpublic void consume() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2.创建Channel 通道Channel channel = connection.createChannel();//3.构建队列channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);//4. 监听消息DefaultConsumer callback =new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消息者获取消息"+new String(body,"UTF-8"));String resp = "获取到client发出的请求,这里是响应的信息";String respQueueName = properties.getReplyTo();String uuid = properties.getCorrelationId();AMQP.BasicProperties prop =new AMQP.BasicProperties().builder().correlationId(uuid).build();channel.basicPublish("",respQueueName,prop,resp.getBytes());channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume(QUEUE_PUBLISHER,false,callback);System.out.println("开始监听");System.in.read();}
}
五、Springboot 操作RabbitMQ
- 创建项目
- 导入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
- 配置rabbitmq信息
spring:rabbitmq:host: 172.16.177.133password: guestusername: guestport: 5672virtual-host: /
配置类声明队列
@Configuration
public class RabbitMQConfig {public static final String EXCHANGE="boot-exchange";public static final String QUEUE="boot-queue";public static final String ROUTING_KEY="*.black.*";@Beanpublic Exchange bootExchange(){return ExchangeBuilder.topicExchange(EXCHANGE).build();}@Beanpublic Queue bootQueue(){return QueueBuilder.durable(QUEUE).build();}@Beanpublic Binding bootBinding(Exchange bootExchange,Queue bootQueue){return BindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs();}
}
生产者配置
@SpringBootTest
class SpringbootRabbitmqApplicationTests {@AutowiredRabbitTemplate rabbitTemplate;@Testvoid contextLoads() {}@Testpublic void publisher(){rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");System.out.println("消息发送成功");}@Testpublic void publiWithProps(){rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "message", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setCorrelationId("123");return message;}});System.out.println("消息发送成功2");}
}
消费者配置
@Component
public class ConsumeListener {/**** @param msg* @param channel 前提是配置好spring.rabbitmq.listener.simple.acknowledge-mode: manual #开启手动ack* @param message* @throws IOException*/@RabbitListener(queues = RabbitMQConfig.QUEUE)public void consume(String msg, Channel channel, Message message) throws IOException {System.out.println("队列消息为:"+msg);String correlationId = message.getMessageProperties().getCorrelationId();System.out.println("标识为:"+correlationId);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}
}
- 声明交换机&队列
六、RabbitMQ保证消息可靠性
confirm机制
可以通过confirm效果保证消息一定送达到Exchange,官方提供了三种,选择了对于效率影响最低的异步回调的效果
6.1、 保证消息一定送达到Exchange
使用confirm机制
public static final String QUEUE_NAME="confirms ";@Testpublic void publish() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2.创建Channel 通道Channel channel = connection.createChannel();//3.构建队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//3.1 开启confirms 的异步回调channel.confirmSelect();String message="hello word!";//3.2 设置confirms的异步回调channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println("消息成功发送到Exchange");}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("消息没有发送到Exchange,尝试重试,或者保存到数据库做其他补偿操作");}});//3.3 设置return回调,确认消息是否路由到了队列channel.addReturnListener(new ReturnListener() {@Overridepublic void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消息没有到指定队列时,做其他的补偿措施!!");}});//3.4、设置消息持久化AMQP.BasicProperties prop =new AMQP.BasicProperties().builder().deliveryMode(2)//消息持久化.build();//4. 发布消息channel.basicPublish("",QUEUE_NAME,true,prop,message.getBytes());System.out.println("消息发送成功");System.in.read();}
6.2、保证消息可以路由到Queue中
使用return 机制
为了保证Exchange上的消息一定可以送达到Queue
//6.2设置return 回调,确认消息是否路由到了Queue
channel.addReturnListener(new ReturnListener() {@Overridepublic void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消息没有到指定队列时,做其他的补偿措施!!");}});
//7.在发送消息时,将basicPublish方法参数中的mandatory设置为true,即可开启Return机制,当消息没有路由到队列中时,就会执行return 回调
6.3、保证队列持久化消息
DeliveryMode设置消息持久化
//6.3、设置消息持久化AMQP.BasicProperties prop =new AMQP.BasicProperties().builder().deliveryMode(2)//消息持久化.build();//4. 发布消息channel.basicPublish("",QUEUE_NAME,true,prop,message.getBytes());
6.4、保证消息者可以正常消费消息
详情看WorkQueue模式
6.5 SpringBoot实现上述操作
6.5.1 Confirm
- 编写配置文件开启Confirm机制
spring:
rabbitmq:
publisher-confirm-type: correlated # 新版本 开启confirm机制
publisher-confirms: true # 老版本
- 在发送消息时,配置RabbitTemplate
@Testpublic void publishWithConfirms() throws IOException {rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {if(b){System.out.println("消息已经送达到交换机");}else{System.out.println("消息没有送到到Exchange,需要做一些补偿操作!");}}});rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");System.out.println("消息发送成功");System.in.read();}
6.5.2 Return
- 编写配置文件开启Return机制
spring:
rabbitmq:
publisher-returns: true # 开启return机制
- 在发送消息时,配置RabbitTemplate
@Testpublic void publishWithReturn() throws IOException {rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {String msg = new String(message.getBody());System.out.println("消息失败:"+msg+"路由队列失败!!做补救操作");}});rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");System.out.println("消息发送成功");System.in.read();}
6.5.3 消息持久化
//3.4、设置消息持久化
AMQP.BasicProperties prop =new AMQP.BasicProperties().builder().deliveryMode(2)//消息持久化 #1.build();//4. 发布消息, 将参数mandatory设置为true,即可开启Return机制,当消息没有路由到队列中时,就会执行return回调channel.basicPublish("",QUEUE_NAME,true,prop,message.getBytes());
七、RabbitMQ死信队列 & 延迟交换机
###7.1、 消息被消费者拒绝,requeue设置为false
###7.2.1、发送消息时设置消息的生存时间,如果生存时间到了,还没有被消费。
###7.2.2、 也可以指定某个队列中所有消息的生存时间,如果生存时间到了,还没有被消费
###7.3、队列已经达到消息的最长长度后,再路由过来的消息直接变成死信
7.4、准备Exchange&Queue
@Configuration
public class DeadLetterConfig {public static final String NORMAL_EXCHANGE="normal-exchange";public static final String NORMAL_QUEUE="normal-queue";public static final String NORMAL_ROUTING_KEY="normal.#";public static final String DEAD_EXCHANGE="dead-exchange";public static final String DEAD_QUEUE="dead-queue";public static final String DEAD_ROUTING_KEY="dead.#";/**普通交换机*/@Beanpublic Exchange normalExchange(){return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build();}/**普通队列*/@Beanpublic Queue normalQueue(){return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE)//绑定死信队列.build();}/**普通队列绑定路由*/@Beanpublic Binding normalBingding(Queue normalQueue,Exchange normalExchange){return BindingBuilder.bind(normalQueue).to(normalExchange).with(DEAD_ROUTING_KEY).noargs();}/**死信交换机*/@Beanpublic Exchange deadExchange(){return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build();}/**死信队列*/@Beanpublic Queue deadQueue(){return QueueBuilder.durable(DEAD_QUEUE ).build();}/**绑定死信队列和交换机*/@Beanpublic Binding deadBinding(Queue deadQueue,Exchange deadExchange){return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();}
}
7.5、实现效果
- 基于消费者进行reject 或者 nack 实现死信效果
@Component
public class DeadListener {/**** @param msg* @param channel 需要手动启动ACK 才能有效 spring.rabbitmq.listener.simple.acknowledge-mode: manual* @param message 需要手动启动ACK 才能有效 spring.rabbitmq.listener.simple.acknowledge-mode: manual*/@RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE)public void comsume(String msg, Channel channel, Message message) throws IOException {System.out.println("接收到normal队列的消息:"+msg);
// channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);}
}
- 消息的生存时间
八、RabbitMQ的集群
相关文章:

RabbitMQ从入门到精通之安装、通讯方式详解
文章目录 RabbitMQ一、RabbitMQ介绍1.1 现存问题 一、RabbitMQ介绍二、RabbitMQ安装三、RabbitMQ架构四、RabbitMQ通信方式4.1 RabbitMQ提供的通讯方式4.2 Helloworld 方式4.2Work queues4.3 Publish/Subscribe4.4 Routing4.5 Topics4.6 RPC (了解) 五、Springboot 操作RabbitM…...

植物大战僵尸植物表(二)
前言 此文章为“植物大战僵尸”专栏中的第007刊(2023年9月第六刊)。 提示: 1.用于无名版; 2.用于1代; 3.pvz指植物大战僵尸(Plants VS Zonbies)。 植物大战僵尸植物表 土豆雷窝瓜火炬树桩火爆辣椒杨…...

UML基础
统一建模语言(UML是 Unified Modeling Language的缩写, 是用来对软件系统进行可视化建模的一种语言。UML为面向对象开发系统的产品 进行说明、可视化、和编制文档的一种标准语言。 共有9种图 UML中的图其实不止九种 (相同的图还可能会有不同的名称), 这里的九种图是…...

C# void 关键字学习
C#中void关键字是System.Void的别名; 可以将 void 用作方法(或本地函数)的返回类型来指定该方法不返回值; 如果C#方法中没有参数,则不能将void用作参数;这是与C语言不同的,C语言有…...

OA与CRM与ORACLE
办公自动化(Office Automation,简称OA),是将计算机、通信等现代化技术运用到传统办公方式,进而形成的一种新型办公方式。办公自动化利用现代化设备和信息化技术,代替办公人员传统的部分手动或重复性业务活动…...

【C++杂货铺】探索list的底层实现
文章目录 一、list的介绍及使用1.1 list的介绍1.2 list的使用1.2.1 list的构造1.2.2 list iterator的使用1.2.3 list capacity(容量相关)1.2.4 list element access(元素访问)1.2.5 list modifiers(链表修改࿰…...

NX/UG二次开发—Parasolid—PK_BODY_pick_topols
最近在写一个判断圆孔深度和通盲状态的功能,发现PK_BODY_pick_topols射线函数可以设置到射线垂直距离,相当于一个圆柱空间,但在测试发现,R7的孔,设置: max_edge_dist 0.007; max_vertices 0.007; 结果测…...

【校招VIP】前端算法考点之大数据相关
考点介绍: 大数据的关键技术分为分析技术和处理技术,可用于大数据分析的关键技术主要包括A/B测试,关联规则挖掘,数据挖掘,集成学习,遗传算法,机器学习,自然语言处理,模式…...

Goland2023版新UI的debug模式调试框按钮功能说明
一、背景 Jetbrains家的IDE的UI基本都是一样的,debug模式的调试框按钮排列也是一致的,但是在我使用Goland2023版的新UI时,发现调试框的按钮变化还是很大的,有一些按钮被收起来了,如果看之前的博客会发现有一些文中的旧…...

【AIGC专题】Stable Diffusion 从入门到企业级应用0414
一、前言 本文是《Stable Diffusion 从入门到企业级应用实战》系列的第四部分能力进阶篇《Stable Diffusion ControlNet v1.1 图像精准控制》的第0414篇 利用Stable Diffusion ControlNet 法线贴图模型精准控制图像生成。本部分内容,位于整个Stable Diffusion生态体…...

汇编原理学习记录:物理地址=段地址*16+偏移地址
文章目录 知识点个人思考解释存储器大小为1MB段的最大占用存储为64KB物理地址段地址*16偏移地址 知识点 8086计算机拥有20根地址总线和16根数据总线,地址总线中的16根和数据总线存在复用 数据总线的数量决定了数据总线的宽度,决定了处理器的位数&#…...

mysql-2:安装mysql
MySQL安装 操作系统:CentOS 7MySQL:5.6 MySQL的卸载 查看MySQL软件 rpm -qa | grep mysqlyum repolist all | grep mysql 卸载MySQL 卸载mysql yum remove -y mysql mysql-libs mysql-common删除mysql下的数据文件 rm -rf /var/lib/mysql删除mys…...

gin框架
【狂神说】Gin框架一小时上手 | 快速转型GoWeb开发 | Go语言零基础教程_哔哩哔哩_bilibili 1.介绍 2.简单程序 1)gin.GET/POST/PUT/DELETE函数 Go Gin 简明教程 | 快速入门 | 极客兔兔 (geektutu.com) 我的理解是:这类函数就像是在监听接口一样&…...

Laravel 完整开源项目大全
原型项目 Laravel 5 Boilerplate —— 基于当前Laravel最新版本(Laravel 6.0)并集成Boilerplate的项目Laravel 5 Angular Material Starter —— 这是一个 Laravel 和 AngularJS 的原型项目(最高支持版本:5.3,长期未更…...

Spring MVC @Controller和@RequestMapping注解
Controller 注解 Controller 注解可以将一个普通的 Java 类标识成控制器(Controller)类,示例代码如下。 package net.biancheng.controller; import org.springframework.stereotype.Controller; Controller public class IndexController …...

软件架构之前后端分离架构服务器端高并发演进之路
软件架构之前后端分离架构&服务器端高并发演进之路 前后端分离架构从业务角度从质量属性从性能角度 服务器端关于不同并发量的演进之路1. 单体架构2. 第一次演进:应用服务器和数据库服务器分开部署3. 第二次演进:引入本地缓存和分部署缓存4. 第三次演…...

第4节-PhotoShop基础课程-Ps格式
文章目录 前言1.像素认识2. 图层认识1.图层有上下前后遮挡关系2.橡皮檫可以擦掉选择图层的像素3.新建图层4.新建删除图层 3. 分辨率的理解4. 图片格式A 前言 本章主要介绍PS常用格式 1.像素认识 下面每个格子就是像素 2. 图层认识 1.图层有上下前后遮挡关系 2.橡皮檫可以擦…...

C语言malloc函数学习
malloc的全称是memory allocation,中文叫动态内存分配,用于申请一块连续的指定大小的内存块区域,以void*类型返回分配的内存区域地址; 函数原型为void *malloc(unsigned int size),在内存的动态存储区中分配一个长度为…...

从零开始学习deepsort目标追踪算法----原理和代码详解
目录 1.目标追踪的主要步骤 2、传统sort算法的流程 3.Deepsort算法流程 4、目标追踪整体代码 4.1 Configs文件目录下: 4.2 deep_sort/deep_sort/deep目录下: 4.3 deep_sort/deep_sort/sort目录下: 运行demo: DeepSORT&…...

第三章 LInux多线程开发 3.1-3.5线程创建 终止 分离
创建线程:(好好记住 可能会叫写代码) 一般情况下,main函数所在的线程我们称之为主线程(main线程),其余创建的线程称之为子线程。 程序中默认只有一个进程,fork()函数调用,2进行 程序…...

空间曲线的参数方程
空间曲线的参数方程 二维直线 经过一点 P ( x 0 , y 0 ) P(x_0,y_0) P(x0,y0)的方向向量为 n ( c o s θ , s i n θ ) n(cos\theta,sin\theta) n(cosθ,sinθ)的直线参数方程为: [ x y …...

非华为机型如何体验HarmonyOS鸿蒙系统 刷写HarmonyOS鸿蒙GSI系统以及一些初步的bug修复
最近很多视频网站有非华为机型使用HarmonyOS鸿蒙系统的演示。其实大都是刷了HarmonyOS鸿蒙系统gsi系统。体验还可以。有些刷入后bug较多。那么这些机型是如何刷写gsi?可以参考我以往帖子 安卓玩机搞机-----没有第三方包 刷写第三方各种GSI系统 体验非官方系统_gsi刷…...

Flutter 生成小程序的混合 App 实践
一、背景 微信小程序发展的越来越快,目前小程序甚至取代了大部分 App 的生态位,公司的坑位不增反降,只能让原生应用开发兼顾或换岗进行小程序的开发。 以我的实际情况来讲,公司应用采用的 Flutter 框架,同样的功能不可避免的就会存在 Flutter 应用开发和微信小程序开发兼…...

利用 Python-user-agents 解析 User_Agent
利用 Python-user-agents 解析 User_Agen 需求分析 近期在尝试做一个登录日志的功能,及用户登录成功后我在后台进行一个用户的登录记录,两种解决方案: 由前端得到用户的手机型号,我在后台接收后在数据库进行保存使用User_Agent…...

Java版企业电子招标采购系统源码Spring Cloud + Spring Boot +二次开发+ MybatisPlus + Redis
功能描述 1、门户管理:所有用户可在门户页面查看所有的公告信息及相关的通知信息。主要板块包含:招标公告、非招标公告、系统通知、政策法规。 2、立项管理:企业用户可对需要采购的项目进行立项申请,并提交审批,查看…...

Mybatis如何给字段起别名?
Mybatis如何给字段起别名? 假如有一个学生表,有一个字段是class,你的实体类变量肯定不能用class,那么如何起别名? 通过以下代码实现 Result(column "class",property "clas")mapper代码 pub…...

php对接AWS S3云存储,上传S3及访问权限问题
首先先下载sdk包 https://docs.aws.amazon.com/zh_cn/sdk-for-php/v3/developer-guide/getting-started_installation.html S3创建存储桶 去安全凭证-》创建访问秘钥 创建的时候会提示,主账号创建不安全,这个时候我们需要创建一个IAM账号来创建秘钥 创…...

java 实现单例模式
单例模式是一种设计模式,用于确保一个类只有一个实例,并提供一种全局访问该实例的方式。在Java中,可以使用多种方式来实现单例模式,下面整理了几种常见的实现方式。 饿汉式单例模式(Eager Initialization)&…...

minio文件服务器开启https
一、准备证书 你要有https安全证书,我的是适用于nginx的证书 私钥 xxxx.key 公钥 xxxx.pem 二、上传证书到minio服务器 然后看看你的minio docker 有没有把 /root/.minio 挂载在主机上,如果有那么把两个证书文件放在/root/.minio/certs目录里面。…...

每日刷题(回溯法经典问题之子集)
食用指南:本文为作者刷题中认为有必要记录的题目 前置知识:回溯法经典问题之组合 ♈️今日夜电波:想着你—郭顶 1:09 ━━━━━━️💟──────── 4:15 …...