RabbitMQ消息可靠性保证机制4--消费端限流
7.7 消费端限流
在类似如秒杀活动中,一开始会有大量并发写请求到达服务端,城机对消息进行削峰处理,如何做?
当消息投递的速度远快于消费的速度时,随着时间积累就会出现“消息积压”。消息中间件本身是具备一定的缓冲能力的,但这个能力是有容量限制的,如果长期运行并没有任何处理,最终会导致Broker崩溃,而分布式系统的故障往往会发生上下游传递,连锁反应可能会引起系统大范围的宕机,这就很悲剧了
7.7.1 资源限制限流
在RabbitMQ中可对内存和磁盘使用量设置阈值,当达到阈值后,生产者将被阻塞(block),直到对应指标恢复正常。全局上可以防止超大流量、消息积压等导致的Broker被压垮。当内存受限或磁盘可用空间受限的时候,服务器都会暂时阻止连接,服务器将暂停从发布消息的已连接的客户端的套接字读取数据。连接心中监视也将被禁用。所有网络连接将在rabbitmqctl和管理插件中显示为“已阻止”,这意味着它们尚未尝试发布,因此可以或者被阻止,这意味着它们已发布,现在已暂停。兼容的客户端被阻止将收到通知。
在/etc/rabbitmq/rabbitmq.conf
中配置磁盘可用空间大小:
# 磁盘限制阈值设置
# 设置磁盘的可用空间大小,单位字节。当磁盘可用空间低于这个值的时候,发生磁盘告警,触发限流。
# 如果设置了相对大小,则忽略此绝对大小。
disk_free_limit.absolute = 2000# 使用计量单位,从RabbitMQ 3.6.0开始有效,对vm_memory_high_watemark同样有效。
# disk_free_limit.absolute = 500KB
# disk_free_limit.absolute = 50mb
# disk_free_limit.absolute = 5GB# Alternatively, we can set a limit relative to total avaliable RAM.
# Values lower than 1.0 can be dangerous and should be used carefully.
# 还可以使用相对于总可用内存来设置。注意,此值不要低于1.0!
# 当磁盘可用空间低于总可用内存的2.0倍的时候,触发限流
# disk_free_limit.relative = 2.0# 内存限流阈值设置
# 0.4表示阈值总可用内存的比值。 总可用内存表示操作系统给每个进程分配的大小,或者实际内存大小。
# 如32位的Windows,系统给每个进程最大2GB的内存,则此比值表示阈值为820MB
vm_memory_high_watermark.relative = 0.4# 还可以直接通过绝对值限制可用内存大小,单位字节
vm_memory_high_watermark.absolute = 1073741824# 从RabbitMQ 3.6.0开始,绝对值支持计量单位。如果设置了相对值,则忽略此设置值
vm_memory_high_watermark.absolute = 1024MiBk, kiB : kibibytes(2^10 - 1024 bytes)
M, MiB : mibibytes(2^20 - 1024576 bytes)
G, GiB : gibibytes(2^30 - 1073741824 bytes)KB: kilobytes (10^3 - 1000 bytes)
MB: megabytes (10^6 - 1000000 bytes)
GB: gigabytes (10^9 - 1000000000 bytes)
可以通过两种来设置生效
-
临时生效
此配制仅当前生效在重启后将失效。
# 硬盘资源限制
rabbitmqctl set_disk_free_limit 68996808704
# 内存资源限制
rabbitmqctl set_vm_memory_high_watermark 0.4
样例:
[root@nullnull-os rabbitmq]# rabbitmqctl set_disk_free_limit 68996808704
Setting disk free limit on rabbit@nullnull-os to 68996808704 bytes ...
- 长期生效
在rabbitmq.conf的配制文件中加入
# 硬盘限制
disk_free_limit.absolute=68455178240# 内存限制
vm_memory_high_watermark.relative = 0.4
样例:
[root@nullnull-os rabbitmq]# vi /etc/rabbitmq/rabbitmq.conf
# 加入以下内容,注意单位到字节
disk_free_limit.absolute=68455178240[root@nullnull-os rabbitmq]# cat /etc/rabbitmq/rabbitmq.conf
disk_free_limit.absolute=68455178240[root@nullnull-os rabbitmq]# systemctl restart rabbitmq-server
[root@nullnull-os rabbitmq]#
注意,此需要重启rabbitMQ才能生效。
磁盘限制配制参考
Configuring Disk Free Space Limit
The disk free space limit is configured with the disk_free_limit setting. By default 50MB is required to be free on the database partition (see the description of file locations for the default database location). This configuration file sets the disk free space limit to 1GB:
disk_free_limit.absolute = 1000000000
Or you can use memory units (KB, MB GB etc.) like this:
disk_free_limit.absolute = 1GB
It is also possible to set a free space limit relative to the RAM in the machine. This configuration file sets the disk free space limit to the same as the amount of RAM on the machine:
disk_free_limit.relative = 1.0
The limit can be changed while the broker is running using the rabbitmqctl set_disk_free_limit command or rabbitmqctl set_disk_free_limit mem_relative command. This command will take effect until next node restart.
The corresponding configuration setting should also be changed when the effects should survive a node restart.
来自:https://www.rabbitmq.com/disk-alarms.html
内存配制限制参考
https://www.rabbitmq.com/memory.html
Configuring the Memory Threshold
The memory threshold at which the flow control is triggered can be adjusted by editing the configuration file.
The example below sets the threshold to the default value of 0.4:
\# new style config format, recommended vm_memory_high_watermark.relative = 0.4
The default value of 0.4 stands for 40% of available (detected) RAM or 40% of available virtual address space, whichever is smaller. E.g. on a 32-bit platform with 4 GiB of RAM installed, 40% of 4 GiB is 1.6 GiB, but 32-bit Windows normally limits processes to 2 GiB, so the threshold is actually to 40% of 2 GiB (which is 820 MiB).
Alternatively, the memory threshold can be adjusted by setting an absolute limit of RAM used by the node. The example below sets the threshold to 1073741824 bytes (1024 MiB):
vm_memory_high_watermark.absolute = 1073741824
Same example, but using memory units:
vm_memory_high_watermark.absolute = 1024MiB
If the absolute limit is larger than the installed RAM or available virtual address space, the threshold is set to whichever limit is smaller.
The memory limit is appended to the log file when the RabbitMQ node starts:
2019-06-10 23:17:05.976 [info] <0.308.0> Memory high watermark set to 1024 MiB (1073741824 bytes) of 8192 MiB (8589934592 bytes) total
The memory limit may also be queried using the rabbitmq-diagnostics memory_breakdown and rabbitmq-diagnostics status commands.
The threshold can be changed while the broker is running using the
rabbitmqctl set_vm_memory_high_watermark <fraction>
command or
rabbitmqctl set_vm_memory_high_watermark absolute <memory_limit>
For example:
rabbitmqctl set_vm_memory_high_watermark 0.6
and
rabbitmqctl set_vm_memory_high_watermark absolute "4G"
When using the absolute mode, it is possible to use one of the following memory units:
- M, MiB for mebibytes (2^20 bytes)
- MB for megabytes (10^6 bytes)
- G, GiB for gibibytes (2^30 bytes)
- GB for gigabytes (10^9 bytes)
中文配制可参考:https://www.cnblogs.com/kaishirenshi/p/12132703.html
更多配制可参见:https://www.rabbitmq.com/configure.html#config-file
样例程序:
maven导入
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency>
生产程序:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;public class ResourceLimitProduct {public static void main(String[] args) throws Exception {// 资源限制ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel(); ) {// 定义交换器、队列和绑定channel.exchangeDeclare("res.limit.ex", BuiltinExchangeType.DIRECT, true, false, null);channel.queueDeclare("res.limit.qu", true, false, false, null);channel.queueBind("res.limit.qu", "res.limit.ex", "res.limit.rk");// 开启发送方确认机制AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();ConfirmCallback confirm =new ConfirmCallback() {@Overridepublic void handle(long deliveryTag, boolean multiple) throws IOException {if (multiple) {System.out.println("【批量确认】:小于" + deliveryTag + "已经确认");} else {System.out.println("【单条确认】:等于" + deliveryTag + "已经确认");}}};ConfirmCallback nackConfirm =new ConfirmCallback() {@Overridepublic void handle(long deliveryTag, boolean multiple) throws IOException {if (multiple) {System.out.println("【批量不确认】:小于" + deliveryTag + "已经确认");} else {System.out.println("【单条不确认】:等于" + deliveryTag + "已经确认");}}};channel.addConfirmListener(confirm, nackConfirm);for (int i = 0; i < 100000000; i++) {String msg = getKbMessage(i);long sequence = channel.getNextPublishSeqNo();System.out.println("【发送】成功了序列消息:" + sequence);AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();builder.contentType("text/plain");// 发送的消息持久化builder.deliveryMode(2);AMQP.BasicProperties properties = builder.build();channel.basicPublish("res.limit.ex", "res.limit.rk", properties, msg.getBytes(StandardCharsets.UTF_8));Thread.sleep(ThreadLocalRandom.current().nextInt(5, 100));}} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}private static String getKbMessage(int i) {StringBuilder msg = new StringBuilder("发送确认消息:" + i + "--");for (int j = 0; j < 102400; j++) {msg.append(j);}return msg.toString();}
}
设置硬盘资源限制
[root@nullnull-os rabbitmq]# rabbitmqctl set_disk_free_limit 68996808704
Setting disk free limit on rabbit@nullnull-os to 68996808704 bytes ...
运行生产者的应用程序,查看控制台的输出
【发送】成功了序列消息:1
【单条确认】:等于1已经确认
【发送】成功了序列消息:2
【发送】成功了序列消息:3
【单条确认】:等于2已经确认
【发送】成功了序列消息:4
【单条确认】:等于3已经确认
【发送】成功了序列消息:5
......
【单条确认】:等于702已经确认
【单条确认】:等于703已经确认
【发送】成功了序列消息:704
【发送】成功了序列消息:705
【发送】成功了序列消息:706
【发送】成功了序列消息:707
【发送】成功了序列消息:708
【发送】成功了序列消息:709
【发送】成功了序列消息:710
【发送】成功了序列消息:711
到此使用硬盘空间限制的测试完成。
内存资源限制
编辑配制文件rabbitmq.conf
vi /etc/rabbitmqrabbitmq.conf # 添加配制
vm_memory_high_watermark.absolute=120M
重启让其生效
systemctl restart rabbitmq-server
检查配制生效情况
[root@nullnull-os rabbitmq]# rabbitmqctl environment......{trace_vhosts,[]},{vhost_restart_strategy,continue},{vm_memory_calculation_strategy,rss},{vm_memory_high_watermark,{absolute,"120MB"}},{vm_memory_high_watermark_paging_ratio,0.5},{writer_gc_threshold,1000000000}]},{rabbit_common,[]},
......
查看到如下配制说明生效。
运行生产者
观察客户端输出
【发送】成功了序列消息:1
【发送】成功了序列消息:2
【单条确认】:等于1已经确认
【发送】成功了序列消息:3
【单条确认】:等于2已经确认
【单条确认】:等于3已经确认
【发送】成功了序列消息:4
【发送】成功了序列消息:5
【发送】成功了序列消息:6
【单条确认】:等于4已经确认
【单条确认】:等于5已经确认
【单条确认】:等于6已经确认
【发送】成功了序列消息:7
【单条确认】:等于7已经确认
......
【发送】成功了序列消息:174
【单条确认】:等于174已经确认
【发送】成功了序列消息:175
【单条确认】:等于175已经确认
【发送】成功了序列消息:176
【单条确认】:等于176已经确认
【发送】成功了序列消息:177
【发送】成功了序列消息:178
【发送】成功了序列消息:179
【发送】成功了序列消息:180
【发送】成功了序列消息:181
【发送】成功了序列消息:182
【发送】成功了序列消息:183
【发送】成功了序列消息:184
【发送】成功了序列消息:185
【发送】成功了序列消息:186
【发送】成功了序列消息:187
观察网页端的情况
到此内存资源限制而导致的限流测试完成。
7.7.2 默认的credit flow流控
RabbitMQ Credit Flow Mechanism (信用流控制机制) 是 RabbitMQ 使用的一种流量控制机制,旨在确保生产者(publishers)不会发送太多的消息给消费者(consumers),从而导致系统超载或资源耗尽。这个机制主要是为了保护消费者免受生产者发送太多消息的影响。
以下是 RabbitMQ Credit Flow 机制的基本工作原理:
- 信用计数器(Credit Counter):对于每个消费者,RabbitMQ 维护一个称为信用计数器的值。这个计数器表示消费者当前可以接收多少条消息。
- 初始信用额度(Initial Credit):当一个消费者连接到队列并开始消费消息时,RabbitMQ 为该消费者分配一个初始信用额度。这个额度通常与队列中的未确认消息数量有关。
- 消费者确认(Consumer Acknowledgments):当消费者成功处理一条消息并确认它时,它将会恢复一定数量的信用,这允许 RabbitMQ 将更多的消息发送给消费者。
- 信用降低(Decreasing Credit):当消费者未确认消息超出其信用额度时,其信用额度将降低。这会导致生产者无法继续发送消息给该消费者,直到其信用额度恢复。
- 自动降低的消费者(Auto-decrease Consumers):RabbitMQ 还可以配置为自动降低某些消费者的信用,以避免某个消费者占用太多资源。这通常用于处理慢速或长时间处理的消费者。
这个机制有助于平衡生产者和消费者之间的消息流量,防止生产者发送大量消息导致队列爆满,从而提高系统的稳定性和可靠性。
要注意的是,RabbitMQ 的信用流控制机制是可配置的,您可以根据您的需求来调整信用额度和其他参数,以满足特定的应用场景。此外,RabbitMQ 还提供了一些工具和插件,用于监控和管理流量控制,以确保系统的正常运行。
可以通过查看队列的状态信息来了解 Credit Flow 机制的当前状态。以下是一些常见的方式来查看 Credit Flow 状态:
-
RabbitMQ Management UI:RabbitMQ 提供了一个基于 Web 的管理界面,您可以通过该界面查看队列的状态和统计信息,包括队列的消息数量、未确认消息数量以及消费者的状态。要访问管理界面,请确保已启用 RabbitMQ Management 插件。默认情况下,它通常在 http://localhost:15672/ 上运行。
在管理界面中,您可以选择特定的队列,然后查看其状态和相关的统计信息,包括未确认消息数量。这可以帮助您了解 Credit Flow 是否生效,是否有消费者的信用已用尽。
-
命令行工具:您还可以使用 RabbitMQ 的命令行工具来查看队列的状态。以下是一个示例命令,用于查看队列的状态:
rabbitmqctl list_queues name messages consumers messages_unacknowledged
这将显示队列的名称、消息数量、消费者数量以及未确认消息数量。未确认消息数量表示消费者尚未确认的消息数量,这可以用于判断 Credit Flow 是否生效。
-
监控工具:您可以使用监控工具(如Prometheus和Grafana)来设置自定义监控和警报,以便实时跟踪队列的状态和信用流控制情况。通过这些工具,您可以创建仪表板来显示队列的各种指标,包括未确认消息数量和消费者的信用。
通过以上方法,您可以监视 RabbitMQ 中队列的状态和 Credit Flow 机制的工作情况,以确保系统的稳定性和可靠性。
7.7.3 Qos机制
RabbitMQ中有一种Qos保证机制,可以限制channel上接收到的未被Ack的消息数量,如果过这个数量限制RabbitMQ将不会再往消费端推送消息。是一种流控手段,可以防止大量消息瞬时从Broker送达消费端造成消费端巨大压力(甚至压垮消费端)需要注意的是Qos机制仅对消费端推模式有效,对拉模式无效。而且不支持NONE-ACK模式。
执行
channel.basicConsume
方法之前通过channel.basicQos方法可以设置该数量。消息的发送是异步的,消息的确认也是异步的。在消费慢的时候可以设置Qos的prefetchCount,它表示broker在向消费者发送消息的时候,一旦发送了prefetchCount个消息而没有一个消息确认的时候,就停止发送。消费者确认一个.broker就发送一个,确认两个就发送两个,换句话说,消费者确认多少,broker就发送多少,消费者等待处理的个数永远限制在prefetchCount个。如果对于每个消息都发送确认,增加了网络流量,此时可以批量确认消息。如果设置了multiple为true,消费者在确认的时候,比如说id是8的消息确认了,则在8之前的所有消息都确认了。
生产者往往是希望自己产生的消息能快速投递出去,而当消息投递太快县城超过了下游的消费速度时就容易出现消息积压、堆积,所以,从上游来讲我们应该在生产端应用程序中也可以加入限流、应急开关等手段,避免超过broker端的极限承载能力或者压垮下游消费者。
再讲消费者,我们期望消费者能够尽快的消费完消息,而且还要防止瞬时大量消息压垮消费端(推模式),我们期望消费端能够处理速度是最快、最稳定而且还相对均匀(比较理想化)
提供应用吞吐量和缩短消费过程的耗时,主要以下几种方式:
- 优化应用程序的性能,缩短响应时间
- 增加消费节点实例。
- 调整并发消费的线程数。
测试
maven导入:
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency>
生产者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;public class QosProduct {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 定义交换机channel.exchangeDeclare("qos.ex",BuiltinExchangeType.DIRECT,// 持久化标识false,// 是否自动删除false,// 属性信息null);for (int i = 0; i < 100; i++) {String msg = "这是发送的消息:" + i;channel.basicPublish("qos.ex", "qos.rk", null, msg.getBytes(StandardCharsets.UTF_8));}}
}
消费者 :
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.concurrent.ThreadLocalRandom;public class QosConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 定义交换器、队列和绑定channel.exchangeDeclare("qos.ex", BuiltinExchangeType.DIRECT, false, false, null);channel.queueDeclare("qos.qu", false, false, false, null);channel.queueBind("qos.qu", "qos.ex", "qos.rk");// 设置Qos为5,未被确认ACK的为5,还有一个参数,即是否为全局,true为全局channel.basicQos(5);channel.basicConsume("qos.qu",false,new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {LocalDateTime time = LocalDateTime.now();System.out.println("[消费]" + time + "+收到的消息:" + new String(body, StandardCharsets.UTF_8));int randomSleep = ThreadLocalRandom.current().nextInt(20, 1000);try {Thread.sleep(randomSleep);} catch (InterruptedException e) {e.printStackTrace();}if (envelope.getDeliveryTag() % 3 == 0) {// 进行消息确认channel.basicAck(envelope.getDeliveryTag(), true);}}});}
}
测试
先启动消费都,再启动生产者,查看控制台输出
[消费]2023-08-25T12:08:13.143+收到的消息:这是发送的消息:0
[消费]2023-08-25T12:08:13.765+收到的消息:这是发送的消息:1
[消费]2023-08-25T12:08:14.127+收到的消息:这是发送的消息:2
[消费]2023-08-25T12:08:14.892+收到的消息:这是发送的消息:3
......
[消费]2023-08-25T12:08:57.437+收到的消息:这是发送的消息:96
[消费]2023-08-25T12:08:57.530+收到的消息:这是发送的消息:97
[消费]2023-08-25T12:08:57.566+收到的消息:这是发送的消息:98
[消费]2023-08-25T12:08:57.649+收到的消息:这是发送的消息:99
查看队列的情况:
[root@nullnull-os ~]# rabbitmqctl list_channels name,prefetch_count,global_prefetch_count --formatter pretty_table
Listing channels ...
┌───────────────────────────────────────────┬────────────────┬───────────────────────┐
│ name │ prefetch_count │ global_prefetch_count │
├───────────────────────────────────────────┼────────────────┼───────────────────────┤
│ 61.170.208.88:59116 -> 10.0.4.16:5672 (1) │ 5 │ 0 │
└───────────────────────────────────────────┴────────────────┴───────────────────────┘
[root@nullnull-os ~]#
网页端查看
并行消费者
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;public class QosThreadConsumer {public static void main(String[] args) throws Exception {// 资源限制ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");// 设置channel并发请求最大数factory.setRequestedChannelMax(5);// 自定义线程池工厂ThreadFactory thsFactory = Executors.privilegedThreadFactory();factory.setThreadFactory(thsFactory);Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 定义交换器、队列和绑定channel.exchangeDeclare("qos.ex", BuiltinExchangeType.DIRECT, false, false, null);channel.queueDeclare("qos.qu", false, false, false, null);channel.queueBind("qos.qu", "qos.ex", "qos.rk");// 设置每秒处理2个channel.basicQos(5, true);channel.basicConsume("qos.qu",false,new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {LocalDateTime time = LocalDateTime.now();long threadId = Thread.currentThread().getId();System.out.println("[消费]"+ time+ ",线程:"+ threadId+ ",收到的消息:"+ new String(body, StandardCharsets.UTF_8));int randomSleep = ThreadLocalRandom.current().nextInt(20, 1000);try {Thread.sleep(randomSleep);} catch (InterruptedException e) {e.printStackTrace();}if (envelope.getDeliveryTag() % 3 == 0) {// 进行消息确认channel.basicAck(envelope.getDeliveryTag(), true);}}});}
}
控制台输出:
[消费]2023-08-26T09:37:21.430,线程:24,收到的消息:这是发送的消息:0
[消费]2023-08-26T09:37:21.866,线程:25,收到的消息:这是发送的消息:1
[消费]2023-08-26T09:37:22.434,线程:25,收到的消息:这是发送的消息:2
[消费]2023-08-26T09:37:22.847,线程:25,收到的消息:这是发送的消息:3
[消费]2023-08-26T09:37:23.685,线程:25,收到的消息:这是发送的消息:4
[消费]2023-08-26T09:37:23.847,线程:26,收到的消息:这是发送的消息:5
......
[消费]2023-08-26T09:39:10.684,线程:28,收到的消息:这是发送的消息:526
[消费]2023-08-26T09:39:10.695,线程:32,收到的消息:这是发送的消息:527
[消费]2023-08-26T09:39:10.767,线程:32,收到的消息:这是发送的消息:528
......
[消费]2023-08-26T09:39:58.270,线程:27,收到的消息:这是发送的消息:996
[消费]2023-08-26T09:39:58.405,线程:27,收到的消息:这是发送的消息:997
[消费]2023-08-26T09:39:58.575,线程:27,收到的消息:这是发送的消息:998
[消费]2023-08-26T09:39:58.671,线程:27,收到的消息:这是发送的消息:999
如果Qos设置为全局,则可以看到到
[root@nullnull-os ~]# rabbitmqctl list_channels name,prefetch_count,global_prefetch_count --formatter pretty_table
Listing channels ...
┌───────────────────────────────────────────┬────────────────┬───────────────────────┐
│ name │ prefetch_count │ global_prefetch_count │
├───────────────────────────────────────────┼────────────────┼───────────────────────┤
│ 61.170.208.88:60591 -> 10.0.4.16:5672 (1) │ 0 │ 5 │
├───────────────────────────────────────────┼────────────────┼───────────────────────┤
│ 61.170.208.88:60610 -> 10.0.4.16:5672 (1) │ 0 │ 0 │
└───────────────────────────────────────────┴────────────────┴───────────────────────┘
[root@nullnull-os ~]#
相关文章:
RabbitMQ消息可靠性保证机制4--消费端限流
7.7 消费端限流 在类似如秒杀活动中,一开始会有大量并发写请求到达服务端,城机对消息进行削峰处理,如何做? 当消息投递的速度远快于消费的速度时,随着时间积累就会出现“消息积压”。消息中间件本身是具备一定的缓冲…...
查找萤石云IOS Sdk中的编解码接口
2021/1/20 以前的时候,碰到的问题,想把萤石云视频介入到TRTC,但是... 萤石云的IOS接口中没有相应的解码播放库,也就是找不到PlayerSDK对应部分,怎么做呢? 一个是坐等萤石云开放这部分接口,可能…...
erchas
#include <iostream> #include <vector> https://gitee.com/tongchaowei/front-native-page-template/tree/main/image-display/template-01 using namespace std; class BinaryTree { private: vector<char> tree; // 存储二叉树的数组 int size;…...
【网络安全】SSL(一):为什么需要 Keyless SSL?
未经许可,不得转载。 文章目录 背景正文背景 随着网站和应用程序向云端迁移,使用 HTTPS(SSL/TLS)加密流量已成为行业标准。然而,传统的 HTTPS 配置要求服务器持有网站的私钥,这在云计算环境中引发了一系列安全性和合规性问题。一旦云服务器遭到攻击,私钥泄露可能带来不…...
ggplot2 分面图等添加注释文字,相加哪里加哪里: 自定义函数 AddText()
如果分面图上还想再添加文字,只能使用底层的grid包了。 函数定义 # Add text to ggplot2 figures # # param label text you want to put on figure # param x position x, left is 0, right 1 # param y position y, bottom is 0, up 1 # param color text color…...
解读缓存问题的技术旅程
目录 前言1. 问题的突发与初步猜测2. 缓存的“隐身术”3. 缓存策略的深层优化4. 反思与感悟结语 前言 那是一个普通的工作日,团队例行的早会刚刚结束,我正准备继续优化手头的模块时,突然收到了用户反馈。反馈的内容是部分数据显示异常&#…...
洛谷P1597
语句解析 - 洛谷 语句解析 题目背景 木有背景…… 题目描述 一串长度不超过255的 PASCAL 语言代码,只有 a,b,c 三个变量,而且只有赋值语句,赋值只能是一个一位的数字或一个变量,每条赋值语句的格式是 [变量]:[变量或一位整数…...
2411rust,76~79
1.76.0稳定版 此版本较小 ABI兼容更新 函数指针文档中新增的ABI兼容部分介绍了函数签名与ABI兼容的意义.大部分是参数类型和返回类型的兼容,及在当前Rust中兼容的列表.文档仅描述现有兼容的状态. 一个新增功能是,现在保证符和u32是ABI兼容的.它们一直有相同大小和对齐方式,…...
vue2.0前端管理系统界面布局设置
前言 后台管理系统的核心就是用户管理、角色管理(含权限分配)、菜单管理,以及一些业务管理。业务管理通常以及根据不同的角色进行了权限分配。本次任务完成用户管理页面。 一 界面设计 1.引用Element 的Container 布局容器。 以上次博客中…...
4. SQL视图
MySQL中的视图(View)是一种虚拟表,本质是存储了一条SELECT语句。视图并不直接存储数据,而是动态生成结果集,帮助开发者简化查询逻辑和增强数据安全性。本文将从视图的基础概念到实际应用,逐步深入地探讨如何…...
Simulink学习笔记【PID UG联动仿真】
Simulink进行PID控制及调参: 建立系统动力学框图(把状态方程翻译出来),设置成subsystem建立PID反馈回路。示波器叫scope,多变量输出用demux和mux。可以用自动调参Tune模块,调整响应速度和稳定性࿰…...
【Python】30个Python爬虫的实战项目!!!(附源码)
Python爬虫是数据采集自动化的利器。本文精选了30个实用的Python爬虫项目,从基础到进阶,每个项目都配有完整源码和详细讲解。通过这些项目的实战,可以全面掌握网页数据抓取、反爬处理、并发下载等核心技能。 一、环境准备 在开始爬虫项目前…...
uni-app 界面TabBar中间大图标设置的两种方法
一、前言 最近写基于uni-app 写app项目的时候,底部导航栏 中间有一个固定的大图标,并且没有激活状态。这里记录下实现方案。效果如下(党组织这个图标): 方法一:midButton的使用 官方文档:ta…...
什么是Sass,有什么特点
Sass 概述 什么是 Sass? Sass(Syntactically Awesome Style Sheets)是一种 CSS 预处理器,它扩展了 CSS 的功能,使其更加强大和灵活。Sass 允许开发者使用变量、嵌套规则、混合宏、继承等高级特性,从而编写…...
服务器端渲染 (SSR) 与客户端渲染 (CSR)
嘿程序员!我们都知道,新时代的 Javascript 已经彻底改变了现代网站的结构和用户体验。如今,网站的构建更像是一个应用程序,伪装成一个能够发送电子邮件、通知、聊天、购物、支付等的网站。今天的网站是如此先进、互动,…...
数据结构(Java版)第一期:时间复杂度和空间复杂度
目录 一、数据结构的概念 1.1. 什么是数据结构 1.2. 算法与数据结构的关系 二、算法效率 三、时间复杂度 3.1. 大O的渐进表⽰法 3.2. 计算冒泡排序的时间复杂度 3.3. 计算二分查找的时间复杂度 四、空间复杂度 4.1. 空间复杂度 4.2. 冒泡排序的空间复杂度 4.3.…...
基于web的音乐网站(Java+SpringBoot+Mysql)
目录 1系统概述 1.1 研究背景 1.2研究目的 1.3系统设计思想 2相关技术 2.1 MYSQL数据库 2.2 B/S结构 2.3 Spring Boot框架简介 3系统分析 3.1可行性分析 3.1.1技术可行性 3.1.2经济可行性 3.1.3操作可行性 3.2系统性能分析 3.2.1 系统安全性 3.2.2 数据完整性 …...
用go语言后端开发速查
文章目录 一、发送请求和接收请求示例1.1 发送请求1.2 接收请求 二、发送form-data格式的数据示例 用go语言发送请求和接收请求的快速参考 一、发送请求和接收请求示例 1.1 发送请求 package mainimport ("bytes""encoding/json""fmt""ne…...
GeekChallenge 2024 第十五届极客大挑战 pwn AK
GeekChallenge 2024 第十五届极客大挑战 pwn AK 🍀前言☘️ez_shellcode(shellcode,栈溢出)🌿分析🌿解题🌿exp ☘️买黑吗喽了吗(整数溢出,栈溢出)dz…...
禅道是什么,nas是什么,ssh是什么,finalshell是什么,git命令feat 、fix分别什么意思
禅道(Zentao)是一款开源的项目管理软件,专为软件开发团队设计。它集成了项目管理、产品管理、质量管理、文档管理和事务管理等多种功能,旨在帮助团队提高工作效率和项目交付质量。禅道支持敏捷开发方法,同时也适用于传…...
点云-半径搜索法-Radius Search
核心作用 在于通过设定一个空间范围(半径)寻找点的邻域点集合,从而支持对局部区域的分析和操作。 因为空间半径不会随着密度变化而改变点云输出的结果,处理密度变化大的点云时很重要。 应用场景 稀疏点检测:当点云密度…...
P11290 【MX-S6-T2】「KDOI-11」飞船
题目大意:有i种加油站,最开始速度为1,每次加油可以使速度*v,每次加油有一个时间代价,求到达终点所需最小时间。 思路:不妨考虑dp,贪心是错误的。 对于速度而言,,所以速…...
WebGIS地图框架有哪些?
地理信息系统(GIS)已经成为现代应用开发中不可或缺的一部分,尤其在前端开发中。随着Web技术的快速发展,许多强大而灵活的GIS框架涌现出来,为开发人员提供了丰富的工具和功能,使他们能够创建交互式、高性能的…...
量化加速知识点(整理中。。。)
量化的基本概念 通过减少模型中计算精度,从而减少模型计算所需要的访存量。 参考...
BLIP-2模型的详解与思考
大模型学习笔记------BLIP-2模型的详解与思考 1、BLIP-2框架概述2、BLIP-2网络结构详解3、BLIP-2的几点思考 上一篇文章上文中讲解了 BLIP(Bootstrapping Language-Image Pretraining)模型的一些思考,本文将讲述一个BLIP的升级版 BLIP-2&am…...
2024年11月22日 十二生肖 今日运势
小运播报:2024年11月22日,星期五,农历十月廿二 (甲辰年乙亥月庚寅日),法定工作日。 红榜生肖:马、猪、狗 需要注意:牛、蛇、猴 喜神方位:西北方 财神方位:…...
小米C++ 面试题及参考答案上(120道面试题覆盖各种类型八股文)
进程和线程的联系和区别 进程是资源分配的基本单位,它拥有自己独立的地址空间、代码段、数据段和堆栈等。线程是进程中的一个执行单元,是 CPU 调度的基本单位。 联系方面,线程是进程的一部分,一个进程可以包含多个线程。它们都用于…...
SQL SELECT 语句:基础与进阶应用
SQL SELECT 语句:基础与进阶应用 SQL(Structured Query Language)是一种用于管理关系数据库的编程语言。在SQL中,SELECT语句是最常用的命令之一,用于从数据库表中检索数据。本文将详细介绍SELECT语句的基础用法&#…...
微服务即时通讯系统的实现(服务端)----(1)
目录 1. 项目介绍和服务器功能设计2. 基础工具安装3. gflags的安装与使用3.1 gflags的介绍3.2 gflags的安装3.3 gflags的认识3.4 gflags的使用 4. gtest的安装与使用4.1 gtest的介绍4.2 gtest的安装4.3 gtest的使用 5 Spdlog日志组件的安装与使用5.1 Spdlog的介绍5.2 Spdlog的安…...
《Spring 依赖注入方式全解析》
一、Spring 依赖注入概述 Spring 依赖注入(Dependency Injection,DI)是一种重要的设计模式,它在 Spring 框架中扮演着关键角色。依赖注入的核心概念是将对象所需的依赖关系由外部容器(通常是 Spring 容器)进…...
杭州有哪些做网站的公司/南京市网站
以下是zen cart 首页程序的修改。根据各个文件修改不同的功能。 首页界面://include/templates/zccn/common/tpl_main_page.php首页主样式表://include/templates/zccn/css/schinese_stylesheet.css 首页左边栏目:/includes/templates/templa…...
网站logo设计教程/八上数学优化设计答案
Zabbix-2.4.3监控系统安装配置 环境 操作系统:Red Hat Enterprise Linux Server release 6.4 (Santiago) 32位 zabbix版本:zabbix-2.4.3.tar.gz 一、zabbix WEB环境搭建 zabbix的安装需要LAMP或者LNMP环境。 #yum install httpd mysql mysql-server mysq…...
网站 什么语言开发/建设网站制作公司
作业二:编写登陆接口 输入用户名密码认证成功后显示欢迎信息输错三次后锁定http://www.cnblogs.com/alex3714/articles/5465198.html 自己写的第一个长一点的Python程序,当作纪念啦~ 要在e盘里面有四个文件才可以运行哦~ 1 #! /usr/bin/env python2 # -*…...
网站推广互联网推广/火星时代教育培训机构官网
boostrap依赖jquery,需要在引入boostrap之前引入jquery库...
做视频比较好的理财网站有哪些/免费的seo教程
Oracle 11gR2 : ACFS 文件系统 呼之欲出在ITPUB上看到Kamus透露的一些关于Oracle Database 11gR2的消息,学习记录一下。1. Oracle Cluster Repository (OCR) and Voting Disk stored in ASMThis feature enables ASM to provide a unified storage solution, storin…...
做女装代理需要自建网站么/怎么做好网络营销
用matrix[0][j] 记录第j列是否为0 用matrix[i][0]记录第i行是否为0 因为在0 0 位置有重复,所以新开个变量记录第0行或者第0列 更新的时候先更新从第1行开始和从第1列开始 妙啊 哭了 class Solution {public void setZeroes(int[][] matrix) {int m matrix.len…...