RabbitMQ消息可靠性保证机制3--消费端ACK机制
消费端ACK机制
在这之前已经完成了发送端的确认机制。可以保证数据成功的发送到RabbitMQ,以及持久化机制,然尔这依然无法完全保证整个过程的可靠性,因为如果消息被消费过程中业务处理失败了,但是消息却已经被标记为消费了,如果又没有任何重度机制,那结果基本等于丢消息。在消费端如何保证消息不丢呢?
在rabbitMQ的消费端会有ACK机制。即消费端消费消息后需要发送ACK确认报文给Broker端,告知自己是否已经消费完成,否则可能会一直重发消息直到消息过期(AUTO模式)。同时这个也是最终一致性、可恢复性的基础。一般有如下手段:
- 采用NONE模式,消费的过程中自行捕捉异常,引发异常后直接记录日志并落到异常处理表,再通过后台定时任务扫描异常恢复表做重度动作。如果业务不自行处理则有丢失数据的风险。
- 采用AUTO(自动ACK)模式,不主动捕获异常,当消费过程中出现异常时,会将消息放回Queue中,然后消息会被重新分配到其他消费节点(如果没有则还是选择当前节点)重新被消费,默认会一直重发消息并直到消费完成返回ACK或者一直到过期。
- 采用MANUAL(手动ACK)模式,消费者自行控制流程并手动调用channel相关的方法返回ACK。
7.6.1 手动ACK机制-Reject
maven导入
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency>
生产者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;public class Product {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel(); ) {// 定义交换器队列channel.exchangeDeclare("ack.ex", BuiltinExchangeType.DIRECT, false, false, false, null);// 定义队列channel.queueDeclare("ack.qu", false, false, false, null);// 队列绑定channel.queueBind("ack.qu", "ack.ex", "ack.rk");for (int i = 0; i < 5; i++) {byte[] sendBytes = ("hello-" + i).getBytes(StandardCharsets.UTF_8);channel.basicPublish("ack.ex", "ack.rk", null, sendBytes);}} catch (Exception e) {e.printStackTrace();}}
}
消费者
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;public class Consumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare("ack.qu", false, false, false, null);DefaultConsumer consumer =new DefaultConsumer(channel) {@Overridepublic void handleDelivery(// 消费者标签String consumerTag,// 消费者的封装Envelope envelope,// 消息属性AMQP.BasicProperties properties,// 消息体byte[] body)throws IOException {System.out.println("确认的消息内容:" + new String(body));// 找收消息// Nack与Reject的区别在于,nack可以对多条消息进行拒收,而reject只能拒收一条。// requeue为true表示不确认的消息会重新放回队列。channel.basicReject(envelope.getDeliveryTag(), true);}};channel.basicConsume("ack.qu",// 非自动确认false,// 消费者的标签"ack.consumer",// 回调函数consumer);}
}
发送测试
首先执行生产者向队列中发送数据。然后执行消费者,检查拒收的处理。
在消费者的控制台,将持续不断的输出消息信息:
确认的消息内容:hello-0
确认的消息内容:hello-1
确认的消息内容:hello-2
确认的消息内容:hello-3
确认的消息内容:hello-4
确认的消息内容:hello-0
确认的消息内容:hello-1
......
确认的消息内容:hello-0
按照发送的顺序将不断的被打印。
那此时消息是什么状态呢?查看下消息队列中的信息
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────┬────────────────┬─────────────────────────┬──────────┬───────────┐
│ name │ messages_ready │ messages_unacknowledged │ messages │ consumers │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ ack.qu │ 0 │ 5 │ 5 │ 1 │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ persistent.qu │ 1 │ 0 │ 1 │ 0 │
└───────────────┴────────────────┴─────────────────────────┴──────────┴───────────┘
[root@nullnull-os rabbitmq]#
可以看到当前的消息处于unack的状态。由于消息被不断的重新放回队列,而消费者又只有当前这一个,所以,在不断拒收中被放回。
那如果将消息拒绝改为不重新放回队列,会如何呢?来验证下。
首先修改消费者的代码:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;public class Consumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare("ack.qu", false, false, false, null);DefaultConsumer consumer =new DefaultConsumer(channel) {@Overridepublic void handleDelivery(// 消费者标签String consumerTag,// 消费者的封装Envelope envelope,// 消息属性AMQP.BasicProperties properties,// 消息体byte[] body)throws IOException {System.out.println("确认的消息内容:" + new String(body));// 找收消息// Nack与Reject的区别在于,nack可以对多条消息进行拒收,而reject只能拒收一条。// requeue为false表示不确认的消息不会重新放回队列。//channel.basicReject(envelope.getDeliveryTag(), true);channel.basicReject(envelope.getDeliveryTag(), false);}};channel.basicConsume("ack.qu",// 非自动确认false,// 消费者的标签"ack.consumer",// 回调函数consumer);}
}
再次执行消费者。
确认的消息内容:hello-0
确认的消息内容:hello-1
确认的消息内容:hello-2
确认的消息内容:hello-3
确认的消息内容:hello-4
而这一次消息没有再循环打印。只输出一遍,再检查下消息在队列中的状态:
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────┬────────────────┬─────────────────────────┬──────────┬───────────┐
│ name │ messages_ready │ messages_unacknowledged │ messages │ consumers │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ ack.qu │ 0 │ 0 │ 0 │ 1 │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ persistent.qu │ 1 │ 0 │ 1 │ 0 │
└───────────────┴────────────────┴─────────────────────────┴──────────┴───────────┘
[root@nullnull-os rabbitmq]#
通过观察发现,消息已经没有在队列中了,那就是消息已经被丢弃了。
7.6.2 手动ACK机制-ack
消费者修改为ACK确认处理
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;import java.io.IOException;public class Consumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare("ack.qu", false, false, false, null);DefaultConsumer consumer =new DefaultConsumer(channel) {@Overridepublic void handleDelivery(// 消费者标签String consumerTag,// 消费者的封装Envelope envelope,// 消息属性AMQP.BasicProperties properties,// 消息体byte[] body)throws IOException {System.out.println("确认的消息内容:" + new String(body));// 消息确认,并且非批量确认,multiple为false,表示只确认了单条channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume("ack.qu",// 非自动确认false,// 消费者的标签"ack.consumer",// 回调函数consumer);}
}
此时可以先运行消息者。等待消息推送。然后运行生产者将消息推送,此时便可以看到消费者的控制台输出:
确认的消息内容:hello-0
确认的消息内容:hello-1
确认的消息内容:hello-2
确认的消息内容:hello-3
确认的消息内容:hello-4
观察队列中的信息
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────┬────────────────┬─────────────────────────┬──────────┬───────────┐
│ name │ messages_ready │ messages_unacknowledged │ messages │ consumers │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ ack.qu │ 0 │ 0 │ 0 │ 1 │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ persistent.qu │ 1 │ 0 │ 1 │ 0 │
└───────────────┴────────────────┴─────────────────────────┴──────────┴───────────┘
[root@nullnull-os rabbitmq]#
在队列中,消息已经被成功的消费了。
7.6.3 手动ACK机制-nack
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;public class Consumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare("ack.qu", false, false, false, null);DefaultConsumer consumer =new DefaultConsumer(channel) {@Overridepublic void handleDelivery(// 消费者标签String consumerTag,// 消费者的封装Envelope envelope,// 消息属性AMQP.BasicProperties properties,// 消息体byte[] body)throws IOException {System.out.println("确认的消息内容:" + new String(body));// 消息批量不确认,即批量丢弃,每5个做一次批量消费// 参数1,消息的标签// multiple为false 表示不确认当前是一个消息。true就是多个消息。// requeue为true表示不确认的消息会重新放回队列。// 每5条做一次批量确认,_deliveryTag从1开始if (envelope.getDeliveryTag() % 5 == 0) {System.out.println("批量确认执行");channel.basicNack(envelope.getDeliveryTag(), true, false);}}};channel.basicConsume("ack.qu",// 非自动确认false,// 消费者的标签"ack.consumer",// 回调函数consumer);}
}
执行消费者程序,然后再执行生产者。查看消费端的控制台:
确认的消息内容:hello-0
确认的消息内容:hello-1
确认的消息内容:hello-2
确认的消息内容:hello-3
确认的消息内容:hello-4
批量确认执行
由于此处采用的是不重新放回队列,所以,数据接收到之后被丢弃了。
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────┬────────────────┬─────────────────────────┬──────────┬───────────┐
│ name │ messages_ready │ messages_unacknowledged │ messages │ consumers │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ ack.qu │ 0 │ 0 │ 0 │ 0 │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ persistent.qu │ 1 │ 0 │ 1 │ 0 │
└───────────────┴────────────────┴─────────────────────────┴──────────┴───────────┘
队列中的数据也已经被处理掉了。
7.6.4 手动ACK机制-SpringBoot
首先是Maven导入
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.2.8.RELEASE</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.2.8.RELEASE</version></dependency>
配制文件application.yml
spring:application:name: consumer-ackrabbitmq:host: node1port: 5672virtual-host: /username: rootpassword: 123456# 配制消费端ack信息。listener:simple:acknowledge-mode: manual# 重试超过最大次数后是否拒绝default-requeue-rejected: falseretry:# 开启消费者重度(false时关闭消费者重试,false不是不重试,而是一直收到消息直到ack确认或者一直到超时)enable: true# 最大重度次数max-attempts: 5# 重试间隔时间(单位毫秒)initial-interval: 1000
主启动类
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;import java.nio.charset.StandardCharsets;@SpringBootApplication
public class Main {@Autowired private RabbitTemplate rabbitTemplate;public static void main(String[] args) {SpringApplication.run(Main.class, args);}/*** 在启动后就开始向MQ中发送消息** @return*/@Beanpublic ApplicationRunner runner() {return args -> {Thread.sleep(5000);for (int i = 0; i < 10; i++) {MessageProperties props = new MessageProperties();props.setDeliveryTag(i);Message message = new Message(("消息:" + i).getBytes(StandardCharsets.UTF_8), props);rabbitTemplate.convertAndSend("ack.ex", "ack.rk", message);}};}
}
当主类启动后,会延迟5秒,向MQ中发送10条记录。
队列配制
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {@Beanpublic Queue queue() {return new Queue("ack.qu", false, false, false, null);}@Beanpublic Exchange exchange(){return new DirectExchange("ack.ex",false,false,null);}@Beanpublic Binding binding(){return BindingBuilder.bind(queue()).to(exchange()).with("ack.rk").noargs();}
}
使用推送模式来查确认消息
监听器,MQ队列推送消息至listener
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.concurrent.ThreadLocalRandom;@Component
public class MessageListener {/*** NONE模式,则只要收到消息后就立即确认(消息出列,标识已消费),有丢数据风险** <p>AUTO模式,看情况确认,如果此时消费者抛出异常则消息会返回队列中** <p>WANUAL模式,需要显示的调用当前channel的basicAck方法** @param channel* @param deliveryTag* @param msg*/// @RabbitListener(queues = "ack.qu", ackMode = "AUTO")// @RabbitListener(queues = "ack.qu", ackMode = "NONE")@RabbitListener(queues = "ack.qu", ackMode = "MANUAL")public void handMessageTopic(Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, @Payload String msg) {System.out.println("消息内容:" + msg);ThreadLocalRandom current = ThreadLocalRandom.current();try {if (current.nextInt(10) % 3 != 0) {// 手动nack,告诉broker消费者处理失败,最后一个参数表示是否需要将消息重新入列// channel.basicNack(deliveryTag, false, true);// 手动拒绝消息,第二个参数表示是否重新入列channel.basicReject(deliveryTag, true);} else {// 手动ACK,deliveryTag表示消息的唯一标志,multiple表示是否批量确认channel.basicAck(deliveryTag, false);System.out.println("已经确认的消息" + msg);}Thread.sleep(ThreadLocalRandom.current().nextInt(500, 3000));} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {}}
}
消息有33%的概率被拒绝,这样又会被重新放回队列,等待下次推送。
启动测试
运行main方法
【确认】消息内容:消息:0
【拒绝】消息内容:消息:1
【拒绝】消息内容:消息:2
【拒绝】消息内容:消息:3
【确认】消息内容:消息:4
【确认】消息内容:消息:5
【拒绝】消息内容:消息:6
【拒绝】消息内容:消息:7
【拒绝】消息内容:消息:8
【拒绝】消息内容:消息:9
【确认】消息内容:消息:1
【拒绝】消息内容:消息:2
【拒绝】消息内容:消息:3
【拒绝】消息内容:消息:6
【确认】消息内容:消息:7
【确认】消息内容:消息:8
【拒绝】消息内容:消息:9
【拒绝】消息内容:消息:2
【拒绝】消息内容:消息:3
【拒绝】消息内容:消息:6
【确认】消息内容:消息:9
【确认】消息内容:消息:2
【拒绝】消息内容:消息:3
【拒绝】消息内容:消息:6
【确认】消息内容:消息:3
【拒绝】消息内容:消息:6
【确认】消息内容:消息:6
从观察到的结果也印证了,反复的被推送,接收的一个过程中,使用命令查看队列的一个消费的情况
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────┬────────────────┬─────────────────────────┬──────────┬───────────┐
│ name │ messages_ready │ messages_unacknowledged │ messages │ consumers │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ ack.qu │ 0 │ 6 │ 6 │ 1 │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ persistent.qu │ 1 │ 0 │ 1 │ 0 │
└───────────────┴────────────────┴─────────────────────────┴──────────┴───────────┘
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────┬────────────────┬─────────────────────────┬──────────┬───────────┐
│ name │ messages_ready │ messages_unacknowledged │ messages │ consumers │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ ack.qu │ 0 │ 1 │ 1 │ 1 │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ persistent.qu │ 1 │ 0 │ 1 │ 0 │
└───────────────┴────────────────┴─────────────────────────┴──────────┴───────────┘
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────┬────────────────┬─────────────────────────┬──────────┬───────────┐
│ name │ messages_ready │ messages_unacknowledged │ messages │ consumers │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ ack.qu │ 0 │ 0 │ 0 │ 1 │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ persistent.qu │ 1 │ 0 │ 1 │ 0 │
└───────────────┴────────────────┴─────────────────────────┴──────────┴───────────┘
使用拉确认消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.GetResponse;
import org.springframework.amqp.rabbit.core.ChannelCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;@RestController
public class MsgController {@Autowired private RabbitTemplate rabbitTemplate;@RequestMapping("/msg")public String getMessage() {String message =rabbitTemplate.execute(new ChannelCallback<String>() {@Overridepublic String doInRabbit(Channel channel) throws Exception {GetResponse getResponse = channel.basicGet("ack.qu", false);if (null == getResponse) {return "你已经消费完所有的消息";}String message = new String(getResponse.getBody(), StandardCharsets.UTF_8);if (ThreadLocalRandom.current().nextInt(10) % 3 == 0) {// 执行消息确认操作channel.basicAck(getResponse.getEnvelope().getDeliveryTag(), false);return "已确认的消息:" + message;} else {// 拒收一条消息并重新放回队列channel.basicReject(getResponse.getEnvelope().getDeliveryTag(), true);return "拒绝的消息:" + message;}}});return message;}
}
在浏览器中访问,同样有66%的概率会被拒绝,仅33%会被确认。
注:如果与监听在同一个工程,需将监听器给注释。
启动main函数,在浏览器中访问。http://127.0.0.1:8080/msg
可以看到返回:
拒绝的消息:消息:0
已确认的消息:消息:1
拒绝的消息:消息:2
......
已确认的消息:消息:9
你已经消费完所有的消息
同样的观察队列的一个消费情况:
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────┬────────────────┬─────────────────────────┬──────────┬───────────┐
│ name │ messages_ready │ messages_unacknowledged │ messages │ consumers │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ ack.qu │ 8 │ 0 │ 8 │ 0 │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ persistent.qu │ 1 │ 0 │ 1 │ 0 │
└───────────────┴────────────────┴─────────────────────────┴──────────┴───────────┘
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────┬────────────────┬─────────────────────────┬──────────┬───────────┐
│ name │ messages_ready │ messages_unacknowledged │ messages │ consumers │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ ack.qu │ 3 │ 0 │ 3 │ 0 │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ persistent.qu │ 1 │ 0 │ 1 │ 0 │
└───────────────┴────────────────┴─────────────────────────┴──────────┴───────────┘
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────┬────────────────┬─────────────────────────┬──────────┬───────────┐
│ name │ messages_ready │ messages_unacknowledged │ messages │ consumers │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ ack.qu │ 0 │ 0 │ 0 │ 0 │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ persistent.qu │ 1 │ 0 │ 1 │ 0 │
└───────────────┴────────────────┴─────────────────────────┴──────────┴───────────┘
[root@nullnull-os rabbitmq]#
使用拉模式进行消息ACK确认也已经完成。
相关文章:
RabbitMQ消息可靠性保证机制3--消费端ACK机制
消费端ACK机制 在这之前已经完成了发送端的确认机制。可以保证数据成功的发送到RabbitMQ,以及持久化机制,然尔这依然无法完全保证整个过程的可靠性,因为如果消息被消费过程中业务处理失败了,但是消息却已经被标记为消费了&…...
Copilot在Pycharm的应用和示例
Copilot 是 Github 在 2021 年发布的 AI 代码助手工具,它可以根据你提供的上下文信息,自动生成代码建议,帮助提高代码编写效率和准确性。在 Pycharm 中使用 Copilot,可以进一步提升 Python 开发效率,本文将分享如何在 …...
搜维尔科技:【简报】第九届元宇宙数字人设计大赛,报名已经进入白热化阶段!
随着元宇宙时代的来临,数字人设计成为了创新前沿领域之一。为了提高大学生元宇宙虚拟人角色策划与美术设计的专业核心能力,我们特别举办了这场元宇宙数字人设计赛道,赛道主题为「AI人工智能科技」 ,只要与「AI人工智能科技」相关的…...
性能检测自动化(含内存泄露检测)
一、平台侧实现方案 1、UI case重复执行N次:进入页面,sleep 5s,记录start_time,sleep 30s,记录end_time,性能采集工具全程采集性能数据 2、根据采集到的性能数据,按照N次卡点性能数据:去掉最大的10%、最小的10%,求取平均值,作为单次性能数据结果f(n)…...
iec104和iec61850
iec104和iec61850 IEC104 规约详细解读(一) 协议结构 IEC104 规约详细解读(二)交互流程以及协议解析 61850开发知识总结与分享【1】 Get the necesarry projects next to each other in the same directory; $ git clone https://github.com/robidev/iec61850_open_server.g…...
redis 面试问题 (更新中 ing)
目录 reids 是做什么的为什么那么快有哪些使用场景redis有哪些 数据结构redis 有哪些底层数据结构为什么设计 sds一个 字符串 存储多大容量 stream为什么设计 streamstream 消费者消息丢失stream 消息私信问题 持久化机制redis 持久化机制,优缺点,怎么用…...
力扣(leetcode)第389题找不同(Python)
389.找不同 题目链接:389.找不同 给定两个字符串 s 和 t ,它们只包含小写字母。 字符串 t 由字符串 s 随机重排,然后在随机位置添加一个字母。 请找出在 t 中被添加的字母。 示例 1: 输入:s “abcd”, t “abcde…...
Linux_源码编译安装LAMP
1. 安装httpd服务 在配置 Apache 网站服务之前,需要正确安装好 httpd 服务器软件。httpd 服务器的安装可以选用 RPM 安装、源码编译安装这两种方式,前者相对比较简单、快速,但是在功能上存在一定的局限性。在实际的生产环境中,使…...
静态网页设计——清雅古筝网(HTML+CSS+JavaScript)
前言 声明:该文章只是做技术分享,若侵权请联系我删除。!! 感谢大佬的视频: https://www.bilibili.com/video/BV1T64y1K7Zn/?vd_source5f425e0074a7f92921f53ab87712357b 使用技术:HTMLCSSJS(…...
实战Flink Java api消费kafka实时数据落盘HDFS
文章目录 1 需求分析2 实验过程2.1 启动服务程序2.2 启动kafka生产 3 Java API 开发3.1 依赖3.2 代码部分 4 实验验证STEP1STEP2STEP3 5 时间窗口 1 需求分析 在Java api中,使用flink本地模式,消费kafka主题,并直接将数据存入hdfs中。 flin…...
爬虫与反爬-localStorage指纹(某易某盾滑块指纹检测)(Hook案例)
概述:本文将用于了解爬虫中localStorage的检测原理以及讲述一个用于检测localStorage的反爬虫案例,最后对该参数进行Hook断点定位 目录: 一、LocalStorage 二、爬虫中localStorage的案例(以某盾滑块为例) 三、如何…...
聊一聊 webpack 和 vite 的开发服务代理的问题
webpack 和 vite webpackVite重新编辑的问题 changOrigin: true如何定义 /api ? webPack And Vite 都是两个比较好用的打包工具,尤其是 Vite, 几几年流行忘记了,特色就是服务启动极快,实现预加载,感觉 webPack 要比 Vite 要复杂一…...
【鸿蒙4.0】安装DevEcoStudio
1.下载安装包 HUAWEI DevEco Studio和SDK下载和升级 | HarmonyOS开发者华为鸿蒙DevEco Studio是面向全场景的一站式集成开发环境,,在鸿蒙官网下载或升级操作系统开发工具DevEco Studio最新版本,SDK配置和下载,2.1支持Mac、Windows操作系统。…...
[概率论]四小时不挂猴博士
贝叶斯公式是什么 贝叶斯公式是概率论中的一个重要定理,用于计算在已知一些先验信息的情况下,更新对事件发生概率的估计。贝叶斯公式的表达式如下: P(A|B) P(B|A) * P(A) / P(B) 其中,P(A|B)表示在事件B发生的条件下事件A发生的概…...
算法通关村第二十关-黄金挑战图的常见算法
大家好我是苏麟 , 今天聊聊图的常见算法 . 图里的算法是很多的,这里我们介绍一些常见的图算法。这些算法一般都比较复杂,我们这里介绍这些算法的基本含义,适合面试的时候装*,如果手写,那就不用啦。 图分析算法…...
服务器内存不足怎么办?会有什么影响?
服务器内存,也被称为RAM(Random Access Memory),是一种临时存储设备,用于临时存放正在运行的程序和数据。它是服务器上的超高速存储介质,可以快速读取和写入数据,提供给CPU进行实时计算和操作。…...
GPT实战系列-简单聊聊LangChain
GPT实战系列-简单聊聊LangChain LLM大模型相关文章: GPT实战系列-ChatGLM3本地部署CUDA111080Ti显卡24G实战方案 GPT实战系列-Baichuan2本地化部署实战方案 GPT实战系列-大话LLM大模型训练 GPT实战系列-探究GPT等大模型的文本生成 GPT实战系列-Baichuan2等大模…...
【读书笔记】《白帽子讲web安全》浏览器安全
目录 第二篇 客户端脚本安全 第2章 浏览器安全 2.1同源策略 2.2浏览器沙箱 2.3恶意网址拦截 2.4高速发展的浏览器安全 第二篇 客户端脚本安全 第2章 浏览器安全 近年来随着互联网的发展,人们发现浏览器才是互联网最大的入口,绝大多数用户使用互联…...
海外服务器2核2G/4G/8G和4核8G配置16M公网带宽优惠价格表
腾讯云海外服务器租用优惠价格表,2核2G10M带宽、2核4G12M、2核8G14M、4核8G16M配置可选,可以选择Linux操作系统或Linux系统,相比较Linux服务器价格要更优惠一些,腾讯云服务器网txyfwq.com分享腾讯云国外服务器租用配置报价&#x…...
Linux 编译安装 Nginx
目录 一、前言二、四种安装方式介绍三、本文安装方式:源码安装3.1、安装依赖库3.2、开始安装 Nginx3.3、Nginx 相关操作3.4、把 Nginx 注册成系统服务 四、结尾 一、前言 Nginx 是一款轻量级的 Web 服务器、[反向代理]服务器,由于它的内存占用少…...
Oracle文件自动“减肥”记
📢📢📢📣📣📣 哈喽!大家好,我是【IT邦德】,江湖人称jeames007,10余年DBA及大数据工作经验 一位上进心十足的【大数据领域博主】!😜&am…...
【csharp】抽象类与接口有哪些不同?什么时候应该使用抽象类?
抽象类与接口有哪些不同? 抽象类和接口是在面向对象编程中两个不同的概念,它们有一些重要的区别。以下是抽象类和接口的主要不同点: 抽象类(Abstract Class): 成员类型: 抽象类可以包含抽象方…...
最新-mybatis-plus 3.5分页插件配置
mybatis-plus 3.5分页插件配置 前提 1.项目不是springboot, 是以前的常规spring项目 2.mp 从3.2升级到3.5,升级后发现原本的分页竟然不起作用了,每次查询都是查出所有 前后配置对比 jar包对比 jsqlparser我这里单独引了包,因为版本太低…...
案例098:基于微信小程序的电子购物系统的设计与实现
文末获取源码 开发语言:Java 框架:SSM JDK版本:JDK1.8 数据库:mysql 5.7 开发软件:eclipse/myeclipse/idea Maven包:Maven3.5.4 小程序框架:uniapp 小程序开发软件:HBuilder X 小程序…...
亚信安慧AntDB数据库:数字化时代的数据库创新引领者
AntDB数据库以其卓越的创新能力,集中体现在融合统一与实时处理两大关键领域。作为一款服务全国超过10亿用户的分布式数据库,其独特之处在于长期积累的经验、多样性的支持能力、快速响应的数据处理速度以及卓越的系统稳定性。AntDB不仅仅是一个数据库系统…...
【MySQL】关于日期转换的方法
力扣题 1、题目地址 1853. 转换日期格式 2、模拟表 表: Days Column NameTypedaydate day 是这个表的主键。 3、要求 给定一个Days表,请你编写SQL查询语句,将Days表中的每一个日期转化为"day_name, month_name day, year"格式的字符串…...
Ubuntu 虚拟机挂接 Windows 目录
Windows 共享目录 首先 Windows 下共享目录 我这里偷懒直接直接 Everyone ,也可以指定用户啥的 Ubuntu 挂接 挂接命令,类似如下: sudo mount -o usernamefananchong,passwordxxxx,uid1000,gid1000,file_mode0644,dir_mode0755,dynperm //…...
机器学习模型可解释性的结果分析
模型的可解释性是机器学习领域的一个重要分支,随着 AI 应用范围的不断扩大,人们越来越不满足于模型的黑盒特性,与此同时,金融、自动驾驶等领域的法律法规也对模型的可解释性提出了更高的要求,在可解释 AI 一文中我们已…...
静态网页设计——环保网(HTML+CSS+JavaScript)(dw、sublime Text、webstorm、HBuilder X)
前言 声明:该文章只是做技术分享,若侵权请联系我删除。!! 感谢大佬的视频: https://www.bilibili.com/video/BV1BC4y1v7ZY/?vd_source5f425e0074a7f92921f53ab87712357b 使用技术:HTMLCSSJS(…...
【HarmonyOS】装饰器下的状态管理与页面路由跳转实现
从今天开始,博主将开设一门新的专栏用来讲解市面上比较热门的技术 “鸿蒙开发”,对于刚接触这项技术的小伙伴在学习鸿蒙开发之前,有必要先了解一下鸿蒙,从你的角度来讲,你认为什么是鸿蒙呢?它出现的意义又是…...
怀化网站建设/企业网站设计服务
近期安装oracle到时候出现的一些问题记录1.安装完成无法启动,检查初始化参数2.监听配置1521以及其它端口都不通,检查是否存在多网卡,检查hosts文件,手工配置listener.ora等;3.ORA-00119: invalid specification for system parameter LOCAL_LISTENER检查hosts文件,listener.ora…...
wordpress运行目录/seo搜索优化是什么意思
传送门 给出一个二维平面,给出若干根线段,求出xx坐标为x0x0时在最上方的线段的标号(若有多条输出最小的)。 线段树好题,这题是李超线段树板子题。 没学过的这道题可以让你很好的理解李超线段树,下面讲讲…...
国家企业官网查询系统/网站seo排名免费咨询
1、$ORACLE_BASE/admin/SID_NAME/pfile文件夹下的init文件中的SID;2、/etc/oratab中的最后一行第一个“:”前,如“oracl:/u01/app/oracle/product/11.2.0/dbhome_1:N”中的“oracl”;3、~/.bash_profile中的SID;三个的sid要保持一致。...
albedo wordpress/外贸网站建站和推广
原文地址:http://blog.csdn.net/zhangxs_3/article/details/4034811 与Queue不同的是,Topic实现的是发布/订阅模型,发布者发布的消息,可以被多个订阅者消费。现在我们建立两个订阅者,一个发布者,循环给这个…...
关键词优化是什么意思/独立站seo建站系统
2014年,HTML5页面作为营销界新宠儿,“多快好省”的杰出代表,其灵活性高、开发成本低且制作周期短的种种特性使其在移动营销领域大放异彩。 文/Teeya 此前,介绍了HTML5网页的基础知识、有哪些应用场景以及如何推广,反响…...
通州个人做网站/友情手机站
目录: (1)axios-响应格式 (2)axios-拦截器 (3)vue2-条件渲染 (4)vue2-列表渲染 (1)axios-响应格式 下面看axios的返回响应对象的内部组成 后…...