Redisリストをメッセージキューとして使う方法(LPUSH, RPOP)

Redisリストを強力なメッセージキューシステムに変換する方法を学びます。このチュートリアルでは、基本的なLPUSHおよびRPOPコマンドを解説し、タスクをキューに入れ、ワーカーが確実にデキューして処理する方法を実演します。実践的なPythonの例を探求し、Redisを使用した堅牢なFIFOベースのメッセージキューを非同期タスク処理のために構築するための重要な考慮事項を発見します。

Redisリストをメッセージキューとして使う方法(LPUSH, RPOP)

Redisリストは、RabbitMQ、Kafka、または本格的なバックグラウンドジョブフレームワークほど重厚なものが必要ない場合に、便利な小さなメッセージキューとして機能します。一般的なパターンはシンプルです。プロデューサーはLPUSHでジョブを追加し、ワーカーはRPOPまたはBRPOPでジョブを取得します。

このシンプルさこそが、人々がこれを利用する理由です。WebリクエストはメールジョブをRedisに投入し、素早く応答を返せます。ワーカーはそのジョブをすぐにピックアップできます。ブローカーのトポロジー、エクスチェンジ、トピック、新しい運用スタックは必要ありません。ただし、トレードオフについて正直になる必要があります。単純なRPOPは、ワーカーが作業を完了する前にメッセージを削除します。ワーカーが不適切なタイミングでクラッシュすると、確認応答パターンを構築しない限り、そのジョブは失われます。

キューとしてのRedisリストの理解

Redisリストは、文字列の順序付きコレクションです。これは要素のシーケンスと見なすことができ、Redisはリストの先頭または末尾から要素を追加または削除するコマンドを提供します。この両端性により、リストは本質的にキューの実装に適しています。

  • エンキュー(メッセージの追加): リストの一端にメッセージをプッシュすることで、キューに新しいメッセージを追加できます。LPUSHコマンドは、リストの先頭(左側)に要素をプッシュします。
  • デキュー(メッセージの処理): リストのもう一端からメッセージをポップすることで、キューからメッセージを取得して削除できます。RPOPコマンドは、リストの末尾(右側)から要素をポップします。

この特定の組み合わせ(エンキューにLPUSH、デキューにRPOP)により、先入れ先出し(FIFO)キューが作成されます。これはメッセージキューで最も一般的で期待される動作です。

コアコマンド:LPUSH と RPOP

Redisメッセージキューの中核をなす2つの主要なコマンドについて詳しく見ていきましょう。

LPUSH key value [value ...]

LPUSHコマンドは、keyに格納されているリストの先頭(左側)に1つ以上の文字列値を挿入します。keyが存在しない場合は、新しいリストが作成され、値が挿入されます。

例:

メール送信など、処理が必要なタスクがあるとします。このタスクをemail_tasksという名前のRedisリストにメッセージとしてプッシュできます。

# 単一のメールタスクをプッシュ
LPUSH email_tasks "{'to': '[email protected]', 'subject': 'Welcome!', 'body': 'Thanks for signing up!'}"

# 別のタスクをプッシュ。これは前のタスクの前に配置されます
LPUSH email_tasks "{'to': '[email protected]', 'subject': 'New User Registration', 'body': 'A new user has registered.'}"

これらのコマンドの後、email_tasksリストは次のようになります(先頭から末尾へ):

1) "{'to': '[email protected]', 'subject': 'New User Registration', 'body': 'A new user has registered.'}"
2) "{'to': '[email protected]', 'subject': 'Welcome!', 'body': 'Thanks for signing up!'}"

RPOP key

RPOPコマンドは、keyに格納されているリストの最後の要素(末尾、右側)を削除して返します。リストが空の場合はnilを返します。

例:

ワーカープロセスは、RPOPを使用してemail_tasksリストに新しいタスクがないか定期的にポーリングできます。

# ワーカーがタスクの取得を試みる
RPOP email_tasks

リストが空でない場合、RPOPはプッシュされた最後の要素(つまり、末尾から最初の要素)を返します。上記の例では、最初のRPOP呼び出しは以下を返します:

"{'to': '[email protected]', 'subject': 'Welcome!', 'body': 'Thanks for signing up!'}"

後続の呼び出しでは、末尾から次の利用可能なタスクが取得されます。

基本的なメッセージキューシステムの構築

LPUSHRPOPを使用したシンプルなメッセージキューの典型的なフローを概説しましょう。

1. プロデューサー(タスクのエンキュー)

作業をオフロードする必要があるアプリケーションの任意の部分が、プロデューサーとして機能します。メッセージ(多くの場合、タスクの詳細を表すJSON文字列)を構築し、LPUSHを使用してRedisリストにプッシュします。

プロデューサーのロジック(概念的なPythonの例):

import redis
import json

r = redis.Redis(host='localhost', port=6379, db=0)

def send_email_task(to_email, subject, body):
    task_message = {
        'type': 'send_email',
        'payload': {
            'to': to_email,
            'subject': subject,
            'body': body
        }
    }
    # LPUSHはリスト'email_queue'の先頭に追加します
    r.lpush('email_queue', json.dumps(task_message))
    print(f"Pushed email task to queue: {to_email}")

# 使用例:
send_email_task('[email protected]', 'Hello from Producer', 'This is a test message.')
send_email_task('[email protected]', 'Important Update', 'New features available.')

2. コンシューマー(タスクのデキューと処理)

独立して実行されるワーカープロセスは、新しいメッセージがないかRedisリストを継続的に監視します。RPOPを使用してキューからメッセージを取得し、削除します。

コンシューマーのロジック(概念的なPythonの例):

import redis
import json
import time

r = redis.Redis(host='localhost', port=6379, db=0)

def process_tasks():
    while True:
        # RPOPはリスト'email_queue'の末尾からメッセージを取得しようとします
        message_bytes = r.rpop('email_queue')
        if message_bytes:
            message_str = message_bytes.decode('utf-8')
            try:
                task = json.loads(message_str)
                print(f"Processing task: {task}")
                # タスク処理をシミュレート
                if task.get('type') == 'send_email':
                    print(f"  -> Sending email to {task['payload']['to']}...")
                    # 実際のメール送信ロジックに置き換えてください
                    time.sleep(1) # 作業をシミュレート
                    print(f"  -> Email sent to {task['payload']['to']}.")
                else:
                    print(f"  -> Unknown task type: {task.get('type')}")
            except json.JSONDecodeError:
                print(f"Error decoding JSON: {message_str}")
            except Exception as e:
                print(f"Error processing task {message_str}: {e}")
        else:
            # メッセージがない場合、再ポーリングする前に少し待機
            # print("No tasks available, waiting...")
            time.sleep(0.5)

if __name__ == "__main__":
    print("Worker started. Waiting for tasks...")
    process_tasks()

プロデューサーを実行すると、メッセージがプッシュされます。コンシューマーを実行すると、メッセージの取得と処理が開始されます。処理の順序は、プッシュされた順序(FIFO)に対応します。これは、LPUSHが先頭に追加し、RPOPが末尾から削除するためです。

信頼性に関する考慮事項

LPUSHRPOPは基本的なキューイングメカニズムを提供しますが、本番環境のキューを構築するには、ワーカーがクラッシュした場合、ジョブが失敗した場合、またはプロデューサーが不正なペイロードを送信した場合に何が起こるかを決定する必要があります。

1. 処理中のメッセージ損失

ワーカープロセスがRPOPでメッセージを削除した、処理を完了するにクラッシュした場合、そのメッセージは失われます。これを防ぐには:

  • タイトなポーリングの代わりにBRPOPを使用する: BRPOPは、リストに要素ができるか、タイムアウトが発生するまでブロックします。これ自体で処理が信頼できるようになるわけではありませんが、ワーカーが空のキューを見つけるためだけに数ミリ秒ごとに起動するのを防ぎます。
    # 右からのブロッキングポップ、タイムアウト0(無期限にブロック)
    BRPOP email_queue 0
    
  • 確認応答のために処理リストを使用する: 一般的なパターンは、メッセージをemail_queueからemail_processingにアトミックに移動し、処理し、作業が成功した場合にのみemail_processingから削除することです。ワーカーがダウンした場合、別のリーパープロセスが処理リスト内の古いアイテムを探し、メインキューに戻すことができます。RPOPLPUSHはこのパターンの古典的なコマンドであり、新しいバージョンのRedisではLMOVE/BLMOVEも提供されています。

2. 失敗したタスクの処理

処理中にタスクが失敗した場合(例:一時的なネットワーク問題や不正なデータが原因)、どうなるでしょうか?

  • 再試行メカニズム: ワーカー内に再試行ロジックを実装します。数回失敗した後、手動検査のためにタスクを'failed_tasks'リストに移動します。
  • デッドレターキュー(DLQ): 繰り返し処理に失敗したメッセージが送られる専用のRedisリスト(またはその他のストレージ)。これはデバッグとリカバリに不可欠です。

3. 複数のコンシューマー

同じキューを消費する複数のワーカーインスタンスがある場合、RPOP(およびBRPOP)は、各メッセージが1つのワーカーによってのみ処理されることを保証します。これは、RPOPがアトミックに要素を削除するためです。

4. メッセージの順序付け

LPUSHRPOPはFIFOキューを作成しますが、この保証は処理ロジックと同じくらい強力です。コンシューマーが適切に処理せずに失敗したメッセージを再キューイングしたり、他の操作を導入したりすると、厳密なFIFO順序が損なわれる可能性があります。

5. ペイロード形式と冪等性

メッセージ本文は小さな契約として扱います。JSONはredis-cliで簡単に検査できるため一般的ですが、Pythonスタイルのシングルクォート辞書ではなく、有効なJSONを使用してください:

{"type":"send_email","id":"email-1842","payload":{"to":"[email protected]","template":"welcome"}}

idフィールドは重要です。ワーカーがタイムアウト後に再試行したり、処理リストから古いジョブが再キューイングされたりすると、同じ論理ジョブが複数回実行される可能性があります。重複が無害になるようにハンドラーを設計してください。メールワーカーの場合、送信前にアプリケーションデータベースにemail-1842を記録し、再試行で別のメッセージが送信される前にそのレコードを確認することを意味するかもしれません。

6. キューの長さとバックプレッシャー

LLEN email_queueでキューの長さを監視します。増加するキューは自動的に悪いわけではありません。単にトラフィックの急増後にワーカーが追いついていることを意味する場合があります。何時間も増加し続けるキューは、通常、プロデューサーがコンシューマーより速い、ワーカーが失敗している、または1つの遅い依存関係がすべてを妨げていることを意味します。

実際には、長さだけでなく経過時間にもアラートを設定するのが好きです。Redisリストはエンキュー時間を個別に保存しないため、ジョブの経過時間が重要な場合はペイロードにタイムスタンプを入れます:

{"type":"resize_image","id":"img-991","created_at":"2026-05-24T08:15:00Z","payload":{"image_id":991}}

そうすれば、ワーカーログから、ジョブが数秒遅れて処理されているのか、数時間遅れて処理されているのかがわかります。これは、実際のインシデントをデバッグする際に、長さだけよりもはるかに有用です。

高度なテクニック(簡単に)

  • RPOPLPUSH あるリストからメッセージをアトミックにポップし、別のリスト(例:'processing'リスト)にプッシュします。これは、確認応答を使用して信頼性の高い処理を実装するための重要なコマンドです。
  • 複数キーでのBLPOP / BRPOP 空でなくなった最初のリストからブロックしてポップします。複数のキューから消費する場合に便利です。
  • Luaスクリプティング: RPOPLPUSHではカバーできない複雑なアトミック操作の場合、Luaスクリプトを使用して、重要なコマンドシーケンスが中断されずに実行されるようにできます。

より信頼性の高いワーカーの形

ベストエフォートの通知よりも重要なものには、単純な「ポップして期待する」ワーカーは避けてください。より安全な形は次のようになります:

RPOPLPUSH email_queue email_processing

ワーカーはメッセージを受信し、Redisはすでにそれをemail_processingに移動しています。メールが送信され、アプリケーションが成功を記録した後、ワーカーはその正確なペイロードを処理リストから削除します:

LREM email_processing 1 '{"type":"send_email","id":"email-1842"}'

それでも完璧なエンタープライズキューではありません。LREMはペイロードと一致する必要があり、大きな処理リストは扱いにくくなり、メッセージが再試行するのに十分古いかどうかを認識するリーパープロセスが必要です。しかし、障害モードを有用な方法で変更します。ワーカーのクラッシュがジョブの唯一のコピーを削除することはなくなります。

このアプローチを使用する場合は、再試行メタデータをメッセージに入れるか、メッセージIDの横にある別のキーに保存します。たとえば、リーパーは古いメッセージを最初の数回はemail_queueに戻し、再試行制限に達した後はemail_failedに移動できます。これにより、同じ不良ペイロードが永久に失敗し続けるのを見る代わりに、ポイズンメッセージを検査する場所が得られます。

Redisリストが間違ったキューである場合

Redisリストは理解しやすいですが、常に適切なツールであるとは限りません。遅延ジョブ、ジョブの優先順位、スケジュールされた再試行、ワークフローの可視性、または長期的な監査履歴が必要な場合、ジョブライブラリまたは専用ブローカーの方が最終的に手間がかからない可能性があります。Redisストリームも検討に値します。これは、データ型に組み込まれたコンシューマーグループと確認応答セマンティクスを備えているためです。

私は、サムネイル生成、キャッシュのウォームアップ、ソースシステムが再試行できるWebhookファンアウト、または1つのアプリケーションが所有する単純なバックグラウンドジョブなど、小さな内部キューにはまだリストが好きです。複数のチームがキュー契約に依存するようになったら、配信期待値を文書化してください。「少なくとも1回」、「最大1回」、「ベストエフォート」は、インシデント中の学術用語ではありません。これらは、重複が許容されるかどうか、メッセージ損失が許容されるかどうか、そしてどれだけのリカバリメカニズムが必要かを決定します。

Redisリストは、小さくて理解しやすいキューが必要で、すでにRedisを運用している場合に適しています。単純なバックグラウンド作業にはLPUSHBRPOPから始めてください。ジョブを失うことが問題になる場合は、処理リスト、再試行回数、デッドレターリストを追加してください。遅延スケジューリング、優先順位、ファンアウト、長期保存、または多くのサービスにわたる強力な配信保証が必要な場合、通常は、増え続けるRedisの慣習に対処するよりも、専用のキューの方が扱いやすくなります。