Redis Pub/Sub 消息丢失:原因与可靠替代方案
了解 Redis Pub/Sub 在网络断开或消费者处理缓慢时为何会丢失消息,并探索 Redis Streams 和基于列表的队列等模式,以实现可靠的消息投递。
Redis Pub/Sub 消息丢失:原因与可靠替代方案
我还记得第一次被 Redis Pub/Sub 坑的经历。那是在深夜,大约晚上 11 点,我们的通知系统开始丢失消息。不是全部丢失——只是足够让用户在发现之前就察觉到。值班工程师(很不幸,就是我)花了两个小时翻查应用日志,才意识到那个显而易见的事实:Redis Pub/Sub 不会对任何消息进行排队。它不是一个消息代理。它是一个消防水管,如果你没有直接站在它前面张开嘴,你就会错过一些东西。
这就是当你第一次接触 Redis Pub/Sub 时,没人会告诉你的事情。虽然文档里确实有说明,但当你为 API 的简洁性感到兴奋时,很容易忽略这一点。你在一边发布,在另一边订阅,一切正常。直到它出问题。
即发即弃的现实
Redis Pub/Sub 遵循一个极其简单的原则:当你发布一条消息时,Redis 会在那一瞬间将其推送给该频道上所有已连接的订阅者。如果某个订阅者没有连接,或者虽然连接了但无法跟上,消息就会消失。没有持久化层,没有确认机制,没有死信队列。消息只存在于传输过程中。
让我给你一个具体的例子。假设你有一个发布订单状态更新的服务,以及另一个订阅该更新以发送确认邮件的服务。在正常负载下,一切运行顺畅。然后你的邮件服务出现故障——可能是 SMTP 中继变慢,或者发生了垃圾回收暂停。在那次故障期间,Redis 持续推送消息。订阅者的 TCP 缓冲区被填满。最终,连接断开。当订阅者重新连接时,它会从现在开始接收消息,而不是从它断开的地方继续。在断开连接窗口期间发布的所有消息都丢失了。
我通过一个简单的测试设置实际测量过:一个发布者以每秒 10,000 条消息的速度发送,而一个订阅者偶尔会阻塞 50 毫秒。即使只有一次短暂的暂停,你也会丢失几十条消息。订阅者永远不会知道它们被发送过。发布者永远不会知道它们丢失了。Redis 完全没问题——它完美地完成了设计目标。
实际导致消息丢失的原因
Pub/Sub 在三种主要情况下会丢失消息,理解这些情况很重要,因为它们会以不同的方式显现。
网络不稳定是最明显的原因。订阅者和 Redis 之间的任何临时网络分区都会切断连接。Redis 通过客户端超时(默认 60 秒,但你可能设置得更低)检测到这一点。在此期间,该订阅者会丢失所有发布的消息。其他订阅者可能正常收到,这让调试变得特别有趣——你会看到服务之间的状态不一致,并怀疑自己是不是疯了。
消费者处理缓慢则更隐蔽,因为连接保持打开状态。Redis 使用推送模型,意味着它会以发布者生产的速度向订阅者套接字写入数据。如果订阅者处理消息的速度不够快,内核的 TCP 接收缓冲区就会被填满。一旦缓冲区满了,Redis 就无法写入更多数据,连接最终会失败。订阅者甚至可能直到断开连接时才发现自己落后了。
我曾见过订阅者为每条消息执行同步数据库写入的情况。在低流量时,一切正常。在高峰期,数据库成为瓶颈,订阅者落后,消息在 TCP 缓冲区中堆积。当缓冲区溢出时,连接重置,订阅者丢失了所有尚未从套接字读取的消息。
部署或重启期间的客户端断开连接是第三大类原因。如果你正在进行滚动部署,并且某个订阅者实例宕机,它就会错过其离线期间发布的所有消息。没有“追赶”机制。当它重新上线时,它会从头开始。
有一件事让我很惊讶:即使是干净的关闭也无济于事。如果你的订阅者在退出前优雅地取消订阅,它仍然会错过取消订阅和重新上线之间发布的消息。取消订阅是瞬间完成的——没有“帮我保留消息一分钟”的选项。
Pub/Sub 何时适用
我不想让人觉得 Redis Pub/Sub 毫无用处。它对于特定的用例非常出色,我仍然经常使用它。关键在于理解这些用例是什么。
对于偶尔丢失消息可以接受的实时通知,它工作得很好。想想实时体育比分、股票行情或聊天应用中的输入指示器。如果用户错过了一个比分更新,几秒钟后下一个就会到来。这些数据保质期短,没有持久性要求。
服务发现和配置广播是另一个理想场景。当你更改一个功能标志并向所有应用实例发布时,某个正在重启的实例错过更新是可以接受的——它会在重新上线时或下一次定期刷新时获取当前状态。
我还成功地将 Pub/Sub 用于跨多个应用服务器的缓存失效。发布一个要失效的缓存键,每个服务器都会清除其本地缓存。如果某个服务器错过了消息,最坏的情况是它提供过期的数据,直到缓存条目自然过期。不理想,但也不是灾难性的。
这里的共同点是:当消息本质上是短暂的、丢失可以通过其他机制恢复、并且你不需要顺序保证或精确一次投递时,Pub/Sub 是可行的。
Redis Streams:内置的替代方案
Redis 5.0 中引入的 Redis Streams 是我现在在需要可靠消息投递时会使用的方案。它不是简单地为 Pub/Sub 加上持久化——它是一种根本不同的模型,更接近于像 Kafka 这样的分布式日志,而不是广播机制。
使用 Streams,消息会被追加到日志中,并一直保留,直到被显式确认。消费者可以断开连接、重启、落后,然后仍然能赶上。流会根据最大长度或保留期限来保留消息,因此你可以控制保留多少历史记录。
以下是思维模型的不同之处。在 Pub/Sub 中,你订阅一个频道,消息流向你。在 Streams 中,你按照自己的节奏拉取消息。消费者组会跟踪每个消费者已确认的消息,因此你可以让多个消费者从同一个流中读取数据而不会重复(或者如果你想要扇出,也可以有意重复)。
一个基本的 Streams 设置看起来像这样:
XADD orders * status confirmed order_id 12345
这会将一条消息追加到 orders 流中。* 告诉 Redis 自动生成一个 ID。然后你的消费者通过以下方式读取:
XREADGROUP GROUP email-processor worker-1 COUNT 10 STREAMS orders >
> 表示“给我那些尚未投递给该组中任何消费者的消息。”处理完成后,消费者进行确认:
XACK orders email-processor <message-id>
如果消费者在确认前崩溃,消息会保持待处理状态。组内的另一个消费者可以在超时后通过 XCLAIM 认领它。这就是 Pub/Sub 完全缺乏的确认和重新投递机制。
消费者组模型的实际应用
消费者组是使 Streams 对可靠处理真正有用的关键。每个组在流中维护自己的位置,因此你可以有一个组用于邮件通知,另一个用于分析,还有一个用于审计日志——所有组都独立地从同一个流中读取数据。
在一个组内,消息会分发给各个消费者。这提供了水平可扩展性:添加更多消费者实例,它们会分担负载。如果一个实例宕机,其待处理的消息可以被其他实例认领。
我发现待处理条目列表对于监控来说非常宝贵。你可以运行 XPENDING 来查看哪些消息尚未确认以及它们已待处理了多长时间。这能立即暴露出处理缓慢的消费者——这比通过用户投诉几天后才发现消息丢失要好得多。
Streams 的一个注意事项:消息 ID 是单调递增的时间戳,这意味着你不能轻易地按顺序插入消息。如果你需要在流内保持严格的顺序,这实际上是一个特性。如果你需要优先处理某些消息,则需要多个流或不同的方法。
用于简单需求的基于列表的队列
在 Streams 出现之前,使用 Redis 进行可靠消息传递的标准模式是基于列表的队列和阻塞弹出。这种模式仍然完全可行,特别是如果你使用的是较旧版本的 Redis,或者想要一个极其简单的实现。
思路很简单:生产者使用 LPUSH 或 RPUSH 将消息推送到列表上,消费者使用 BLPOP 或 BRPOP 阻塞直到消息到达。阻塞弹出至关重要——没有它,你就需要轮询,这会浪费 CPU 并增加延迟。
可靠性来自于一个辅助的“处理中”列表。消费者使用 BRPOPLPUSH(或在 Redis 6.2+ 中使用 LMOVE)原子性地将消息从待处理队列移动到处理中队列。处理完成后,它会从处理中队列中移除该消息。如果消费者崩溃,处理中队列会保留该消息,并且一个监控进程可以将过期的项目移回待处理队列。
我多次构建过这种模式,它确实有效,但代码量比你预期的要多。你需要处理超时、决定一条消息在处理中队列中可以停留多久才被视为放弃,以及处理重复处理等边缘情况。Streams 基本上将这些都形式化了,这就是为什么我大多放弃了手动实现的列表队列。
我仍然使用基于列表的队列的一个场景是工作队列,其中处理顺序无关紧要,并且我想要尽可能简单的实现。有时,一个列表和一个 BLPOP 循环就足够了,添加 Streams 会显得过度设计。
Redis 7 中的分片 Pub/Sub
Redis 7 引入了分片 Pub/Sub,这值得一提,因为它解决了一个与消息丢失不同的问题。使用常规 Pub/Sub,每条消息都会广播到集群中的每个节点,即使某个节点上没有订阅者关心该频道。这会浪费集群互连带宽。
分片 Pub/Sub 将频道绑定到特定的集群槽,因此消息只传播到实际有该频道订阅者的节点。这是一个性能优化,而不是可靠性特性。在断开连接时,你仍然会丢失消息。但是,如果你在集群环境中大规模运行 Pub/Sub,了解这一点是值得的。
做出选择:Pub/Sub vs Streams vs 列表
在与这些模式打交道多年后,我的决策过程简化为几个问题。
首先:你能容忍消息丢失吗?如果可以,并且数据是短暂的,那么 Pub/Sub 可能就足够了。你将获得最低的延迟和最简单的操作模型。
其次:你需要消息持久化和重放吗?如果需要,Streams 就是答案。在消费者修复 bug 后重新处理消息的能力不止一次救了我。使用 Pub/Sub,如果你的消费者有一个 bug 导致它在一小时内错误处理了消息,那些消息就永远丢失了。使用 Streams,你可以重置消费者组的位置并重放它们。
第三:你需要多个独立的消费者组读取相同的数据吗?Streams 原生支持这一点。使用 Pub/Sub,每个订阅者都会收到每条消息,这可能是你想要的,但无法让不同的订阅者组维护独立的位置。
第四:你的 Redis 版本是什么?如果你使用的是 5.0 之前的版本,Streams 不可用,你需要考虑基于列表的队列或外部消息代理。我曾遇到过这种情况,老实说,如果你需要可靠的消息传递但不能使用 Streams,我会考虑 Redis 是否是正确的工具。RabbitMQ 或 NATS 可能更合适。
没人谈论的操作层面
这是我通过艰难的方式学到的东西:监控 Pub/Sub 实际上非常困难。你可以使用 PUBSUB NUMSUB 监控连接数和频道订阅数,但你无法看到有多少消息正在丢失。没有“已发布但未接收的消息”这个指标,因为 Redis 不跟踪这个。
使用 Streams,你可以获得可见性。XINFO GROUPS 显示消费者延迟。XPENDING 显示未确认的消息。你可以设置当延迟超过阈值时发出警报。仅凭这种操作可见性,就足以让我切换到 Streams。
内存管理是另一个考虑因素。Pub/Sub 消息只存在于内存中,并且只在传输过程中存在,因此内存使用量受限于你的发布速率和消费者速度。Streams 会存储消息直到被修剪,因此你需要考虑保留策略。我通常根据预期的吞吐量和可用内存设置最大流长度(MAXLEN),并监控流长度以发现意外的积压。
我现在实际的做法
如今,对于任何需要可靠性的新消息传递用例,我默认使用 Redis Streams。API 比 Pub/Sub 稍微复杂一点,但差别不大,而且可靠性保证是值得的。我将 Pub/Sub 保留用于短暂的数据——缓存失效、实时在线状态等。
对于特别关键的消息传递(支付处理、订单履行),我已经完全放弃了 Redis,转而使用专用的消息代理。Redis 在很多方面都很出色,但它并未针对基于磁盘的高容量消息队列持久化进行优化。如果你需要消息在 Redis 完全重启后零丢失地存活,你需要配置 AOF 持久化并设置 appendfsync always,这会严重影响写入性能。在这一点上,像 Kafka 或 Pulsar 这样的工具更有意义。
但对于广阔的中间地带——消息丢失会令人烦恼或代价高昂,但并非灾难性,并且你希望留在已经熟悉的 Redis 生态系统中——Streams 找到了一个最佳点。对我来说,它在生产环境中足够可靠,而且不引入新基础设施组件的操作简单性具有真正的价值。
我最初在使用 Pub/Sub 时犯的错误实际上与技术无关。而是没有仔细阅读细则,假设“消息传递”意味着“消息投递保证”。Redis Pub/Sub 不做这样的保证,它也不假装做。一旦你理解了这一点,你就可以恰当地使用它,并在需要更多功能时转向 Streams。