ダイレクト vs トピック vs ファンアウト:適切な交換タイプの選び方

正確なルーティング、ワイルドカードルーティング、ブロードキャスト配信に基づいてRabbitMQのダイレクト、トピック、ファンアウト交換を選択します。

ダイレクト vs トピック vs ファンアウト:適切な交換タイプの選び方

RabbitMQの交換タイプは、プロデューサーがメッセージを公開した後にどこへ送られるかを制御します。間違った交換を選ぶと、プライベートな作業をブロードキャストしたり、ルーティングキーが一致しないメッセージをドロップしたり、後で修正が困難なルーティングルールをハードコーディングすることになります。

実用的な選択はシンプルです:正確なルーティングにはdirect、ブロードキャストにはfanout、命名スキームにわたるワイルドカードルーティングにはtopicを使用します。

RabbitMQのルーティングの仕組み

プロデューサーはキューに直接ではなく、交換にメッセージを公開します。交換はメッセージのルーティングキーをキューのバインディングと比較し、交換タイプに従ってメッセージをルーティングします。

重要な3つの用語:

  • 交換: 公開されたメッセージを受信します。
  • キュー: コンシューマーが受信するまでメッセージを保存します。
  • バインディング: キューを交換に接続し、場合によってはバインディングキーを使用します。

一致するキューがなく、メッセージがmandatoryとマークされていない場合、RabbitMQはルーティング不可能なメッセージをドロップすることがあります。重要なフローでは、パブリッシャー確認を使用し、返されたメッセージを処理します。

ダイレクト交換

ダイレクト交換は、バインディングキーがメッセージのルーティングキーと完全に一致するキューにメッセージをルーティングします。

ルーティングの選択肢が既知で正確な場合に使用します。良い例としては、ログの重大度、タスクタイプ、テナント固有のキューなど、正確な名前で十分な場合があります。

例えば、error_queueerrorで、info_queueinfoでバインドします。ルーティングキーerrorで公開されたメッセージはerror_queueに送られ、infoのメッセージはinfo_queueに送られます。2つのキューがerrorでバインドされている場合、両方のキューがコピーを受信します。同じキューのコンシューマーは、そのキューからのメッセージを競い合います。

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs_direct', exchange_type='direct')

channel.basic_publish(
    exchange='logs_direct',
    routing_key='error',
    body=b'データベース接続に失敗しました'
)

connection.close()

ルーティングキーが安定しており、ワイルドカードマッチングが必要ない場合はダイレクトを選択します。

ファンアウト交換

ファンアウト交換は、交換にバインドされたすべてのキューにすべてのメッセージをルーティングします。ルーティングキーは無視されます。

すべてのサブスクライバーが同じメッセージを受信する必要があるブロードキャストイベントに使用します。一般的なケースとしては、キャッシュ無効化、ユーザープロファイル更新イベント、デプロイメント通知、リアルタイムステータス更新などがあります。

例えば、user_updatesファンアウト交換は3つのキュー(email_queuesearch_index_queueanalytics_queue)にフィードできます。1つの公開されたプロファイル更新が3つのサービスすべてに届きます。

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='user_updates', exchange_type='fanout')

channel.basic_publish(
    exchange='user_updates',
    routing_key='',
    body=b'ユーザー123が表示名を変更しました'
)

connection.close()

ルーティングロジックが重要でなく、バインドされたすべてのキューがコピーを受信する必要がある場合はファンアウトを選択します。

トピック交換

トピック交換は、ドット区切りのルーティングキーをワイルドカードバインディングキーと照合してルーティングします。イベントタイプごとに異なる交換を作成することなく、柔軟なルーティングを提供します。

トピックバインディングは2つのワイルドカードを使用します:

  • * は正確に1つの単語に一致します。
  • # は0個以上の単語に一致します。

例えば、order.created.usorder.cancelled.euのようなルーティングキーの場合:

  • order.*.us は米国の注文に対する1つのアクションに一致します。
  • order.created.# はすべての地域とオプションの追加単語にわたる作成された注文に一致します。
  • #.eueuで終わるイベントに一致します。
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='events_topic', exchange_type='topic')

channel.basic_publish(
    exchange='events_topic',
    routing_key='order.created.us',
    body=b'注文9001が作成されました'
)

connection.close()

コンシューマーがより大きなイベントストリームのフィルタリングされたサブセットを必要とする場合、例えばservice.environment.severityentity.action.regiontenant.event.priorityなどにトピックを選択します。

クイック選択ガイド

ニーズ 交換タイプ
完全一致ルーティング ダイレクト errorinvoice.created
すべてのサブスクライバーにブロードキャスト ファンアウト キャッシュパージイベント
ワイルドカードルーティング トピック orders.*.uslogs.#

一貫性のないルーティングキーのダンプ場としてトピック交換を使用しないでください。早期に命名規則を選び、文書化し、コンシューマーのフィルタリング方法に応じて、単語を広いものから具体的なものへ、またはエンティティからアクションへと順序付けしてください。

まとめ

正確な宛先にはダイレクト交換、ブロードキャストにはファンアウト交換、コンシューマーがワイルドカードフィルターを必要とする場合はトピック交換を使用します。ルートを1つの正確なラベルとして説明できる場合はダイレクトを使用します。全員がメッセージを必要とする場合はファンアウトを使用します。サブスクライバーがパターンを必要とする場合はトピックを使用します。