Direct vs. Topic vs. Fanout:选择正确的交换机类型
根据精确路由、通配符路由或广播投递,选择 RabbitMQ 的 direct、topic 或 fanout 交换机。
Direct vs. Topic vs. Fanout:选择正确的交换机类型
RabbitMQ 交换机类型控制着生产者发布消息后消息的去向。选择错误的交换机可能导致广播私有工作、丢弃路由键不匹配的消息,或硬编码路由规则,这些都会给后续维护带来麻烦。
实际选择很简单:使用 direct 进行精确路由,使用 fanout 进行广播,使用 topic 在命名方案中进行通配符路由。
RabbitMQ 路由工作原理
生产者将消息发布到交换机,而不是直接发布到队列。交换机将消息路由键与队列绑定进行比较,然后根据交换机类型路由消息。
三个关键术语:
- 交换机: 接收发布的消息。
- 队列: 存储消息,直到消费者接收。
- 绑定: 将队列连接到交换机,有时带有绑定键。
如果没有匹配的队列且消息未标记为 mandatory,RabbitMQ 可能会丢弃无法路由的消息。对于重要流程,请使用发布者确认并处理返回的消息。
Direct 交换机
Direct 交换机将消息路由到绑定键与消息路由键完全匹配的队列。
当路由选择已知且精确时使用它。好的例子包括日志严重级别、任务类型或租户特定队列,其中精确名称就足够了。
例如,将 error_queue 绑定到 error,将 info_queue 绑定到 info。使用路由键 error 发布的消息会进入 error_queue;使用 info 的消息会进入 info_queue。如果两个队列都绑定到 error,则两个队列都会收到一份副本。同一队列上的消费者仍然会竞争该队列中的消息。
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs_direct', exchange_type='direct')
channel.basic_publish(
exchange='logs_direct',
routing_key='error',
body=b'Database connection failed'
)
connection.close()
当路由键稳定且不需要通配符匹配时,选择 direct。
Fanout 交换机
Fanout 交换机将每条消息路由到绑定到该交换机的每个队列。它忽略路由键。
用于广播事件,每个订阅者都应收到相同的消息。常见情况包括缓存失效、用户资料更新事件、部署通知或实时状态更新。
例如,一个 user_updates fanout 交换机可能提供三个队列:email_queue、search_index_queue 和 analytics_queue。一次发布的资料更新会到达所有三个服务。
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='user_updates', exchange_type='fanout')
channel.basic_publish(
exchange='user_updates',
routing_key='',
body=b'User 123 changed their display name'
)
connection.close()
当路由逻辑不重要且每个绑定的队列都应收到一份副本时,选择 fanout。
Topic 交换机
Topic 交换机通过将点分隔的路由键与通配符绑定键进行匹配来路由。它提供了灵活的路由,而无需为每种事件类型创建不同的交换机。
Topic 绑定使用两个通配符:
*精确匹配一个单词。#匹配零个或多个单词。
例如,对于路由键 order.created.us 和 order.cancelled.eu:
order.*.us匹配美国订单的一个操作。order.created.#匹配所有区域和可选额外单词的已创建订单。#.eu匹配以eu结尾的事件。
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='events_topic', exchange_type='topic')
channel.basic_publish(
exchange='events_topic',
routing_key='order.created.us',
body=b'Order 9001 created'
)
connection.close()
当消费者需要较大事件流的过滤子集时,选择 topic,例如 service.environment.severity、entity.action.region 或 tenant.event.priority。
快速选择指南
| 需求 | 交换机类型 | 示例 |
|---|---|---|
| 精确匹配路由 | Direct | error、invoice.created |
| 广播给每个订阅者 | Fanout | 缓存清除事件 |
| 通配符路由 | Topic | orders.*.us、logs.# |
避免将 topic 交换机用作不一致路由键的垃圾场。尽早选择命名约定,记录下来,并根据消费者的过滤方式,将单词从宽泛到具体或从实体到操作进行排序。
要点
使用 direct 交换机进行精确目标路由,使用 fanout 交换机进行广播,使用 topic 交换机处理消费者需要通配符过滤的情况。如果可以将路由描述为一个精确标签,请使用 direct。如果每个人都需要该消息,请使用 fanout。如果订阅者需要模式匹配,请使用 topic。