一般的なRabbitMQのメッセージパターンとその使用時期は?
不可欠なメッセージングパターンを習得して、RabbitMQの可能性を解き放ちましょう。このガイドでは、ワークキュー(タスクの分配と負荷分散のため)、Publish/Subscribe(システムイベントのブロードキャストのため)、Request/Reply(同期呼び出しをシミュレートするため)の構造、ユースケース、および実装のヒントを詳述します。メッセージ確認応答、フェアディスパッチ(QOS)、そして特殊なエクスチェンジ(Fanout、Direct、Topic)といった重要な概念を学び、RabbitMQを使用して、高い拡張性、疎結合性、信頼性を持つアプリケーションを設計しましょう。
一般的なRabbitMQメッセージパターンとその使用タイミングとは?
RabbitMQのメッセージパターンは、1つのワーカーがジョブを処理するか、すべてのサブスクライバーがイベントを受信するか、1つのサービスが応答を待つかを決定します。間違ったパターンを選択すると、システムが作業を重複させたり、有用なイベントを失ったり、本来非同期であるべき呼び出しでブロックしたりする可能性があります。
RabbitMQは、交換機、キュー、バインディング、確認応答、メッセージプロパティを提供します。設計上の有用な作業は、これらの要素をアプリケーションにどのように適合させるかを選択することです。一般的なパターンは、ワークキュー、パブリッシュ/サブスクライブ、DirectまたはTopicルーティング、リクエスト/リプライです。
ワークキュー:ワーカー間でのジョブ分散
ワークキューパターンは、タスクキューとも呼ばれ、複数のワーカープロセス(コンシューマー)間で時間のかかるタスクを分散するために使用される、最もシンプルで一般的なメッセージパターンです。
仕組み
- プロデューサーがタスク(メッセージ)を1つのキューに送信します。
- 複数のコンシューマー(ワーカー)が同じキューをリッスンします。
- RabbitMQは各メッセージを1つのコンシューマーに配信するため、ワーカー間でバックログが共有されます。
デフォルトでは、RabbitMQはアクティブなコンシューマー間でメッセージをラウンドロビンでディスパッチします。1つのワーカーが遅いジョブを処理し、別のワーカーが速いジョブを処理する場合、この分散は必ずしも公平ではありません。そのため、通常はこのパターンを確認応答とプリフェッチ制限と組み合わせます。
確認応答を使用する
手動確認応答を使用すると、ワーカーはタスクが完了したことをRabbitMQに通知します。basic_ackを送信する前にワーカーがダウンした場合、RabbitMQはそのメッセージを再キューに入れて別のコンシューマーに再配信できます。これにより、レポート生成、画像処理、課金ジョブ、またはサイレントにドロップしたくないタスクに対してワークキューが有用になります。
プリフェッチ数を設定する
basic.qosは、コンシューマーが一度に保持できる未確認メッセージの数を制御します。遅くて不均一なジョブの場合、prefetch_countを1に設定するのが安全な開始点です。RabbitMQは、最初のジョブが確認されるまで、そのコンシューマーに2番目のジョブを送信しないためです。より高速なジョブの場合は、スループットとメモリ使用量を測定した後、値を上げても構いません。
実装例(概念)
# フェアディスパッチのためのコンシューマー設定
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=worker_function)
# ワーカーロジックは、正常に処理された後に確認応答を送信する必要があります
worker_function(ch, method, properties, body):
# タスクを処理...
ch.basic_ack(delivery_tag=method.delivery_tag)
パブリッシュ/サブスクライブ:イベントのブロードキャスト
Pub/Subパターンは、複数の関心のあるコンシューマーに同時にメッセージをブロードキャストするように設計されています。ワークキューでは各メッセージが1つのワーカーによってのみ消費されるのに対し、Pub/Subでは接続されているすべてのサブスクライバーがメッセージのコピーを受信します。
Fanout交換機を使用する
プロデューサーはFanout交換機にパブリッシュします。交換機はルーティングキーを無視し、メッセージをバインドされているすべてのキューにコピーします。各サブスクライバーは通常、独自のキューを持っているため、1つのログサービス、1つのメトリクスサービス、1つの監査サービスが、競合することなく同じイベントを受信できます。
ユースケース
- リアルタイム通知: メンテナンスモードイベントをすべてのアプリインスタンスにブロードキャストします。
- ログ配信: 同じログイベントをアーカイブサービスとアラートサービスの両方に送信します。
- キャッシュ無効化: データベース変更後、すべてのサービスインスタンスにローカルキャッシュをクリアするよう通知します。
実装のヒント
短期間のサブスクライバーの場合は、排他的で自動削除されるキューを作成し、Fanout交換機にバインドします。永続的なサブスクライバーの場合は、永続キューを使用して、サブスクライバーがオフライン中に到着したメッセージを後で読み取れるようにします。
DirectおよびTopicルーティング:イベントの選択的送信
Fanout交換機が無差別なブロードキャストを提供するのに対し、AMQPは選択的なパブリッシュのための交換機を提供し、Pub/Subモデルを拡張します。
Direct交換機
メッセージは、メッセージのルーティングキーとキューのバインディングキーの完全一致に基づいてキューにルーティングされます。これは、異なるタイプのコンシューマーを具体的にターゲットにする必要がある場合に便利です。
たとえば、ログパブリッシャーは、error、warning、infoなどのルーティングキーを持つメッセージを送信できます。アラートキューはerrorのみにバインドし、アーカイブキューは3つの重大度すべてにバインドできます。
Topic交換機
これは最も柔軟な交換機タイプであり、バインディングキーとルーティングキーでワイルドカードを使用できます。ルーティングキーは区切り文字付きのリスト(例:ピリオド.を使用)として扱われます。
*は1つの単語に完全一致します。#は0個以上の単語に一致します。
orders.us.createdのようなルーティングキーは、orders.*.createdにバインドされた不正キューと、#.us.#にバインドされた米国運用キューに送信できます。ルーティングルールが単なる固定フィールドではなく、実際のビジネスカテゴリである場合は、Topic交換機を使用します。
リクエスト/リプライ:特定の応答を要求する
リクエスト/リプライパターンを使用すると、クライアントアプリケーションがリクエストメッセージを送信し、ワーカー(サーバー)からの応答を同期的に待機できます。メッセージングは本質的に非同期ですが、このパターンはメッセージバス上で従来のリモートプロシージャコール(RPC)をシミュレートします。
reply_toとcorrelation_idを使用する
- クライアントは
rpc_queueなどのキューにリクエストを送信します。 - クライアントは
reply_toを、自身が消費しているコールバックキューに設定します。 - クライアントは一意の
correlation_idを設定します。 - ワーカーはリクエストを処理し、応答を
reply_toキューにパブリッシュします。 - クライアントは
correlation_idをチェックして、応答を元のリクエストと照合します。
ユースケース
- サービスルックアップ: 別のサービスからユーザープロファイルやフィーチャーフラグを取得します。
- 短い判断: 注文を受け付ける前に在庫を確認します。
注意して使用する
リクエスト/リプライは便利ですが、メッセージングシステムに同期待ちを再び持ち込みます。クライアントのタイムアウトを設定し、重複する応答を処理し、長時間実行されるジョブにRPCを使用しないでください。低速な作業の場合は、コマンドをパブリッシュし、ジョブIDを返し、進行状況や完了イベントを個別に送信します。
概念的なRPCフロー
graph TD
A[クライアント(リクエスター)] -->|1. リクエストメッセージ(reply_to、correlation_idを含む)| B(RPCリクエストキュー);
B --> C[サーバー(ワーカー)];
C -->|2. リクエストを処理|
D[結果];
D -->|3. リプライメッセージ(reply_to経由、correlation_idを保持)| A;
一般的なRabbitMQパターン一覧
| パターン | 交換機タイプ | ルーティングメカニズム | 主な機能 | 主なユースケース |
|---|---|---|---|---|
| ワークキュー | デフォルトまたはDirect | ワーカー間で共有される1つのキュー | 1つのメッセージ、1つのコンシューマー | 長時間実行タスクの負荷分散 |
| パブリッシュ/サブスクライブ | Fanout | ルーティングキーを無視 | 1つのメッセージ、すべてのバインドされたキュー | システムブロードキャスト、ログ |
| Directルーティング | Direct | ルーティングキーの完全一致 | コンシューマーの選択的ターゲティング | 重大度やタイプに基づくルーティング |
| Topicルーティング | Topic | ワイルドカード一致(*、#) |
柔軟で複雑なルーティング | マイクロサービス通信、イベントストリーム |
| リクエスト/リプライ(RPC) | Direct(応答用) | reply_toとcorrelation_idを使用 |
同期的なAPI呼び出しをシミュレート | 即時サービスルックアップ、小規模トランザクション |
まとめ
通信の形状から始めましょう。正確に1つのワーカーがジョブを処理する必要がある場合はワークキュー、すべてのサブスクライバーがイベントを確認する必要がある場合はPub/Sub、一部のサブスクライバーのみがイベントを確認する必要がある場合はDirectまたはTopicルーティング、そして呼び出し元が本当に即時の応答を必要とする場合にのみリクエスト/リプライを使用します。