掌握RabbitMQ预取设置,实现消费者最佳性能

调整RabbitMQ预取,让消费者保持忙碌,同时避免囤积消息或隐藏缓慢处理。

掌握RabbitMQ预取设置,实现消费者最佳性能

RabbitMQ预取是一个看似微小却能改变一切的设置。它控制RabbitMQ允许一个消费者同时持有的未确认消息数量。设置得太低,快速消费者会花费太多时间等待下一条消息的投递。设置得太高,慢速消费者会悄悄囤积工作,增加延迟,并使队列深度图表失真。

思考预取的有用方式是将其视为未完成的工作。预取值为20意味着一个消费者可以拥有20条已投递但尚未确认的消息。这些消息在队列中不再是ready状态。它们处于unacked状态,停留在消费者处,直到消费者确认、否定确认、拒绝或断开连接。

这意味着预取不仅仅是一个吞吐量旋钮。它是一个公平性旋钮、一个内存旋钮和一个故障恢复旋钮。

basic.qos在RabbitMQ中的作用

消费者通过basic.qos设置预取。在大多数客户端库中,你设置prefetch_countprefetch_size很少使用,通常保持为零。

在Python中使用Pika:

channel.basic_qos(prefetch_count=10)
channel.basic_consume(
    queue="jobs",
    on_message_callback=handle_message,
    auto_ack=False,
)

在Node.js中使用amqplib

await channel.prefetch(10);
await channel.consume("jobs", async (msg) => {
  try {
    await handleMessage(msg.content);
    channel.ack(msg);
  } catch (err) {
    channel.nack(msg, false, false);
  }
}, { noAck: false });

手动确认很重要。如果你使用自动确认,RabbitMQ会在消息投递后立即认为消息已完成。预取不再以相同方式保护处理可靠性,因为没有未确认窗口需要管理。

在现代用法中,RabbitMQ默认按消费者应用预取,尽管AMQP的原始措辞是基于通道的。一些客户端暴露了一个global标志。要小心使用它。共享通道或连接范围的限制可能会在消费者之间造成混乱的交互。当每个消费者有自己的通道和自己的预取计数时,大多数服务更容易推理。

为什么预取会改变延迟

想象一个有两个消费者的队列。消费者A收到一批100条消息,然后遇到一个慢速的外部API。消费者B健康且快速,但那100条消息已经分配给A。RabbitMQ不会将它们交给B,除非A拒绝它们或它的通道关闭。

从队列的角度看,这些消息不是ready状态。从用户的角度看,它们被延迟了。这就是为什么高预取可以使系统在代理图表中看起来更好,同时使实际延迟更差。

低预取给RabbitMQ更多机会公平分配工作。高预取给消费者更多本地工作和更少的代理往返。两者都不是永远正确的。

合理的起始值

对于慢速作业,从小开始。如果每条消息调用第三方API、写入多个数据库行或进行CPU密集型转换,尝试prefetch_count=110。你希望一个失败或慢速的消费者只持有少量工作。

对于中等作业,需要几十或几百毫秒并在稳定工作者上运行,10、20或50等值是常见的起点。在提高之前先测量。

对于非常快速的处理程序,代理和消费者在低延迟网络上,较高的预取可以减少往返并提高吞吐量。即便如此,避免仅仅因为让基准测试在五分钟内看起来不错就选择一个巨大的数字。关注消费者内存和尾部延迟。

一个简单的经验法则是将预取大小设定为消费者在短时间内可以舒适持有的工作量。如果一个工作者每秒处理约20条消息,并且你愿意接受大约一秒的本地缓冲工作,那么预取接近20是一个合理的实验。

如何判断预取是否太高

预取可能太高的情况:

  • messages_unacknowledged相对于活跃消费者很大。
  • 一些消费者有许多未确认消息,而其他消费者空闲。
  • 即使messages_ready很低,消息延迟也很高。
  • 消费者内存在突发期间上升。
  • 消费者崩溃导致大量重新投递。

最后一点很容易被忽略。如果一个工作者持有1,000条未确认消息并崩溃,RabbitMQ可以重新投递这些消息。这是正确的行为,但如果处理程序不是幂等的,可能会对下游系统造成重复压力。

降低预取通常会改善公平性和恢复行为。它可能会稍微降低峰值吞吐量,但可以改善用户实际感受到的延迟。

如何判断预取是否太低

预取可能太低的情况:

  • 消费者CPU和内存使用率低,而messages_ready持续增长。
  • 处理时间非常短,但投递速率受限。
  • 消费者和RabbitMQ之间的网络延迟明显。
  • 增加预取提高了吞吐量,而没有增加尾部延迟或内存压力。

经典例子是一个快速工作者,进行小型内存计算并立即确认。使用prefetch_count=1,它可能花费太多时间等待下一条消息。提高预取给它一个小型本地缓冲区并保持忙碌。

不要隐藏下游瓶颈

预取调整不会修复慢速数据库。它只能改变工作的分配和缓冲方式。如果每条消息都等待同一个过载的API,较高的预取可能暂时使吞吐量看起来更好,同时增加超时和重试。

在消费者内部进行测量。记录或发出指标,包括解码消息、等待数据库、调用外部服务和确认所花费的时间。RabbitMQ可以显示ready和unacked计数,但不能告诉你为什么处理程序需要八秒。

当下游服务受到速率限制时,预取通常应该更低,而不是更高。让队列明显吸收积压,而不是在工作者内部隐藏数千个进行中的调用。

预取和并发不同

预取为50并不自动意味着你的消费者并行处理50条消息。它只意味着RabbitMQ可能在收到确认之前投递50条消息。它们是否并发运行取决于你的消费者代码。

一个单线程消费者,预取为50,可能一次处理一条消息,而49条在内存中等待。一个并发为10、预取为50的工作者池可能保持十个任务活跃,四十个缓冲。有时这个缓冲区有用。有时它只是延迟。

将预取与实际并发匹配。如果你的进程可以同时执行五个处理程序,预取为5到20比500更容易推理。

排序和公平性权衡

RabbitMQ队列在队列级别保持顺序,但消费者行为可以改变工作完成的顺序。使用多个消费者和大于1的预取,消息20可能在消息3之前完成,因为它去了更快的工作者或处理更简单的工作。

对于大多数工作队列,完成顺序无关紧要。对于账户更新、库存更改或必须按顺序处理的工作流,它可能非常重要。在这些情况下,使用每个排序键一个队列、按键分片或保持低预取可能比追求最大吞吐量更安全。

公平性也有类似的权衡。低预取让RabbitMQ更均匀地分配工作,因为消费者更频繁地回来获取消息。高预取奖励首先收到消息的消费者。如果消息的处理时间不均匀,可能导致一个工作者持有一堆慢速作业,而另一个工作者快速完成其批次。

当人们说“RabbitMQ负载均衡不均匀”时,预取是首先要检查的事情之一。代理只能平衡尚未投递的消息。

故障行为很重要

预取改变了消费者死亡时发生的情况。使用prefetch_count=1,当通道关闭时,一条未确认的投递会回来。使用prefetch_count=500,可能一次回来数百条。如果消费者在崩溃前执行了部分副作用,除非处理程序是幂等的,否则这些重新投递可能触发重复写入、重复电子邮件或重复API调用。

这并不意味着高预取是错误的。这意味着高预取适用于幂等处理程序、明确的重试规则以及重新投递率的监控。如果重复处理会有危险,保持未确认窗口小,直到应用程序构建为能够处理它。

在消费者中查看redelivered标志。它不是一个完整的重试计数器,但它是一个有用的信号,表明消息之前已被投递过。对于健壮的重试限制,在头部或应用程序状态中跟踪尝试次数,并将耗尽的消息路由到死信队列。

多个队列和混合工作负载

一个预取值很少适合所有队列。一个消费thumbnail.generateemail.send的服务可能需要为每个设置不同的值。缩略图生成可能是CPU密集型的,最好使用低并发。电子邮件发送可能是网络绑定的,可以容忍更多进行中的消息。

如果一个进程在一个通道上消费多个队列,QoS行为可能变得更难推理。对于意义不同的工作负载,优先使用单独的通道。这使得预取、监控和故障处理更明显。

混合消息大小是另一个警告信号。如果一个队列包含小事件和大负载,基于计数的预取不能很好地反映内存压力。十条小消息和十条大消息的成本不同。在这种情况下,拆分工作负载或将大负载移出RabbitMQ,传递引用代替。

关注每个消费者的未确认,而不仅仅是每个队列

队列级别的未确认计数告诉你存在未完成的工作,但可能隐藏偏差。一个消费者可能持有大部分未确认消息,而其他消费者几乎为空。这通常指向高预取、不均匀的消息成本或一个不健康的工作者。

在测试期间使用管理UI、Prometheus或rabbitmqctl list_consumers的消费者级别指标。如果分布不均匀,降低预取或拆分慢速消息类型可以改善实际延迟,即使总吞吐量变化不大。

部署后重新审视预取

预取值会过时。一个在处理程序只写入一个数据库行时有效的值,在下一个版本添加API调用、额外验证或更大负载后可能就不正确了。将预取视为性能配置的一部分,而不是一次设置就忘记的数字。

在消费者发布后,将处理延迟、未确认计数、重新投递和消费者内存与之前版本进行比较。如果延迟上升但CPU未饱和,处理程序可能正在等待外部事物,较低的预取可能保持系统更公平。如果CPU高且每条消息是CPU绑定的,增加工作者或减少每条消息的工作可能比更改预取更重要。

在消费者配置附近记录选择该值的原因。未来的维护者应该知道prefetch_count=5是为了公平性、内存、排序、下游速率限制还是仅仅作为临时默认值而选择的。

使用真实消息形状进行测试

如果生产消息是大型JSON负载或包含昂贵的数据库查找,不要使用微小的虚假消息来调整预取。消息大小和处理程序成本很重要。

一个有用的测试循环是:

  1. 选择一个预取值。
  2. 以现实发布速率运行足够长的时间以看到稳定行为。
  3. 观察messages_readymessages_unacknowledged、消费者CPU、消费者内存、处理延迟和错误率。
  4. 杀死一个消费者,看看有多少消息重新投递。
  5. 增加或减少预取并重复。

最佳值很少是短期基准测试吞吐量最高的那个。它是保持消费者忙碌、保持延迟可接受并以系统能够处理的方式失败的值。

一个实用的默认值

如果你还没有数据,从手动确认和普通工作队列的prefetch_count=10开始。对于慢速、昂贵或严格公平的处理,使用1。在测量后,尝试快速、稳定处理程序的2050。只有当指标显示投递往返是瓶颈且消费者有内存余量时,才进一步提高。

RabbitMQ预取调整不是一次性的设置。当消息大小变化、消费者代码变化、下游依赖变化或你添加更多工作者实例时,重新审视它。正确的预取值是匹配当前工作形状的那个。