Direct vs. Topic vs. Fanout:选择正确的交换机类型

根据精确路由、通配符路由或广播投递,选择 RabbitMQ 的 direct、topic 或 fanout 交换机。

Direct vs. Topic vs. Fanout:选择正确的交换机类型

RabbitMQ 交换机类型控制着生产者发布消息后消息的去向。选择错误的交换机可能导致广播私有工作、丢弃路由键不匹配的消息,或硬编码路由规则,这些都会给后续维护带来麻烦。

实际选择很简单:使用 direct 进行精确路由,使用 fanout 进行广播,使用 topic 在命名方案中进行通配符路由。

RabbitMQ 路由工作原理

生产者将消息发布到交换机,而不是直接发布到队列。交换机将消息路由键与队列绑定进行比较,然后根据交换机类型路由消息。

三个关键术语:

  • 交换机: 接收发布的消息。
  • 队列: 存储消息,直到消费者接收。
  • 绑定: 将队列连接到交换机,有时带有绑定键。

如果没有匹配的队列且消息未标记为 mandatory,RabbitMQ 可能会丢弃无法路由的消息。对于重要流程,请使用发布者确认并处理返回的消息。

Direct 交换机

Direct 交换机将消息路由到绑定键与消息路由键完全匹配的队列。

当路由选择已知且精确时使用它。好的例子包括日志严重级别、任务类型或租户特定队列,其中精确名称就足够了。

例如,将 error_queue 绑定到 error,将 info_queue 绑定到 info。使用路由键 error 发布的消息会进入 error_queue;使用 info 的消息会进入 info_queue。如果两个队列都绑定到 error,则两个队列都会收到一份副本。同一队列上的消费者仍然会竞争该队列中的消息。

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs_direct', exchange_type='direct')

channel.basic_publish(
    exchange='logs_direct',
    routing_key='error',
    body=b'Database connection failed'
)

connection.close()

当路由键稳定且不需要通配符匹配时,选择 direct。

Fanout 交换机

Fanout 交换机将每条消息路由到绑定到该交换机的每个队列。它忽略路由键。

用于广播事件,每个订阅者都应收到相同的消息。常见情况包括缓存失效、用户资料更新事件、部署通知或实时状态更新。

例如,一个 user_updates fanout 交换机可能提供三个队列:email_queuesearch_index_queueanalytics_queue。一次发布的资料更新会到达所有三个服务。

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='user_updates', exchange_type='fanout')

channel.basic_publish(
    exchange='user_updates',
    routing_key='',
    body=b'User 123 changed their display name'
)

connection.close()

当路由逻辑不重要且每个绑定的队列都应收到一份副本时,选择 fanout。

Topic 交换机

Topic 交换机通过将点分隔的路由键与通配符绑定键进行匹配来路由。它提供了灵活的路由,而无需为每种事件类型创建不同的交换机。

Topic 绑定使用两个通配符:

  • * 精确匹配一个单词。
  • # 匹配零个或多个单词。

例如,对于路由键 order.created.usorder.cancelled.eu

  • order.*.us 匹配美国订单的一个操作。
  • order.created.# 匹配所有区域和可选额外单词的已创建订单。
  • #.eu 匹配以 eu 结尾的事件。
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='events_topic', exchange_type='topic')

channel.basic_publish(
    exchange='events_topic',
    routing_key='order.created.us',
    body=b'Order 9001 created'
)

connection.close()

当消费者需要较大事件流的过滤子集时,选择 topic,例如 service.environment.severityentity.action.regiontenant.event.priority

快速选择指南

需求 交换机类型 示例
精确匹配路由 Direct errorinvoice.created
广播给每个订阅者 Fanout 缓存清除事件
通配符路由 Topic orders.*.uslogs.#

避免将 topic 交换机用作不一致路由键的垃圾场。尽早选择命名约定,记录下来,并根据消费者的过滤方式,将单词从宽泛到具体或从实体到操作进行排序。

要点

使用 direct 交换机进行精确目标路由,使用 fanout 交换机进行广播,使用 topic 交换机处理消费者需要通配符过滤的情况。如果可以将路由描述为一个精确标签,请使用 direct。如果每个人都需要该消息,请使用 fanout。如果订阅者需要模式匹配,请使用 topic。