Kafka の Exactly-Once セマンティクスを解き明かす:包括的なガイド
Apache Kafka は、分散イベントストリーミングプラットフォームとして、その耐久性とスケーラビリティで知られています。しかし、分散システムにおいて、メッセージがExactly Once(ちょうど1回)処理されることを保証することは、ネットワークパーティション、ブローカーの障害、アプリケーションの再起動などによって複雑化される、重大な課題です。この包括的なガイドでは、Kafka の Exactly-Once Semantics (EOS) を解き明かし、この重要な信頼性レベルを達成するためにプロデューサーとコンシューマーの両方に必要な基盤となるメカニズムを説明します。
EOS を理解することは、金融取引や在庫更新などのクリティカルな状態変更を扱うアプリケーションにとって不可欠です。これらのアプリケーションでは、重複や欠落したデータは許容されません。冪等な書き込みと正確な消費を保証するために必要な設定とアーキテクチャパターンを探求します。
分散システムにおけるデータ保証の課題
Kafka セットアップにおいて、データ保証を達成するには、主に3つのコンポーネント間の連携が必要です:プロデューサー、ブローカー(Kafka クラスター)、そしてコンシューマー。
データ処理において、通常、3つのレベルの配信セマンティクスが議論されます。
- At-Most-Once(最大1回):メッセージが失われる可能性はありますが、重複することはありません。これは、プロデューサーが障害後にメッセージの送信をリトライしたが、ブローカーが最初の試行を既に正常にログに記録していた場合に発生します。
- At-Least-Once(最低1回):メッセージが失われることはありませんが、重複する可能性があります。これは、プロデューサーが信頼性(つまり、障害時にリトライする)のために設定されている場合のデフォルトの動作です。
- Exactly-Once (EOS)(ちょうど1回):メッセージは失われることも重複することもなく処理されます。これは最も強力な保証です。
EOS の達成には、プロデュース段階とコンサンプション段階の両方における問題の軽減が必要です。
1. Kafka プロデューサーにおける Exactly-Once セマンティクス
EOS の最初の柱は、プロデューサーが Kafka クラスターにデータを Exactly Once で書き込むことを保証することです。これは、冪等プロデューサーとトランザクションの2つの主要なメカニズムによって実現されます。
A. 冪等プロデューサー
冪等プロデューサーは、ネットワークエラーによりプロデューサーが同じバッチの送信をリトライした場合でも、パーティションに送信された単一のレコードバッチが1回だけ書き込まれることを保証します。
これは、ブローカーによってプロデューサーインスタンスに一意の Producer ID (PID) とエポック番号を割り当てることによって有効になります。ブローカーは、各プロデューサー・パーティションのペアに対して正常に確認された最後のシーケンス番号を追跡します。後続のリクエストが、最後に確認された番号以下のシーケンス番号で到着した場合、ブローカーは重複バッチをサイレントに破棄します。
冪等プロデューサーの設定:
この機能を有効にするには、以下のプロパティを設定する必要があります。
acks=all
enable.idempotence=true
acks=all(または-1):書き込みが成功したと見なされる前に、プロデューサーがリーダーおよびすべての同期レプリカ(ISR)が書き込みを確認するまで待機し、耐久性を最大化します。enable.idempotence=true:必要な内部設定(retriesを高い値に設定するなど)を自動的に行い、単一パーティションへの書き込み時にトランザクション保証が暗黙的に有効になることを保証します。
制限事項: 冪等プロデューサーは、単一セッション内、単一パーティションへの Exactly-Once デリバリーのみを保証します。クロスパーティションまたは複数ステップの操作は処理しません。
B. マルチパーティション/マルチトピック書き込みのためのプロデューサートランザクション
複数のパーティション、あるいは複数の Kafka トピックにわたる EOS(例:Topic A から読み込み、処理し、Topic B と Topic C にアトミックに書き込む)のためには、トランザクションを使用する必要があります。トランザクションは、複数の send() 呼び出しをアトミックな単位にグループ化します。グループ全体が成功するか、グループ全体が失敗してアボートされます。
主要なトランザクション設定:
| プロパティ | 値 | 説明 |
|---|---|---|
transactional.id |
ユニークな文字列 | トランザクションに必要な識別子。アプリケーション全体で一意である必要があります。 |
isolation.level |
read_committed |
後述するコンシューマー設定。コミットされたトランザクションデータを読み取るために必要です。 |
トランザクションフロー:
- Init Transactions(トランザクションの初期化): プロデューサーが
transactional.idを使用してトランザクションコンテキストを初期化します。 - Begin Transaction(トランザクションの開始): アトミック操作の開始をマークします。
- Send Messages(メッセージの送信): プロデューサーがさまざまなトピック/パーティションにレコードを送信します。
- Commit/Abort(コミット/アボート): 成功した場合、プロデューサーは
commitTransaction()を発行します。失敗した場合はabortTransaction()を発行します。
プロデューサーがトランザクションの途中でクラッシュした場合、ブローカーはトランザクションが決してコミットされないことを保証し、部分的な書き込みを防ぎます。
2. Kafka コンシューマーにおける Exactly-Once セマンティクス(トランザクショナルコンサンプション)
プロデューサーが Exactly Once で書き込んだとしても、コンシューマーは、そのレコードを Exactly Once で読み込み、処理する必要があります。これは、オフセットコミットと下流の処理ロジックの連携が必要となるため、EOS 実装において伝統的に最も複雑な部分です。
Kafka は、オフセットコミットをプロデューサーのトランザクション境界に統合することによって、トランザクショナルコンサンプションを実現します。これにより、コンシューマーは、同じトランザクション内で結果レコード(もしあれば)を正常に生成した後でのみ、レコードバッチの読み込みをコミットすることを保証します。
コンシューマーの分離レベル
トランザクション出力 を正しく読み取るには、コンシューマーがトランザクション境界を尊重するように設定する必要があります。これは、コンシューマーの isolation.level 設定によって制御されます。
| 分離レベル | 動作 |
|---|---|
read_uncommitted (デフォルト) |
コンシューマーは、アボートされたトランザクションからのレコードを含む、すべてのレコードを読み取ります(下流処理における At-Least-Once 動作)。 |
read_committed |
コンシューマーは、プロデューサートランザクションによって正常にコミットされたレコードのみを読み取ります。コンシューマーが進行中のトランザクションに遭遇した場合、待機するかスキップします。これは、エンドツーエンド EOS に必要です。 |
コンシューマーの設定例:
isolation.level=read_committed
auto.commit.enable=false
auto.commit.enable=false の重要な役割
EOS を目指す場合、手動オフセット管理は必須です。auto.commit.enable=false を必ず設定してください。自動コミットが有効になっていると、コンシューマーは処理が完了する前にオフセットをコミットしてしまう可能性があり、その直後に障害が発生した場合、データ損失や重複につながる可能性があります。
ストリームプロセッサー(読み込み-処理-書き込みループ)
真のエンドツーエンド EOS パイプライン(一般的な Kafka Streams パターン)では、コンシューマーはトランザクションを使用して、入力オフセットコミットと出力生成を連携させる必要があります。
- Start Transaction(トランザクションの開始)(コンシューマーの
transactional.idを使用)。 - Read Batch(バッチの読み込み): 入力トピックからレコードを消費します。
- Process Data(データの処理): データを変換します。
- Write Results(結果の書き込み): 出力トピックに、同じトランザクション内で出力レコードを生成します。
- Commit Offsets(オフセットのコミット): 同じトランザクション内で、入力トピックの読み込みオフセットをコミットします。
- Commit Transaction(トランザクションのコミット)。
いずれかのステップが失敗した場合(例:処理で例外が発生した、または出力書き込みが失敗した)、トランザクション全体がアボートされます。再起動時、コンシューマーは同じ未コミットバッチを再読み込みし、レコードがスキップされたり重複したりしないことを保証します。
EOS 実装のためのベストプラクティス
Exactly-Once Semantics を使用した Kafka アプリケーションを正常にデプロイするには、これらの重要なベストプラクティスに従ってください。
- プロデューサー出力には常にトランザクションを使用する: アプリケーションが Kafka に書き込む場合、EOS が必要な場合は、たとえ1つのパーティションにのみ書き込む場合でも、トランザクションを使用してください。1つのトピック/パーティションにのみ書き込む場合は
enable.idempotence=trueを使用してください。 read_committedコンシューマーを使用する: EOS プロデューサーの出力を読み取るコンシューマーは、isolation.level=read_committedに設定されていることを確認してください。- 自動コミットを無効にする: トランザクションを介した手動オフセット管理は、EOS にとって交渉の余地がありません。
- 安定した
transactional.idを選択する:transactional.idはアプリケーションの再起動をまたいで永続する必要があります。アプリケーションが再起動した場合、ブローカーとのトランザクション状態を復旧するために、同じ ID を使用して再開する必要があります。 - アプリケーションの回復力: 可能であれば、処理ロジック自体が冪等になるように設計してください。Kafka はブローカーの耐久性を処理しますが、外部データベースやサービスも、潜在的なリトライを適切に処理できるように設計する必要があります。
まとめ
Kafka の Exactly-Once Semantics は、メカニズムの慎重なレイヤリングによって達成されます。単一バッチの信頼性のためのプロデューサーの冪等性、アトミックな複数ステップ操作のためのトランザクション API、そしてプロデューサーのトランザクション境界に統合された調整されたオフセットコミットです。プロデューサーで enable.idempotence=true(簡単なケース)を設定するか、トランザクション ID(複雑なフローの場合)を設定し、コンシューマーで isolation.level=read_committed を設定して自動コミットを無効にすることにより、開発者はデータ整合性の最高保証を持つ、堅牢でステートフルなストリーミングアプリケーションを構築できます。