当前位置: 首页 > news >正文

Springboot整合RabbitMQ消息中间件

spring-boot-rabbitmq–消息中间件整合

前言:RabbitMQ的各种交换机说明

1、直连交换机
  • 生产者发布消息时必须带着routing-key,队列绑定到交换机时必须指定binding-key ,且routing-key和binding-key必须完全相同,如此才能将消息路由到队列中
  • 直连交换机通常用来循环分发任务给多个workers,例如在一个日志处理系统中,一个worker处理error级别日志,另外一个worker用来处理info级别的日志,此时生产者只需要在发送时指定特定的routing-key即可,绑定队列时binding-key只需要和routing-key保持一致即可接收到特定的消息。
2、扇形交换机
  • 相对于直连交换机,扇形交换机没有路由设置
3、主题交换机
  • routing-key必须由多个单词或者通配符组成,单词或者通配符之间使用.隔开,上限为255个字节;
  • 通配符只能匹配一个单词;
  • 通配符可以匹配零个或者多个单词;
    队列绑定交换机时的binding-key要能够匹配发送消息时的routing-key才能将消息路由到对应的队列;
  • 根据routing-key和binding-key的匹配情况,消息可能进入单个队列,也可能进入多个队列,也可能丢失
  • 主题队列的routing-key设置为#时,表示所有所有的队列都可以接收到消息,相当于fanout交换机;
  • 主题队列的routing-key中不包含#或者*时,表示指定队列可以接收到消息,相当于direct交换机;
4、RabbitMQ有五种消息模式
  • 无交换机模式
    • 简单模式【点对点模式】
    • 工作模式【一对多,资源争抢模式】
  • 有交换机模式
    • 直连交换机【通过路由Key进行分配到不同队列】
    • 扇形交换机【发布订阅模式,即生产者将消息发布到订阅的队列上】
    • 主题交换机【通过主题标识分配,属于直连交换机的升级】
5、消息确认机制
  • 不确认
  • 自动确认
  • 手动确认:
6、RabbitMQ可以作为RPC异步调用

一、基础快速入门

  • 一个系统有消息发送也有消息接收,本示例采用发送和接收放到一个项目中
  • 本示例采用简易配置,附录中有详细配置参数
1、添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、公共配置项整理
  • listener.simple的concurrency和max-concurrency 为并发线程处理,
  • rabbitmq.template.retry下的是生产者重试配置,listener.retry 为消费者重试配置
  • publisher-confirms 是生产消息到达exchange后回调,publisher-returns 是exhcange路由到queue的回调
spring:application:name: rabbitmq-provider# 配置rabbitmqrabbitmq:# 连接地址,多个地址之间用都好隔开address: 192.168.1.82:5762, 192.168.1.82:5763,192.168.1.82:5764connection-timeout: 15000 # 连接超时时间virtual-host: /	# 虚拟主机的方式username: guestpassword: guest# 生产者配置publisher-confirm-type: simple  # 设置回调方式为simple,correlatedpublisher-confirms: true #开启消息到达exchange的回调,发送成功失败都会触发回调publisher-returns: true #开启消息从exhcange路由到queue的回调,只有路由失败时才会触发回调template:#为true时,如果exchange根据routingKey将消息路由到queue时找不到匹配的queue,# 触发return回调,为false时,exchange直接丢弃消息。mandatory: true# 配置重试机制retry:enabled: true # 开启发送机制max-attempts: 3 # 最大重试次数。默认为 3 。initial-interval: 1000 # 重试间隔,单位为毫秒。默认为 1000 。# 消费者监听器配置listener:simple:concurrency: 2 # 每个 @ListenerContainer 的并发消费的线程数max-concurrency: 10 # 每个 @ListenerCon 允许的并发消费的线程数acknowledge-mode: manual   #auto 自动确认,manual 手动确认,none 没有消息确认prefetch: 5# 重试机制retry:  # 配置重试机制enabled: true # 开启消费重试机制max-attempts: 3 # 最大重试次数。默认为 3 。initial-interval: 1000 # 重试间隔,单位为毫秒。默认为 1000 。
3、公用配置类整理

队列初始化工作可以在RabbitMQ的界面上创建,也可以采用代码的方式初始化,一般建议在管理平台上创建

一般需要主机交换机名称,队列名称,路由名称的使用

  • DirectExchange(String name, boolean durable, boolean autoDelete) 直连交换机
  • FanoutExchange(String name, boolean durable, boolean autoDelete) 扇形交换机
  • TopicExchange(String name, boolean durable, boolean autoDelete) 主题交换机
  • Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) 队列
  • Binding 绑定
@Configuration
public class RabbitConfig {//JSON序列化@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}/*** 直连交换机配置*/public static class DirectExchangeDemoConfiguration {// 创建 Queue@Beanpublic Queue demo01Queue() {return new Queue(Demo01Message.QUEUE, // Queue 名字true, // durable: 是否持久化false, // exclusive: 是否排它false); // autoDelete: 是否自动删除}.build();}// 创建 Direct Exchange@Beanpublic DirectExchange demo01Exchange() {return new DirectExchange(Demo01Message.EXCHANGE,true,  // durable: 是否持久化false);  // exclusive: 是否排它}// 创建 Binding// Exchange:Demo01Message.EXCHANGE// Routing key:Demo01Message.ROUTING_KEY// Queue:Demo01Message.QUEUE@Beanpublic Binding demo01Binding() {return BindingBuilder.bind(demo01Queue()).to(demo01Exchange()).with(Demo01Message.ROUTING_KEY);}}/*** 主题交换机 */public static class TopicExchangeDemoConfiguration {// 创建 Queue@Beanpublic Queue demo02Queue() {return new Queue(Demo02Message.QUEUE, // Queue 名字true, // durable: 是否持久化false, // exclusive: 是否排它false); // autoDelete: 是否自动删除}// 创建 Topic Exchange@Beanpublic TopicExchange demo02Exchange() {return new TopicExchange(Demo02Message.EXCHANGE,true,  // durable: 是否持久化false);  // exclusive: 是否排它}// 创建 Binding// Exchange:Demo02Message.EXCHANGE// Routing key:Demo02Message.ROUTING_KEY// Queue:Demo02Message.QUEUE@Beanpublic Binding demo02Binding() {return BindingBuilder.bind(demo02Queue()).to(demo02Exchange()).with(Demo02Message.ROUTING_KEY);}}/*** 扇形交换机*/public static class FanoutExchangeDemoConfiguration {// 创建 Queue A@Beanpublic Queue demo03QueueA() {return new Queue(Demo03Message.QUEUE_A, // Queue 名字true, // durable: 是否持久化false, // exclusive: 是否排它false); // autoDelete: 是否自动删除}// 创建 Queue B@Beanpublic Queue demo03QueueB() {return new Queue(Demo03Message.QUEUE_B, // Queue 名字true, // durable: 是否持久化false, // exclusive: 是否排它false); // autoDelete: 是否自动删除}// 创建 Fanout Exchange@Beanpublic FanoutExchange demo03Exchange() {return new FanoutExchange(Demo03Message.EXCHANGE,true,  // durable: 是否持久化false);  // exclusive: 是否排它}// 创建 Binding A// Exchange:Demo03Message.EXCHANGE// Queue:Demo03Message.QUEUE_A@Beanpublic Binding demo03BindingA() {return BindingBuilder.bind(demo03QueueA()).to(demo03Exchange());}// 创建 Binding B// Exchange:Demo03Message.EXCHANGE// Queue:Demo03Message.QUEUE_B@Beanpublic Binding demo03BindingB() {return BindingBuilder.bind(demo03QueueB()).to(demo03Exchange());}}/*** HeadersExchange 示例的配置类*/public static class HeadersExchangeDemoConfiguration {// 创建 Queue@Beanpublic Queue demo04Queue() {return new Queue(Demo04Message.QUEUE, // Queue 名字true, // durable: 是否持久化false, // exclusive: 是否排它false); // autoDelete: 是否自动删除}// 创建 Headers Exchange@Beanpublic HeadersExchange demo04Exchange() {return new HeadersExchange(Demo04Message.EXCHANGE,true,  // durable: 是否持久化false);  // exclusive: 是否排它}// 创建 Binding// Exchange:Demo04Message.EXCHANGE// Queue:Demo04Message.QUEUE// Headers: Demo04Message.HEADER_KEY + Demo04Message.HEADER_VALUE@Beanpublic Binding demo4Binding() {return BindingBuilder.bind(demo04Queue()).to(demo04Exchange()).where(Demo04Message.HEADER_KEY).matches(Demo04Message.HEADER_VALUE); // 配置 Headers 匹配}}}
4、消息生产者和消费者类

一般建议,一类消息生产采用一个类,做到职责单一。

交换机,路由,和主题的名称最好采用枚举或者常量的方式定义

注意下面的消息生产者和消息消费者的交换机方式是一一对应的

二、消息生产者

  • rabbitTemplate#convertAndSend 方法实现所有形式的消息发放
  • rabbitTemplate.setConfirmCallback 设置消息回调
  • 直连交换机是常用的方式
RabbitMq消息生产者实例
public class rabbitMqProducer{//点对点模式public void send(String msg){amqpTemplate.convertAndSend(SIMPLE_QUEUE_NAME, msg );}//工作模式public void work() throws InterruptedException {String msg = "这是一个work模式";for (int i = 0; i < 10; i++) {amqpTemplate.convertAndSend(WORKER_QUEUE_NAME, msg + i);Thread.sleep(5000);}}//发送同步消息@Overridepublic void sendMessage(Object message) {rabbitTemplate.convertAndSend(Constants.SAVE_USER_EXCHANGE_NAME,Constants.SAVE_USER_QUEUE_ROUTE_KEY,message, correlationData);log.info("发送消息到RabbitMQ, 消息内容: " + message);}//发送异步消息@Asyncpublic ListenableFuture<Void> asyncSend(Integer id) {try {//设置消息属性MessageProperties messageProperties = new MessageProperties();messageProperties.setMessageId(UUID.randomUUID().toString());messageProperties.setContentType("text/plain");messageProperties.setContentEncoding("utf-8");// 发送消息Message message = new Message(messageStr.getBytes(), messageProperties);message.setId(id);// 同步发送消息rabbitTemplate.convertAndSend(Demo01Message.EXCHANGE, Demo01Message.ROUTING_KEY, message);// 返回成功的 Futurereturn AsyncResult.forValue(null);} catch (Throwable ex) {// 返回异常的 Futurereturn AsyncResult.forExecutionException(ex);}}//扇形交换机public void sendMessage(Object message) {log.info("【消息发送者】发送消息到fanout交换机,消息内容为: {}", message);rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE_NAME, "", message);}//带消息序列的消息public void sendMessage() {//采用内部类方式for (int i = 1; i <= 3; i++) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());logger.info("【Producer】发送的消费ID = {}", correlationData.getId());String msg = "hello confirm message" + i;logger.info("【Producer】发送的消息 = {}", msg);rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, msg, correlationData);}}//主题交换机//确认回调接口final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {/*** 	@param ack broker 是否落盘成功* 	@param cause 失败的一些异常信息*/@Overridepublic void confirm(CorrelationData correlationData,boolean ack, String cause) {System.err.println("消息ACK结果:" + ack+ ", correlationData: " + correlationData.getId());}};//  简单的主题消息public void sendMessage1(Object message,Map<String, Object> properties) throws Exception {//创建消息MessageHeaders mhs = new MessageHeaders(properties);Message<?> msg = MessageBuilder.createMessage(message, mhs);rabbitTemplate.setConfirmCallback(confirmCallback);// 	指定业务唯一的iDCorrelationData correlationData =new CorrelationData(UUID.randomUUID().toString());//消息处理器MessagePostProcessor mpp = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {System.err.println("---> post to do: " + message);return message;}};//消息发送,交换机,路由rabbitTemplate.convertAndSend("exchange-1",   "route.01",  msg,mpp,  correlationData);}//消息发送public void syncSend(Integer id, String headerValue) {// 创建 MessageProperties 属性MessageProperties messageProperties = new MessageProperties();messageProperties.setHeader(Demo04Message.HEADER_KEY, headerValue); // 设置 header// 创建 Message 消息Message message = rabbitTemplate.getMessageConverter().toMessage(new Demo04Message().setId(id), messageProperties);// 同步发送消息rabbitTemplate.send(Demo04Message.EXCHANGE, null, message);}
}
@Data
public class Demo07Message implements Serializable {public static final String QUEUE = "QUEUE_DEMO_07"; // 正常队列public static final String DEAD_QUEUE = "DEAD_QUEUE_DEMO_07"; // 死信队列public static final String EXCHANGE = "EXCHANGE_DEMO_07";public static final String ROUTING_KEY = "ROUTING_KEY_07"; // 正常路由键public static final String DEAD_ROUTING_KEY = "DEAD_ROUTING_KEY_07"; // 死信路由键private Integer id;private String body;
}
6、Headers交换机

Headers Exchange 不依赖于 routing key 与 binding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配

//消息体重定义HEADER_KEY
public class Demo04Message implements Serializable {public static final String QUEUE = "QUEUE_DEMO_04_A";public static final String EXCHANGE = "EXCHANGE_DEMO_04";public static final String HEADER_KEY = "color";public static final String HEADER_VALUE = "red";/*** 编号*/private Integer id;// ... 省略 set/get/toString 方法}

三、消息消费者

  • @RabbitHandler 只能放在方法上,表明这是一个消息处理方法
  • @RabbitListener 可以放在方法上和类上,一般使用在方法上
    • queues 指定队列名称
    • concurrency 并发消费
消息消费者示例
public class RabbitMQConsumerDemo{// 通过注解自动创建 spring.simple.queue 队列@RabbitListener(queuesToDeclare = @Queue("spring.simple.queue"))public void listen(String msg) {System.out.println("简单队列 接收到消息:" + msg);}// 通过注解自动创建 spring.work.queue 队列@RabbitListener(queuesToDeclare = @Queue("spring.work.queue"))public void listen(String msg) {System.out.println("work模式 接收到消息:" + msg);}// 创建两个队列共同消费@RabbitListener(queuesToDeclare = @Queue("spring.work.queue"))public void listen2(String msg) {System.out.println("work模式二 接收到消息:" + msg);}//直连交换机示例@RabbitHandler@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "customer.code.wechatCallback",autoDelete = "false"),exchange = @Exchange(value = "code.wechatCallback",type = ExchangeTypes.FANOUT)),concurrency = "2")public void receiveMessage01(String msg, Channel channel, Message message) throws IOException {try {// 这里模拟一个空指针异常,String string = null;string.length();//消费端限流channel.basicQos(3,false);log.info("【Consumer01成功接收到消息】>>> {}", msg);// 确认收到消息,只确认当前消费者的一个消息收到channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {if (message.getMessageProperties().getRedelivered()) {log.info("【Consumer01】消息已经回滚过,拒绝接收消息 : {}", msg);// 拒绝消息,并且不再重新进入队列channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} else {log.info("【Consumer01】消息即将返回队列重新处理 :{}", msg);//设置消息重新回到队列处理// requeue表示是否重新回到队列,true重新入队channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}}//扇形交换机示例@RabbitHandler@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "customer.code.wechatCallback",autoDelete = "false"),exchange = @Exchange(value = "code.wechatCallback",type = ExchangeTypes.FANOUT)),concurrency = "1")public void receiveMessage(Object message) {logger.info("消息接收者接收到来自【队列一】的消息,消息内容: {}", message);}//主题交换机@RabbitHandler@RabbitListener(bindings =  @QueueBinding(value= @Queue(value = "queue-1",durable = "true",autoDelete = "false"),exchange =  @Exchange(name = "exchange-1",durable = "true",type = ExchangeTypes.TOPICignoreDeclarationExceptions = "true"),key = "route.*"))public void onMessage(Message message, Channel channel,String body) throws Exception {//	1. 收到消息以后进行业务端消费处理System.err.println("消费消息:" + message.getPayload());//设置手工牵手Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);//手动确认消息channel.basicAck(deliveryTag, false);}//header交换机@Component@RabbitListener(queues = Demo04Message.QUEUE)public class Demo04Consumer {private Logger logger = LoggerFactory.getLogger(getClass());@RabbitHandlerpublic void onMessage(Demo04Message message) {logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);}}
}
2、重试机制【消息消费者】
  • 方式一:通过配置项的方式,详细看公用配置项的设置,改方式的优点是简单,缺点是不容易控制重试的时间
  • 方式二:结合MySQL或Redis等持久化方式控制重试次数,该方式的优点是可以自由控制重试的间隔时间,缺点是比较复杂

五、回调消息【消息生产者】

(1)方式一:自定义消息回调处理类
@Slf4j
@Component
public class CustomConfirmAndReturnCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** PostConstruct: 用于在依赖关系注入完成之后需要执行的方法上,以执行任何初始化.*/@PostConstructpublic void init() {//指定 ConfirmCallbackrabbitTemplate.setConfirmCallback(this);//指定 ReturnCallbackrabbitTemplate.setReturnCallback(this);}/*** 消息从交换机成功到达队列,则returnedMessage方法不会执行;* 消息从交换机未能成功到达队列,则returnedMessage方法会执行;*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("returnedMessage回调方法>>>" + new String(message.getBody(), StandardCharsets.UTF_8) + ",replyCode:" + replyCode + ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey);}/*** 如果消息没有到达交换机,则该方法中isSendSuccess = false,error为错误信息;* 如果消息正确到达交换机,则该方法中isSendSuccess = true;*/@Overridepublic void confirm(CorrelationData correlationData, boolean isSendSuccess, String error) {log.info("confirm回调方法>>>回调消息ID为: " + correlationData.getId());if (isSendSuccess) {logger.info("confirm回调方法>>>消息发送到交换机成功!");} else {logger.info("confirm回调方法>>>消息发送到交换机失败!,原因 : [{}]", error);}}}
public class callbackConfirmDemo{@AutoWiredprivate CustomConfirmAndReturnCallback  confirmCallback;//  简单的主题消息public void sendMessage1(Object message,Map<String, Object> properties) throws Exception {//创建消息MessageHeaders mhs = new MessageHeaders(properties);Message<?> msg = MessageBuilder.createMessage(message, mhs);// 使用自定义的确认回调rabbitTemplate.setConfirmCallback(confirmCallback);// 	指定业务唯一的iDCorrelationData correlationData =new CorrelationData(UUID.randomUUID().toString());//消息处理器MessagePostProcessor mpp = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {System.err.println("---> post to do: " + message);return message;}};//消息发送rabbitTemplate.convertAndSend("exchange-1",   "springboot.rabbit",  msg,mpp,  correlationData);}//设置回调public void syncSend(Integer id) {// 创建 Demo13Message 消息Demo13Message message = new Demo13Message();message.setId(id);// 同步发送消息rabbitTemplate.invoke(new RabbitOperations.OperationsCallback<Object>() {@Overridepublic Object doInRabbit(RabbitOperations operations) {// 同步发送消息operations.convertAndSend(Demo13Message.EXCHANGE, Demo13Message.ROUTING_KEY, message);logger.info("[doInRabbit][发送消息完成]");// 等待确认operations.waitForConfirms(0); // timeout 参数,如果传递 0 ,表示无限等待logger.info("[doInRabbit][等待 Confirm 完成]");return null;}}, new ConfirmCallback() {@Overridepublic void handle(long deliveryTag, boolean multiple) throws IOException {logger.info("[handle][Confirm 成功]");}}, new ConfirmCallback() {@Overridepublic void handle(long deliveryTag, boolean multiple) throws IOException {logger.info("[handle][Confirm 失败]");}});}//设置消息回调public void sendMessage(){//设置消息消费端确认回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{if(ack){log.info("消息{}接收成功",correlationData.getId());}else{log.info("消息{}接收失败,原因{}",correlationData.getId(),cause);}});//设置消费端返回回调rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{log.info("消息{}发送失败,应答码{},原因{},交换机{},路由键{}",message.toString(),replyCode,replyText,exchange,routingKey);});rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE_NAME, "", message);}
}
(2)方式二:匿名内部类方式处理

六、批量消息生产和消费

方式一:通过application,yaml配置项中添加批量生产和消费相关配置可以实现

方式二:通过手动的方式,设置BatchingRabbitTemplate和SimpleRabbitListenerContainerFactory实现

方式三: 批量消费中,通过注解配置 concurrency = "2"形式类配置并发批量消费

注意,批量消费和批量生产不一定需要严格搭配,按照各自需求来即可

1、在配置类中添加
  • BatchingRabbitTemplate
  • SimpleRabbitListenerContainerFactory
@Configuration
public class RabbitConfig {//手动设置批量处理@Beanpublic BatchingRabbitTemplate batchRabbitTemplate(ConnectionFactory connectionFactory) {// 创建 BatchingStrategy 对象,代表批量策略int batchSize = 16384; // 超过收集的消息数量的最大条数。int bufferLimit = 33554432; // 每次批量发送消息的最大内存int timeout = 30000; // 超过收集的时间的最大等待时长,单位:毫秒BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(batchSize, bufferLimit, timeout);// 创建 TaskScheduler 对象,用于实现超时发送的定时器TaskScheduler taskScheduler = new ConcurrentTaskScheduler();// 创建 BatchingRabbitTemplate 对象BatchingRabbitTemplate batchTemplate = new BatchingRabbitTemplate(batchingStrategy, taskScheduler);batchTemplate.setConnectionFactory(connectionFactory);return batchTemplate;}// 批量消息监听@Bean(name = "consumerBatchContainerFactory")public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {// 创建 SimpleRabbitListenerContainerFactory 对象SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();configurer.configure(factory, connectionFactory);// 额外添加批量消费的属性factory.setBatchListener(true);factory.setBatchSize(10);factory.setReceiveTimeout(30 * 1000L);factory.setConsumerBatchEnabled(true);return factory;}
}
2、批量生产
  • BatchingRabbitTemplate
@Component
public class Demo06Producer {@Autowiredprivate BatchingRabbitTemplate batchingRabbitTemplate;public void syncSend(Integer id) {// 创建 Demo05Message 消息Demo05Message message = new Demo05Message();message.setId(id);// 同步发送消息batchingRabbitTemplate.convertAndSend(Demo05Message.EXCHANGE, Demo05Message.ROUTING_KEY, message);}}
2、批量消费
  • containerFactory
@Component
@RabbitListener(queues = Demo05Message.QUEUE, concurrency = "2",containerFactory = "consumerBatchContainerFactory")
public class Demo05Consumer {private Logger logger = LoggerFactory.getLogger(getClass());@RabbitHandlerpublic void onMessage(List<Demo05Message> messages) {logger.info("[onMessage][线程编号:{} 消息数量:{}]", Thread.currentThread().getId(), messages.size());}}

七、延迟队列

方式一:采用RabbitMQ的控制台方式添加延迟队列

方式二:采用程序的方式设置延迟队列

1、配置延迟队列
// 创建 Queue
public class delayQueueDemo{@Beanpublic Queue demo08Queue() {return QueueBuilder.durable(Demo08Message.QUEUE) // durable: 是否持久化.exclusive() // exclusive: 是否排它.autoDelete() // autoDelete: 是否自动删除.ttl(10 * 1000) // 设置队列里的默认过期时间为 10 秒.deadLetterExchange(Demo08Message.EXCHANGE).deadLetterRoutingKey(Demo08Message.DELAY_ROUTING_KEY).build();}
}public class delayProducerDemo{public void syncSend(Integer id, Integer delay) {// 创建 Demo07Message 消息Demo08Message message = new Demo08Message();message.setId(id);// 同步发送消息rabbitTemplate.convertAndSend(Demo08Message.EXCHANGE, Demo08Message.ROUTING_KEY, message,new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 设置消息的 TTL 过期时间if (delay != null && delay > 0) {message.getMessageProperties().setExpiration(String.valueOf(delay)); // Spring-AMQP API 设计有问题,所以传入了 String = =}return message;}});}
}

八、消费异常处理

1、自定义异常处理
@Component("rabbitListenerErrorHandler")
public class RabbitListenerErrorHandlerImpl implements RabbitListenerErrorHandler {private Logger logger = LoggerFactory.getLogger(getClass());@Overridepublic Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message,ListenerExecutionFailedException exception) {// 打印异常日志logger.error("[handleError][amqpMessage:[{}] message:[{}]]", amqpMessage, message, exception);// 直接继续抛出异常throw exception;}}
@Component
public class RabbitLoggingErrorHandler implements ErrorHandler {private Logger logger = LoggerFactory.getLogger(getClass());public RabbitLoggingErrorHandler(SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory) {rabbitListenerContainerFactory.setErrorHandler(this);}@Overridepublic void handleError(Throwable t) {logger.error("[handleError][发生异常]]", t);}}
2、消费者添加监听处理异常
@Component
@RabbitListener(queues = Demo16Message.QUEUE,errorHandler = "rabbitListenerErrorHandler")
//@RabbitListener(queues = Demo15Message.QUEUE)
public class Demo16Consumer {private Logger logger = LoggerFactory.getLogger(getClass());@RabbitHandlerpublic void onMessage(Demo16Message message) {logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);// 模拟消费异常throw new RuntimeException("你猜");}}

九、广播消费和集群消费

 在 RabbitMQ 中,如果多个 Consumer 订阅相同的 Queue ,那么每一条消息有且仅会被一个 Consumer 所消费。这个特性,就为我们实现集群消费提供了基础。在广播消费概念中,如果多个 Consumer 订阅相同的 Queue ,我们可以通过给每个 Consumer 创建一个其独有 Queue ,从而保证都能接收到全量的消息。同时,RabbitMQ 支持队列的自动删除,所以我们可以在 Consumer 关闭的时候,通过该功能删除其独有的 Queue如何实现广播消费,
@Component
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = BroadcastMessage.QUEUE + "-" + "#{T(java.util.UUID).randomUUID()}",autoDelete = "true"),exchange = @Exchange(name = BroadcastMessage.EXCHANGE,type = ExchangeTypes.TOPIC,declare = "false"))
)
public class BroadcastConsumer {private Logger logger = LoggerFactory.getLogger(getClass());@RabbitHandlerpublic void onMessage(BroadcastMessage message) {logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);}} 

十、关于死信队列的处理方式

  • 首先,死信队列的绑定,参考公用配置类

  • 方式一:通过配置类自动创建死信交换机,队列并绑定,同时在正常的队列中通过Map的形式添加死信队列的配置

  • 方式二:在RabbitMQ的工作台上手动给队列添加死信队列,

  • 如果死信队列积累较多,可以通过死信队列监听器来处理死信队列数据

1、定义死信队列的绑定关系

public  class DirectExchangeDemoConfiguration {@Beanpublic DirectExchange dlxExchange(){//死信交换机return new DirectExchange(dlxExchangeName);}@Beanpublic Queue dlxQueue(){//死信队列return new Queue(dlxQueueName);}@Beanpublic Binding dlcBinding(Queue dlxQueue, DirectExchange dlxExchange){//死信队列绑定交换机return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(dlxRoutingKey);}//正常队列@Beanpublic Queue queue(){// 正常队列中通过参数绑定死信队列Map<String,Object> params = new HashMap<>();params.put("x-dead-letter-exchange",dlxExchangeName);//声明当前队列绑定的死信交换机params.put("x-dead-letter-routing-key",dlxRoutingKey);//声明当前队列的死信路由键return QueueBuilder.durable(queueName).withArguments(params).build();}// 创建 Direct Exchange@Beanpublic DirectExchange demo01Exchange() {return new DirectExchange(Demo01Message.EXCHANGE,true,  // durable: 是否持久化false);  // exclusive: 是否排它}// 创建 Binding// Exchange:Demo01Message.EXCHANGE// Routing key:Demo01Message.ROUTING_KEY// Queue:Demo01Message.QUEUE@Beanpublic Binding demo01Binding() {return BindingBuilder.bind(demo01Queue()).to(demo01Exchange()).with(Demo01Message.ROUTING_KEY);}
}
2、定义死信队列的消费者
@Component
@RabbitListener(queues = Demo07Message.DEAD_QUEUE)
public class Demo07DeadConsumer {private Logger logger = LoggerFactory.getLogger(getClass());@RabbitHandlerpublic void onMessage(Demo07Message message) {logger.info("[onMessage][【死信队列】线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);}}

十一、顺序队列

  • 普通顺序消息 :Producer 将相关联的消息发送到相同的消息队列。
  • 完全严格顺序 :在【普通顺序消息】的基础上,Consumer 严格顺序消费。
1、定义多个队列
// Demo10Message.javapublic class Demo10Message implements Serializable {private static final String QUEUE_BASE = "QUEUE_DEMO_10-";public static final String QUEUE_0 = QUEUE_BASE + "0";public static final String QUEUE_1 = QUEUE_BASE + "1";public static final String QUEUE_2 = QUEUE_BASE + "2";public static final String QUEUE_3 = QUEUE_BASE + "3";public static final int QUEUE_COUNT = 4;public static final String EXCHANGE = "EXCHANGE_DEMO_10";/*** 编号*/private Integer id;// ... 省略 set/get/toString 方法}
2、实现顺序队列的绑定关系
   
public  class DirectExchangeDemoConfiguration {// 创建 Queue@Beanpublic Queue demo10Queue0() {return new Queue(Demo10Message.QUEUE_0);}@Beanpublic Queue demo10Queue1() {return new Queue(Demo10Message.QUEUE_1);}@Beanpublic Queue demo10Queue2() {return new Queue(Demo10Message.QUEUE_2);}@Beanpublic Queue demo10Queue3() {return new Queue(Demo10Message.QUEUE_3);}// 创建 Direct Exchange@Beanpublic DirectExchange demo10Exchange() {return new DirectExchange(Demo10Message.EXCHANGE,true,  // durable: 是否持久化false);  // exclusive: 是否排它}// 创建 Binding@Beanpublic Binding demo10Binding0() {return BindingBuilder.bind(demo10Queue0()).to(demo10Exchange()).with("0");}@Beanpublic Binding demo10Binding1() {return BindingBuilder.bind(demo10Queue1()).to(demo10Exchange()).with("1");}@Beanpublic Binding demo10Binding2() {return BindingBuilder.bind(demo10Queue2()).to(demo10Exchange()).with("2");}@Beanpublic Binding demo10Binding3() {return BindingBuilder.bind(demo10Queue3()).to(demo10Exchange()).with("3");}}
2、发送顺序队列消息
// Demo10Producer.java@Component
public class Demo10Producer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void syncSend(Integer id) {// 创建 Demo10Message 消息Demo10Message message = new Demo10Message();message.setId(id);// 同步发送消息rabbitTemplate.convertAndSend(Demo10Message.EXCHANGE, this.getRoutingKey(id), message);}private String getRoutingKey(Integer id) {return String.valueOf(id % Demo10Message.QUEUE_COUNT);}}
3、消费顺序队列消息
// Demo10Consumer.java@Component
@RabbitListener(queues = Demo10Message.QUEUE_0)
@RabbitListener(queues = Demo10Message.QUEUE_1)
@RabbitListener(queues = Demo10Message.QUEUE_2)
@RabbitListener(queues = Demo10Message.QUEUE_3)
public class Demo10Consumer {private Logger logger = LoggerFactory.getLogger(getClass());@RabbitHandler(isDefault = true)public void onMessage(Message<Demo10Message> message) {logger.info("[onMessage][线程编号:{} Queue:{} 消息编号:{}]", Thread.currentThread().getId(), getQueue(message),message.getPayload().getId());}private static String getQueue(Message<Demo10Message> message) {return message.getHeaders().get("amqp_consumerQueue", String.class);}}

十二、事务消息

1、创建事务消息配置
// RabbitConfig.java@Configuration
@EnableTransactionManagement
public class RabbitConfig {/*** Direct Exchange 示例的配置类*/public static class DirectExchangeDemoConfiguration {// 创建 Queue@Beanpublic Queue demo11Queue() {return new Queue(Demo11Message.QUEUE, // Queue 名字true, // durable: 是否持久化false, // exclusive: 是否排它false); // autoDelete: 是否自动删除}// 创建 Direct Exchange@Beanpublic DirectExchange demo11Exchange() {return new DirectExchange(Demo11Message.EXCHANGE,true,  // durable: 是否持久化false);  // exclusive: 是否排它}// 创建 Binding// Exchange:Demo11Message.EXCHANGE// Routing key:Demo11Message.ROUTING_KEY// Queue:Demo11Message.QUEUE@Beanpublic Binding demo11Binding() {return BindingBuilder.bind(demo11Queue()).to(demo11Exchange()).with(Demo11Message.ROUTING_KEY);}}@Beanpublic RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory, RabbitTemplate rabbitTemplate) {// <Y> 设置 RabbitTemplate 支持事务rabbitTemplate.setChannelTransacted(true);// 创建 RabbitTransactionManager 对象return new RabbitTransactionManager(connectionFactory);}}
2、生产事务消息
  • 注意@Transactional注解
@Component
public class Demo11Producer {private Logger logger = LoggerFactory.getLogger(getClass());@Autowiredprivate RabbitTemplate rabbitTemplate;@Transactionalpublic void syncSend(Integer id) throws InterruptedException {// 创建 Demo11Message 消息Demo11Message message = new Demo11Message();message.setId(id);// 同步发送消息rabbitTemplate.convertAndSend(Demo11Message.EXCHANGE, Demo11Message.ROUTING_KEY, message);logger.info("[syncSend][发送编号:[{}] 发送成功]", id);// <X> 等待Thread.sleep(10 * 1000L);}}
3、消费事务消息
// Demo11ProducerTest.java@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class Demo11ProducerTest {@Autowiredprivate Demo11Producer producer;@Testpublic void testSyncSend() throws InterruptedException {int id = (int) (System.currentTimeMillis() / 1000);producer.syncSend(id);// 阻塞等待,保证消费new CountDownLatch(1).await();}}

十三、消息确认

1、修改配置文件
# 同步确认
spring.rabbitmq.listener.simple. acknowledge-mode=manual
spring.rabbitmq.publisher-confirm-type=simple# 异步确认
spring.rabbitmq.publisher-confirm-type=correlated
2、消费确认
  • channel.basicAck
@Component
@RabbitListener(queues = Demo12Message.QUEUE)
public class Demo12Consumer {private Logger logger = LoggerFactory.getLogger(getClass());@RabbitHandlerpublic void onMessage(Demo12Message message, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);// 提交消费进度if (message.getId() % 2 == 1) {// ack 确认消息// 第二个参数 multiple ,用于批量确认消息,为了减少网络流量,手动确认可以被批处。// 1. 当 multiple 为 true 时,则可以一次性确认 deliveryTag 小于等于传入值的所有消息// 2. 当 multiple 为 false 时,则只确认当前 deliveryTag 对应的消息channel.basicAck(deliveryTag, false);}}}
3、发送确认
// Demo13Producer.java@Component
public class Demo13Producer {private Logger logger = LoggerFactory.getLogger(getClass());@Autowiredprivate RabbitTemplate rabbitTemplate;public void syncSend(Integer id) {// 创建 Demo13Message 消息Demo13Message message = new Demo13Message();message.setId(id);// 同步发送消息rabbitTemplate.invoke(new RabbitOperations.OperationsCallback<Object>() {@Overridepublic Object doInRabbit(RabbitOperations operations) {// 同步发送消息operations.convertAndSend(Demo13Message.EXCHANGE, Demo13Message.ROUTING_KEY, message);logger.info("[doInRabbit][发送消息完成]");// 等待确认operations.waitForConfirms(0); // timeout 参数,如果传递 0 ,表示无限等待logger.info("[doInRabbit][等待 Confirm 完成]");return null;}}, new ConfirmCallback() {@Overridepublic void handle(long deliveryTag, boolean multiple) throws IOException {logger.info("[handle][Confirm 成功]");}}, new ConfirmCallback() {@Overridepublic void handle(long deliveryTag, boolean multiple) throws IOException {logger.info("[handle][Confirm 失败]");}});}}
4、定义异步确认的回调类setConfirmCallback
  • rabbitTemplate.setConfirmCallback 配置了全局的异步确认
@Component
public class RabbitProducerConfirmCallback implements RabbitTemplate.ConfirmCallback {private Logger logger = LoggerFactory.getLogger(getClass());public RabbitProducerConfirmCallback(RabbitTemplate rabbitTemplate) {rabbitTemplate.setConfirmCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {logger.info("[confirm][Confirm 成功 correlationData: {}]", correlationData);} else {logger.error("[confirm][Confirm 失败 correlationData: {} cause: {}]", correlationData, cause);}}}
5、返回确认ReturnCallback
// RabbitProducerReturnCallback.java@Component
public class RabbitProducerReturnCallback implements RabbitTemplate.ReturnCallback {private Logger logger = LoggerFactory.getLogger(getClass());public RabbitProducerReturnCallback(RabbitTemplate rabbitTemplate) {rabbitTemplate.setReturnCallback(this);}@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {logger.error("[returnedMessage][message: [{}] replyCode: [{}] replyText: [{}] exchange: [{}] routingKey: [{}]]",message, replyCode, replyText, exchange, routingKey);}}

RabbitMQ的消息高可用+重试方案落地

  • 1、通过MQ消息失败记录表记录失败记录
  • 2、通过xxl-job定时扫描数据表,然后对于超过重试次数的进行重试。

一、消息消费

1、创建一个抽象的模板类
@Slf4j
public abstract class AbstractConsumer {/*** 定义静态内部类指定监听队列名称*/static class queueName{public static  final  String ORDER_CANCEL_QUEUE = "queue.assets_center.unification.order_cancel";public static  final  String HOUR_EXCHANGE_REFUND_QUEUE = "queue.assets_center.unification.convert_in_wandou_packet_revert";}@Resourceprivate IMqConsumeService mqConsumeService;@Autowiredprivate RetryMessageMapper retryMessageMapper;@Autowiredprivate DingDingHelper dingDingHelper;private static  final String MQ_HEADER_REF = "spring_returned_message_correlation";/*** 消息订阅入口方法*/@SneakyThrowsprotected void doConsume(Channel channel, Message message, String body) {String messageId = (String) message.getMessageProperties().getHeaders().get(MQ_HEADER_REF);try {//1、添加traceIdMDC.put("X-TRACE-ID", messageId);log.info("[{}] 消费消息: messageId:{}, mq参数:{}", getConsumeType().getMessage(), messageId, body);//2、保存消息日志以及幂等性判断boolean check = this.saveConsumeLogAndJudgeIdempotent(channel, message, body);if(check){//3、业务处理this.bizHandle(body);//4、手工ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}} catch (Exception exception) {log.error("消息消费异常, messageId = " + messageId, exception);//1、异常通知dingDingHelper.sendAlert(getConsumeType().getMessage() + "- 消息消费异常","消息内容:" + body);//2、异常记录,并启动重试机制RetryMessage retryMessage = new RetryMessage(getQueueEnum(),getTimedMaxRetryTimes(),getTimedRecompenseBizMethod(),messageId,body,exception);retryMessageMapper.insert(retryMessage);//3、响应消费失败channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);} finally {MDC.remove("X-TRACE-ID");}}/*** 保存消费日志 & 消息幂等判断*/@SneakyThrowsprotected boolean saveConsumeLogAndJudgeIdempotent(Channel channel, Message message, String body) {String messageId = (String) message.getMessageProperties().getHeaders().get(MQ_HEADER_REF);//幂等校验,是否重复MqConsume mqConsume = new MqConsume(getConsumeType(),getQueueEnum().getQueueName(),messageId,body);Boolean repeat = mqConsumeService.insert(mqConsume);//重复消费控制if (repeat) {// 手工ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);return false;}return true;}//定义的处理业务代码,由子类实现处理protected abstract void bizHandle(String body);/*** 获取订阅的队列key*/protected abstract ListennedQueueEnum getQueueEnum();/*** 获取消费业务类型*/protected abstract MqConsumeTypeEnums getConsumeType();/*** 定时补偿方法*/public void timedRecompense(String body) {bizHandle(body);}/*** 获取本地定时补偿次数 默认3次*/protected Integer getTimedMaxRetryTimes() {return 3;}/*** 定时补偿业务方法* 格式:springbean的实例+需要执行补偿的方法* @return*/protected String getTimedRecompenseBizMethod() {Component component = AnnotationUtils.findAnnotation(this.getClass(), Component.class);String beanName = StrUtil.isBlank(component.value())? StrUtil.lowerFirst(this.getClass().getSimpleName()) : component.value();return beanName.concat(".").concat("timedRecompense");}
}
2、消息监听和消费过程
  • 继承AbstractConsumer 并实现其中的抽象方法
@Slf4j
@Component
public class AssertsConvertOrderCancelConsumer extends AbstractConsumer {@Value("${klzz.internal.config.appId}")private String localAppId;/*** 监听订单取消*/@RabbitListener(queues = queueName.ORDER_CANCEL_QUEUE)public void messageListener(Channel channel, Message message, String body) {//调用父类的doConsume入口进行处理super.doConsume(channel, message, body);}/*** 子类实现的业务处理方法* @param body*/@Overridepublic void bizHandle(String body) {//  TODO 真正的消息处理}@Overrideprotected ListennedQueueEnum getQueueEnum() {return ListennedQueueEnum.ORDER_CANCEL;}@Overrideprotected MqConsumeTypeEnums getConsumeType() {return MqConsumeTypeEnums.ASSETS_CONVERT_ORDER_CANCEL;}
}
3、定义消息的队列配置枚举
@Getter
@AllArgsConstructor
public enum ListennedQueueEnum {ORDER_CANCEL("order_cancel","exchange.trade_order","queue.assets_center.unification.order_cancel"),HOUR_EXCHANGE_REFUND("hour_exchange_refund","exchange.order","queue.assets_center.unification.convert_in_wandou_packet_revert");private  String routeKey;private  String exchangeKey;private  String queueName;}

二、消息消息持久化

  • 接收消息持久化,是为了保证MQ消息的高可用,
  • 同时在处理MQ消息的过程中一定要确保mq_body的长度能够保存,否则容易出现因为保存数据失败导致MQ消息处理问题
1、消息日志表
  • 注意:msg_id设置为唯一索引,进行唯一性校验
CREATE TABLE `mq_consume` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增ID',`type` int(11) NOT NULL DEFAULT '0' COMMENT '消费类型',`msg_id` varchar(64) NOT NULL DEFAULT '' COMMENT '消息id',`title` varchar(32) NOT NULL DEFAULT '' COMMENT '标题',`queue_name` varchar(64) NOT NULL DEFAULT '' COMMENT '队列名称',`mq_body` varchar(3000) NOT NULL DEFAULT '' COMMENT '消息内容',`remark` varchar(256) NOT NULL DEFAULT '' COMMENT '备注',`app_id` varchar(64) NOT NULL DEFAULT '' COMMENT 'appId',`is_deleted` tinyint(1) NOT NULL DEFAULT '1' COMMENT '删除状态 1:正常 2:已删除',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',`version` int(5) NOT NULL DEFAULT '1' COMMENT '版本号',`status` tinyint(1) NOT NULL DEFAULT '1' COMMENT '1:成功,2=失败',PRIMARY KEY (`id`),UNIQUE KEY `unix_msg_id` (`msg_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='mq消息消费表';
2、消息持久化Bean类
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class MqConsume implements Serializable {private static final long serialVersionUID = 1L;@TableId(value = "id", type = IdType.AUTO)private Long id;private Integer type;private String msgId;private String title;private String queueName;private String mqBody;private String remark;private String appId;private Integer isDeleted; //删除状态 1:正常 2:已删除private LocalDateTime createTime;private LocalDateTime updateTime;private Integer version;private Integer status; // 1:成功,2=失败public MqConsume(MqConsumeTypeEnums consumeType,String queueName, String messageId, String body){this.type = consumeType.getCode();this.title = consumeType.getMessage();this.msgId = messageId;this.mqBody = body;this.queueName = queueName;}
}
3、MQ消息的Mapper
//MQ消息接口
public interface IMqConsumeService extends IService<MqConsume> {Boolean insert(MqConsume mqConsume);
}
//MQ业务实现类
@Slf4j
@Service("mqConsumeService")
public class MqConsumeServiceImpl extends ServiceImpl<MqConsumeMapper, MqConsume> implements IMqConsumeService {@Resourceprivate MqConsumeMapper mqConsumeMapper;@Overridepublic Boolean insert(MqConsume mqConsume) {Boolean repeat=Boolean.FALSE;try {mqConsumeMapper.insert(mqConsume);} catch (DuplicateKeyException e) {repeat = Boolean.TRUE;log.error("重复key插入失败={}",e.getMessage());}return repeat;}public MqConsume getByMessageId(String messageId) {LambdaQueryWrapper<MqConsume> queryWrapper = new QueryWrapper<MqConsume>().lambda().eq(MqConsume::getMsgId, messageId).eq(MqConsume::getIsDeleted, 1);return baseMapper.selectOne(queryWrapper);}
}
//DAO类
public interface MqConsumeMapper extends BaseMapper<MqConsume> {}

三、消息的重试机制

1、消息重试持久化Bean
@Data
@NoArgsConstructor
@EqualsAndHashCode(callSuper = false)
@ApiModel(value="retryMessage对象", description="用户消息重试表")
public class RetryMessage implements Serializable {private static final long serialVersionUID = 1L;private Long id;@ApiModelProperty(value = "消息id")private String messageId;@ApiModelProperty(value = "1-消费失败,2-消费成功,3-重试次数达到上限,4:执行中")private Integer consumerStatus;@ApiModelProperty(value = "交换机名字")private String exchange;@ApiModelProperty(value = "路由key")private String routingKey;@ApiModelProperty(value = "队列名字")private String queue;@ApiModelProperty(value = "消息实体,json格式")private String msgBody;@ApiModelProperty(value = "已重试次数")private Integer retryNum;@ApiModelProperty(value = "最大重试次数")private Integer maxRetryNum;@ApiModelProperty(value = "处理器名称")private String handleName;@ApiModelProperty(value = "错误信息")private String errorMsg;private Date createTime;private Date updateTime;/*** 构造方法*/public RetryMessage(ListennedQueueEnum queueEnum,Integer maxRetryNum,String handleName,String messageId,String body,Exception e){this.messageId = messageId;this.consumerStatus = 1;this.routingKey = queueEnum.getRouteKey();this.exchange = queueEnum.getExchangeKey();this.queue = queueEnum.getQueueName();this.msgBody = body;this.retryNum = 0;this.maxRetryNum = maxRetryNum;this.handleName = handleName;String errorMessage = Optional.ofNullable(e.getMessage()).orElse("");errorMessage = errorMessage.length() > 800 ? errorMessage.substring(0, 800) + "..." : errorMessage;this.errorMsg = errorMessage;this.createTime = new Date();this.updateTime = new Date();}}
2、消息重试Mapper
public interface RetryMessageMapper extends BaseMapper<RetryMessage> {@Select("SELECT * from retry_message WHERE consumer_status = #{consumerStatus} order by id asc limit  #{beginSize}, #{size}")List<RetryMessage> getConsumerList(@Param("consumerStatus") Integer consumerStatus, @Param("beginSize") Integer beginSize, @Param("size") Integer size);}
3、创建消息重试定时任务
@Slf4j
@Component
@JobHandler(value = "retryMessageJobHandler")
public class RetryMessageJobHandler extends IJobHandler {@Resourceprivate RetryMessageMapper retryMessageMapper;@Resourceprivate SpringReflectionUtil springReflectionUtil;@Resourceprivate DingDingHelper dingDingHelper;@Trace@Overridepublic ReturnT<String> execute(String s) throws Exception {String[] split = s.split(",");//起始值Integer beginSize = Integer.valueOf(split[0]);//页大小Integer size = Integer.valueOf(split[1]);List<RetryMessage> retryMessages = retryMessageMapper.getConsumerList(1, beginSize, size);if (!CollectionUtils.isEmpty(retryMessages)) {for (RetryMessage retryMessage : retryMessages) {try {//更新状态,改为执行中,保证幂等UpdateWrapper updateWrapper = new UpdateWrapper<>();updateWrapper.eq("id", retryMessage.getId());updateWrapper.eq("consumer_status", ConsumerSatusEnums.CONSUME_FAIL.getCode());retryMessage.setConsumerStatus(ConsumerSatusEnums.CONSUME_EXEC.getCode());int updateResult = retryMessageMapper.update(retryMessage, updateWrapper);//没更新成功,证明已经在执行中,直接返回if (updateResult < 1) {continue;}//超过最大次数,不处理if (retryMessage.getRetryNum()>=retryMessage.getMaxRetryNum()) {continue;}//处理器为dealUserPayStatusMsg的业务String handleName = retryMessage.getHandleName();//.号分开String[] handleNameSplit = handleName.split("\\.");String serviceName = handleNameSplit[0];String methodName = handleNameSplit[1];Object[] params = {retryMessage.getMsgBody()};springReflectionUtil.springInvokeMethod(serviceName, methodName, params);//成功更新重试次数sucessUpdate(retryMessage);} catch (Exception e) {//失败更新重试次数failUpdate(retryMessage);log.error("RetryMessageJobHandler处理异常={},retryMessage实体= {}", e.getMessage(),JSONUtil.toJsonStr(retryMessage));}}}return ReturnT.SUCCESS;}/*** 失败更新* @param retryMessage*/private void failUpdate(RetryMessage retryMessage) {RetryMessage retryMessageUpadte = new RetryMessage();retryMessageUpadte.setRetryNum(retryMessage.getRetryNum()+1);retryMessageUpadte.setId(retryMessage.getId());//由消费中,改回消费失败retryMessageUpadte.setConsumerStatus(ConsumerSatusEnums.CONSUME_FAIL.getCode());//判断是否达到最大重试次数if (retryMessageUpadte.getRetryNum()>=retryMessage.getMaxRetryNum()) {retryMessageUpadte.setConsumerStatus(ConsumerSatusEnums.CONSUME_MAX_LIMIT.getCode());//钉钉通知人工处理String title = "补偿最大次数用完,转人工干涉处理";String context = "消息实体=" + JSONUtil.toJsonStr(retryMessage);dingDingHelper.sendAlert(title,context);}retryMessageMapper.updateById(retryMessageUpadte);}/*** 成功更新* @param retryMessage*/private void sucessUpdate(RetryMessage retryMessage) {RetryMessage retryMessageUpadte = new RetryMessage();retryMessageUpadte.setRetryNum(retryMessage.getRetryNum()+1);retryMessageUpadte.setId(retryMessage.getId());//处理消费成功retryMessageUpadte.setConsumerStatus(ConsumerSatusEnums.CONSUME_SUCCESS.getCode());retryMessageMapper.updateById(retryMessageUpadte);}}
Spring反射工具类
@Component
public class SpringReflectionUtil {@Resourceprivate ApplicationContext applicationContext;public Object springInvokeMethod(String serviceName, String methodName, Object[] params){Object service = applicationContext.getBean(serviceName);Class<? extends Object>[] paramClass = null;if (params != null) {int paramsLength = params.length;paramClass = new Class[paramsLength];for (int index = 0; index < paramsLength; index++) {paramClass[index] = params[index].getClass();}}// 找到需要执行的方法Method method = ReflectionUtils.findMethod(service.getClass(), methodName, paramClass);// 执行方法return ReflectionUtils.invokeMethod(method, service, params);}}

四、RabbitMQ消息发送

1、MQ消息管理器
@Slf4j
@Component
public class RabbitMQManager {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(QueueEnum queueEnum, String jsonStr) {String exchange = queueEnum.getExchange();String routeKey = queueEnum.getRouteKey();String error = "success";long timeMillis = System.currentTimeMillis();try {rabbitTemplate.convertAndSend(exchange, routeKey, jsonStr);} catch (Exception e) {log.error(e.getMessage(), e);error = e.getMessage();} finally {String stringBuilder = "Rabbit MQ 发送消息:\n" +"交换机\t: " + exchange + "\n" +"路由键\t: " + routeKey + "\n" +"消息内容\t: " + jsonStr + "\n" +"错误信息\t: " + error + "\n" +"消耗时间\t: " + (System.currentTimeMillis() - timeMillis) + "ms\n";log.info(stringBuilder);}}/*** 发送消息,用object,cn.vipthink.assets.config.MqConfig,已经做了json的配置,直接传object就行,* 如果传string,等同于序列化了两次,序列化出来的body会多一些转义字符,* MqConfig中有配置消费接收body去除转义,所以我们自身的项目可以正常接收* 但是业务方,没有一般不会配置消费接收body去除转义* @param queueEnum* @param object*/public void sendMessage(QueueEnum queueEnum, Object object) {String exchange = queueEnum.getExchange();String routeKey = queueEnum.getRouteKey();String error = "success";long timeMillis = System.currentTimeMillis();try {rabbitTemplate.convertAndSend(exchange, routeKey, object);} catch (Exception e) {log.error(e.getMessage(), e);error = e.getMessage();} finally {String stringBuilder = "Rabbit MQ 发送消息:\n" +"交换机\t: " + exchange + "\n" +"路由键\t: " + routeKey + "\n" +"消息内容\t: " + JSONUtil.toJsonStr(object) + "\n" +"错误信息\t: " + error + "\n" +"消耗时间\t: " + (System.currentTimeMillis() - timeMillis) + "ms\n";log.info(stringBuilder);}}
}
2、使用消息发送消息
//生命rabbitMQManager
@Resource
private RabbitMQManager rabbitMQManager;//在方法中调用sendMessage实现发送消息rabbitMQManager.sendMessage(QueueEnum.OPEN_HOUR_NOTIFY, messageNotifyRequestDTO.getMessageBody().toString());
3、定制Queue的枚举,注意需要区别于监听的枚举
@Getter
@AllArgsConstructor
public enum QueueEnum {/*** 开包通知mq*/OPEN_HOUR_NOTIFY("exchange.unification_assets", "open_hour_notify","queue.yx-poster.unification_assets.direct.open_hour_notify"),;private final String exchange;private final String routeKey;private final String name;}

五、可靠性消息的投递的通用方案–starter

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SALisY9a-1692633241334)(…/…/bak/md_img/image-20220316140719460.png)]

1、添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.vipthink.infra</groupId><artifactId>mq-client-starter</artifactId><version>1.1.3</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><spring.boot.version>2.1.17.RELEASE</spring.boot.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring.boot.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId><version>2.5.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.5.2</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-jdbc</artifactId><version>5.3.3</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.16</version></dependency><dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-redis</artifactId><version>2.1.20.RELEASE</version></dependency><dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp</artifactId><version>3.8.1</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.76</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId><version>2.3.12.RELEASE</version></dependency></dependencies><distributionManagement><repository><id>nexus-releases</id><name>Nexus Nexus Repository</name><url>http://nexus-op.vipthink.cn/repository/maven-releases</url></repository><snapshotRepository><id>nexus-snapshots</id><name>local Nexus Repository</name><url>http://nexus-op.vipthink.cn/repository/maven-snapshots</url></snapshotRepository></distributionManagement></project>
2、配置项管理
(1)项目的配置项
spring.rabbitmq.addresses=xxx
spring.rabbitmq.username=xxx
spring.rabbitmq.password=xxx
spring.rabbitmq.port=5672
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=correlated
(2)配置项类
@Data
@NoArgsConstructor
@ConfigurationProperties(prefix = "tx.message")
public class AppProperties {private Integer taskInterval = 60;private Integer backoffDelay = 30;private Integer backoffMultiplier = 2;private Integer maxRetry = 3;private Integer querySize = 100;private String dingTalkTitle;private String dingTalkWebhook;
}    
(3)设置配置管理类
@Configuration
@EnableConfigurationProperties({AppProperties.class})
public class AppConfiguration {@Autowiredprivate AppProperties appProperties;public AppConfiguration() { }@Beanpublic MessageService messageService(JdbcTemplate jdbcTemplate) {MessageService messageService = new MessageService();messageService.setJdbcTemplate(jdbcTemplate);return messageService;}@Beanpublic MessageCallback messageCallback(MessageService messageService, RabbitTemplate rabbitTemplate) {MessageCallback messageCallback = new MessageCallback();messageCallback.setMessageService(messageService);messageCallback.setDingTalkUtil(this.dingTalkUtil());rabbitTemplate.setConfirmCallback(messageCallback);return messageCallback;}@Beanpublic MessageSender messageSender(MessageService messageService, RabbitTemplate rabbitTemplate) {MessageSenderImpl messageSenderImpl = new MessageSenderImpl();messageSenderImpl.setMessageService(messageService);messageSenderImpl.setRabbitTemplate(rabbitTemplate);messageSenderImpl.setBackoffDelay(this.appProperties.getBackoffDelay());messageSenderImpl.setBackoffMultiplier(this.appProperties.getBackoffMultiplier());messageSenderImpl.setMaxRetry(this.appProperties.getMaxRetry());return messageSenderImpl;}@Bean(initMethod = "execute")public MessageTask messageTask(MessageService messageService,RabbitTemplate rabbitTemplate, RedisTemplate redisTemplate) {MessageTask messageTask = new MessageTask();messageTask.setMessageService(messageService);messageTask.setRabbitTemplate(rabbitTemplate);messageTask.setRedisTemplate(redisTemplate);messageTask.setTaskInterval(this.appProperties.getTaskInterval());messageTask.setQuerySize(this.appProperties.getQuerySize());return messageTask;}@Beanpublic OkHttpClient okHttpClient() {OkHttpClient client = (new Builder()).build();return client;}//加载DingTalkUtil@Beanpublic DingTalkUtil dingTalkUtil() {DingTalkUtil dingTalkUtil = new DingTalkUtil();dingTalkUtil.setClient(this.okHttpClient());dingTalkUtil.setUrl(this.appProperties.getDingTalkWebhook());dingTalkUtil.setTitle(this.appProperties.getDingTalkTitle());return dingTalkUtil;}
}
(4)在META-INF/spring.factories文件
org.springframework.boot.autoconfigure.EnableAutoConfiguration=cn.vipthink.infra.mq.AppConfiguration
3、持久化消息部分
(1)TxMessageDO持久化类以及SQL
CREATE TABLE `tx_message`(id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,message_id BIGINT UNSIGNED NOT NULL COMMENT '消息ID',send_times TINYINT NOT NULL DEFAULT 0 COMMENT '已发送次数',max_retry_times TINYINT NOT NULL DEFAULT 5 COMMENT '最大重试次数',exchange_name VARCHAR(255) NOT NULL COMMENT '交换器名',routing_key VARCHAR(255) COMMENT '路由键',content TEXT COMMENT '消息内容'next_schedule_time DATETIME NOT NULL COMMENT '下一次调度时间',message_status TINYINT NOT NULL DEFAULT 'INIT' COMMENT '消息状态 INIT:处理中 SUCCESS:成功 ,FAIL:失败 ',backoff_delay BIGINT UNSIGNED NOT NULL DEFAULT 10 COMMENT '退避初始化值,单位为秒',backoff_multiplier TINYINT NOT NULL DEFAULT 2 COMMENT '退避因子',create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,deleted TINYINT NOT NULL DEFAULT 0,INDEX idx_create_time (create_time),INDEX idx_next_schedule_time (next_schedule_time),unique INDEX idx_message_id(message_id)
) COMMENT '本地消息表';
@Data
@Accessors(chain=true)
@NoArgsConstructor
public class TxMessageDO {private Long id;private String messageId;private Integer sendTimes;private Integer maxRetryTimes;private String exchangeName;private String routingKey;private String content;private LocalDateTime nextScheduleTime;private String status;private Integer backoffDelay;private Integer backoffMultiplier;private LocalDateTime createTime;private LocalDateTime updateTime;private Integer deleted;
}    
(2)DAO和Service类
public class MessageService {public static final String TABLE_NAME = "tx_message";private JdbcTemplate jdbcTemplate;public MessageService() {}@Autowiredpublic void setJdbcTemplate(JdbcTemplate jdbcTemplate) {this.jdbcTemplate = jdbcTemplate;}//插入到消息表@Transactional( rollbackFor = {Exception.class})public int insert(final TxMessageDO txMessageDO) {String insertSql = "INSERT INTO tx_message(message_id,max_retry_times,exchange_name,routing_key,content,next_schedule_time,backoff_delay,backoff_multiplier) VALUES (?,?,?,?,?,?,?,?)";int i = this.jdbcTemplate.update(insertSql, new PreparedStatementSetter() {public void setValues(PreparedStatement pstmt) throws SQLException {pstmt.setString(1, txMessageDO.getMessageId());pstmt.setInt(2, txMessageDO.getMaxRetryTimes());pstmt.setString(3, txMessageDO.getExchangeName());pstmt.setString(4, txMessageDO.getRoutingKey());pstmt.setString(5, txMessageDO.getContent());pstmt.setObject(6, txMessageDO.getNextScheduleTime());pstmt.setInt(7, txMessageDO.getBackoffDelay());pstmt.setInt(8, txMessageDO.getBackoffMultiplier());}});return i;}//通过消息ID查询TxMessageDOpublic TxMessageDO getByMessageId(String messageId) {String sql = "SELECT id, message_id, send_times, max_retry_times, exchange_name, routing_key, next_schedule_time, status, backoff_delay, backoff_multiplier FROM tx_message WHERE message_id = ?";RowMapper<TxMessageDO> rowMapper = new BeanPropertyRowMapper(TxMessageDO.class);TxMessageDO txMessageDO = (TxMessageDO)this.jdbcTemplate.queryForObject(sql, rowMapper, new Object[]{messageId});return txMessageDO;}//查询所有的失败的TxMessageDOpublic List<TxMessageDO> listFail(String startTime, String endTime, Integer size) {String sql = "SELECT id, message_id, send_times, max_retry_times, exchange_name, routing_key, next_schedule_time, status, backoff_delay, backoff_multiplier,content FROM tx_message WHERE send_times<= max_retry_times  AND status IN( 'INIT','FAIL') AND  next_schedule_time BETWEEN  ? AND ? ORDER BY ID DESC LIMIT ? ";RowMapper<TxMessageDO> rowMapper = new BeanPropertyRowMapper(TxMessageDO.class);List<TxMessageDO> txMessageList = this.jdbcTemplate.query(sql, rowMapper, new Object[]{startTime, endTime, size});return txMessageList;}//更新消息IDpublic int updateById(final TxMessageDO txMessageDO) {String sql = "UPDATE tx_message SET send_times=? ,next_schedule_time=? , status =? WHERE id =? ";int i = this.jdbcTemplate.update(sql, new PreparedStatementSetter() {public void setValues(PreparedStatement pstmt) throws SQLException {pstmt.setInt(1, txMessageDO.getSendTimes());pstmt.setObject(2, txMessageDO.getNextScheduleTime());pstmt.setString(3, txMessageDO.getStatus());pstmt.setLong(4, txMessageDO.getId());}});return i;}
}
(3)消息状态枚举
public enum MessageStatus {INIT,SUCCESS,FAIL;private MessageStatus() {}
}
4、消息回调
public class MessageCallback implements ConfirmCallback {private static final Logger log = LoggerFactory.getLogger(MessageCallback.class);private MessageService messageService;private DingTalkUtil dingTalkUtil;public MessageCallback() {}public void setMessageService(MessageService messageService) {this.messageService = messageService;}public void setDingTalkUtil(DingTalkUtil dingTalkUtil) {this.dingTalkUtil = dingTalkUtil;}//消息确认public void confirm(CorrelationData correlationData, boolean ack, String cause) {if (correlationData != null && StringUtils.hasText(correlationData.getId())) {log.info("MessageId:{},ack:{},cause:{}", new Object[]{correlationData.getId(), ack, cause});TxMessageDO messageDO = this.messageService.getByMessageId(correlationData.getId());if (messageDO != null) {TxMessageDO txMessageDO = TxMessageDO.builder().build();txMessageDO.setId(messageDO.getId());txMessageDO.setStatus(ack ? MessageStatus.SUCCESS.name() : MessageStatus.FAIL.name());txMessageDO.setSendTimes(messageDO.getSendTimes() + 1);txMessageDO.setNextScheduleTime(messageDO.getNextScheduleTime().plusSeconds((long)(txMessageDO.getSendTimes()* messageDO.getBackoffDelay() * messageDO.getBackoffDelay())));this.messageService.updateById(txMessageDO);if (txMessageDO.getSendTimes() > messageDO.getMaxRetryTimes() && !ack) {this.dingTalkUtil.send("消息ID:" + txMessageDO.getMessageId() + " 达到最大发送次数:" + messageDO.getMaxRetryTimes());}}}}
}
5、消息发送
(1)消息发送接口
public interface MessageSender {//消息发送void send(Message message);//发送延迟消息void sendDelay(Message message, LocalDateTime delayTime);
}
(2)消息发送实现类
public class MessageSenderImpl implements MessageSender {private RabbitTemplate rabbitTemplate;private MessageService messageService;//最大重试次数private Integer maxRetry;//延迟消息private Integer backoffDelay;private Integer backoffMultiplier;public MessageSenderImpl() {}public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}public void setMessageService(MessageService messageService) {this.messageService = messageService;}public void setMaxRetry(Integer maxRetry) {this.maxRetry = maxRetry;}public void setBackoffDelay(Integer backoffDelay) {this.backoffDelay = backoffDelay;}public void setBackoffMultiplier(Integer backoffMultiplier) {this.backoffMultiplier = backoffMultiplier;}//消息发送public void send(final Message message) {this.check(message);TxMessageDO txMessageDO = this.convert(message);this.messageService.insert(txMessageDO);boolean isSynchronizationActive = TransactionSynchronizationManager.isSynchronizationActive();if (!isSynchronizationActive) {this.sendMsg(message);} else {TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {public void afterCommit() {MessageSenderImpl.this.sendMsg(message);}});}}//发送消息private void sendMsg(Message message) {CorrelationData correlationData = new CorrelationData();correlationData.setId(message.getMessageId());this.rabbitTemplate.convertAndSend(message.getExchange(), message.getRoutingKey(),message.getMessage(),correlationData);}//发送延迟消息public void sendDelay(Message message, LocalDateTime delayTime) {this.check(message);Assert.notNull(delayTime, "messageId can not be null");TxMessageDO txMessageDO = this.convert(message);txMessageDO.setNextScheduleTime(delayTime);this.messageService.insert(txMessageDO);}//消息转换成DOprivate TxMessageDO convert(Message message) {TxMessageDO txMessageDO = TxMessageDO.builder().build().setMessageId(message.getMessageId()).setContent(message.getMessage()).setExchangeName(message.getExchange()).setRoutingKey(message.getRoutingKey()).setBackoffDelay(this.backoffDelay).setBackoffMultiplier(this.backoffMultiplier).setMaxRetryTimes(this.maxRetry).setNextScheduleTime(LocalDateTime.now());return txMessageDO;}//检查private void check(Message message) {Assert.notNull(message, "message can not be null");Assert.notNull(message.getMessageId(), "messageId can not be null");Assert.notNull(message.getMessage(), "message can not be null");Assert.notNull(message.getExchange(), "exchange can not be null");Assert.notNull(message.getRoutingKey(), "routingKey can not be null");}
}
6、消息监听AOP类
@Aspect
public class MessageListnerAspct {public MessageListnerAspct() {}@Pointcut("execution(* cn.vipthink.orderfeign.*Feign*.*(..)) ")public void oldOrderFeignAspect() {}@Around("oldOrderFeignAspect()")public Object oldOrderFeignAspect(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {//获取渠道ObjObject channelObj = Arrays.stream(proceedingJoinPoint.getArgs()).filter((s) -> {return s instanceof Channel;}).findAny().orElse((Object)null);//获取消息ObjectObject messageObj = Arrays.stream(proceedingJoinPoint.getArgs()).filter((s) -> {return s instanceof Message;}).findAny().orElse((Object)null);if (channelObj != null) {Channel var4 = (Channel)channelObj;}if (channelObj != null && messageObj != null) {Message message = (Message)messageObj;String envIdHeader = (String)message.getMessageProperties().getHeader("envId");String envId = (String)System.getenv().get("envId");if (StringUtils.hasText(envIdHeader) && envIdHeader.equals(envId)) {return proceedingJoinPoint.proceed();} else if (message.getMessageProperties().getRedelivered()) {return proceedingJoinPoint.proceed();} else {Channel channel = (Channel)channelObj;channel.basicRecover(true);return null;}} else {return proceedingJoinPoint.proceed();}}
}
7、引入将starter打包成为POM
(1)引入POM包
<dependency><groupId>cn.vipthink.infra</groupId><artifactId>mq-client-starter</artifactId><version>1.1.3</version>
</dependency>
(2)添加配置
spring.rabbitmq.addresses=xxx
spring.rabbitmq.username=xxx
spring.rabbitmq.password=xxx
spring.rabbitmq.port=5672
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=correlated

(3)使用MessageSender发送

@Autowired
private  MessageSender messageSender;Message message = new Message();
messageSender.send(message)

代码参考:https://www.iocoder.cn/Spring-Boot/RabbitMQ/?github

相关文章:

Springboot整合RabbitMQ消息中间件

spring-boot-rabbitmq–消息中间件整合 前言&#xff1a;RabbitMQ的各种交换机说明 1、直连交换机 生产者发布消息时必须带着routing-key&#xff0c;队列绑定到交换机时必须指定binding-key ,且routing-key和binding-key必须完全相同&#xff0c;如此才能将消息路由到队列中…...

基于springboot+vue的食材商城(前后端分离)

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容&#xff1a;毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 项目介绍…...

Maven解析

目录 Maven的概念 Pom 项目坐标 仓库 Maven环境搭建 安装jdk 配置maven 配置本地仓库地址 配置阿里云 maven 镜像仓库&#xff0c;下载速度更快 在idea中配置maven ​编辑 pom中名词解释 Maven命令 Maven的概念 Maven 是 Apache 软件基金会的一个开源项目,是一个…...

如何使用数学将 NumPy 函数的性能提高 50%

一、说明 2D 傅里叶变换是本世纪最重要的计算机科学算法之一。它已在我们的日常生活中得到应用&#xff0c;从Instagram过滤器到MP3文件的处理。 普通用户最常用的实现&#xff0c;有时甚至是在不知不觉中&#xff0c;是 NumPy 的改编。然而&#xff0c;尽管它很受欢迎&#xf…...

群狼调研(长沙政策第三方评估)| 社情民意调查的内容

本文由群狼调研(长沙社会舆情调查)出品&#xff0c;欢迎转载&#xff0c;请注明出处。社情民意调查旨在捕捉公众对各种社会问题的态度、意见和看法&#xff0c;社情民意调查的内容通常包括以下几个方面&#xff1a; 1. 社会热点问题&#xff1a;针对当前社会热点问题进行调查&…...

【三维重建】【深度学习】NeuS代码Pytorch实现--测试阶段代码解析(上)

【三维重建】【深度学习】NeuS代码Pytorch实现–测试阶段代码解析(上) 论文提出了一种新颖的神经表面重建方法&#xff0c;称为NeuS&#xff0c;用于从2D图像输入以高保真度重建对象和场景。在NeuS中建议将曲面表示为有符号距离函数(SDF)的零级集&#xff0c;并开发一种新的体绘…...

day-24 代码随想录算法训练营(19)回溯part01

77.组合 思路一&#xff1a;回溯相当于枚举&#xff0c;所以我们遍历1-n的每一个数字&#xff0c;然后在遍历第i位的同时递归出第i1~n位的组合结果&#xff0c;跟树的形式相似。 如上图所示&#xff0c;当长度为k时&#xff0c;即退出递归可对遍历到第i位以及剩下位数与k进行比…...

Redis之SYNC与PSYNC命令

一、复制SYNC与PSYNC 在Redis主从架构中&#xff0c;主要有以下两种情形需要进行数据同步 &#xff08;1&#xff09;当新的服务器执行slave of 命令&#xff0c;成为主服务器的从服务器。这时候从服务器会向主服务器发送SYNC命令&#xff0c;请求全量同步数据&#xff0c;主服…...

共创无线物联网数字化新模式|协创数据×企企通采购与供应链管理平台项目成功上线

近日&#xff0c;全球无线物联网领先者『协创数据技术股份有限公司』&#xff08;以下简称“协创数据”&#xff09;SRM采购与供应链项目全面上线&#xff0c;并于近日与企企通召开成功召开项目上线总结会。 基于双方资源和优势&#xff0c;共同打造了物联网特色的数字化采购供…...

【深入理解jvm读书笔记】jvm如何进行内存分配

jvm如何进行内存分配 内存分配方式内存分配方式的选择并发场景下的内存分配内存空间的初始化构造函数 内存分配方式 指针碰撞空闲列表 指针碰撞法&#xff1a; 假设Java堆中内存是绝对规整的&#xff0c;所有被使用过的内存都被放在一边&#xff0c;空闲的内存被放在另一边&a…...

OpenCV使用CMake和MinGW-w64的编译安装

OpenCV使用CMake和MinGW-w64的编译安装中的问题 问题&#xff1a;gcc: error: long: No such file or directory** C:\PROGRA~2\Dev-Cpp\MinGW64\bin\windres.exe: preprocessing failed. modules\core\CMakeFiles\opencv_core.dir\build.make:1420: recipe for target ‘modul…...

亚马逊买家怎么留评

亚马逊买家可以按照以下步骤在购买后留下产品评价&#xff1a; 1、登录亚马逊账户&#xff1a;首先&#xff0c;在网页浏览器中打开亚马逊网站&#xff0c;登录你的亚马逊账户。 2、找到订单&#xff1a;在页面上找到并点击你购买过的商品的"我的订单"或"订单…...

并查集 size 的优化(并查集 size 的优化)

目录 并查集 size 的优化 Java 实例代码 UnionFind3.java 文件代码&#xff1a; 并查集 size 的优化 按照上一小节的思路&#xff0c;我们把如下图所示的并查集&#xff0c;进行 union(4,9) 操作。 合并操作后的结构为&#xff1a; 可以发现&#xff0c;这个结构的树的层相对…...

Qt关于hex转double,或者QByteArray转double

正常的00 ae 02 33这种类型的hex数据类型可以直接通过以下代码进行转换 double QDataConversion::hexToDouble(QByteArray p_buf) {double retValue 0;if(p_buf.size()>4){QString str1 byteArrayToHexStr(p_buf.mid(0,1));QString str2 byteArrayToHexStr(p_buf.mid(1,…...

Java“牵手”根据关键词搜索(分类搜索)拼多多商品列表页面数据获取方法,拼多多API实现批量商品数据抓取示例

拼多多商城是一个网上购物平台&#xff0c;售卖各类商品&#xff0c;包括服装、鞋类、家居用品、美妆产品、电子产品等。要获取拼多多商品列表和商品详情页面数据&#xff0c;您可以通过开放平台的接口或者直接访问拼多多商城的网页来获取商品列表和详情信息。以下是两种常用方…...

Linux相关知识点

Linux是什么&#xff1f; Linux是一套免费使用和自由传播的类Unix操作系统&#xff0c;是一个基于POSIX和UNIX的多用户、多任务、支持多线程和多CPU的操作系统。它能运行主要的UNIX工具软件、应用程序和网络协议。它支持32位和64位硬件。 Linux内核 是一个Linux系统的内核&…...

常见的的数据结构

数组&#xff08;Array&#xff09;&#xff1a;一组按顺序排列的元素的集合&#xff0c;可以通过索引访问和修改元素。 链表&#xff08;Linked List&#xff09;&#xff1a;由一系列节点组成的数据结构&#xff0c;每个节点包含数据和指向下一个节点的指针。 栈&#xff0…...

专业心理咨询师助你轻装上阵,向内耗说不!

引言 身为技术人&#xff0c;你是否经常感觉自己被掏空了精力&#xff0c;行动力不佳&#xff1f;又或者觉得自己的工作没有成就和意义&#xff0c;工作状态持续不佳&#xff1f;你是否总有一种无法消除的疲惫&#xff1f;即使没有学习、工作&#xff0c;而是选择看剧、刷短视频…...

Ubuntu安装mysql5.7

目录 1. 更新系统软件包2. 安装MySQL 5.73. 启动MySQL 服务4. 设置MySQL root 密码5. 验证MySQL 安装6. 启用远程访问7. 创建新用户8. 为新用户授予权限9. mysql命令 以Ubuntu 18.04系统为例&#xff0c;安装MySQL 5.7。操作步骤如下&#xff1a; 1. 更新系统软件包 sudo apt…...

vue2,使用element中的Upload 上传文件,自定义上传http-request上传,上传附件支持多选,多个文件只发送一次请求,代码里有注释

复制直接使用&#xff0c;组件根据multiple是否多选来返回附件内容&#xff0c;支持多选就返回数据附件&#xff0c;则返回一个附件对象。 //uploadFiles.vue<template><div><el-uploadclass"avatar-uploader"action"#":accept"accep…...

flutter定位简单工具类

import package:permission_handler/permission_handler.dart;class PermissionUtil {/// 获取用户定位权限static Future<bool> getLocationStatus() async {Map<Permission, PermissionStatus> statuses await [Permission.location,].request();return statuse…...

java请求SAP系统,发起soap的xml报文,实体类转换,idea自动生成教程

1、将接口的网页地址&#xff0c;右键保存&#xff0c;然后修改文件后缀为wsdl文件 2、idea全局搜索 wsdl&#xff0c;找到自动转换javabean插件&#xff1a; 3、点击后&#xff0c;选择下载改完后缀的文件(选择)&#xff1a; 4、将无用的class文件删除掉 5、请求sap的地址为…...

不同屏幕的触控技术

不同显示屏的触控技术原理有所不同。触摸屏的基本原理是&#xff0c;用手指或其他物体触摸安装在显示器前端的触摸屏时&#xff0c;所触摸的位置(以坐标形式)由触摸屏控制器检测&#xff0c;并通过接口(如RS-232串行口)送到CPU&#xff0c;从而确定输入的信息。 目前市场上常…...

深度解读thenable

在学习promise时&#xff0c;我们经常会遇到thenable一词。关于thenable&#xff0c;目前的资料解读不够通俗易懂&#xff0c;又或者脉络不够清晰&#xff0c;本文主要对thenable进行详细剖析&#xff0c;以便各位参考。笔者希望你能够仅凭这一篇文章&#xff0c;便能深度掌握该…...

原生无限极目录树详细讲解

原生无限级目录树 当涉及到原生的无限级目录树&#xff0c;我们可以使用递归算法来实现。以下是一个使用 JavaScript 实现原生无限级目录树的示例 介绍 原生无限级目录树是一种常见的数据结构&#xff0c;用于组织多层级的目录或分类数据。通过递归算法&#xff0c;我们可以…...

剑指offer(C++)-JZ64:求1+2+3+...+n(算法-位运算)

作者&#xff1a;翟天保Steven 版权声明&#xff1a;著作权归作者所有&#xff0c;商业转载请联系作者获得授权&#xff0c;非商业转载请注明出处 题目描述&#xff1a; 求123...n&#xff0c;要求不能使用乘除法、for、while、if、else、switch、case等关键字及条件判断语句&…...

“深入探究JVM内部机制:如何实现Java程序的运行环境?“

标题&#xff1a;深入探究JVM内部机制&#xff1a;如何实现Java程序的运行环境&#xff1f; 摘要&#xff1a;本文将深入探究Java虚拟机&#xff08;JVM&#xff09;的内部机制&#xff0c;重点讨论JVM如何实现Java程序的运行环境。我们将从JVM的结构、类加载、内存管理、垃圾…...

Mac更新homebrew时卡住的解决办法

Mac更新homebrew时卡住的解决办法 引起问题的原因brew命令安装软件跟这3个仓库地址有关1、brew2、homebrew-core3、homebrew-bottles4、若/bin/zsh&#xff0c;则输入5、若/bin/bash&#xff0c;则输入6、更新brew 引起问题的原因 知其然&#xff0c;还要知其所以然。brew的更…...

带你了解—在外远程群晖NAS-群晖Drive挂载电脑磁盘同步备份【无需公网IP】

文章目录 前言1.群晖Synology Drive套件的安装1.1 安装Synology Drive套件1.2 设置Synology Drive套件1.3 局域网内电脑测试和使用 2.使用cpolar远程访问内网Synology Drive2.1 Cpolar云端设置2.2 Cpolar本地设置2.3 测试和使用 3. 结语 前言 群晖作为专业的数据存储中心&…...

计算机网络第2章(物理层)

计算机网络第2章&#xff08;物理层&#xff09; 2.1 物理层的基本概念2.2 物理层下面的传输媒体2.2.1 导引型传输媒体2.2.2 非导引型传输媒体 2.3 传输方式2.3.1 串行传输和并行传输2.3.2 同步传输和异步传输2.3.3 单向通信&#xff08;单工&#xff09;、双向交替通信&#x…...

windows钩子保护自身进程不被破坏

代码来自于《windows核心编程》作者&#xff1a; APIHOOK.h头文件&#xff1a; #pragma once #include <Windows.h> class CAPIHOOK { public: CAPIHOOK(LPTSTR lpszModName, LPSTR pszFuncName, PROC pfnHook, BOOL bExcludeAPIHookMod TRUE); ~CAPIHOOK(void); p…...

Linux系统查看文件系统类型C代码

系统&#xff1a;VM Ubuntu 实现Linux系统下通过输入指定路径查看文件系统类型,MSDOS_SUPER_MAGIC&#xff0c;NTFS_SUPER_MAGIC和EXT4_SUPER_MAGIC这些宏定义并不是在sys/mount.h中定义的&#xff0c;它们实际上是在linux/magic.h头文件中定义的。不同系统下宏定义可能不一样&…...

Python中的正则表达式

大家好&#xff0c;今天我们将通过详细的解释和代码示例&#xff0c;探讨如何在Python中使用正则表达式。 介绍 正则表达式&#xff08;regex&#xff09;是一种用于操作文本和数据的强大工具&#xff0c;它们提供了一种简洁灵活的方式来“匹配”&#xff08;指定和识别&…...

第六章,创作文章

6.1添加创作页面 <template><div class="blog-container"><div class="blog-pages"><div class="col-md-12 panel"><div class="panel-body"><h2 class="text-center">创作文章&l…...

Win10c盘满了怎么清理?快速清理,5个方法!

“快救救孩子吧&#xff01;我的电脑是win10系统的&#xff0c;现在c盘满了&#xff0c;根本没法继续使用电脑了。怎么才能快速的释放内存呢&#xff1f;非常着急&#xff01;感谢大家&#xff01;” C盘是Windows系统中重要的分区&#xff0c;当其存储空间满了&#xff0c;可能…...

回归预测 | MATLAB实现GWO-BP灰狼算法优化BP神经网络多输入单输出回归预测(多指标,多图)

回归预测 | MATLAB实现GWO-BP灰狼算法优化BP神经网络多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09; 目录 回归预测 | MATLAB实现GWO-BP灰狼算法优化BP神经网络多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09;效果一览基本介绍程序…...

docker 06(docker compose)

一、服务编排 二、docker compose...

非阻塞重试与 Spring Kafka 的集成测试

如何为启用重试和死信发布的消费者的 Spring Kafka 实现编写集成测试。 Kafka 非阻塞重试 Kafka 中的非阻塞重试是通过为主主题配置重试主题来完成的。如果需要&#xff0c;还可以配置其他死信主题。如果所有重试均已用尽&#xff0c;事件将转发至 DLT。公共领域提供了大量资…...

基于 Debian 12 的MX Linux 23 正式发布!

导读MX Linux 是基于 Debian 稳定分支的面向桌面的 Linux 发行&#xff0c;它是 antiX 及早先的 MEPIS Linux 社区合作的产物。它采用 Xfce 作为默认桌面环境&#xff0c;是一份中量级操作系统&#xff0c;并被设计为优雅而高效的桌面与如下特性的结合&#xff1a;配置简单、高…...

Nginx代理功能与负载均衡详解

序言 Nginx的代理功能与负载均衡功能是最常被用到的&#xff0c;关于nginx的基本语法常识与配置已在上篇文章中有说明&#xff0c;这篇就开门见山&#xff0c;先描述一些关于代理功能的配置&#xff0c;再说明负载均衡详细。 Nginx代理服务的配置说明 1、上一篇中我们在http…...

部署问题集合(特辑)虚拟机常用命令

基础 查看ip&#xff1a;ip addr或ipconfig压缩&#xff1a;tar -zcvf redis-3.2.8.tar.gz redis-3.2.8/ 注意&#xff1a;-zcvf对应gz&#xff0c;-vcf对应tar 解压&#xff1a;tar -zxvf redis-3.2.8.tar.gz压缩zip&#xff1a;zip nginx.zip nginx.txt nginx2.txt解压zip&a…...

【Git】如何将本地文件进行Git仓库归档

Git 全局设置 git config --global user.name "mcihael" git config --global user.email "michael520.com"创建新版本库 git clone gitcode.xxxxxx.git cd branch-name touch README.md git add README.md git commit -m "add README" git pu…...

uniapp 使用腾讯视频 的 坑

1. 版本号的问题 注意 1.X.X不维护了 &#xff0c; 需要升级要 2.X.X 2. 官网的 组件事件 调用需要去掉bind 才能调用 官网地址&#xff1a;腾讯视频 | 小程序插件 | 微信公众平台...

LinkedList

LinkedList的模拟实现&#xff08;底层是一个双向链表&#xff09;LinkedList使用 LinkedList的模拟实现&#xff08;底层是一个双向链表&#xff09; 无头双向链表&#xff1a;有两个指针&#xff1b;一个指向前一个节点的地址&#xff1b;一个指向后一个节点的地址。 节点定…...

创作新纪元:知乎、阅文加码AI大模型,撬动创作者经济

输入几个关键词就能生成一篇文章、一篇新闻、一篇小说&#xff0c;ChatGPT自诞生以来文本生成能力一直备受赞誉&#xff0c;ChatGPT要替代记者、编辑、作家的言论愈演愈烈&#xff0c;甚至有一些互联网企业宣布砍掉记者、编辑、文案等岗位全面拥抱AIGC。 目前ChatGPT是否会全面…...

PAT(Advanced Level) Practice(with python)——1067 Sort with Swap(0, i)

Code # 输入有毒&#xff0c;需避坑 # N int(input()) L list(map(int,input().split())) N L[0] L L[1:] res 0 for i in range(1,N):while L[0]!0:# 把所有不在正常位置下的数换到正常t L[0]L[0],L[t] L[t],L[0]res1if L[i]!i:# 换完全后如果对应位置下的数不是目标…...

Python爬取斗罗大陆全集

打开网址http://www.luoxu.cc/dmplay/C888H-1-265.html F12打开Fetch/XHR&#xff0c;看到m3u8&#xff0c;ts&#xff0c;一眼顶真&#xff0c;打开index.m3u8 由第一个包含第二个index.m3u8的地址&#xff0c;ctrlf在源代码中一查index&#xff0c;果然有&#xff0c;不过/…...

前馈神经网络解密:深入理解人工智能的基石

目录 一、前馈神经网络概述什么是前馈神经网络前馈神经网络的工作原理应用场景及优缺点 二、前馈神经网络的基本结构输入层、隐藏层和输出层激活函数的选择与作用网络权重和偏置 三、前馈神经网络的训练方法损失函数与优化算法反向传播算法详解避免过拟合的策略 四、使用Python…...

顺序栈Sequential-stack

0、节点结构体定义 typedef struct SqStack{int *base;int *top; } SqStack; 1、初始化 bool InitStack(SqStack &S) {S.base new int[Maxsize]; //eg. #define Maxsize 100if(!S.base){return false;}S.top S.base;return true; } 2、入栈 bool Push(SqStack &…...

关于工牌(必须5-10个字)

今天蹲坑&#xff0c;低头看了下工牌觉得挺有意思&#xff1a;我从啥时候起也不排斥将工牌挂在脖子上了&#xff1f; 工牌&#xff0c;一个标识。不仅标识了你&#xff0c;也标识了你所在的群体。如果你认可这个群体&#xff0c;佩戴它那是一种荣誉、荣耀&#xff1b;如果你不…...