手把手搭建springboot项目06-springboot整合RabbitMQ及其原理和应用场景
目录
- 前言
- 工作流程-灵魂画手
- 名词解释
- 交换机类型
- 一、安装
- 1.1 [RabbitMQ官网安装](https://www.rabbitmq.com/download.html)
- 1.2 Docker安装并启动
- 二、食用教程
- 2.1.导入依赖
- 2.2 添加配置
- 2.3 代码实现
- 2.3.1 直连(Direct)类型
- 2.3.2 引入消息手动确认机制
- 2.3.2 广播(Fanout)类型
- 2.3.3 主题(Topic)类型
- 三、实战应用场景
- 3.1 如何控制消息有序
- 3.2 保证消息不被重复消费(幂等性)
- 3.3 保证消息的可靠性
- 3.4 死信队列解决订单超时未支付
- 总结
前言
RabbitMQ是一个由erlang语言开发,实现了AMQP(Advanved Message Queue Protocol)高级消息队列协议的消息服务中间件。
工作流程-灵魂画手
1、生产者(Producer)和消费者(Consumer)都需要在与RabbitMQ建立长连接(Connection)的前提下,才能收发消息
2、客户端(生产者、消费者)和服务端(RabbitMQ)只能建立一条长连接,在长连接中开辟一条条的信道进行收发消息
3、生产者发送消息,消息到达Broker指定虚拟主机(服务会配置)的指定交换机(发送消息会指定),根据路由键和交换机与队列的绑定关系,把消息发送给对应的队列
3、消费者通过信道监听队列,消息进入队列就可以被消费者实时拿到
名词解释
1、Broker (message broker) 消息代理:消息队列服务器实体,简单理解为邮局,寄收件都要通过它。
2、JMS(Java Message Service)JAVA消息服务。是基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现
3、AMQP(Advanced Message Queuing Protocol)
高级消息队列协议,也是一个消息代理的规范,兼容JMS
RabbitMQ是AMQP的实现
4、Message 消息,由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
5、Producer消息的生产者,也是一个向交换器发布消息的客户端应用程序。
6、Exchange 交换机 ,用来接收生产者发送的消息,并将这些消息路由给服务器中的队列。
Exchange常用有3种类型:direct、fanout、 topic、不同类型的Exchange转发消息的策略有所区别
7、Queue 消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
8、Binding绑定,用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
Exchange 和 Queue 的绑定可以是多对多的关系。
9、Connection 网络连接,比如一个TCP连接。
10、Channel 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
11、Consumer 消费者,表示一个从消息队列中取得消息的客户端应用程序。
12、Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
交换机类型
一、安装
1.1 RabbitMQ官网安装
1.2 Docker安装并启动
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.11-management# 开机自启
docker update rabbitmq --restart=always
● 5672 (AMQP端口)
● 15672 (web管理后台端口)
本地安装可通过:http://127.0.0.1:15672/访问,用户名密码默认guest
二、食用教程
2.1.导入依赖
<!--RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.2 添加配置
spring:rabbitmq:host: 127.0.0.1port: 5672username: root #用户名 默认guestpassword: root #密码 默认guestvirtual-host: springboot-test #虚拟主机 默认/
2.3 代码实现
2.3.1 直连(Direct)类型
直连型交换机,根据消息携带的路由键将消息投递给对应队列。
直连类型初始化配置
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 1、直连交换机配置*/
@Configuration
public class DirectRabbitConfig {public static final String DIRECT_QUEUE = "===DirectQueue===";public static final String DIRECT_EXCHANGE = "===DirectExchange===";public static final String DIRECT_ROUTING = "===DirectRouting===";/*** durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效* exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable* autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。* return new Queue("TestDirectQueue",true,true,false);*/@Beanpublic Queue directQueue() {return new Queue(DIRECT_QUEUE,false);}@BeanDirectExchange directExchange() {return new DirectExchange(DIRECT_EXCHANGE,false,false);}@BeanBinding binding() {return BindingBuilder.bind(directQueue()).to(directExchange()).with(DIRECT_ROUTING);}
}
该配置主要把队列、交换机、绑定都交由spring管理,记得声明队列、交换机、建立绑定关系。消息指定交换机发送后,交换机就可以根据路由键把消息发送到匹配的队列上。
消费者
import com.chendi.springboot_rabbitmq.config.DirectRabbitConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;@Slf4j
@Component
public class DirectReceiver {@RabbitListener(queues = DirectRabbitConfig.DIRECT_QUEUE)public void receiver(String dataMsg) {log.info("接收者A dataMsg:{} ",dataMsg);}@RabbitListener(queues = DirectRabbitConfig.DIRECT_QUEUE)public void receiver(String dataMsg) {log.info("接收者B dataMsg:{} ",dataMsg);}
}
生产者
@RestController
@RequiredArgsConstructor
public class RabbitMQTestController {final RabbitTemplate rabbitTemplate;@GetMapping("/sendDirectMessage")public String sendDirectMessage() {for (int i = 0; i < 10; i++) {String messageData = "Hello World!" + i;//可自定义消息体类型rabbitTemplate.convertAndSend(DirectRabbitConfig.DIRECT_EXCHANGE, DirectRabbitConfig.DIRECT_ROUTING, messageData);}return "发送完成";}
}
运行发现:默认情况下,RabbitMQ轮询分发将按顺序将每个消息发送给下一个使用者。有如下缺点:
1、无法保证消息已被消费
2、处理消息快的服务得到的消息和处理消息慢的服务是一样多的(公平分发、能者多劳)。
2.3.2 引入消息手动确认机制
配置文件
spring:rabbitmq:listener:simple:acknowledge-mode: manual # 设置消费端手动 ack#表示消费者端每次从队列拉取多少个消息进行消费,直到手动确认消费完毕后,才会继续拉取下一条prefetch: 1 # 预加载消息数量--QOS
消费者应答
@Slf4j
@Component
public class DirectReceiver {@RabbitListener(queues = DirectRabbitConfig.DIRECT_QUEUE)public void receiver(String dataMsg, Channel channel, Message message) throws IOException, InterruptedException {long deliveryTag = message.getMessageProperties().getDeliveryTag();Thread.sleep(1000);log.info("接收者A deliveryTag:{} dataMsg:{} ",deliveryTag ,dataMsg);channel.basicAck(deliveryTag,true);}@RabbitListener(queues = DirectRabbitConfig.DIRECT_QUEUE)public void receiver2(String dataMsg, Channel channel, Message message) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收者B deliveryTag:{} dataMsg:{} ",deliveryTag ,dataMsg);channel.basicAck(deliveryTag,true);}
}
回执方法(
1、channel.basicAck表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。
2、channel.basicNack表示失败确认,一般在消费消息业务异常时用到此方法、可决定消息是否重新入列
3、channel.basicReject 拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。
2.3.2 广播(Fanout)类型
扇型交换机,这个交换机没有路由键概念,这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
广播类型配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/*** 2、广播、扇出交换机*/
@Configuration
public class FanoutRabbitConfig {public final static String FANOUT_EXCHANGE = "fanoutExchange";public static final String FANOUT_QUEUE_A = "fanoutQueueA";public static final String FANOUT_QUEUE_B = "fanoutQueueB";public static final String FANOUT_QUEUE_C = "fanoutQueueC";/*** 创建三个队列* 将三个队列都绑定在交换机 fanoutExchange 上* 因为是扇型交换机, 路由键无需配置,配置也不起作用*/@Beanpublic Queue queueA() {return new Queue(FANOUT_QUEUE_A);}@Beanpublic Queue queueB() {return new Queue(FANOUT_QUEUE_B);}@Beanpublic Queue queueC() {return new Queue(FANOUT_QUEUE_C);}@BeanFanoutExchange fanoutExchange() {return new FanoutExchange(FANOUT_EXCHANGE);}@BeanBinding bindingExchangeA() {return BindingBuilder.bind(queueA()).to(fanoutExchange());}@BeanBinding bindingExchangeB() {return BindingBuilder.bind(queueB()).to(fanoutExchange());}@BeanBinding bindingExchangeC() {return BindingBuilder.bind(queueC()).to(fanoutExchange());}
}
消费者
import com.chendi.springboot_rabbitmq.config.FanoutRabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;//如果开启了消息手动确认机制,一定要记得应答消息噢
//不然消息会一直堆积在mq里
@Slf4j
@Component
public class FanoutReceiver {@RabbitListener(queues = FanoutRabbitConfig.FANOUT_QUEUE_A)public void fanout_A(String message) {log.info("fanout_A {}" , message);}@RabbitListener(queues = FanoutRabbitConfig.FANOUT_QUEUE_B)public void fanout_B(String message) {log.info("fanout_B {}" , message);}@RabbitListener(queues = FanoutRabbitConfig.FANOUT_QUEUE_C)public void fanout_C(String message) {log.info("fanout_C {}" , message);}
}
测试生产者 Controller加上
@GetMapping("/sendFanoutMessage")
public String sendFanoutMessage() {String messageData = "这是一条广播消息";rabbitTemplate.convertAndSend(FanoutRabbitConfig.FANOUT_EXCHANGE, "", messageData);return "发送完成";
}
2.3.3 主题(Topic)类型
主题交换机,特点就是在它的路由键和绑定键之间是有规则的。
「*」 (星号) 用来表示一个单词 (必须出现的)
「#」 (井号) 用来表示任意数量(零个或多个)单词
主题交换机不绑定路由键时是直连交换机,绑定「#」号时是扇形交换机。
主题模式配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/*** 主题交换机* 转发规则:* #:匹配一个或者多个词* *:匹配一个或者0个词* 比如 有msg.# 、msg.* 匹配规则* msg.# 会匹配 msg.email、msg.email.b、msg.email.a* msg.* 只会匹配 msg.email 和 msg ,*/
@Configuration
public class TopicRabbitConfig {//绑定键public final static String MSG_EMAIL = "msg.email";public final static String MSG_EMAIL_A = "msg.email.a";public final static String MSG_SMS = "msg.sms";public final static String TOPIC_EXCHANGE = "topicExchange";@Beanpublic Queue firstQueue() {return new Queue(TopicRabbitConfig.MSG_EMAIL);}@Beanpublic Queue secondQueue() {return new Queue(TopicRabbitConfig.MSG_EMAIL_A);}@Beanpublic Queue thirdQueue() {return new Queue(TopicRabbitConfig.MSG_SMS);}@BeanTopicExchange exchange() {return new TopicExchange(TOPIC_EXCHANGE);}@BeanBinding bindingExchangeMessage() {return BindingBuilder.bind(firstQueue()).to(exchange()).with(MSG_EMAIL);}@BeanBinding bindingExchangeMessage2() {return BindingBuilder.bind(secondQueue()).to(exchange()).with("msg.#");}@BeanBinding bindingExchangeMessage3() {return BindingBuilder.bind(thirdQueue()).to(exchange()).with("msg.*");}
}
消费者
import com.chendi.springboot_rabbitmq.config.TopicRabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class TopicReceiver {@RabbitListener(queues = TopicRabbitConfig.MSG_EMAIL)public void topic_man(String message) {log.info("队列{} 收到消息:{}" ,TopicRabbitConfig.MSG_EMAIL, message);}@RabbitListener(queues = TopicRabbitConfig.MSG_SMS)public void topic_woman(String message) {log.info("队列{} 收到消息:{}" ,TopicRabbitConfig.MSG_SMS, message);}@RabbitListener(queues = TopicRabbitConfig.MSG_EMAIL_A)public void xxx(String message) {log.info("队列{} 收到消息:{}" ,TopicRabbitConfig.MSG_EMAIL_A, message);}
}
测试生产者 Controller加上
@GetMapping("/sendTopicMessage")
public String sendTopicMessage() {rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE, TopicRabbitConfig.MSG_EMAIL, "Hello Topic!所有队列都可以收到这条信息");rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE, TopicRabbitConfig.MSG_EMAIL_A, "只有 msg.email.a可以收到这条信息");rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE, TopicRabbitConfig.MSG_SMS, "msg.email.a 和 msg.sms可以收到这条信息");return "发送完成";
}
如果开启了消息手动确认机制,一定要记得应答消息噢!!!
以上整合就完成了。
三、实战应用场景
3.1 如何控制消息有序
1、当只有一个消费者可以保证消息有序,但是效率低。
2、生产者顺序发送消息到队列但是多个消费者监听一个队列时会轮询分发导致乱序。修改为一个消费者只监听一个队列,生产者自定义投放策略,1、2、3投放到A队列,4、5、6投放到B队列(顺序的消息为一个整体)投放至一个队列。
3.2 保证消息不被重复消费(幂等性)
在消费者消费结束后,正常情况下会发送回执给消息队列,证明该消息已被消费。但是此时消费者网络传输故障或者宕机了,消息队列收不到消息被消费的回执会将消息再分发给其他消费者,进而导致消息被消费多次。
·······
解决方法:(具体问题具体分析)
1、在redis中维护一个set,生产者在发送消息前,加上全局唯一的id,消费者消费之前,去redis中查一下,看是否消费过,如果没有消费过则继续执行。
//生产者
public void sendMessageIde() {MessageProperties properties = new MessageProperties();properties.setMessageId(UUID.randomUUID().toString());Message message = new Message("消息".getBytes(), properties);rabbitTemplate.convertAndSend("exchange", "", message);
}//消费者
@RabbitListener(queues = "queue")
@RabbitHandler
public void processIde(Message message, Channel channel) throws IOException {if (stringRedisTemplate.opsForValue().setIfAbsent(message.getMessageProperties().getMessageId(),"1")){// 业务操作...System.out.println("消费消息:"+ new String(message.getBody(), "UTF-8"));// 手动确认 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}
3.3 保证消息的可靠性
消息发送流程
可以看出,生产者发送的消息准确抵达消费者分为两部分
1、发送端 :消息投递到Broker成功时回调confirmCallback,交换机投递到队列失败时回调returnCallback
2、消费端的ack
配置文件
spring:rabbitmq:publisher-returns: true # 开启消息抵达队列的确认 # 低版本 publisher-confirms: truepublisher-confirm-type: correlated # 开启发送端确认
配置类
/*** 常用的三个配置如下* 1---设置手动应答(acknowledge-mode: manual)* 2---设置生产者消息发送的确认回调机制 ( #这个配置是保证提供者确保消息推送到交换机中,不管成不成功,都会回调* publisher-confirm-type: correlated* #保证交换机能把消息推送到队列中* publisher-returns: true* template:* #以下是rabbitmqTemplate配置* mandatory: true)* 3---设置重试*/
@Slf4j
@Configuration
public class RabbitConfig {@Autowiredprivate ConnectionFactory rabbitConnectionFactory;// 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的template的配置),如果想自定义来区分开 需要改变bean 的名称@Beanpublic RabbitTemplate rabbitTemplate(){RabbitTemplate rabbitTemplate=new RabbitTemplate(rabbitConnectionFactory);//默认是用jdk序列化//数据转换为json存入消息队列,方便可视化界面查看消息数据rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数rabbitTemplate.setMandatory(true);//此处设置重试template后,会再生产者发送消息的时候,调用该template中的调用链rabbitTemplate.setRetryTemplate(rabbitRetryTemplate());rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if(!ack){System.out.println("ConfirmCallback "+"相关数据:"+ correlationData);System.out.println("ConfirmCallback "+"确认情况:"+ ack);System.out.println("ConfirmCallback "+"原因:"+ cause);}});rabbitTemplate.setReturnsCallback((ReturnedMessage returned) -> {System.out.println("ReturnsCallback: "+"消息:"+ returned.getMessage());System.out.println("ReturnsCallback: "+"回应码:"+ returned.getReplyCode());System.out.println("ReturnsCallback: "+"回应消息:"+ returned.getReplyText());System.out.println("ReturnsCallback: "+"交换机:"+ returned.getExchange());System.out.println("ReturnsCallback: "+"路由键:"+ returned.getRoutingKey());});return rabbitTemplate;}//重试的Template@Beanpublic RetryTemplate rabbitRetryTemplate() {RetryTemplate retryTemplate = new RetryTemplate();// 设置监听 调用重试处理过程retryTemplate.registerListener(new RetryListener() {@Overridepublic <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) {// 执行之前调用 (返回false时会终止执行)//log.info("执行之前调用 (返回false时会终止执行)");return true;}@Overridepublic <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {// 方法结束的时候调用if(retryContext.getRetryCount() > 0){log.info("最后一次调用");}}@Overridepublic <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {// 方法异常时会调用log.info("第{}次调用", retryContext.getRetryCount());}});return retryTemplate;}
}
发送端测试
import com.chendi.springboot_rabbitmq.config.DirectRabbitConfig;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;@RestController
public class SendCallbackMessageController {@AutowiredRabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法@ResponseBody@GetMapping("/sendMessageToExchangeFail")public Object sendMessageToExchangeFail() {String messageData = "这条消息不会到达交换机";rabbitTemplate.convertAndSend("不存在的交换机", "", messageData, new CorrelationData(UUID.randomUUID().toString()));return messageData;}@ResponseBody@GetMapping("/sendMessageToQueueFail")public Object sendMessageToQueueFail() {String messageData = "这条消息不会到达队列";rabbitTemplate.convertAndSend(DirectRabbitConfig.DIRECT_EXCHANGE, "不存在的路由键", messageData, new CorrelationData(UUID.randomUUID().toString()));return messageData;}
}
请求结果:
3.4 死信队列解决订单超时未支付
场景:当顾客购买一件商品存在的操作
生成订单 =》 扣减库存 =》 完成支付
当库存只剩1件时,A用户下单但是迟迟未支付,会导致B用户下单时,判断库存不足导致生成订单失败。
此时,就需要解决订单超时未支付的问题。
流程 :
初始化两组正常队列和交换机A、B,A组的初始化参数x-dead-letter-exchange、x-dead-letter-routing-key指向B组的交换机和路由键。意在,A中删除或过期的数据,可以放入指定交换机指定路由键的队列中。
-这样如果设置了订单超过5min未支付
发送方在发送消息时,指定过期时间为5 * 60 * 1000
时间过期后此消息会投递到队列B(死信队列)中,队列B根据订单id去判断是否支付,去做加库存等相应的操作。
死信队列配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** 解决订单超时未支付的问题** 创建两个队列* 1、队列A(正常的队列只是设置了某些参数):设置队列中的超时未消费信息指定丢到对应的队列B* 2、队列B(也是一个正常的队列),只是把超时的信息丢给它所以称呼为死信队列*/@Configuration
public class DeadLetterExchangeConfig {/*** x-message-tti(Time-To-Live)发送到队列的消息在丟弃之前可以存活多长时间(毫秒)* x-max-length限制队列最大长度(新增后挤出最早的),单位个数* x-expires队列没有访问超时时,自动删除(包含没有消费的消息),单位毫秒* x-max-length-bytes限制队列最大容量* x-dead-letter-exchange死信交换机,将删除/过期的数据,放入指定交换机* x-dead-letter-routing-key死信路由,将删除/过期的数据,放入指定routingKey* x-max-priority队列优先级* x-queue-mode对列模式,默认lazy(将数据放入磁盘,消费时放入内存)* x-queue-master-locator镜像队列*/@Beanpublic Queue orderQueue(){Map<String, Object> args = new HashMap<>(2);// 绑定我们的死信交换机args.put("x-dead-letter-exchange", "orderDeadExChange");// 绑定我们的路由keyargs.put("x-dead-letter-routing-key", "orderDeadRoutingKey");return new Queue("orderQueue", true, false, false, args);}@Beanpublic Queue orderDeadQueue(){return new Queue("orderDeadQueue");}@Beanpublic DirectExchange orderExchange(){return new DirectExchange("orderExchange");}@Beanpublic DirectExchange orderDeadExchange(){return new DirectExchange("orderDeadExChange");}//绑定正常队列到交换机@Beanpublic Binding orderBindingExchange(Queue orderQueue, DirectExchange orderExchange) {return BindingBuilder.bind(orderQueue).to(orderExchange).with("orderRoutingKey");}//绑定死信队列到死信交换机@Beanpublic Binding deadBindingExchange(Queue orderDeadQueue, DirectExchange orderDeadExchange) {return BindingBuilder.bind(orderDeadQueue).to(orderDeadExchange).with("orderDeadRoutingKey");}
}
消费者
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/*** 死信队列的消费者*/
@Slf4j
@Component
public class DeadLetterReceiver {@RabbitListener(queues = "orderDeadQueue")public void orderDeadQueueReceiver(String dataMsg, Channel channel, Message message) {try{long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("死信队列接收者A收到消息,根据订单id查询订单是否支付,未支付解锁库存 deliveryTag:{} dataMsg:{} ",deliveryTag ,dataMsg);channel.basicAck(deliveryTag,false);} catch (Exception e){log.info("如果报错了,执行补偿机制");}}
}
生产者
@GetMapping("/createOrder")
public String createOrder() {rabbitTemplate.convertAndSend("orderExchange", "orderRoutingKey", "我是订单json", message -> {//设置过期时间10smessage.getMessageProperties().setExpiration("10000");return message;});return "发送完成";
}
总结
MQ的应用场景:
- 异步处理(注册发邮件发短消息)
- 应用解耦(用户下单后,订单系统需要通知库存系统扣减库存,就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失.)
- 流量削峰(秒杀活动,一般会因为流量过大,导致应用挂掉,设置消息队列参数,如果长度超过最大值,则直接抛弃用户请求或跳转到错误页面)
相关文章:
手把手搭建springboot项目06-springboot整合RabbitMQ及其原理和应用场景
目录前言工作流程-灵魂画手名词解释交换机类型一、安装1.1 [RabbitMQ官网安装](https://www.rabbitmq.com/download.html)1.2 Docker安装并启动二、食用教程2.1.导入依赖2.2 添加配置2.3 代码实现2.3.1 直连(Direct)类型2.3.2 引入消息手动确认机制2.3.2…...
如何根据IP地址判断是IPv4还是IPv6
IPv4地址的书写形式为:“192.168.0.1” IPv6地址的书写形式为:“2001:DB8:85A3:8D3:1319:8A2E:370:7344” 给你一个IP地址,它有三种可能:IPv4、IPv6、既不是IPv4也不是IPv6的无效地址。所以,如果用函数ipGetAddressAsNumber,只能判断是不是ipv4,编写如下函数: int R…...
山地车和公路车怎么选
公路车: 只能适应平坦的路面,骑行阻力小,速度快比较适合新手 山地车: 能适应所有路面,更注重操控性和舒适性 怎么选? 1、先决定用途 旅游:旅行车、山地车、 通勤:公路车 2、预…...
Zotero设置毕业论文/中文期刊参考文献格式
大家在使用zotero时很容易遇到的问题: 英文参考文献中有多个作者时出现“等”,而不是用"et al"引文最后面有不需要的DOI号,或者论文链接对于一些期刊分类上会出现OL字样,即[J/OL]作者名为全大写 本文主要解决以上几个…...
【人工智能与深度学习】自动编码器的简介
【人工智能与深度学习】自动编码器的简介 自动编码器的应用图片生成像素空间和潜在空间插值的差异图像超级分辨率图像修补由文字说明转成图片什么是自动编码器?为什么我们用自动编码器?重建损失完成过度降噪自动编码器:Denoising autoencoder压缩式自动编码器定义自动编码器…...
Isaac-gym(9):项目更新、benchmarks框架梳理
一、项目更新 近期重新git clone isaac gym的强化部分(具体见系列第5篇)时发现官方的github库有跟新,git clone下来后发现多了若干个task,在环境配置上也有一定区别。 例如新旧两版工程项目的setup.py区别如下: git …...
Linux 学习笔记(一):终端 和 Shell 的区别和联系
一、Linux 介绍 1、什么是 Linux Linux 就是一个操作系统,全称 GNU/Linux,是一种类 Unix 操作系统Linux 一开始是没有图形界面的,所有操作都靠 命令 完成。如 磁盘操作、文件存取、目录操作、进程管理、文件权限 等等,可以说 Li…...
cycleGAN算法解读
本文参考:https://blog.csdn.net/Mr_health/article/details/112545671 1 CycleGAN概述 CycleGAN:循环生成对抗神经网络,是一种非监督学习模型。 Pix2pix方法适用于成对数据的风格迁移,而大多数情况下对于A风格的图像…...
解读“方差”
其实,从这个标题就可以看出来,方差,这个问题不简单, 先给出定义: 方差其实应该叫,差方差,(差方)差,差的平方的差,与差的平方之间的误差࿰…...
记录面试问题
以下问题不分先后,按照印象深浅排序,可能一次记录不完成,后面想起来会及时补充,如有不对,恳请各位围观大佬多多指教🙏 印象最深的是一道很简单很简单的题目,我结束面试之后赶紧代码敲敲发现答错…...
(六十四)设计索引的时候,我们一般要考虑哪些因素呢?(上)
本周我们将要讲解一下设计索引的时候,我们通常应该考虑哪些因素,给哪些字段建立索引,如何建立索引,建立好索引之后应该如何使用才是最合适的。 可能有的朋友会希望尽快更新后面的内容,但是因为工作的原因的确非常忙&a…...
【蓝桥杯嵌入式】LCD屏的原理图解析与代码实现(第十三届省赛为例)——STM32
🎊【蓝桥杯嵌入式】专题正在持续更新中,原理图解析✨,各模块分析✨以及历年真题讲解✨都在这儿哦,欢迎大家前往订阅本专题,获取更多详细信息哦🎏🎏🎏 🪔本系列专栏 - 蓝…...
论文学习——Reproducing Activation Function for Deep Learning
论文学习——Reproducing Activation Function Abstract RAFs将集中基础激活函数进行线性组合,构建出神经元级的、数据驱动的激活函数。使用RAFs为激活函数的神经网络可以重现传统的近似工具,也能相对于传统网络以更少的参数量拟合目标函数。训练过程中,RAFs可以以更好的条…...
【趣味学Python】Python基础语法讲解
目录 编码 标识符 python保留字 注释 实例(Python 3.0) 实例(Python 3.0) 行与缩进 实例(Python 3.0) 实例 多行语句 数字(Number)类型 字符串(String) 实例(Python 3.0) 空行 等待用户输入 实例(Python 3.0) 同一行显示多条语句 实例(Python 3.0) 多个语句构…...
虚拟局域网VLAN的实现机制
虚拟局域网VLAN的实现机制1.IEEE 802.1Q帧2.交换的端口类型AccessTrunkHybrid(华为特有)1.IEEE 802.1Q帧 IEEE802.1Q帧(也称Dot One Q帧)对以太网的MAC帧格式进行了扩展,插入了4字节的VLAN标记。 2.交换的端口类型 A…...
Mask R-CNN 算法学习总结
Mask R-CNN 相关知识点整体框架1.Resnet 深度残差学习1.1 目的1.2 深度学习深度增加带来的问题1.3 Resnet实现思想【添加恒等映射】2.线性插值2.1 目的2.2 线性插值原理2.3 为什么使用线性插值?3.FPN 特征金字塔3.1 FPN介绍3.2 为什么使用FPN?3.3 自下而上层【提取特征】3.4 …...
Gorm -- 添加记录
文章目录添加单条记录直接添加模型对象赋予默认值方法一: gorm 标签赋予默认值方法二: 设置钩子方法(Hooks)指定字段插入插入时忽略某些字段插入时禁止使用钩子方法添加多条记录通过对象列表插入通过字典列表插入在字典中使用SQL内…...
go提高升阶(四) I/O流学习
I/O 官网课程 购买课程找博主推荐 文章目录I/O文件信息创建文件、目录IO读IO写(权限)文件复制Seeker接口断点续传遍历文件夹bufio电脑中一切,都是以 二进制流的形式存在的。jpg:010100000010010101001010101010010101010 编码格式,还原为一个…...
【代码随想录训练营】【Day28】第七章|回溯算法|93.复原IP地址|78.子集|90.子集II
复原IP地址 题目详细:LeetCode.93 这道题与上一道练习题分割回文字符串十分详细,一样是涉及到分割字符串、判断字符串、递归与回溯的问题,所以这道题要解决的难点在于: 如何分割IP地址字符串如何判断分割的IP地址是否合法递归的…...
Get请求和Post请求区别
前后端交互请求数据的方式有很多种。 例如:Get Post Put Patch Delete Copy 等等很多请求方式 但是用的最多的就是Get和Post Get请求方式 1. get多用于从服务器请求获取数据 2.get传送的数据量较小,不能大于2KB 3.get安全性非常低 Post请求方式 1.…...
static关键字
static的基本基本用法可以分为下面几种: (1)static修饰全局变量 (2) 修饰局部变量 (3)修饰普通函数 (4)修饰类的成员变量 一、static修饰全局变量 当同时编译多个文件时…...
A Comprehensive Tool for Modeling CMOS Image-Sensor-Noise Performance论文总结及翻译
A Comprehensive Tool for Modeling CMOS Image-Sensor-Noise Performance Author: Ryan D. Gow Link: https://ieeexplore.ieee.org/document/4215175/metrics#metrics Select: ⭐️⭐️⭐️⭐️ Type: Academic Journal 备注: CMOS图像传感器噪声性能建模的综合工具 总结 …...
嘀嗒出行再闯IPO:千军万马我无懈
羽扇纶巾笑谈间,千军万马我无懈。 在激烈竞争中再度冲刺港交所IPO的嘀嗒出行,闪露出一丝歌词里的气魄。交通运输部下属网约车监管信息交互系统的数据显示,截至2023年1月31日,全国共有300家网约车平台公司取得网约车平台经营许可。…...
MATLAB算法实战应用案例精讲-【优化算法】增强型鲸鱼优化算法(EWOA)(附matlab代码实现)
前言 增强型鲸鱼优化算法(Enhanced Whale Optimization Algorithm,EWOA)是Mohammad H. Nadimi-Shahraki等人于2022年提出的一种改进算法。由于标准的鲸鱼优化算法及其它的改进算法都存在种群多样性低和搜索策略差的问题,因此引入有效的策略来缓解鲸鱼优化算法的这些核心缺点…...
登录Oracle数据库遇到ORA-01017密码错误的解决办法
文章目录症状分析解决办法欢迎加下方我的微信👇,拉你入学习群我们在登录Oracle数据库时可能会遇到ORA-01017错误,这里分析原因并提供解决办法。点击试看博主的专著《MySQL 8.0运维与优化》(清华大学出版社) 症状 图像…...
10个黑客基础教程!简单有效
如果你的电脑运行缓慢,请使用下面介绍的方法来帮助加速、优化和提高电脑的性能。 1.关闭启动时自动运行的应用程序 计算机上安装的许多应用程序都可以将自己配置为在启动期间自动启动并继续在后台运行,但是,如果不是每天都使用这些应用程序…...
JPA之实体之间的关系
JPA之实体之间的关系 10.1.1实体类创建 注解的应用 Table,Entity IdGeneratedValue指定主键,Column P174 实体类编写规范 Table(name "t_user") Entity(name "User") public class User implements Serializable {IdGeneratedVa…...
如何在 C++ 中调用 python 解析器来执行 python 代码(三)?
本文在 C 中调用 multi.py 脚本,并向它传入参数并执行,然后获得返回值并在 C 中打印结果。 目录 如何在 C 中调用 python 解析器来执行 python 代码(一)?如何在 C 中调用 python 解析器来执行 python 代码࿰…...
【Linux】gcc/g++/gdb的使用
🔥🔥 欢迎来到小林的博客!! 🛰️博客主页:✈️小林爱敲代码 🛰️社区 : 进步学堂 🛰️欢迎关注:👍点赞🙌收…...
浅浅谈一谈B树和B+树
目录: 🚀1.B树 🚀2.B树 索引背后的数据结构是啥呢,是B树,是为了数据库索引设计的,我们可以先了解B树,再说B树 1.什么是B树 B树也叫B-树,这里的-不读减,是一个符号 我们已经学过了二叉搜素树,B树其实就是N叉搜素树,二叉搜索树只能在每一个结点放一个…...
做网站 带宽 多少/免费的舆情网站app
在今天这个充斥着各种激烈竞争的重压时代,每个人都在忙碌的生活着。小孩子要上各种辅导班,培养很多特长的同时还要完成学校布置的作业,大孩子学习任务更重,努力考上好学校,在我们的忙碌琐碎日常生活中,会有…...
网络游戏中心/站长之家seo综合
先看牛逼的草图 知乎上刚看到类似的需求 Python Web导出有排版要求的PDF文件 关键技术 转载于:https://www.cnblogs.com/wancy86/p/PDFPY.html...
卓越亚马逊网站建设目的/做网站的网络公司
概述 Intellij IDEA真是越用越觉得它强大,它总是在我们写代码的时候,不时给我们来个小惊喜。出于对Intellij IDEA的喜爱,我决定写一个与其相关的专栏或者系列,把一些好用的Intellij IDEA技巧分享给大家。本文是这个系列的第一篇&a…...
珠海哪个公司做网站好/网站seo诊断分析
早些时候,有个客户14块盘的磁盘阵列出现故障,需要恢复的数据是oracle数据库,客户在寻求数据恢复技术支持,要求我提供详细的数据恢复方案,以下是提供给客户的详细数据恢复解决方案,本方案包含Raid数据恢复和…...
张家港网站建设培训学校/it培训
1、逐步回归法,班级:研1614,学生:秦培歌,认为社会学家犯罪和收入低,与失业和人口规模有关,20个城市的犯罪率(每10万人的犯罪人数)和年收入在5000美元以下的家庭的百分比1,失业率2和人…...
wordpress媒体默认链接/灰色行业推广平台网站
使用 <script setup>组合式 API 的语法糖的时候,defineProps报错: 代码如下: 第一次写vue3的项目,真的是到处都是坑啊,我就不断的百度百度再百度,发现再 module.exports {root: true,env: {node: …...