RabbitMQ消息可靠性保证机制3--消费端ACK机制
消费端ACK机制
在这之前已经完成了发送端的确认机制。可以保证数据成功的发送到RabbitMQ,以及持久化机制,然尔这依然无法完全保证整个过程的可靠性,因为如果消息被消费过程中业务处理失败了,但是消息却已经被标记为消费了,如果又没有任何重度机制,那结果基本等于丢消息。在消费端如何保证消息不丢呢?
在rabbitMQ的消费端会有ACK机制。即消费端消费消息后需要发送ACK确认报文给Broker端,告知自己是否已经消费完成,否则可能会一直重发消息直到消息过期(AUTO模式)。同时这个也是最终一致性、可恢复性的基础。一般有如下手段:
- 采用NONE模式,消费的过程中自行捕捉异常,引发异常后直接记录日志并落到异常处理表,再通过后台定时任务扫描异常恢复表做重度动作。如果业务不自行处理则有丢失数据的风险。
- 采用AUTO(自动ACK)模式,不主动捕获异常,当消费过程中出现异常时,会将消息放回Queue中,然后消息会被重新分配到其他消费节点(如果没有则还是选择当前节点)重新被消费,默认会一直重发消息并直到消费完成返回ACK或者一直到过期。
- 采用MANUAL(手动ACK)模式,消费者自行控制流程并手动调用channel相关的方法返回ACK。
7.6.1 手动ACK机制-Reject
maven导入
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency>
生产者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;public class Product {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel(); ) {// 定义交换器队列channel.exchangeDeclare("ack.ex", BuiltinExchangeType.DIRECT, false, false, false, null);// 定义队列channel.queueDeclare("ack.qu", false, false, false, null);// 队列绑定channel.queueBind("ack.qu", "ack.ex", "ack.rk");for (int i = 0; i < 5; i++) {byte[] sendBytes = ("hello-" + i).getBytes(StandardCharsets.UTF_8);channel.basicPublish("ack.ex", "ack.rk", null, sendBytes);}} catch (Exception e) {e.printStackTrace();}}
}
消费者
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;public class Consumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare("ack.qu", false, false, false, null);DefaultConsumer consumer =new DefaultConsumer(channel) {@Overridepublic void handleDelivery(// 消费者标签String consumerTag,// 消费者的封装Envelope envelope,// 消息属性AMQP.BasicProperties properties,// 消息体byte[] body)throws IOException {System.out.println("确认的消息内容:" + new String(body));// 找收消息// Nack与Reject的区别在于,nack可以对多条消息进行拒收,而reject只能拒收一条。// requeue为true表示不确认的消息会重新放回队列。channel.basicReject(envelope.getDeliveryTag(), true);}};channel.basicConsume("ack.qu",// 非自动确认false,// 消费者的标签"ack.consumer",// 回调函数consumer);}
}
发送测试
首先执行生产者向队列中发送数据。然后执行消费者,检查拒收的处理。
在消费者的控制台,将持续不断的输出消息信息:
确认的消息内容:hello-0
确认的消息内容:hello-1
确认的消息内容:hello-2
确认的消息内容:hello-3
确认的消息内容:hello-4
确认的消息内容:hello-0
确认的消息内容:hello-1
......
确认的消息内容:hello-0
按照发送的顺序将不断的被打印。
那此时消息是什么状态呢?查看下消息队列中的信息
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────┬────────────────┬─────────────────────────┬──────────┬───────────┐
│ name │ messages_ready │ messages_unacknowledged │ messages │ consumers │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ ack.qu │ 0 │ 5 │ 5 │ 1 │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ persistent.qu │ 1 │ 0 │ 1 │ 0 │
└───────────────┴────────────────┴─────────────────────────┴──────────┴───────────┘
[root@nullnull-os rabbitmq]#
可以看到当前的消息处于unack的状态。由于消息被不断的重新放回队列,而消费者又只有当前这一个,所以,在不断拒收中被放回。
那如果将消息拒绝改为不重新放回队列,会如何呢?来验证下。
首先修改消费者的代码:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;public class Consumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare("ack.qu", false, false, false, null);DefaultConsumer consumer =new DefaultConsumer(channel) {@Overridepublic void handleDelivery(// 消费者标签String consumerTag,// 消费者的封装Envelope envelope,// 消息属性AMQP.BasicProperties properties,// 消息体byte[] body)throws IOException {System.out.println("确认的消息内容:" + new String(body));// 找收消息// Nack与Reject的区别在于,nack可以对多条消息进行拒收,而reject只能拒收一条。// requeue为false表示不确认的消息不会重新放回队列。//channel.basicReject(envelope.getDeliveryTag(), true);channel.basicReject(envelope.getDeliveryTag(), false);}};channel.basicConsume("ack.qu",// 非自动确认false,// 消费者的标签"ack.consumer",// 回调函数consumer);}
}
再次执行消费者。
确认的消息内容:hello-0
确认的消息内容:hello-1
确认的消息内容:hello-2
确认的消息内容:hello-3
确认的消息内容:hello-4
而这一次消息没有再循环打印。只输出一遍,再检查下消息在队列中的状态:
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────┬────────────────┬─────────────────────────┬──────────┬───────────┐
│ name │ messages_ready │ messages_unacknowledged │ messages │ consumers │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ ack.qu │ 0 │ 0 │ 0 │ 1 │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ persistent.qu │ 1 │ 0 │ 1 │ 0 │
└───────────────┴────────────────┴─────────────────────────┴──────────┴───────────┘
[root@nullnull-os rabbitmq]#
通过观察发现,消息已经没有在队列中了,那就是消息已经被丢弃了。
7.6.2 手动ACK机制-ack
消费者修改为ACK确认处理
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;import java.io.IOException;public class Consumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare("ack.qu", false, false, false, null);DefaultConsumer consumer =new DefaultConsumer(channel) {@Overridepublic void handleDelivery(// 消费者标签String consumerTag,// 消费者的封装Envelope envelope,// 消息属性AMQP.BasicProperties properties,// 消息体byte[] body)throws IOException {System.out.println("确认的消息内容:" + new String(body));// 消息确认,并且非批量确认,multiple为false,表示只确认了单条channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume("ack.qu",// 非自动确认false,// 消费者的标签"ack.consumer",// 回调函数consumer);}
}
此时可以先运行消息者。等待消息推送。然后运行生产者将消息推送,此时便可以看到消费者的控制台输出:
确认的消息内容:hello-0
确认的消息内容:hello-1
确认的消息内容:hello-2
确认的消息内容:hello-3
确认的消息内容:hello-4
观察队列中的信息
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────┬────────────────┬─────────────────────────┬──────────┬───────────┐
│ name │ messages_ready │ messages_unacknowledged │ messages │ consumers │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ ack.qu │ 0 │ 0 │ 0 │ 1 │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ persistent.qu │ 1 │ 0 │ 1 │ 0 │
└───────────────┴────────────────┴─────────────────────────┴──────────┴───────────┘
[root@nullnull-os rabbitmq]#
在队列中,消息已经被成功的消费了。
7.6.3 手动ACK机制-nack
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;public class Consumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare("ack.qu", false, false, false, null);DefaultConsumer consumer =new DefaultConsumer(channel) {@Overridepublic void handleDelivery(// 消费者标签String consumerTag,// 消费者的封装Envelope envelope,// 消息属性AMQP.BasicProperties properties,// 消息体byte[] body)throws IOException {System.out.println("确认的消息内容:" + new String(body));// 消息批量不确认,即批量丢弃,每5个做一次批量消费// 参数1,消息的标签// multiple为false 表示不确认当前是一个消息。true就是多个消息。// requeue为true表示不确认的消息会重新放回队列。// 每5条做一次批量确认,_deliveryTag从1开始if (envelope.getDeliveryTag() % 5 == 0) {System.out.println("批量确认执行");channel.basicNack(envelope.getDeliveryTag(), true, false);}}};channel.basicConsume("ack.qu",// 非自动确认false,// 消费者的标签"ack.consumer",// 回调函数consumer);}
}
执行消费者程序,然后再执行生产者。查看消费端的控制台:
确认的消息内容:hello-0
确认的消息内容:hello-1
确认的消息内容:hello-2
确认的消息内容:hello-3
确认的消息内容:hello-4
批量确认执行
由于此处采用的是不重新放回队列,所以,数据接收到之后被丢弃了。
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────┬────────────────┬─────────────────────────┬──────────┬───────────┐
│ name │ messages_ready │ messages_unacknowledged │ messages │ consumers │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ ack.qu │ 0 │ 0 │ 0 │ 0 │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ persistent.qu │ 1 │ 0 │ 1 │ 0 │
└───────────────┴────────────────┴─────────────────────────┴──────────┴───────────┘
队列中的数据也已经被处理掉了。
7.6.4 手动ACK机制-SpringBoot
首先是Maven导入
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.2.8.RELEASE</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.2.8.RELEASE</version></dependency>
配制文件application.yml
spring:application:name: consumer-ackrabbitmq:host: node1port: 5672virtual-host: /username: rootpassword: 123456# 配制消费端ack信息。listener:simple:acknowledge-mode: manual# 重试超过最大次数后是否拒绝default-requeue-rejected: falseretry:# 开启消费者重度(false时关闭消费者重试,false不是不重试,而是一直收到消息直到ack确认或者一直到超时)enable: true# 最大重度次数max-attempts: 5# 重试间隔时间(单位毫秒)initial-interval: 1000
主启动类
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;import java.nio.charset.StandardCharsets;@SpringBootApplication
public class Main {@Autowired private RabbitTemplate rabbitTemplate;public static void main(String[] args) {SpringApplication.run(Main.class, args);}/*** 在启动后就开始向MQ中发送消息** @return*/@Beanpublic ApplicationRunner runner() {return args -> {Thread.sleep(5000);for (int i = 0; i < 10; i++) {MessageProperties props = new MessageProperties();props.setDeliveryTag(i);Message message = new Message(("消息:" + i).getBytes(StandardCharsets.UTF_8), props);rabbitTemplate.convertAndSend("ack.ex", "ack.rk", message);}};}
}
当主类启动后,会延迟5秒,向MQ中发送10条记录。
队列配制
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {@Beanpublic Queue queue() {return new Queue("ack.qu", false, false, false, null);}@Beanpublic Exchange exchange(){return new DirectExchange("ack.ex",false,false,null);}@Beanpublic Binding binding(){return BindingBuilder.bind(queue()).to(exchange()).with("ack.rk").noargs();}
}
使用推送模式来查确认消息
监听器,MQ队列推送消息至listener
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.concurrent.ThreadLocalRandom;@Component
public class MessageListener {/*** NONE模式,则只要收到消息后就立即确认(消息出列,标识已消费),有丢数据风险** <p>AUTO模式,看情况确认,如果此时消费者抛出异常则消息会返回队列中** <p>WANUAL模式,需要显示的调用当前channel的basicAck方法** @param channel* @param deliveryTag* @param msg*/// @RabbitListener(queues = "ack.qu", ackMode = "AUTO")// @RabbitListener(queues = "ack.qu", ackMode = "NONE")@RabbitListener(queues = "ack.qu", ackMode = "MANUAL")public void handMessageTopic(Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, @Payload String msg) {System.out.println("消息内容:" + msg);ThreadLocalRandom current = ThreadLocalRandom.current();try {if (current.nextInt(10) % 3 != 0) {// 手动nack,告诉broker消费者处理失败,最后一个参数表示是否需要将消息重新入列// channel.basicNack(deliveryTag, false, true);// 手动拒绝消息,第二个参数表示是否重新入列channel.basicReject(deliveryTag, true);} else {// 手动ACK,deliveryTag表示消息的唯一标志,multiple表示是否批量确认channel.basicAck(deliveryTag, false);System.out.println("已经确认的消息" + msg);}Thread.sleep(ThreadLocalRandom.current().nextInt(500, 3000));} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {}}
}
消息有33%的概率被拒绝,这样又会被重新放回队列,等待下次推送。
启动测试
运行main方法
【确认】消息内容:消息:0
【拒绝】消息内容:消息:1
【拒绝】消息内容:消息:2
【拒绝】消息内容:消息:3
【确认】消息内容:消息:4
【确认】消息内容:消息:5
【拒绝】消息内容:消息:6
【拒绝】消息内容:消息:7
【拒绝】消息内容:消息:8
【拒绝】消息内容:消息:9
【确认】消息内容:消息:1
【拒绝】消息内容:消息:2
【拒绝】消息内容:消息:3
【拒绝】消息内容:消息:6
【确认】消息内容:消息:7
【确认】消息内容:消息:8
【拒绝】消息内容:消息:9
【拒绝】消息内容:消息:2
【拒绝】消息内容:消息:3
【拒绝】消息内容:消息:6
【确认】消息内容:消息:9
【确认】消息内容:消息:2
【拒绝】消息内容:消息:3
【拒绝】消息内容:消息:6
【确认】消息内容:消息:3
【拒绝】消息内容:消息:6
【确认】消息内容:消息:6
从观察到的结果也印证了,反复的被推送,接收的一个过程中,使用命令查看队列的一个消费的情况
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────┬────────────────┬─────────────────────────┬──────────┬───────────┐
│ name │ messages_ready │ messages_unacknowledged │ messages │ consumers │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ ack.qu │ 0 │ 6 │ 6 │ 1 │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ persistent.qu │ 1 │ 0 │ 1 │ 0 │
└───────────────┴────────────────┴─────────────────────────┴──────────┴───────────┘
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────┬────────────────┬─────────────────────────┬──────────┬───────────┐
│ name │ messages_ready │ messages_unacknowledged │ messages │ consumers │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ ack.qu │ 0 │ 1 │ 1 │ 1 │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ persistent.qu │ 1 │ 0 │ 1 │ 0 │
└───────────────┴────────────────┴─────────────────────────┴──────────┴───────────┘
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────┬────────────────┬─────────────────────────┬──────────┬───────────┐
│ name │ messages_ready │ messages_unacknowledged │ messages │ consumers │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ ack.qu │ 0 │ 0 │ 0 │ 1 │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ persistent.qu │ 1 │ 0 │ 1 │ 0 │
└───────────────┴────────────────┴─────────────────────────┴──────────┴───────────┘
使用拉确认消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.GetResponse;
import org.springframework.amqp.rabbit.core.ChannelCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;@RestController
public class MsgController {@Autowired private RabbitTemplate rabbitTemplate;@RequestMapping("/msg")public String getMessage() {String message =rabbitTemplate.execute(new ChannelCallback<String>() {@Overridepublic String doInRabbit(Channel channel) throws Exception {GetResponse getResponse = channel.basicGet("ack.qu", false);if (null == getResponse) {return "你已经消费完所有的消息";}String message = new String(getResponse.getBody(), StandardCharsets.UTF_8);if (ThreadLocalRandom.current().nextInt(10) % 3 == 0) {// 执行消息确认操作channel.basicAck(getResponse.getEnvelope().getDeliveryTag(), false);return "已确认的消息:" + message;} else {// 拒收一条消息并重新放回队列channel.basicReject(getResponse.getEnvelope().getDeliveryTag(), true);return "拒绝的消息:" + message;}}});return message;}
}
在浏览器中访问,同样有66%的概率会被拒绝,仅33%会被确认。
注:如果与监听在同一个工程,需将监听器给注释。
启动main函数,在浏览器中访问。http://127.0.0.1:8080/msg
可以看到返回:
拒绝的消息:消息:0
已确认的消息:消息:1
拒绝的消息:消息:2
......
已确认的消息:消息:9
你已经消费完所有的消息
同样的观察队列的一个消费情况:
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────┬────────────────┬─────────────────────────┬──────────┬───────────┐
│ name │ messages_ready │ messages_unacknowledged │ messages │ consumers │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ ack.qu │ 8 │ 0 │ 8 │ 0 │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ persistent.qu │ 1 │ 0 │ 1 │ 0 │
└───────────────┴────────────────┴─────────────────────────┴──────────┴───────────┘
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────┬────────────────┬─────────────────────────┬──────────┬───────────┐
│ name │ messages_ready │ messages_unacknowledged │ messages │ consumers │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ ack.qu │ 3 │ 0 │ 3 │ 0 │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ persistent.qu │ 1 │ 0 │ 1 │ 0 │
└───────────────┴────────────────┴─────────────────────────┴──────────┴───────────┘
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────┬────────────────┬─────────────────────────┬──────────┬───────────┐
│ name │ messages_ready │ messages_unacknowledged │ messages │ consumers │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ ack.qu │ 0 │ 0 │ 0 │ 0 │
├───────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ persistent.qu │ 1 │ 0 │ 1 │ 0 │
└───────────────┴────────────────┴─────────────────────────┴──────────┴───────────┘
[root@nullnull-os rabbitmq]#
使用拉模式进行消息ACK确认也已经完成。
相关文章:
RabbitMQ消息可靠性保证机制3--消费端ACK机制
消费端ACK机制 在这之前已经完成了发送端的确认机制。可以保证数据成功的发送到RabbitMQ,以及持久化机制,然尔这依然无法完全保证整个过程的可靠性,因为如果消息被消费过程中业务处理失败了,但是消息却已经被标记为消费了&…...
Copilot在Pycharm的应用和示例
Copilot 是 Github 在 2021 年发布的 AI 代码助手工具,它可以根据你提供的上下文信息,自动生成代码建议,帮助提高代码编写效率和准确性。在 Pycharm 中使用 Copilot,可以进一步提升 Python 开发效率,本文将分享如何在 …...

搜维尔科技:【简报】第九届元宇宙数字人设计大赛,报名已经进入白热化阶段!
随着元宇宙时代的来临,数字人设计成为了创新前沿领域之一。为了提高大学生元宇宙虚拟人角色策划与美术设计的专业核心能力,我们特别举办了这场元宇宙数字人设计赛道,赛道主题为「AI人工智能科技」 ,只要与「AI人工智能科技」相关的…...
性能检测自动化(含内存泄露检测)
一、平台侧实现方案 1、UI case重复执行N次:进入页面,sleep 5s,记录start_time,sleep 30s,记录end_time,性能采集工具全程采集性能数据 2、根据采集到的性能数据,按照N次卡点性能数据:去掉最大的10%、最小的10%,求取平均值,作为单次性能数据结果f(n)…...

iec104和iec61850
iec104和iec61850 IEC104 规约详细解读(一) 协议结构 IEC104 规约详细解读(二)交互流程以及协议解析 61850开发知识总结与分享【1】 Get the necesarry projects next to each other in the same directory; $ git clone https://github.com/robidev/iec61850_open_server.g…...

redis 面试问题 (更新中 ing)
目录 reids 是做什么的为什么那么快有哪些使用场景redis有哪些 数据结构redis 有哪些底层数据结构为什么设计 sds一个 字符串 存储多大容量 stream为什么设计 streamstream 消费者消息丢失stream 消息私信问题 持久化机制redis 持久化机制,优缺点,怎么用…...
力扣(leetcode)第389题找不同(Python)
389.找不同 题目链接:389.找不同 给定两个字符串 s 和 t ,它们只包含小写字母。 字符串 t 由字符串 s 随机重排,然后在随机位置添加一个字母。 请找出在 t 中被添加的字母。 示例 1: 输入:s “abcd”, t “abcde…...

Linux_源码编译安装LAMP
1. 安装httpd服务 在配置 Apache 网站服务之前,需要正确安装好 httpd 服务器软件。httpd 服务器的安装可以选用 RPM 安装、源码编译安装这两种方式,前者相对比较简单、快速,但是在功能上存在一定的局限性。在实际的生产环境中,使…...

静态网页设计——清雅古筝网(HTML+CSS+JavaScript)
前言 声明:该文章只是做技术分享,若侵权请联系我删除。!! 感谢大佬的视频: https://www.bilibili.com/video/BV1T64y1K7Zn/?vd_source5f425e0074a7f92921f53ab87712357b 使用技术:HTMLCSSJS(…...

实战Flink Java api消费kafka实时数据落盘HDFS
文章目录 1 需求分析2 实验过程2.1 启动服务程序2.2 启动kafka生产 3 Java API 开发3.1 依赖3.2 代码部分 4 实验验证STEP1STEP2STEP3 5 时间窗口 1 需求分析 在Java api中,使用flink本地模式,消费kafka主题,并直接将数据存入hdfs中。 flin…...

爬虫与反爬-localStorage指纹(某易某盾滑块指纹检测)(Hook案例)
概述:本文将用于了解爬虫中localStorage的检测原理以及讲述一个用于检测localStorage的反爬虫案例,最后对该参数进行Hook断点定位 目录: 一、LocalStorage 二、爬虫中localStorage的案例(以某盾滑块为例) 三、如何…...
聊一聊 webpack 和 vite 的开发服务代理的问题
webpack 和 vite webpackVite重新编辑的问题 changOrigin: true如何定义 /api ? webPack And Vite 都是两个比较好用的打包工具,尤其是 Vite, 几几年流行忘记了,特色就是服务启动极快,实现预加载,感觉 webPack 要比 Vite 要复杂一…...

【鸿蒙4.0】安装DevEcoStudio
1.下载安装包 HUAWEI DevEco Studio和SDK下载和升级 | HarmonyOS开发者华为鸿蒙DevEco Studio是面向全场景的一站式集成开发环境,,在鸿蒙官网下载或升级操作系统开发工具DevEco Studio最新版本,SDK配置和下载,2.1支持Mac、Windows操作系统。…...

[概率论]四小时不挂猴博士
贝叶斯公式是什么 贝叶斯公式是概率论中的一个重要定理,用于计算在已知一些先验信息的情况下,更新对事件发生概率的估计。贝叶斯公式的表达式如下: P(A|B) P(B|A) * P(A) / P(B) 其中,P(A|B)表示在事件B发生的条件下事件A发生的概…...

算法通关村第二十关-黄金挑战图的常见算法
大家好我是苏麟 , 今天聊聊图的常见算法 . 图里的算法是很多的,这里我们介绍一些常见的图算法。这些算法一般都比较复杂,我们这里介绍这些算法的基本含义,适合面试的时候装*,如果手写,那就不用啦。 图分析算法…...

服务器内存不足怎么办?会有什么影响?
服务器内存,也被称为RAM(Random Access Memory),是一种临时存储设备,用于临时存放正在运行的程序和数据。它是服务器上的超高速存储介质,可以快速读取和写入数据,提供给CPU进行实时计算和操作。…...

GPT实战系列-简单聊聊LangChain
GPT实战系列-简单聊聊LangChain LLM大模型相关文章: GPT实战系列-ChatGLM3本地部署CUDA111080Ti显卡24G实战方案 GPT实战系列-Baichuan2本地化部署实战方案 GPT实战系列-大话LLM大模型训练 GPT实战系列-探究GPT等大模型的文本生成 GPT实战系列-Baichuan2等大模…...

【读书笔记】《白帽子讲web安全》浏览器安全
目录 第二篇 客户端脚本安全 第2章 浏览器安全 2.1同源策略 2.2浏览器沙箱 2.3恶意网址拦截 2.4高速发展的浏览器安全 第二篇 客户端脚本安全 第2章 浏览器安全 近年来随着互联网的发展,人们发现浏览器才是互联网最大的入口,绝大多数用户使用互联…...

海外服务器2核2G/4G/8G和4核8G配置16M公网带宽优惠价格表
腾讯云海外服务器租用优惠价格表,2核2G10M带宽、2核4G12M、2核8G14M、4核8G16M配置可选,可以选择Linux操作系统或Linux系统,相比较Linux服务器价格要更优惠一些,腾讯云服务器网txyfwq.com分享腾讯云国外服务器租用配置报价&#x…...

Linux 编译安装 Nginx
目录 一、前言二、四种安装方式介绍三、本文安装方式:源码安装3.1、安装依赖库3.2、开始安装 Nginx3.3、Nginx 相关操作3.4、把 Nginx 注册成系统服务 四、结尾 一、前言 Nginx 是一款轻量级的 Web 服务器、[反向代理]服务器,由于它的内存占用少…...
内存分配函数malloc kmalloc vmalloc
内存分配函数malloc kmalloc vmalloc malloc实现步骤: 1)请求大小调整:首先,malloc 需要调整用户请求的大小,以适应内部数据结构(例如,可能需要存储额外的元数据)。通常,这包括对齐调整,确保分配的内存地址满足特定硬件要求(如对齐到8字节或16字节边界)。 2)空闲…...

《从零掌握MIPI CSI-2: 协议精解与FPGA摄像头开发实战》-- CSI-2 协议详细解析 (一)
CSI-2 协议详细解析 (一) 1. CSI-2层定义(CSI-2 Layer Definitions) 分层结构 :CSI-2协议分为6层: 物理层(PHY Layer) : 定义电气特性、时钟机制和传输介质(导线&#…...

linux arm系统烧录
1、打开瑞芯微程序 2、按住linux arm 的 recover按键 插入电源 3、当瑞芯微检测到有设备 4、松开recover按键 5、选择升级固件 6、点击固件选择本地刷机的linux arm 镜像 7、点击升级 (忘了有没有这步了 估计有) 刷机程序 和 镜像 就不提供了。要刷的时…...

【Zephyr 系列 10】实战项目:打造一个蓝牙传感器终端 + 网关系统(完整架构与全栈实现)
🧠关键词:Zephyr、BLE、终端、网关、广播、连接、传感器、数据采集、低功耗、系统集成 📌目标读者:希望基于 Zephyr 构建 BLE 系统架构、实现终端与网关协作、具备产品交付能力的开发者 📊篇幅字数:约 5200 字 ✨ 项目总览 在物联网实际项目中,**“终端 + 网关”**是…...

云原生玩法三问:构建自定义开发环境
云原生玩法三问:构建自定义开发环境 引言 临时运维一个古董项目,无文档,无环境,无交接人,俗称三无。 运行设备的环境老,本地环境版本高,ssh不过去。正好最近对 腾讯出品的云原生 cnb 感兴趣&…...

嵌入式学习笔记DAY33(网络编程——TCP)
一、网络架构 C/S (client/server 客户端/服务器):由客户端和服务器端两个部分组成。客户端通常是用户使用的应用程序,负责提供用户界面和交互逻辑 ,接收用户输入,向服务器发送请求,并展示服务…...

基于Java+MySQL实现(GUI)客户管理系统
客户资料管理系统的设计与实现 第一章 需求分析 1.1 需求总体介绍 本项目为了方便维护客户信息为了方便维护客户信息,对客户进行统一管理,可以把所有客户信息录入系统,进行维护和统计功能。可通过文件的方式保存相关录入数据,对…...
Java毕业设计:WML信息查询与后端信息发布系统开发
JAVAWML信息查询与后端信息发布系统实现 一、系统概述 本系统基于Java和WML(无线标记语言)技术开发,实现了移动设备上的信息查询与后端信息发布功能。系统采用B/S架构,服务器端使用Java Servlet处理请求,数据库采用MySQL存储信息࿰…...

一些实用的chrome扩展0x01
简介 浏览器扩展程序有助于自动化任务、查找隐藏的漏洞、隐藏自身痕迹。以下列出了一些必备扩展程序,无论是测试应用程序、搜寻漏洞还是收集情报,它们都能提升工作流程。 FoxyProxy 代理管理工具,此扩展简化了使用代理(如 Burp…...
跨平台商品数据接口的标准化与规范化发展路径:淘宝京东拼多多的最新实践
在电商行业蓬勃发展的当下,多平台运营已成为众多商家的必然选择。然而,不同电商平台在商品数据接口方面存在差异,导致商家在跨平台运营时面临诸多挑战,如数据对接困难、运营效率低下、用户体验不一致等。跨平台商品数据接口的标准…...