某某会员小程序后端性能优化
背景
某某会员小程序后台提供开放平台能力,为三方油站提供会员积分、优惠劵等api。当用户在油站加油,油站收银会调用我们系统为用户发放积分、优惠劵等。用户反馈慢,三方调用发放积分接口性能极低,耗时30s+;
接口情况
发放积分接口业务负责,且用存储过程写的业务,改动风险极大
数据库情况
优惠卷等表,数据量800w+,甚至存在单表3000w+
优化方案
数据库数据归档
归档交易、用户优惠劵等表历史数据,比如归档三个月前的数据(根据实际情况补充归档条件,如用户优惠劵没使用或没过期的数据不能归档)
优化效果:存储过程耗时从30s降低到7s,但是作为Toc用途接口性能远远不达标,优化数据库sql或许能进一步降低响应时间,但是存储过程复杂优化费时费力风险大
方案 | 描述 | 风险 | 工作量 | 难度 | 是否能解决性能问题 | 是否解决并发冲突 | 影响 | 使用技术 |
---|---|---|---|---|---|---|---|---|
方案1 | java重写存储过程业务 | 大 | 大 | 大 | 一定程度能解决 | yes | 改动点多,业务影响大 | java + orm |
方案2 | 保证存储过程全局串行执行 | 小 | 小 | 大 | no | yes | 接口性能会降低 | 分布式锁 |
方案3 | 异步下存储过程全局串行执行 | 中 | 中 | 中 | yes | yes | rabbitmq+分布式锁+自旋锁 |
线程池异步化分析
接口中存储串行调用改为异步调用,
使用线程池异步化存在问题
开始简单使用线程池异步化,但是出现锁表的情况(原因存储过程没有保证原子性,并且其中大量使用临时表,并发下出现竞争锁表),而SqlServer自带的死锁检查机制杀死事务导致发放积分失败
线程池+分布式锁
异步线程【不能保证分布式环境的全局顺序执行】,使用分布式锁能保证同一个时间只有一个存储过程执行
问题:但是并发情况会将存储过程执行堆积在线程池,并发过大存在OOM风险,或者处理丢失风险
rabbitmq异步改造
可行性验证报告结论
验证通过点如下:
- 测试rabbitmq发送/接收消息【通过】
- 测试并发下分布式锁+自旋锁保证业务串行执行【通过】
- 测试并发下分布式锁+自旋锁+mq保证业务串行执行【通过】
- 测试业务幂等性保证不重复消费【通过】
- 测试手动ack兼容原来配置保证可靠性【通过】
当前项目rabbitmq使用方式问题分析
配置发下
spring.rabbitmq.host=172.18.229.23
spring.rabbitmq.port=5672
spring.rabbitmq.username=totaltest
spring.rabbitmq.password=totaltest
spring.rabbitmq.virtual-host=/totaltest/
spring.rabbitmq.publisher-confirms=false
该配置没有
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
若是不配默认为
spring.rabbitmq.listener.direct.acknowledge-mode=auto
spring.rabbitmq.listener.simple.acknowledge-mode=auto
rabbitmq消费者ack机制问题分析
spring.rabbitmq.listener.direct.acknowledge-mode是用于配置Spring Boot应用中RabbitMQ消息监听器的确认模式。确认模式决定了在消费者处理消息后如何通知RabbitMQ服务器来确认消息的接收情况。
该配置有以下几种可选的值:
- AUTO: 在这种模式下,消费者处理消息后,RabbitMQ会自动确认消息。这意味着消息一旦被消费者接收,就会立即从队列中删除。这是默认的确认模式。
- MANUAL: 在这种模式下,消费者需要显式地发送确认消息来告知RabbitMQ服务器消息已经被成功处理。这意味着消费者可以在处理消息后决定是否要确认消息。通常在需要进行消息处理的事务性操作时使用这种模式。
- NONE: 在这种模式下,消费者不会发送任何确认消息,也不会被要求发送确认消息。这意味着消息会在被传递给消费者之后立即被视为已经被确认。
问题:项目中该配置使用的模式配置,以为着没有手动ack,即消费者接收到消息,消息就会从mq中删除,若是消费者消费异常,则消息丢失不可追溯复原
rabbitmq生产者ack机制问题分析
项目中配置如下
spring.rabbitmq.publisher-confirms=false
spring.rabbitmq.publisher-confirms是Spring Boot中用于配置RabbitMQ生产者消息确认的属性。它用于控制是否启用RabbitMQ的发布确认机制,以确保消息成功发送到Exchange。
当spring.rabbitmq.publisher-confirms属性设置为true时,表示启用了RabbitMQ的发布确认机制。在这种情况下,当生产者发送消息到Exchange后,RabbitMQ会发送一个确认消息给生产者,告知消息是否成功到达Exchange。生产者可以根据收到的确认消息来判断消息是否成功发送,从而进行相应的处理。
当spring.rabbitmq.publisher-confirms属性设置为false时,表示禁用了RabbitMQ的发布确认机制。在这种情况下,生产者发送消息到Exchange后,不会收到确认消息,也无法得知消息是否成功到达Exchange。
通常情况下,建议将spring.rabbitmq.publisher-confirms属性设置为true,以确保消息的可靠发送。当然,具体是否启用发布确认机制,还取决于业务场景和对消息可靠性的要求。
rabbitmq消息可靠性问题分析
通过上诉【rabbitmq生产者ack机制问题分析】和【rabbitmq消费者ack机制问题分析】
可知当前项目中消息没有保证消息可靠性,rabbitmq宕机恢复、消费者消费异常都会导致消息丢失,导致业务完整性缺失
rabbitmq配置最小改动方案
上诉问题若想得到解决需项目中rabbitmq配置,会影响到原来所有使用mq的地方,避免影响范围较大
解决方案:新增消费者类似,通过设置不同的消费者来实现接收指定的消息需要手动 ack
测试rabbitmq配置发送接收消息【通过】
rabbitmq和springboot对应版本:3. Reference
创建虚拟host
创建测试 交换机和queues
Exchange:exchange-1
Queue:queue-1
key:springboot.*
spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-2
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=springboot.*
发送消息
package com.bfxy.springboot.producer;import java.util.Map;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import com.bfxy.springboot.entity.Order;@Component
public class RabbitSender {private static final Logger LOGGER = LoggerFactory.getLogger(RabbitSender.class);//自动注入RabbitTemplate模板类@Autowiredprivate RabbitTemplate rabbitTemplate; //回调函数: confirm确认final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.err.println("correlationData: " + correlationData);System.err.println("ack: " + ack);if(!ack){System.err.println("异常处理....");}}};//回调函数: return返回final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,String exchange, String routingKey) {System.err.println("return exchange: " + exchange + ", routingKey: " + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);}};//发送消息方法调用: 构建Message消息public void send(Object message, Map<String, Object> properties) {LOGGER.info("消息内容:{}",message);LOGGER.info("properties:{}",properties);try {MessageHeaders mhs = new MessageHeaders(properties);Message msg = MessageBuilder.createMessage(message, mhs);rabbitTemplate.setConfirmCallback(confirmCallback);rabbitTemplate.setReturnCallback(returnCallback);//id + 时间戳 全局唯一CorrelationData correlationData = new CorrelationData("1234567890");rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);}catch (Exception e){LOGGER.error("发送消息异常,message:{}",message);}}//发送消息方法调用: 构建自定义对象消息public void sendOrder(Order order) {LOGGER.info("订单消息内容:{}",order);try {rabbitTemplate.setConfirmCallback(confirmCallback);rabbitTemplate.setReturnCallback(returnCallback);//id + 时间戳 全局唯一CorrelationData correlationData = new CorrelationData("0987654321");rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData);}catch (Exception e){LOGGER.error("订单发送消息异常,message:{}",order);}}}
测试代码
package com.bfxy.springboot;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import com.bfxy.springboot.entity.Order;
import com.bfxy.springboot.producer.RabbitSender;@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {@Testpublic void contextLoads() {}@Autowiredprivate RabbitSender rabbitSender;private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");@Testpublic void testSender1() throws Exception {Map<String, Object> properties = new HashMap<>();properties.put("number", "12345");properties.put("send_time", simpleDateFormat.format(new Date()));rabbitSender.send("Hello RabbitMQ For Spring Boot!", properties);}@Testpublic void testSender2() throws Exception {Order order = new Order("001", "第一个订单");rabbitSender.sendOrder(order);}}
接收消息
package com.bfxy.springboot.conusmer;import java.util.Map;import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;@Component
public class RabbitReceiver {/*** 用于标识方法是一个RabbitMQ消息的监听方法,用于监听指定的队列,并在接收到消息时调用该方法进行处理。* 可以指定队列、交换机、路由键等属性,用于配置消息监听的相关信息。* 通常与@RabbitHandler一起使用,将消息监听和消息处理方法关联起来。*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue-1", durable="true"),exchange = @Exchange(value = "exchange-1", durable="true", type= "topic", ignoreDeclarationExceptions = "true"),key = "springboot.*" ))/*** 用于标识方法是一个RabbitMQ消息的处理方法。* 通常与@RabbitListener一起使用,用于指定具体的消息处理方法。* 通过@RabbitHandler注解标识的方法可以处理多个不同类型的消息,通过方法参数的类型来区分不同的消息类型。*/@RabbitHandlerpublic void onMessage(Message message, Channel channel) throws Exception {System.err.println("--------------------------------------");System.err.println("消费端Payload: " + message.getPayload());Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);//手工ACKchannel.basicAck(deliveryTag, false);}/*** * spring.rabbitmq.listener.order.queue.name=queue-2spring.rabbitmq.listener.order.queue.durable=truespring.rabbitmq.listener.order.exchange.name=exchange-1spring.rabbitmq.listener.order.exchange.durable=truespring.rabbitmq.listener.order.exchange.type=topicspring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=truespring.rabbitmq.listener.order.key=springboot.** @param order* @param channel* @param headers* @throws Exception*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", durable="${spring.rabbitmq.listener.order.queue.durable}"),exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", durable="${spring.rabbitmq.listener.order.exchange.durable}", type= "${spring.rabbitmq.listener.order.exchange.type}", ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),key = "${spring.rabbitmq.listener.order.key}"))@RabbitHandlerpublic void onOrderMessage(@Payload com.bfxy.springboot.entity.Order order, Channel channel,@Headers Map<String, Object> headers) throws Exception {System.err.println("--------------------------------------");System.err.println("消费端order: " + order.getId());Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);//手工ACKchannel.basicAck(deliveryTag, false);}}
断点测试
测试分布式锁+自旋锁测试串行执行【通过】
测试并发分布式锁顺序执行业务代码
package com.bfxy.springboot;import org.junit.Test;
import org.junit.runner.RunWith;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationTests.class);@AutowiredRedissonClient redissonClient;@Testpublic void contextLoads() {long startTime = System.currentTimeMillis();for (int i = 1;i<5;i++){int finalI = i;CompletableFuture.runAsync(()->{bizLock(String.valueOf(finalI));});}while (true){}}private void bizLock(String taskName) {RLock lock = redissonClient.getLock("my-lock");boolean locked = false;try {while (!locked) {locked = lock.tryLock();if (locked) {try {biz(3000, taskName);System.out.println("----------------");} finally {lock.unlock();}} else {// 未获取到锁,可以进行一些等待操作,比如休眠一段时间后再尝试获取锁Thread.sleep(100);}}} catch (Exception e) {e.printStackTrace();}}private void biz(Integer time,String taskName) throws Exception{long startTime = System.currentTimeMillis();LOGGER.info("任务序号={},任务执行开始时间={}",taskName,startTime);Thread.sleep(time);long endtime = System.currentTimeMillis();LOGGER.info("任务序号={},任务执行结束时间={}",taskName,startTime);LOGGER.info("任务序号={},任务执行消耗时间={}",taskName,(endtime-startTime));}
}
执行日志如下:【测试结果并发串行执行(同一时间只有一个任务执行)】
2024-07-10 13:46:49.587 INFO 36076 --- [onPool-worker-9] com.bfxy.springboot.ApplicationTests : 任务序号=1,任务执行开始时间=1720590409587
2024-07-10 13:46:52.601 INFO 36076 --- [onPool-worker-9] com.bfxy.springboot.ApplicationTests : 任务序号=1,任务执行结束时间=1720590409587
2024-07-10 13:46:52.601 INFO 36076 --- [onPool-worker-9] com.bfxy.springboot.ApplicationTests : 任务序号=1,任务执行消耗时间=3014
----------------
2024-07-10 13:46:52.665 INFO 36076 --- [onPool-worker-4] com.bfxy.springboot.ApplicationTests : 任务序号=4,任务执行开始时间=1720590412665
2024-07-10 13:46:55.678 INFO 36076 --- [onPool-worker-4] com.bfxy.springboot.ApplicationTests : 任务序号=4,任务执行结束时间=1720590412665
2024-07-10 13:46:55.678 INFO 36076 --- [onPool-worker-4] com.bfxy.springboot.ApplicationTests : 任务序号=4,任务执行消耗时间=3013
----------------
2024-07-10 13:46:55.759 INFO 36076 --- [onPool-worker-2] com.bfxy.springboot.ApplicationTests : 任务序号=2,任务执行开始时间=1720590415759
2024-07-10 13:46:58.761 INFO 36076 --- [onPool-worker-2] com.bfxy.springboot.ApplicationTests : 任务序号=2,任务执行结束时间=1720590415759
2024-07-10 13:46:58.761 INFO 36076 --- [onPool-worker-2] com.bfxy.springboot.ApplicationTests : 任务序号=2,任务执行消耗时间=3002
压力测试对比资源使用情况
结论使用线程池比较消耗资源,特别是内存,一点并发上来可能oom
压测前
200并发
1000并发
2000并发
测试分布式锁+自旋锁+mq全局串行执行【通过】
使用线程池控制会导致请求积压到线程池消耗cpu和内存资源,使用mq能有效削峰限流(减小服务器资源消耗),线上部署了两个节点即并发为2
消费者代码
/*** * spring.rabbitmq.listener.order.queue.name=queue-2spring.rabbitmq.listener.order.queue.durable=truespring.rabbitmq.listener.order.exchange.name=exchange-1spring.rabbitmq.listener.order.exchange.durable=truespring.rabbitmq.listener.order.exchange.type=topicspring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=truespring.rabbitmq.listener.order.key=springboot.** @param order* @param channel* @param headers* @throws Exception*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", durable="${spring.rabbitmq.listener.order.queue.durable}"),exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", durable="${spring.rabbitmq.listener.order.exchange.durable}", type= "${spring.rabbitmq.listener.order.exchange.type}", ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),key = "${spring.rabbitmq.listener.order.key}"))@RabbitHandlerpublic void onOrderMessage(@Payload com.bfxy.springboot.entity.Order order, Channel channel,@Headers Map<String, Object> headers) throws Exception {//System.err.println("--------------------------------------");//System.err.println("消费端order: " + order.getId());Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);onLockOrderMessage(order);//手工ACKchannel.basicAck(deliveryTag, false);}@AutowiredRedissonClient redissonClient;private void onLockOrderMessage(com.bfxy.springboot.entity.Order order) {RLock lock = redissonClient.getLock("my-lock");boolean locked = false;try {while (!locked) {locked = lock.tryLock();if (locked) {try {long startTime = System.currentTimeMillis();String id = order.getId();LOGGER.info("订单序号={},订单执行开始时间={}",id,startTime);Thread.sleep(7000);long endtime = System.currentTimeMillis();LOGGER.info("订单序号={},订单执行结束时间={}",id,startTime);LOGGER.info("订单序号={},订单执行消耗时间={}",id,(endtime-startTime));System.out.println("----------------");} finally {lock.unlock();}} else {// 未获取到锁,可以进行一些等待操作,比如休眠一段时间后再尝试获取锁Thread.sleep(100);}}} catch (Exception e) {e.printStackTrace();}}
生产者代码
@Testpublic void testSender3() throws Exception {for (int i = 1;i<=50;i++){int finalI = i;CompletableFuture.runAsync(()->{Order order = new Order(String.valueOf(finalI), "第"+finalI+"个订单");rabbitSender.sendOrder(order);});System.err.println("发送消息订单:"+finalI);if (i%5==0){Thread.sleep(1000);}}}
启动两个消费者【验证全局串行:同一时间只有一个业务执行】
记录消费日志验证是否串行
- 通过日志可知:单个消费者消费顺序执行
- 验证消费者1和2直接业务串行
消费者2:15:18:22 到 15:18:50 之间没有接收到消息【串行执行】
验证消费1:15:18:22 到 15:18:50时间段消息情况【串行执行】
业务幂等性保障测试【通过】
mq接收到消息会将消息中的uid放入redis,当重复消费时会进行判断,保障业务幂等性
@Testpublic void time() {// 获取字符串对象String key = "myKey";String value = "Hello, Redis!";RBucket<String> bucket = redissonClient.getBucket(key);bucket.set(value, 30, TimeUnit.SECONDS); // 设置失效时间为10秒}
幂等逻辑
// 判断key是否存在
if(bucket.isExists()){LOGGER.error("重复消费,id={}",id);// 重复消息不执行业务逻辑跳出直接ackbreak;
}else {marker(id);
}
重复消费情况:进入断点表示重复执行break会跳过业务代码
rabbitmq配置生效测试
原项目配置【自动ack测试】-【通过】
测试自动ack是否生效
package com.bfxy.springboot.config;import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;@Configuration
public class RabbitMQConfig {@Value("${spring.rabbitmq.host}")private String addresses;@Value("${spring.rabbitmq.port}")private String port;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.virtual-host}")private String virtualHost;@Value("${spring.rabbitmq.publisher-confirms}")private boolean publisherConfirms;@Bean/** 因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置 */// @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)public RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());return rabbitTemplate;}@Beanpublic RabbitTemplate manualAckRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// 配置手动ACKrabbitTemplate.setChannelTransacted(true);rabbitTemplate.setChannelTransacted(true);rabbitTemplate.setChannelTransacted(true);rabbitTemplate.setChannelTransacted(true);rabbitTemplate.setAcknowledgeMode(AcknowledgeMode.MANUAL);return rabbitTemplate;}@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setAddresses(addresses + ":" + port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);/** 如果要进行消息回调,则这里必须要设置为true */connectionFactory.setPublisherConfirms(publisherConfirms);return connectionFactory;}@Bean("mqContainerFactory")@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory, MessageConverter messageConverter) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(messageConverter);factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);return factory;}}
注释手动ack
spring.rabbitmq.addresses=127.0.0.1:5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=total-api
spring.rabbitmq.connection-timeout=15000
发送消息,没ack前控制台信息
等待一会,自动ack的消息从rabbitmq中删除了
新增配置【手动ack测试】-【通过】
rabbitmq如何实现接受指定的消息要手动ack,其他消息自动ack
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Beanpublic RabbitListenerContainerFactory<SimpleMessageListenerContainer> manualAckListenerContainerFactory() {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);return factory;}@Beanpublic RabbitListenerContainerFactory<SimpleMessageListenerContainer> autoAckListenerContainerFactory() {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setAcknowledgeMode(AcknowledgeMode.AUTO);return factory;}
}
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Beanpublic RabbitListenerContainerFactory<SimpleMessageListenerContainer> manualAckListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);return factory;}@Beanpublic RabbitListenerContainerFactory<SimpleMessageListenerContainer> autoAckListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setAcknowledgeMode(AcknowledgeMode.AUTO);return factory;}
}
消费者代码指定手动ack,并注释手动ack
查看rabbitmq中消息是否被删除,预期消息不会删除
放开手动ack注释,再次测试
兜底保证方案
消息处理可能失败,处理失败的消息记录到broker_message_log表中
-- 表 broker_message_log 消息记录结构
CREATE TABLE IF NOT EXISTS `broker_message_log` (`message_id` varchar(128) NOT NULL, -- 消息唯一ID`message` varchar(4000) DEFAULT NULL, -- 消息内容`try_count` int(4) DEFAULT '0', -- 重试次数`status` varchar(10) DEFAULT '', -- 消息投递状态 0 投递中 1 投递成功 2 投递失败`next_retry` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', --下一次重试时间 或 超时时间`create_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', --创建时间`update_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', --更新时间PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
通过定时任务重新执行失败的消息
执行点设计
- 定时任务重目标业务方法(该方式要将业务封装某个class的某个方法中,失败时会录入表中)
- 发送mq在消费中执行
相关文章:
![](https://img-blog.csdnimg.cn/img_convert/99bdfe7ee5618b62a6a2a04045e963d1.png)
某某会员小程序后端性能优化
背景 某某会员小程序后台提供开放平台能力,为三方油站提供会员积分、优惠劵等api。当用户在油站加油,油站收银会调用我们系统为用户发放积分、优惠劵等。用户反馈慢,三方调用发放积分接口性能极低,耗时30s; 接口情况…...
![](https://i-blog.csdnimg.cn/direct/784adbf51ea94efa9648c0cc0a668a26.png)
Docker:基础概念、架构与网络模式详解
1.Docker的基本概念 1.1.什么是docker Docker是一个用于开发,交付和运行应用程序的开放平台.docker使您能够将应用程序域基础框架分开,以便你可以快速开发交付软件.使用docker,你可以管理你的基础架构以管理应用程序相同的方式.通过利用docker用于交付,测试和部署代码的方法,你…...
![](https://i-blog.csdnimg.cn/direct/0b7e2eda63704d0fa24ef1bf0b9e9082.png)
全国大学生数据建模比赛c题——基于蔬菜类商品的自动定价与补货决策的研究分析
基于蔬菜类商品的自动定价与补货决策的研究分析 摘要 商超蔬菜不易保存,其质量会随着销售时间的增加而变差,影响商超收益,因此,基于各蔬菜品类的历史销售数据,制定合理的销售策略和补货决策对商超的营收十分关键。本文…...
![](https://i-blog.csdnimg.cn/direct/0bf905aaa84c445081872dd77ef56e49.png)
【漏洞复现】飞企互联-FE企业运营管理平台 uploadAttachmentServlet—文件上传漏洞
声明:本文档或演示材料仅用于教育和教学目的。如果任何个人或组织利用本文档中的信息进行非法活动,将与本文档的作者或发布者无关。 一、漏洞描述 企互联-FE企业运营管理平台是一个利用云计算、人工智能、大数据、物联网和移动互联网等现代技术构建的云…...
![](https://www.ngui.cc/images/no-images.jpg)
基于深度学习的语言生成
基于深度学习的语言生成(NLG, Natural Language Generation)是一种利用深度学习模型生成自然语言文本的技术。它在智能写作、自动摘要、对话系统、机器翻译等领域有广泛应用。以下是对这一领域的系统介绍: 1. 任务和目标 语言生成的主要任务…...
![](https://www.ngui.cc/images/no-images.jpg)
Kafka Rebalance详解
作者:耀灵 1.rebalance概览 rebalance中文含义为再平衡。它本质上是一组协议,规定了一个 consumer group 是如何达成一致来分配订阅 topic 的所有分区的。比方说Consumer Group A 有3个consumer 实例,它要消费一个拥有6个分区的topic&#…...
![](https://www.ngui.cc/images/no-images.jpg)
在 Markdown 编辑器中插入 空格 Space 和 空行 Enter
1. 空格 Space  2.空行 Enter <br/>...
![](https://i-blog.csdnimg.cn/direct/5ae41ee1642b467abb64c45bf589e133.png)
js逆向-webpack-python
网站(base64):aHR0cHM6Ly93d3cuY29pbmdsYXNzLmNvbS96aA 案例响应解密爬取(webpack) 1、找到目标url 2、进行入口定位(此案例使用 ‘decrypt(’ 关键字搜索 ) 3、找到位置进行分析 --t 为 dat…...
![](https://i-blog.csdnimg.cn/direct/2519b0dc15fd40938c35f1e206487dee.png#pic_center)
Python精神病算法和自我认知异类数学模型
🎯要点 🎯空间不确定性和动态相互作用自我认知异类模型 | 🎯精神病神经元算法推理 | 🎯集体信念催化个人行动力数学模型 | 🎯物种基因进化关系网络算法 | 🎯电路噪声低功耗容错解码算法 📜和-…...
![](https://i-blog.csdnimg.cn/direct/09c167d0d99f497294e04a767bef39cc.png)
npm install 报错:PhantomJS not found on PATH
npm install 报错:PhantomJS not found on PATH 整体报错内容 npm ERR! code 1 npm ERR! path G:\work-learn\open-coding\bruno\node_modules\phantomjs-prebuilt npm ERR! command failed npm ERR! command C:\Windows\system32\cmd.exe /d /s /c node install.…...
![](https://i-blog.csdnimg.cn/direct/3545a890b3e94fe0afb9315522263df4.jpeg)
【C++进阶学习】第六弹——set和map——体会用C++来构建二叉搜索树
set和map基础:【C进阶学习】第五弹——二叉搜索树——二叉树进阶及set和map的铺垫-CSDN博客 前言: 在上篇的学习中,我们已经学习了如何使用C语言来实现二叉搜索树,在C中,我们是有现成的封装好的类模板来实现二叉搜索树…...
![](https://i-blog.csdnimg.cn/direct/b2f6a5d6864f4e4595e276295179b71f.png)
sqlmap确定目标/实操
安装kali,kali自带sqlmap,在window系统中跟linux系统操作有区别 sqlmap是一款自动化SQL工具,打开kali终端,输入sqlmap,出现以下界面,就说明sqlmap可用。 sqlmap确定目标 一、sqlmap直连数据库 1、直连数据库…...
![](https://www.ngui.cc/images/no-images.jpg)
Java笔试|面试 —— 对多态性的理解
谈谈对多态性的理解: 一个事物的多种形态(编译和运行时状态不一致性) 实现机制:通过继承、重写和向上转型(Object obj new 子类())来实现。 1.广义上的理解 子类对象的多态性,方法的重写&am…...
从RL的专业角度解惑 instruct GPT的目标函数
作为早期chatGPT背后的核心技术,instruct GPT一直被业界奉为里程碑式的著作。但是这篇论文关于RL的部分确写的非常模糊,几乎一笔带过。当我们去仔细审查它的目标函数的时候,心中不免有诸多困惑。特别是作者提到用PPO来做强化学习,…...
![](https://i-blog.csdnimg.cn/direct/ed336fb4a41d47b194b1b168fd1210de.png#pic_center)
location匹配的优先级和重定向
nginx的重定向(rewrite) location 匹配 location匹配的就是后面的uri /wordpress 192.168.233.10/wordpress location匹配的分类和优先级 1.精确匹配 location / 对字符串进行完全匹配,必须完全符合 2.正则匹配 ^-前缀级别ÿ…...
![](https://i-blog.csdnimg.cn/direct/7b990667870a46f1abead337031c7ae7.png)
观察矩阵(View Matrix)、投影矩阵(Projection Matrix)、视口矩阵(Window Matrix)及VPM矩阵及它们之间的关系
V表示摄像机的观察矩阵(View Matrix),它的作用是把对象从世界坐标系变换到摄像机坐标系。因此,对于世界坐标系下的坐标值worldCoord(x0, y0, z0),如果希望使用观察矩阵VM将其变换为摄像机坐标系下的坐标值localCoord(x…...
![](https://i-blog.csdnimg.cn/direct/434c0d62fd6342279200a5dc36bd5e5b.png)
谷粒商城学习笔记-19-快速开发-逆向生成所有微服务基本CRUD代码
文章目录 一,使用逆向工程步骤梳理1,修改逆向工程的application.yml配置2,修改逆向工程的generator.properties配置3,以Debug模式启动逆向工程4,使用逆向工程生成代码5,整合生成的代码到对应的模块中 二&am…...
![](https://img-blog.csdnimg.cn/8c95ab5b58c84aada0dfae24f82dc48e.png)
时序预测 | Matlab实现TCN-Transformer的时间序列预测
时序预测 | Matlab实现TCN-Transformer的时间序列预测 目录 时序预测 | Matlab实现TCN-Transformer的时间序列预测效果一览基本介绍程序设计 效果一览 基本介绍 基于TCN-Transformer模型的时间序列预测,可以用于做光伏发电功率预测,风速预测,…...
![](https://img-blog.csdnimg.cn/img_convert/da5f0741e50090b4769a04dae9e134b5.png)
没想到MySQL 9.0这么拉胯
MySQL 7月1号发布了9.0版本,然而没想到并没有引起大家的狂欢,反而是来自DBA圈子的一篇吐槽,尤其是PG界吐槽更厉害。 难道MySQL现在真的这么拉胯了?本着好奇的态度,我也去下载了MySQL9.0的手册看了一下。确实有点让我大…...
![](https://www.ngui.cc/images/no-images.jpg)
开源 Wiki 系统 InfoSphere 2024.01.1 发布
推荐一套基于 SpringBoot 开发的简单、易用的开源权限管理平台,建议下载使用: https://github.com/devlive-community/authx 推荐一套为 Java 开发人员提供方便易用的 SDK 来与目前提供服务的的 Open AI 进行交互组件:https://github.com/devlive-commun…...
![](https://www.ngui.cc/images/no-images.jpg)
1.Introduction to Spring Web MVC framework
Web MVC framework 文档:22. Web MVC framework (spring.io) 概述 Web MVC框架(Web Model-View-Controller Framework)是一种用于构建Web应用程序的软件架构模式。MVC模式将应用程序分为三个主要组件:模型(Model&am…...
![](https://www.ngui.cc/images/no-images.jpg)
Onnx 1-深度学习-概述1
Onnx 1-深度学习-概述1 一: Onnx 概念1> Onnx 介绍2> Onnx 的作用3> Onnx 应用场景4> Onnx 文件格式1. Protobuf 特点2. onnx.proto3协议3> Onnx 模型基本操作二:Onnx API1> 算子详解2> Onnx 算子介绍三: Onnx 模型1> Onnx 函数功能1. np.random.rand…...
![](https://www.ngui.cc/images/no-images.jpg)
网络基础——udp协议
UDP协议(User Datagram Protocol,用户数据报协议)是OSI(Open System Interconnection,开放式系统互联)参考模型中一种无连接的传输层协议,它提供了一种简单的、不可靠的数据传输服务。以下是关于…...
![](https://i-blog.csdnimg.cn/direct/f1a8780c72404088aca4522051ee1906.png)
分布式锁理解
介绍分布式锁,我觉得从项目的背景入手把 在伙伴匹配系统中,我创建了一个定时任务,做为缓存预热的手段 这个具体原因在Redis-CSDN博客 接下来切入正题: 想象每个服务器都有一个定时任务,都要对数据库或者缓存进行操…...
![](https://www.ngui.cc/images/no-images.jpg)
Android Gradle 开发与应用 (十): Gradle 脚本最佳实践
目录 1. 使用Gradle Kotlin DSL 1.1 什么是Gradle Kotlin DSL 1.2 迁移到Kotlin DSL 1.3 优势分析 2. 优化依赖管理 2.1 使用依赖版本管理文件 2.2 使用依赖分组 3. 合理使用Gradle插件 3.1 官方插件和自定义插件 3.2 插件管理的最佳实践 4. 任务配置优化 4.1 使用…...
![](https://www.ngui.cc/images/no-images.jpg)
c#获取本机的MAC地址(附源码)
在前一次的项目中,突然用到了这个获取本机的MAC地址,然后就研究了一下,记录下来,防止以后再用到, 使用winfrom做的,界面一个button,一个textBox,点了button以后给textBox赋值显示mac地址 附上源…...
![](https://i-blog.csdnimg.cn/direct/11c915f1ca514b7c843be188325c0568.png)
sqlmap使用之-post注入、head注入(ua、cookie、referer)
1、post注入 1.1、方法一,通过保存数据包文件进行注入 bp抓包获取post数据 将数据保存到post.txt文件 加上-r指定数据文件 1.2、方法二、通过URL注入 D:\Python3.8.6\SQLmap>python sqlmap.py -u "http://localhost/login.php" --data "userna…...
![](https://i-blog.csdnimg.cn/direct/b933cbc12930451c8234daf4a12f597d.png)
XSS: 原理 反射型实例[入门]
原理 服务器未对用户输入进行严格校验,使攻击者将恶意的js代码,拼接到前端代码中,从而实现恶意利用 XSS攻击危害 窃取用户Cookie和其他敏感信息,进行会话劫持或身份冒充后台增删改文章进行XSS钓鱼攻击利用XSS漏洞进行网页代码的…...
![](https://i-blog.csdnimg.cn/direct/85bd36f375b1432b9b984f54af81db5c.png)
Idea新增Module报错:sdk ‘1.8‘ type ‘JavaSDK‘ is not registered in ProjectJdkTable
文章目录 一,创建Module报错二,原因分析三,解决方案1,点击上图的加号,把JDK8添加进来即可2,点击左侧[Project],直接设置SDK为JDK8 四,配置检查与验证 一,创建Module报错 …...
![](https://i-blog.csdnimg.cn/direct/37440478a39f4ccf910bb01d59308e3d.png)
基于RHCE基础搭建简单服务
目录 项目标题与需求一 配置IP地址server机node02机 二 配置web服务三 搭建dns服务器四 开启防火墙server firewalld 五 配置nfs服务器node02 nfsserver autofs 六 开启SELinux七 验证是否能访问www.rhce.com 项目标题与需求 项目标题: 项目需求: 现有…...
![](/images/no-images.jpg)
wordpress增加访问速度/网站排名
搬寝室 Time Limit: 2000/1000 MS (Java/Others) Memory Limit: 65536/32768 K (Java/Others) Total Submission(s): 20642 Accepted Submission(s): 7013 Problem Description 搬寝室是非常累的,xhd深有体会.时间追述2006年7月9号,那天xhd迫于无奈要从27号楼搬到3号楼,…...
![](https://images2017.cnblogs.com/blog/1072930/201712/1072930-20171207194110519-1470236458.png)
北京网站建设设计公司/西安区seo搜索排名优化
一、启动流程的时候设置流程变量 1.1 案例 /*** 启动流程实例*/Testpublic void start() {Student studentnew Student();student.setId(1);student.setName("张三");Map<String, Object> variablesnew HashMap<String,Object>();variables.put("da…...
![](/images/no-images.jpg)
做动态网站的总结/恢复正常百度
Linux服务器具有低成本、性能卓越、代码开放等特性。越来越多的企业正在准备或已经采用Linux担起了企业应用服务器的重任。本文要介绍的是笔者在实际工作中,采用Linux和其它开放套件共同部署高可靠性LDAP认证服务的实例。系统所要用到的软件包括:◆ Red …...
![](/images/no-images.jpg)
成品图片的网站在哪里找/如何在百度发布信息
当你看到的时候,你是不是已经爱上了它,如果你真的只看外表,那你就错了,不要太相信自己的眼睛,往往真像并不是你所看到的那么简单!请跟我一起来看看吧! 这次在项目中,就遇到了这个问题…...
![](https://img-blog.csdnimg.cn/img_convert/849f255f40bed4e9b45812bf561f6814.png)
晋城企业网站建设价格/搜狐视频
大家好,我是老赵~我有一个朋友~做了一个小破站,现在要实现一个站内信web消息推送的功能,对,就是下图这个小红点,一个很常用的功能。不过他还没想好用什么方式做,这里我帮他整理了一下…...
![](/images/no-images.jpg)
罗湖草铺附近做网站/百度百度一下官网
1、增加新功能: ①同时支持本书使用的S3C2440和S3C2410开发板 ②支持串口xmodem协议 ③支持网卡芯片CS8900 ④支持NAND Flash 读写 ⑤支持烧写yaffs文件系统映象 2、通过读取GSTATUS1寄存器的值可以区分S3C2410和S3C2440 0x32410000表示S3C2410 0x32410002表示S3C24…...