RabbitMQ消息可靠性保证机制4--消费端限流

7.7 消费端限流

在类似如秒杀活动中,一开始会有大量并发写请求到达服务端,城机对消息进行削峰处理,如何做?

当消息投递的速度远快于消费的速度时,随着时间积累就会出现“消息积压”。消息中间件本身是具备一定的缓冲能力的,但这个能力是有容量限制的,如果长期运行并没有任何处理,最终会导致Broker崩溃,而分布式系统的故障往往会发生上下游传递,连锁反应可能会引起系统大范围的宕机,这就很悲剧了

7.7.1 资源限制限流

在RabbitMQ中可对内存和磁盘使用量设置阈值,当达到阈值后,生产者将被阻塞(block),直到对应指标恢复正常。全局上可以防止超大流量、消息积压等导致的Broker被压垮。当内存受限或磁盘可用空间受限的时候,服务器都会暂时阻止连接,服务器将暂停从发布消息的已连接的客户端的套接字读取数据。连接心中监视也将被禁用。所有网络连接将在rabbitmqctl和管理插件中显示为“已阻止”,这意味着它们尚未尝试发布,因此可以或者被阻止,这意味着它们已发布,现在已暂停。兼容的客户端被阻止将收到通知。

/etc/rabbitmq/rabbitmq.conf中配置磁盘可用空间大小:

# 磁盘限制阈值设置
# 设置磁盘的可用空间大小,单位字节。当磁盘可用空间低于这个值的时候,发生磁盘告警,触发限流。
# 如果设置了相对大小,则忽略此绝对大小。
disk_free_limit.absolute = 2000# 使用计量单位,从RabbitMQ 3.6.0开始有效,对vm_memory_high_watemark同样有效。
# disk_free_limit.absolute = 500KB
# disk_free_limit.absolute = 50mb
# disk_free_limit.absolute = 5GB# Alternatively, we can set a limit relative to total avaliable RAM.
# Values lower than 1.0 can be dangerous and should be used carefully.
# 还可以使用相对于总可用内存来设置。注意,此值不要低于1.0!
# 当磁盘可用空间低于总可用内存的2.0倍的时候,触发限流
# disk_free_limit.relative = 2.0# 内存限流阈值设置
# 0.4表示阈值总可用内存的比值。 总可用内存表示操作系统给每个进程分配的大小,或者实际内存大小。
# 如32位的Windows,系统给每个进程最大2GB的内存,则此比值表示阈值为820MB
vm_memory_high_watermark.relative = 0.4# 还可以直接通过绝对值限制可用内存大小,单位字节
vm_memory_high_watermark.absolute = 1073741824# 从RabbitMQ 3.6.0开始,绝对值支持计量单位。如果设置了相对值,则忽略此设置值
vm_memory_high_watermark.absolute = 1024MiBk, kiB : kibibytes(2^10 - 1024 bytes)
M, MiB : mibibytes(2^20 - 1024576 bytes)
G, GiB : gibibytes(2^30 - 1073741824 bytes)KB: kilobytes (10^3 - 1000 bytes)
MB: megabytes (10^6 - 1000000 bytes)
GB: gigabytes (10^9 - 1000000000 bytes)

可以通过两种来设置生效

  1. 临时生效

    此配制仅当前生效在重启后将失效。

# 硬盘资源限制
rabbitmqctl set_disk_free_limit 68996808704
# 内存资源限制
rabbitmqctl set_vm_memory_high_watermark 0.4

样例:

[root@nullnull-os rabbitmq]# rabbitmqctl set_disk_free_limit 68996808704
Setting disk free limit on rabbit@nullnull-os to 68996808704 bytes ...
  1. 长期生效

在rabbitmq.conf的配制文件中加入

# 硬盘限制 
disk_free_limit.absolute=68455178240# 内存限制
vm_memory_high_watermark.relative = 0.4

样例:

[root@nullnull-os rabbitmq]# vi  /etc/rabbitmq/rabbitmq.conf 
# 加入以下内容,注意单位到字节
disk_free_limit.absolute=68455178240[root@nullnull-os rabbitmq]# cat /etc/rabbitmq/rabbitmq.conf 
disk_free_limit.absolute=68455178240[root@nullnull-os rabbitmq]# systemctl restart rabbitmq-server
[root@nullnull-os rabbitmq]# 

注意,此需要重启rabbitMQ才能生效。

磁盘限制配制参考

Configuring Disk Free Space Limit

The disk free space limit is configured with the disk_free_limit setting. By default 50MB is required to be free on the database partition (see the description of file locations for the default database location). This configuration file sets the disk free space limit to 1GB:

disk_free_limit.absolute = 1000000000

Or you can use memory units (KB, MB GB etc.) like this:

disk_free_limit.absolute = 1GB

It is also possible to set a free space limit relative to the RAM in the machine. This configuration file sets the disk free space limit to the same as the amount of RAM on the machine:

disk_free_limit.relative = 1.0

The limit can be changed while the broker is running using the rabbitmqctl set_disk_free_limit command or rabbitmqctl set_disk_free_limit mem_relative command. This command will take effect until next node restart.

The corresponding configuration setting should also be changed when the effects should survive a node restart.

来自:https://www.rabbitmq.com/disk-alarms.html

内存配制限制参考

https://www.rabbitmq.com/memory.html

Configuring the Memory Threshold

The memory threshold at which the flow control is triggered can be adjusted by editing the configuration file.

The example below sets the threshold to the default value of 0.4:

\# new style config format, recommended
vm_memory_high_watermark.relative = 0.4

The default value of 0.4 stands for 40% of available (detected) RAM or 40% of available virtual address space, whichever is smaller. E.g. on a 32-bit platform with 4 GiB of RAM installed, 40% of 4 GiB is 1.6 GiB, but 32-bit Windows normally limits processes to 2 GiB, so the threshold is actually to 40% of 2 GiB (which is 820 MiB).

Alternatively, the memory threshold can be adjusted by setting an absolute limit of RAM used by the node. The example below sets the threshold to 1073741824 bytes (1024 MiB):

vm_memory_high_watermark.absolute = 1073741824

Same example, but using memory units:

vm_memory_high_watermark.absolute = 1024MiB

If the absolute limit is larger than the installed RAM or available virtual address space, the threshold is set to whichever limit is smaller.

The memory limit is appended to the log file when the RabbitMQ node starts:

2019-06-10 23:17:05.976 [info] <0.308.0> Memory high watermark set to 1024 MiB (1073741824 bytes) of 8192 MiB (8589934592 bytes) total

The memory limit may also be queried using the rabbitmq-diagnostics memory_breakdown and rabbitmq-diagnostics status commands.

The threshold can be changed while the broker is running using the

rabbitmqctl set_vm_memory_high_watermark <fraction>

command or

rabbitmqctl set_vm_memory_high_watermark absolute <memory_limit>

For example:

rabbitmqctl set_vm_memory_high_watermark 0.6

and

rabbitmqctl set_vm_memory_high_watermark absolute "4G"

When using the absolute mode, it is possible to use one of the following memory units:

  • M, MiB for mebibytes (2^20 bytes)
  • MB for megabytes (10^6 bytes)
  • G, GiB for gibibytes (2^30 bytes)
  • GB for gigabytes (10^9 bytes)

中文配制可参考:https://www.cnblogs.com/kaishirenshi/p/12132703.html

更多配制可参见:https://www.rabbitmq.com/configure.html#config-file

样例程序:

maven导入

            <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency>

生产程序:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;public class ResourceLimitProduct {public static void main(String[] args) throws Exception {// 资源限制ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel(); ) {// 定义交换器、队列和绑定channel.exchangeDeclare("res.limit.ex", BuiltinExchangeType.DIRECT, true, false, null);channel.queueDeclare("res.limit.qu", true, false, false, null);channel.queueBind("res.limit.qu", "res.limit.ex", "res.limit.rk");// 开启发送方确认机制AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();ConfirmCallback confirm =new ConfirmCallback() {@Overridepublic void handle(long deliveryTag, boolean multiple) throws IOException {if (multiple) {System.out.println("【批量确认】:小于" + deliveryTag + "已经确认");} else {System.out.println("【单条确认】:等于" + deliveryTag + "已经确认");}}};ConfirmCallback nackConfirm =new ConfirmCallback() {@Overridepublic void handle(long deliveryTag, boolean multiple) throws IOException {if (multiple) {System.out.println("【批量不确认】:小于" + deliveryTag + "已经确认");} else {System.out.println("【单条不确认】:等于" + deliveryTag + "已经确认");}}};channel.addConfirmListener(confirm, nackConfirm);for (int i = 0; i < 100000000; i++) {String msg = getKbMessage(i);long sequence = channel.getNextPublishSeqNo();System.out.println("【发送】成功了序列消息:" + sequence);AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();builder.contentType("text/plain");// 发送的消息持久化builder.deliveryMode(2);AMQP.BasicProperties properties = builder.build();channel.basicPublish("res.limit.ex", "res.limit.rk", properties, msg.getBytes(StandardCharsets.UTF_8));Thread.sleep(ThreadLocalRandom.current().nextInt(5, 100));}} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}private static String getKbMessage(int i) {StringBuilder msg = new StringBuilder("发送确认消息:" + i + "--");for (int j = 0; j < 102400; j++) {msg.append(j);}return msg.toString();}
}

设置硬盘资源限制

[root@nullnull-os rabbitmq]# rabbitmqctl set_disk_free_limit 68996808704
Setting disk free limit on rabbit@nullnull-os to 68996808704 bytes ...

运行生产者的应用程序,查看控制台的输出

【发送】成功了序列消息:1
【单条确认】:等于1已经确认
【发送】成功了序列消息:2
【发送】成功了序列消息:3
【单条确认】:等于2已经确认
【发送】成功了序列消息:4
【单条确认】:等于3已经确认
【发送】成功了序列消息:5
......
【单条确认】:等于702已经确认
【单条确认】:等于703已经确认
【发送】成功了序列消息:704
【发送】成功了序列消息:705
【发送】成功了序列消息:706
【发送】成功了序列消息:707
【发送】成功了序列消息:708
【发送】成功了序列消息:709
【发送】成功了序列消息:710
【发送】成功了序列消息:711

到此使用硬盘空间限制的测试完成。

内存资源限制

编辑配制文件rabbitmq.conf

vi /etc/rabbitmqrabbitmq.conf # 添加配制
vm_memory_high_watermark.absolute=120M

重启让其生效

systemctl restart rabbitmq-server

检查配制生效情况

[root@nullnull-os rabbitmq]# rabbitmqctl environment......{trace_vhosts,[]},{vhost_restart_strategy,continue},{vm_memory_calculation_strategy,rss},{vm_memory_high_watermark,{absolute,"120MB"}},{vm_memory_high_watermark_paging_ratio,0.5},{writer_gc_threshold,1000000000}]},{rabbit_common,[]},
......

查看到如下配制说明生效。

运行生产者

观察客户端输出

【发送】成功了序列消息:1
【发送】成功了序列消息:2
【单条确认】:等于1已经确认
【发送】成功了序列消息:3
【单条确认】:等于2已经确认
【单条确认】:等于3已经确认
【发送】成功了序列消息:4
【发送】成功了序列消息:5
【发送】成功了序列消息:6
【单条确认】:等于4已经确认
【单条确认】:等于5已经确认
【单条确认】:等于6已经确认
【发送】成功了序列消息:7
【单条确认】:等于7已经确认
......
【发送】成功了序列消息:174
【单条确认】:等于174已经确认
【发送】成功了序列消息:175
【单条确认】:等于175已经确认
【发送】成功了序列消息:176
【单条确认】:等于176已经确认
【发送】成功了序列消息:177
【发送】成功了序列消息:178
【发送】成功了序列消息:179
【发送】成功了序列消息:180
【发送】成功了序列消息:181
【发送】成功了序列消息:182
【发送】成功了序列消息:183
【发送】成功了序列消息:184
【发送】成功了序列消息:185
【发送】成功了序列消息:186
【发送】成功了序列消息:187

观察网页端的情况

在这里插入图片描述

到此内存资源限制而导致的限流测试完成。

7.7.2 默认的credit flow流控

RabbitMQ Credit Flow Mechanism (信用流控制机制) 是 RabbitMQ 使用的一种流量控制机制,旨在确保生产者(publishers)不会发送太多的消息给消费者(consumers),从而导致系统超载或资源耗尽。这个机制主要是为了保护消费者免受生产者发送太多消息的影响。

以下是 RabbitMQ Credit Flow 机制的基本工作原理:

  1. 信用计数器(Credit Counter):对于每个消费者,RabbitMQ 维护一个称为信用计数器的值。这个计数器表示消费者当前可以接收多少条消息。
  2. 初始信用额度(Initial Credit):当一个消费者连接到队列并开始消费消息时,RabbitMQ 为该消费者分配一个初始信用额度。这个额度通常与队列中的未确认消息数量有关。
  3. 消费者确认(Consumer Acknowledgments):当消费者成功处理一条消息并确认它时,它将会恢复一定数量的信用,这允许 RabbitMQ 将更多的消息发送给消费者。
  4. 信用降低(Decreasing Credit):当消费者未确认消息超出其信用额度时,其信用额度将降低。这会导致生产者无法继续发送消息给该消费者,直到其信用额度恢复。
  5. 自动降低的消费者(Auto-decrease Consumers):RabbitMQ 还可以配置为自动降低某些消费者的信用,以避免某个消费者占用太多资源。这通常用于处理慢速或长时间处理的消费者。

这个机制有助于平衡生产者和消费者之间的消息流量,防止生产者发送大量消息导致队列爆满,从而提高系统的稳定性和可靠性。

要注意的是,RabbitMQ 的信用流控制机制是可配置的,您可以根据您的需求来调整信用额度和其他参数,以满足特定的应用场景。此外,RabbitMQ 还提供了一些工具和插件,用于监控和管理流量控制,以确保系统的正常运行。

可以通过查看队列的状态信息来了解 Credit Flow 机制的当前状态。以下是一些常见的方式来查看 Credit Flow 状态:

  1. RabbitMQ Management UI:RabbitMQ 提供了一个基于 Web 的管理界面,您可以通过该界面查看队列的状态和统计信息,包括队列的消息数量、未确认消息数量以及消费者的状态。要访问管理界面,请确保已启用 RabbitMQ Management 插件。默认情况下,它通常在 http://localhost:15672/ 上运行。

    在管理界面中,您可以选择特定的队列,然后查看其状态和相关的统计信息,包括未确认消息数量。这可以帮助您了解 Credit Flow 是否生效,是否有消费者的信用已用尽。

  2. 命令行工具:您还可以使用 RabbitMQ 的命令行工具来查看队列的状态。以下是一个示例命令,用于查看队列的状态:

    rabbitmqctl list_queues name messages consumers messages_unacknowledged
    

    这将显示队列的名称、消息数量、消费者数量以及未确认消息数量。未确认消息数量表示消费者尚未确认的消息数量,这可以用于判断 Credit Flow 是否生效。

  3. 监控工具:您可以使用监控工具(如Prometheus和Grafana)来设置自定义监控和警报,以便实时跟踪队列的状态和信用流控制情况。通过这些工具,您可以创建仪表板来显示队列的各种指标,包括未确认消息数量和消费者的信用。

通过以上方法,您可以监视 RabbitMQ 中队列的状态和 Credit Flow 机制的工作情况,以确保系统的稳定性和可靠性。

在这里插入图片描述

7.7.3 Qos机制

RabbitMQ中有一种Qos保证机制,可以限制channel上接收到的未被Ack的消息数量,如果过这个数量限制RabbitMQ将不会再往消费端推送消息。是一种流控手段,可以防止大量消息瞬时从Broker送达消费端造成消费端巨大压力(甚至压垮消费端)需要注意的是Qos机制仅对消费端推模式有效,对拉模式无效。而且不支持NONE-ACK模式。

执行channel.basicConsume方法之前通过channel.basicQos方法可以设置该数量。消息的发送是异步的,消息的确认也是异步的。在消费慢的时候可以设置Qos的prefetchCount,它表示broker在向消费者发送消息的时候,一旦发送了prefetchCount个消息而没有一个消息确认的时候,就停止发送。消费者确认一个.broker就发送一个,确认两个就发送两个,换句话说,消费者确认多少,broker就发送多少,消费者等待处理的个数永远限制在prefetchCount个。

如果对于每个消息都发送确认,增加了网络流量,此时可以批量确认消息。如果设置了multiple为true,消费者在确认的时候,比如说id是8的消息确认了,则在8之前的所有消息都确认了。

生产者往往是希望自己产生的消息能快速投递出去,而当消息投递太快县城超过了下游的消费速度时就容易出现消息积压、堆积,所以,从上游来讲我们应该在生产端应用程序中也可以加入限流、应急开关等手段,避免超过broker端的极限承载能力或者压垮下游消费者。

再讲消费者,我们期望消费者能够尽快的消费完消息,而且还要防止瞬时大量消息压垮消费端(推模式),我们期望消费端能够处理速度是最快、最稳定而且还相对均匀(比较理想化)

提供应用吞吐量和缩短消费过程的耗时,主要以下几种方式:

  1. 优化应用程序的性能,缩短响应时间
  2. 增加消费节点实例。
  3. 调整并发消费的线程数。

测试

maven导入:

            <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency>

生产者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;public class QosProduct {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 定义交换机channel.exchangeDeclare("qos.ex",BuiltinExchangeType.DIRECT,// 持久化标识false,// 是否自动删除false,// 属性信息null);for (int i = 0; i < 100; i++) {String msg = "这是发送的消息:" + i;channel.basicPublish("qos.ex", "qos.rk", null, msg.getBytes(StandardCharsets.UTF_8));}}
}

消费者 :

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.concurrent.ThreadLocalRandom;public class QosConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 定义交换器、队列和绑定channel.exchangeDeclare("qos.ex", BuiltinExchangeType.DIRECT, false, false, null);channel.queueDeclare("qos.qu", false, false, false, null);channel.queueBind("qos.qu", "qos.ex", "qos.rk");// 设置Qos为5,未被确认ACK的为5,还有一个参数,即是否为全局,true为全局channel.basicQos(5);channel.basicConsume("qos.qu",false,new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {LocalDateTime time = LocalDateTime.now();System.out.println("[消费]" + time + "+收到的消息:" + new String(body, StandardCharsets.UTF_8));int randomSleep = ThreadLocalRandom.current().nextInt(20, 1000);try {Thread.sleep(randomSleep);} catch (InterruptedException e) {e.printStackTrace();}if (envelope.getDeliveryTag() % 3 == 0) {// 进行消息确认channel.basicAck(envelope.getDeliveryTag(), true);}}});}
}

测试

先启动消费都,再启动生产者,查看控制台输出

[消费]2023-08-25T12:08:13.143+收到的消息:这是发送的消息:0
[消费]2023-08-25T12:08:13.765+收到的消息:这是发送的消息:1
[消费]2023-08-25T12:08:14.127+收到的消息:这是发送的消息:2
[消费]2023-08-25T12:08:14.892+收到的消息:这是发送的消息:3
......
[消费]2023-08-25T12:08:57.437+收到的消息:这是发送的消息:96
[消费]2023-08-25T12:08:57.530+收到的消息:这是发送的消息:97
[消费]2023-08-25T12:08:57.566+收到的消息:这是发送的消息:98
[消费]2023-08-25T12:08:57.649+收到的消息:这是发送的消息:99

查看队列的情况:

[root@nullnull-os ~]# rabbitmqctl list_channels name,prefetch_count,global_prefetch_count --formatter pretty_table
Listing channels ...
┌───────────────────────────────────────────┬────────────────┬───────────────────────┐
│ name                                      │ prefetch_count │ global_prefetch_count │
├───────────────────────────────────────────┼────────────────┼───────────────────────┤
│ 61.170.208.88:59116 -> 10.0.4.16:5672 (1) │ 5              │ 0                     │
└───────────────────────────────────────────┴────────────────┴───────────────────────┘
[root@nullnull-os ~]# 

网页端查看

在这里插入图片描述

并行消费者

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;public class QosThreadConsumer {public static void main(String[] args) throws Exception {// 资源限制ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");// 设置channel并发请求最大数factory.setRequestedChannelMax(5);// 自定义线程池工厂ThreadFactory thsFactory = Executors.privilegedThreadFactory();factory.setThreadFactory(thsFactory);Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 定义交换器、队列和绑定channel.exchangeDeclare("qos.ex", BuiltinExchangeType.DIRECT, false, false, null);channel.queueDeclare("qos.qu", false, false, false, null);channel.queueBind("qos.qu", "qos.ex", "qos.rk");// 设置每秒处理2个channel.basicQos(5, true);channel.basicConsume("qos.qu",false,new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {LocalDateTime time = LocalDateTime.now();long threadId = Thread.currentThread().getId();System.out.println("[消费]"+ time+ ",线程:"+ threadId+ ",收到的消息:"+ new String(body, StandardCharsets.UTF_8));int randomSleep = ThreadLocalRandom.current().nextInt(20, 1000);try {Thread.sleep(randomSleep);} catch (InterruptedException e) {e.printStackTrace();}if (envelope.getDeliveryTag() % 3 == 0) {// 进行消息确认channel.basicAck(envelope.getDeliveryTag(), true);}}});}
}

控制台输出:

[消费]2023-08-26T09:37:21.430,线程:24,收到的消息:这是发送的消息:0
[消费]2023-08-26T09:37:21.866,线程:25,收到的消息:这是发送的消息:1
[消费]2023-08-26T09:37:22.434,线程:25,收到的消息:这是发送的消息:2
[消费]2023-08-26T09:37:22.847,线程:25,收到的消息:这是发送的消息:3
[消费]2023-08-26T09:37:23.685,线程:25,收到的消息:这是发送的消息:4
[消费]2023-08-26T09:37:23.847,线程:26,收到的消息:这是发送的消息:5
......
[消费]2023-08-26T09:39:10.684,线程:28,收到的消息:这是发送的消息:526
[消费]2023-08-26T09:39:10.695,线程:32,收到的消息:这是发送的消息:527
[消费]2023-08-26T09:39:10.767,线程:32,收到的消息:这是发送的消息:528
......
[消费]2023-08-26T09:39:58.270,线程:27,收到的消息:这是发送的消息:996
[消费]2023-08-26T09:39:58.405,线程:27,收到的消息:这是发送的消息:997
[消费]2023-08-26T09:39:58.575,线程:27,收到的消息:这是发送的消息:998
[消费]2023-08-26T09:39:58.671,线程:27,收到的消息:这是发送的消息:999

如果Qos设置为全局,则可以看到到

[root@nullnull-os ~]# rabbitmqctl list_channels name,prefetch_count,global_prefetch_count --formatter pretty_table
Listing channels ...
┌───────────────────────────────────────────┬────────────────┬───────────────────────┐
│ name                                      │ prefetch_count │ global_prefetch_count │
├───────────────────────────────────────────┼────────────────┼───────────────────────┤
│ 61.170.208.88:60591 -> 10.0.4.16:5672 (1) │ 0              │ 5                     │
├───────────────────────────────────────────┼────────────────┼───────────────────────┤
│ 61.170.208.88:60610 -> 10.0.4.16:5672 (1) │ 0              │ 0                     │
└───────────────────────────────────────────┴────────────────┴───────────────────────┘
[root@nullnull-os ~]# 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/475039.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

爬虫开发工具与环境搭建——使用Postman和浏览器开发者工具

第三节&#xff1a;使用Postman和浏览器开发者工具 在网络爬虫开发过程中&#xff0c;我们经常需要对HTTP请求进行测试、分析和调试。Postman和浏览器开发者工具&#xff08;特别是Network面板和Console面板&#xff09;是两种最常用的工具&#xff0c;能够帮助开发者有效地捕…

单片机实验记录3

定时计数实验 【实验目的】 1)学习使用单片机定时/计数器 2)在程序中添加定时/计数功能&#xff0c;将相关程序部署在仿真环境中&#xff0c;观察运行的情况. 【实验内容】 必做&#xff1a;应用定时器中断和数码管&#xff0c;实现10秒倒计时功能 【实验代码】 必做&am…

(计算机毕设)基于SpringBoot+Vue的房屋租赁系统的设计与实现

博主可接毕设设计&#xff01;&#xff01;&#xff01; 各种毕业设计源码只要是你有的题目我这里都有源码 摘 要 社会的发展和科学技术的进步&#xff0c;互联网技术越来越受欢迎。网络计算机的生活方式逐渐受到广大人民群众的喜爱&#xff0c;也逐渐进入了每个用户的使用。互…

创新租赁APP开发提升用户体验与业务效率

内容概要 在这个互联网飞速发展的时代&#xff0c;租赁APP的开发成为了提升市场竞争力的重要一环。用户对租赁服务的需求与日俱增&#xff0c;而传统的方式已显得不够高效。这时候&#xff0c;创新的租赁APP就像是一道光&#xff0c;照亮了用户体验和业务效率的双重需求。通过…

【Java SE】数据库连接池

数据库连接池是一个管理数据库连接的容器。它的主要作用是分配和管理数据库连接&#xff0c;允许应用程序重复使用现有的连接&#xff0c;而不是每次都重新建立新的连接。此外&#xff0c;连接池会释放那些空闲时间超过最大限制的连接&#xff0c;从而避免因未及时释放连接而造…

SpringBoot 集成 Sharding-JDBC(一):数据分片

在深入探讨 Sharding-JDBC 之前&#xff0c;建议读者先了解数据库分库分表的基本概念和应用场景。如果您还没有阅读过相关的内容&#xff0c;可以先阅读我们之前的文章&#xff1a; 关系型数据库海量数据存储策略-CSDN博客 这篇文章将帮助您更好地理解分库分表的基本原理和实现…

多线程--常见锁策略--Java

目录 一、悲观锁VS乐观锁 1.悲观锁 2.乐观锁 二、重量级锁VS轻量级锁 1.重量级锁 2.轻量级锁 三、自旋锁 1.自旋锁概念 四、公平锁VS非公平锁 1.公平锁 2.非公平锁 3.注意 五、可重入锁和不可重入锁 六、读写锁 1.线程对于数据的访问方式 注意&#xff1a;以下讲…

基于SSM的农家乐管理系统+论文示例参考

1.项目介绍 功能模块&#xff1a;管理员&#xff08;农家乐管理、美食信息管理、住宿信息管理、活动信息、用户管理、活动报名、论坛等&#xff09;&#xff0c;普通用户&#xff08;注册登录、活动报名、客房预订、用户评价、收藏管理、模拟支付等&#xff09;技术选型&#…

jmeter--CSV数据文件设置--请求体设置变量

目录 一、示例 1、准备组织列表的TXT文件&#xff0c;如下&#xff1a; 2、添加 CSV数据文件设置 &#xff0c;如下&#xff1a; 3、接口请求体设置变量&#xff0c;如下&#xff1a; 二、CSV数据文件设置 1、CSV Data Set Config 配置选项说明 2、示例 CSV 文件内容 3、…

Redis环境部署(主从模式、哨兵模式、集群模式)

一、概述 REmote DIctionary Server(Redis) 是一个由 Salvatore Sanfilippo 写的 key-value 存储系统&#xff0c;是跨平台的非关系型数据库。Redis 是一个开源的使用 ANSI C 语言编写、遵守 BSD 协议、支持网络、可基于内存、分布式、可选持久性的键值对(Key-Value)存储数据库…

【大数据学习 | flume】flume之常见的sink组件

Flume Sink取出Channel中的数据&#xff0c;进行相应的存储文件系统&#xff0c;数据库&#xff0c;或者提交到远程服务器。Flume也提供了各种sink的实现&#xff0c;包括HDFS sink、Logger sink、Avro sink、File Roll sink、HBase sink&#xff0c;。 ​ Flume Sink在设置存…

【ArcGIS微课1000例】0127:计算城市之间的距离

本文讲述,在ArcGIS中,计算城市(以地级城市为例)之间的距离,效果如下图所示: 一、数据准备 加载配套实验数据包中的地级市和行政区划矢量数据(订阅专栏后,从私信查收数据),如下图所示: 二、计算距离 1. 计算邻近表 ArcGIS提供了计算点和另外点之间距离的工具:分析…

(Linux 入门) 基本指令、基本权限

目录 一、什么是操作系统 二、基础指令 01. ls 指令 02. pwd命令 03.mkdir 04. touch指令 05.rmdir指令 && rm 指令 06.man指令&#xff08;重要&#xff09; 07 cat 08.cp指令 09 mv指令 10 alias 指令 11.more指令 12.head指令 13.less指令 14.时间相…

云原生之运维监控实践-使用Prometheus与Grafana实现对Nginx和Nacos服务的监测

背景 如果你要为应用程序构建规范或用户故事&#xff0c;那么务必先把应用程序每个组件的监控指标考虑进来&#xff0c;千万不要等到项目结束或部署之前再做这件事情。——《Prometheus监控实战》 去年写了一篇在Docker环境下部署若依微服务ruoyi-cloud项目的文章&#xff0c;当…

QT基础 窗体 对话框 文件 QT5.12.3环境 C++实现

一、堆栈窗体 1. 概念 是一种界面设计思路&#xff0c; 多个窗体重叠在一起&#xff0c;通过点击对应的按钮&#xff0c;显示对应的界面。 2. 相关方法 Public FunctionsQStackedWidget(QWidget * parent 0)//stack如果单纯指定父窗口&#xff0c;但是没有指定大小&#xf…

【NOIP提高组】潜伏者

【NOIP提高组】潜伏者 &#x1f490;The Begin&#x1f490;点点关注&#xff0c;收藏不迷路&#x1f490; R国和S国正陷入战火之中&#xff0c;双方都互派间谍&#xff0c;潜入对方内部&#xff0c;伺机行动。 历尽艰险后&#xff0c;潜伏于 S 国的R 国间谍小C 终于摸清了S 国…

共享门店模式:创新零售的新篇章

​在消费升级和数字化转型的双重浪潮下&#xff0c;传统零售业正面临前所未有的挑战与机遇。其中&#xff0c;共享门店模式作为一种创新的商业模式&#xff0c;正逐渐成为实体店铺应对电商冲击、提升运营效率和市场竞争力的重要途径。本文将深入解析共享门店模式的内涵、优势、…

除了电商平台,还有哪些网站适合进行数据爬取?

在数字化时代&#xff0c;数据的价值日益凸显&#xff0c;而网络爬虫技术成为获取数据的重要手段。除了电商平台&#xff0c;还有许多其他类型的网站适合进行数据爬取&#xff0c;以支持市场研究、数据分析、内容聚合等多种应用场景。本文将探讨除了电商平台外&#xff0c;还有…

STM32G4的数模转换器(DAC)的应用

目录 概述 1 DAC模块介绍 2 STM32Cube配置参数 2.1 参数配置 2.2 项目架构 3 代码实现 3.1 接口函数 3.2 功能函数 3.3 波形源代码 4 DAC功能测试 4.1 测试方法介绍 4.2 波形测试 概述 本文主要介绍如何使用STM32G4的DAC模块功能&#xff0c;笔者使用STM32Cube工具…

Linux-Apache

文章目录 Apache基础配置 &#x1f3e1;作者主页&#xff1a;点击&#xff01; &#x1f916;Linux专栏&#xff1a;点击&#xff01; ⏰️创作时间&#xff1a;2024年11月19日12点20分 Apache Web服务器用来实现HTTP和相关TCP连接的处理&#xff0c;同时负责所提供资源的管理…