Kafkaパイプラインにおけるコンシューマレイテンシのトラブルシューティング
Apache Kafkaパイプラインにおけるコンシューマレイテンシの診断と解決方法を解説します。この実践ガイドでは、コンシューマラグの発生メカニズムを詳述し、フェッチタイミング(`fetch.min.bytes`、`fetch.max.wait.ms`)、バッチサイズ(`max.poll.records`)、オフセットコミット戦略など、Kafkaコンシューマプロパティの調整可能な設定を提供します。コンシューマの並列性を効果的に拡張し、低レイテンシでリアルタイムなイベント処理を維持する方法を学びます。
Kafkaパイプラインにおけるコンシューマレイテンシのトラブルシューティング
コンシューマレイテンシが高いとは、アプリケーションがレコードを処理し終える前に、Kafka内でレコードが利用可能になっている状態を指します。この遅延は、コンシューマラグ、ダッシュボードの古い表示、アラートの遅延、または下流ジョブが期待されるウィンドウを逃すという形で現れることがあります。厄介なのは、Kafka自体は正常でもパイプラインが遅い場合があることです。コンシューマがデータベースを待っていたり、ポーリングごとに処理が多すぎたり、オフセットをコミットしすぎたり、処理の一時停止が長引くことでリバランスが発生している可能性があります。
このガイドでは、まずコンシューマ側から説明します。なぜなら、レイテンシの問題のほとんどはコンシューマ側で顕在化するからです。目標は、設定を変更する前に、遅いセグメントを見つけることです。
コンシューマラグとレイテンシの理解
コンシューマラグは、レイテンシ問題を示す主要なメトリクスです。これは、パーティションにプロデュースされた最新のオフセットと、コンシューマグループが正常に読み取り、コミットしたオフセットとの差を表します。ラグが大きいということは、コンシューマが遅れていることを意味します。
監視すべき主要メトリクス:
- コンシューマラグ: パーティションごとの未読メッセージの総数。
- フェッチレート vs プロデュースレート: コンシューマのフェッチレートがプロデューサのレートに一貫して追いつかない場合、ラグは増加します。
- コミットレイテンシ: コンシューマが進捗をチェックポイントするのにかかる時間。
フェーズ1: コンシューマのフェッチ動作の分析
レイテンシが高い最も一般的な原因は、非効率なデータ取得です。コンシューマはブローカからデータをプルする必要があり、設定が最適でない場合、待機に時間がかかりすぎたり、取得するデータが少なすぎたりする可能性があります。
fetch.min.bytes と fetch.max.wait.ms の調整
これらの2つの設定は、コンシューマがフェッチを要求する前にデータが蓄積されるのを待つ量に直接影響し、レイテンシとスループットのバランスを取ります。
fetch.min.bytes: ブローカが返すべき最小データ量(バイト単位)。値を大きくするとバッチ処理が促進され、スループットは向上しますが、必要なサイズがすぐに利用できない場合、レイテンシがわずかに増加する可能性があります。- ベストプラクティス: 高スループット、低レイテンシのパイプラインでは、即座に返却されるように比較的低く(例:1バイト)設定するか、スループットのボトルネックが観察される場合に調整します。
fetch.max.wait.ms: ブローカがfetch.min.bytesを蓄積するために応答を待つ最大時間。待機時間が長いとバッチサイズが最大化されますが、必要なボリュームが存在しない場合、レイテンシに直接追加されます。- トレードオフ: この時間を短くする(例:デフォルトの500msから50msに)と、レイテンシは大幅に低下しますが、フェッチが小さく非効率になる可能性があります。
max.poll.records の調整
この設定は、1回のConsumer.poll()呼び出しで返されるレコード数を制御します。
max.poll.records=500
max.poll.recordsが低すぎると、コンシューマは大量のデータを処理せずにpoll()呼び出しをループするのに過剰な時間を費やし、オーバーヘッドが増加します。高すぎると、大きなバッチの処理にセッションタイムアウトよりも時間がかかり、不要なリバランスを引き起こす可能性があります。
実用的なヒント: 100から500程度の中程度の値から始め、各ポーリングの実際の処理時間を監視してください。推測で調整しないでください。各レコードが遅いAPIに書き込むため、500レコードのバッチ処理に4分かかる場合、max.poll.recordsを増やすとコンシューマの安定性が低下し、高速化にはなりません。
フェーズ2: 処理時間とコミットの調査
データが迅速にフェッチされても、フェッチされたバッチの処理にかかる時間がフェッチ間の時間を超えると、レイテンシが高くなります。
処理ロジックのボトルネック
コンシューマアプリケーションのロジックに、消費ループ内で並列化されていない*重い外部呼び出し(例:データベース書き込み、APIルックアップ)が含まれている場合、処理時間が膨れ上がります。
トラブルシューティング手順:
- 処理時間の測定: メトリクスを使用して、バッチを受信してからコミット前にすべての下流操作を完了するまでの経過時間を追跡します。
- 並列化: 処理が遅い場合は、コンシューマアプリケーション内で内部スレッドプールを使用して、ポーリング後、オフセットをコミットする前にレコードを並行して処理することを検討してください。
コミット戦略の見直し
オフセットコミットが頻繁すぎると、各コミットにKafkaとの調整が必要なため、レイテンシが発生する可能性があります。ただし、より大きなリスクは通常、正確性です。早すぎるコミットはクラッシュ後に作業を失う可能性があります。遅すぎるコミットはクラッシュ後に作業を再実行する可能性があります。
enable.auto.commit: 単純なリーダー、実験、重要でないパイプラインには問題ありません。データベースを更新したり、APIを呼び出したり、派生イベントを公開したりする本番コンシューマの場合、手動コミットの方が通常は理解しやすいです。auto.commit.interval.ms: オフセットがコミットされる頻度を指定します(デフォルトは5秒)。
処理が高速で安定している場合、より長い間隔(例:10〜30秒)はコミットのオーバーヘッドを削減します。ただし、アプリケーションが頻繁にクラッシュする場合、より短い間隔は進行中の作業をより多く保持しますが、ネットワークトラフィックと潜在的なレイテンシが増加します。
手動コミットに関する警告: 手動コミット(
enable.auto.commit=false)を使用する場合、commitSync()は控えめに使用してください。commitSync()はコミットが確認されるまでコンシューマスレッドをブロックするため、すべてのメッセージまたは小さなバッチの後に呼び出すと、レイテンシに深刻な影響を与えます。
フェーズ3: スケーリングとリソース割り当て
設定が最適化されているように見える場合、根本的な問題は並列性の不足またはリソースの飽和である可能性があります。
コンシューマスレッドのスケーリング
Kafkaコンシューマは、消費するパーティションの数まで、グループ内のコンシューマインスタンスの数を増やすことでスケーリングします。20のパーティションと5つのコンシューマインスタンスがある場合、Kafkaは通常、各コンシューマに複数のパーティションを割り当てます。これは完全に正常です。制限は、1つのコンシューマグループ内の1つのパーティションは、一度に1つのコンシューマによってのみ処理されるため、グループメンバーを追加するだけでは、1つのホットパーティションを修正できないことです。
経験則: コンシューマインスタンスの数は、通常、サブスクライブするすべてのトピックのパーティション数を超えてはなりません。インスタンスがパーティションよりも多いと、アイドル状態のスレッドが発生します。
ブローカとネットワークの健全性
レイテンシはコンシューマコードの外部で発生する可能性があります。
- ブローカのCPU/メモリ: ブローカが過負荷の場合、フェッチリクエストへの応答時間が増加し、コンシューマのタイムアウトと遅延が発生します。
- ネットワークの飽和: コンシューマとブローカ間のネットワークトラフィックが多いと、特に大きなバッチをフェッチする場合にTCP転送が遅くなる可能性があります。
監視ツールを使用して、ラグが大きい期間中のブローカのCPU使用率とネットワークI/Oを確認してください。
ラグの形状を読む
ラグの形状は、どこを調べるべきかを示します。単一のパーティションが遅れている場合、通常は問題が狭い範囲に限定されています。キーが1つのパーティションに過剰なトラフィックをルーティングしている可能性があります。1つのレコードが遅いコードパスをトリガーしている可能性があります。そのパーティション割り当てを実行しているホストが正常でない可能性があります。そのような状況では、コンシューマを追加しても効果がない場合があります。なぜなら、Kafkaは同じグループ内の複数のコンシューマにその1つのパーティションを分割できないからです。
すべてのパーティションにわたって均等にラグがある場合は、共有された制限を示しています。サービスにより多くのインスタンスが必要か、下流のデータベースが飽和しているか、ブローカがフェッチの提供に時間がかかっている可能性があります。ラグが毎時同じ時間に急増する場合は、スケジュールされたジョブ、バッチプロデューサ、コンパクション圧力、バックアップ、またはオートスケーリングイベントを探してください。Kafkaのレイテンシは、多くの場合、Kafka外部の何かの副作用です。
また、「遅れているレコード数」と「遅れている時間」を区別してください。イベントが小さいトピックでは、レコード数は恐ろしく見えても、数秒で追いつく場合があります。レコードが大きいか処理にコストがかかるトピックでは、ラグカウントは小さくても、ビジネス上の遅延が数分を表す場合があります。監視スタックがレコードのタイムスタンプからラグ時間を推定できる場合は、オフセットラグの横にグラフ化してください。できない場合は、一時的なグループでkafka-console-consumer.shを使用していくつかのレコードをサンプリングし、イベントタイムスタンプと壁時計時間を比較してください。
逆効果になる一般的な修正
最初の悪い修正は、リバランスが止まるまでmax.poll.interval.msを上げることです。これは、処理が本質的に長い場合には有効ですが、停止したコンシューマをより長く隠す可能性もあります。コンシューマが下流の呼び出しで20分間スタックしている場合、間隔を大きくすると回復が遅れます。
2番目の悪い修正は、キーイングモデルを確認せずにインシデント中にパーティションを増やすことです。より多くのパーティションは将来の並列性を向上させる可能性がありますが、新しいレコードのパーティション割り当てを変更し、順序の前提に影響を与える可能性があります。また、既存のパーティションにすでにあるレコードを分割することはありません。
3番目の悪い修正は、ダッシュボードを正常にするために--to-latestオフセットリセットに切り替えることです。これにより作業がスキップされます。ビジネスがそれを受け入れる場合もあります。例えば、障害中の使い捨て分析イベントなどです。請求、フルフィルメント、セキュリティアラート、またはユーザーに見える状態変更の場合、遅延レコードをスキップすると、レイテンシ自体よりもはるかに大きなインシデントが発生する可能性があります。
コンシューマのスケーリングが有効な場合
スケーリングが有効なのは、グループがアクティブなコンシューマよりも多くのパーティションを持ち、作業がそれらのパーティション間で適切に分散されている場合です。トピックに24のパーティションと6つのコンシューマがある場合、12のコンシューマに移行すると、各インスタンスが処理するパーティションが少なくなるため、レイテンシが低下する可能性があります。24のコンシューマから40のコンシューマに移行しても、同じグループには役立ちません。割り当て可能なパーティションが24しかないため、追加のコンシューマはアイドル状態になります。
すべてのコンシューマが同じ飽和した依存関係を待機している場合、スケーリングはあまり役立ちません。すべてのコンシューマがすでにロックバインドされている1つのデータベーステーブルに書き込む場合、コンシューマを増やすと競合が増加し、レイテンシが悪化する可能性があります。その場合、書き込みのバッチ処理、インデックスの変更、バックプレッシャーの追加、またはホットなワークロードの分離が、Kafkaの設定よりも重要になる場合があります。
スケーリング中のリバランスに注意してください。コンシューマの起動と停止を過度に積極的に行うローリングデプロイは、最終的なレプリカ数が正しい場合でも、レイテンシのスパイクを引き起こす可能性があります。group.instance.idを使用した静的メンバーシップは、一部の長時間実行サービスで不要なパーティション移動を減らすことができますが、注意深いインスタンスID管理が必要です。協調リバランスは、クライアントとアサイナの設定に応じて、 eagerリバランスと比較して中断を減らすこともできます。
レイテンシが実際に保持期間のリスクになる場合
ラグがトピックの保持ウィンドウに近づくと、高レイテンシは緊急の問題になります。Kafkaは、すべてのコンシューマが読み取ったかどうかではなく、保持ポリシーに基づいて古いセグメントを削除します。コンシューマが7日間データを保持するトピックで6時間遅れている場合、アプリケーションを修復する時間はあります。同じトピックで6日遅れている場合、最も古い未読レコードが期限切れになる前に回復計画が必要です。
そのようなインシデントの間は、キャッチアップ率を推定してください。グループが1分あたり50,000レコードのラグを減らし、500万レコード遅れている場合、実行可能なウィンドウ内で追いつく可能性があります。ラグがまだ増加している場合、グループは回復していません。プロデューサを一時停止したり、一時的なコンシューマ容量を追加したり、ホットパスから遅い下流の依存関係を削除したり、どのデータをスキップできるかについて意識的な決定を下す必要があるかもしれません。
最良のコンシューマレイテンシ監視は、運用遅延と保持余力の両方を示します。「このグループは20分遅れている」は有用です。「このグループは未読データの期限切れまで18時間ある」は、適切な人材を招集する数字です。
実用的なレイテンシランブック
まず、合計ラグだけでなく、パーティションレベルのラグから始めてください。
kafka-consumer-groups.sh --bootstrap-server kafka-1:9092 --describe --group realtime-enricher
ラグが1つのパーティションに集中している場合は、キーの偏りや、他のインスタンスよりも遅いコンシューマインスタンスを探してください。ラグが均等に分散している場合は、共有ボトルネック(コンシューマが少なすぎる、下流の呼び出しが遅い、ブローカのフェッチレイテンシ、または通常の容量を超えたプロデューサのスパイク)を探してください。コマンドを1〜2分間隔で2回実行して、グループが追いついているのか、さらに遅れているのかを確認してください。
次に、アプリケーション内の4つのタイミングを測定します。poll()での待機時間、返されたレコードの処理時間、下流システムへの書き込み時間、オフセットのコミット時間です。これらの数値は、どの設定が重要かを示します。トラフィックが少ないときにpoll()の待機時間が長すぎる場合は、fetch.max.wait.msを減らすか、fetch.min.bytesを低く保ちます。処理が支配的な場合、Kafkaのフェッチ設定は無関係です。コミットが支配的な場合、同期コミットですべてのレコードをコミットするのをやめてください。
低レイテンシサービスの場合、私は通常、控えめなフェッチバッチから始め、ブローカまたはネットワークのオーバーヘッドが明らかに問題である場合にのみ増やします。
fetch.min.bytes=1
fetch.max.wait.ms=50
max.poll.records=100
enable.auto.commit=false
これは普遍的な最適設定ではありません。これは読みやすい出発点です。バッチETLコンシューマは、より大きなフェッチとより大きなmax.poll.recordsを好む場合があります。不正スコアリングサービスは、1つの遅いAPI呼び出しがバッチ全体を保留にする可能性があるため、より小さなバッチを好む場合があります。
poll()の後にワーカースレッドを追加する場合は特に注意してください。並列処理は役立ちますが、オフセットは、関連するパーティションの以前のすべてのレコードが安全に処理された後にのみコミットする必要があります。ワーカースレッドが順不同で終了し、最も高いオフセットを早すぎるタイミングでコミットすると、クラッシュ時に処理中のレコードが静かにスキップされる可能性があります。一般的なパターンは、パーティションごとの完了を追跡し、連続して完了した最も高いオフセットのみをコミットすることです。
チェックリストはシンプルです。パーティションごとにラグを検査し、アプリケーションフェーズを測定し、フェッチ動作が問題である場合にのみフェッチ動作を調整し、追加のインスタンスを使用するのに十分なパーティションがある場合にのみコンシューマをスケーリングします。この順序により、ほとんどの無駄な調整作業を防ぐことができます。