効率的なKafkaバッチ戦略のベストプラクティス

batch.size、linger.ms、fetch.min.bytes、max.poll.recordsを使用してKafkaプロデューサーとコンシューマーのバッチ処理を調整します。

効率的なKafkaバッチ戦略のベストプラクティス

Kafkaのバッチ処理は、クライアントがリクエストごとに送信またはフェッチするレコード数を制御します。バッチが小さすぎるとCPUとネットワークのラウンドトリップを浪費し、大きすぎるとレイテンシが増加し、障害時の再試行コストが高くなります。

主な調整項目は、プロデューサーのbatch.sizelinger.ms、コンシューマーのfetch.min.bytesfetch.max.wait.msmax.poll.recordsです。

Kafkaバッチ処理とオーバーヘッドの理解

Kafkaでは、データ転送はTCP/IPを介して行われます。レコードを1つずつ送信すると、TCP確認応答、各リクエストのネットワークレイテンシ、シリアライゼーションとリクエストフレーミングのためのCPU使用率の増加に伴う大きなオーバーヘッドが発生します。バッチ処理は、レコードをローカルに蓄積してから大きな連続した単位として送信することで、この問題を軽減します。これにより、ネットワークの利用効率が大幅に向上し、同じ量のデータを処理するために必要なネットワークトリップの数が劇的に減少します。

プロデューサーバッチ:送信効率の最大化

プロデューサーバッチは、パフォーマンスチューニングにおいて最も影響力のある領域です。目標は、バッチサイズがネットワークコストを償却するのに十分大きく、かつエンドツーエンドのレイテンシに許容できない影響を与えない最適なポイントを見つけることです。

主要なプロデューサー設定パラメータ

プロデューサーがバッチを作成および送信する方法を決定するいくつかの重要な設定があります。

  1. batch.size: これは、保留中のレコードのためのプロデューサーのメモリ内バッファの最大サイズをバイト単位で定義します。このしきい値に達すると、バッチが送信されます。

    • ベストプラクティス: クライアントのデフォルトから始め、64KBや128KBなどのより大きな値をテストします。非常に大きなバッチはスループットに役立ちますが、レコード、パーティション、レイテンシ目標がそれをサポートしている場合に限ります。
  2. linger.ms: この設定は、新しいレコードが到着した後、不完全なバッチを送信する前に、プロデューサーがバッファを満たすためにより多くのレコードを待機する時間(ミリ秒)を指定します。

    • トレードオフ: linger.msが高いとバッチサイズが大きくなり(スループット向上)、個々のメッセージのレイテンシが増加します。
    • ベストプラクティス: スループット重視のワークロードでは、5〜20msの短い待機時間をテストします。低レイテンシのアプリケーションでは、この値を低く保ち、小さなバッチを受け入れます。
  3. buffer.memory: この設定は、単一のプロデューサーインスタンスのすべてのトピックとパーティションにわたって、未送信のレコードをバッファリングするために割り当てられる総メモリを設定します。バッファがいっぱいになると、後続のsend()呼び出しはブロックされます。

    • ベストプラクティス: アクティブなすべてのパーティションのピークバーストに対応できる十分な大きさに保ちます。いっぱいになると、send()max.block.msまでブロックされ、その後失敗する可能性があります。

プロデューサーバッチ設定例(Java)

Properties props = new Properties();
props.put("bootstrap.servers", "kafka-broker:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// パフォーマンスチューニングパラメータ
props.put("linger.ms", 10); // 最大10ms待機してレコードを収集
props.put("batch.size", 65536); // 目標64KBのバッチサイズ
props.put("buffer.memory", 33554432); // 32MBの総バッファスペース

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

コンシューマーバッチ:効率的なプルと処理

プロデューサーバッチが効率的な送信に焦点を当てるのに対し、コンシューマーバッチは受信と処理のワークロードを最適化します。コンシューマーはパーティションからデータをバッチでプルし、これを最適化することでブローカーへのネットワーク呼び出しの頻度を減らし、アプリケーションスレッドが必要とするコンテキストスイッチを制限します。

主要なコンシューマー設定パラメータ

  1. fetch.min.bytes: これは、単一のフェッチリクエストでブローカーが返すべきデータの最小量(バイト単位)です。ブローカーは、少なくともこの量のデータが利用可能になるか、fetch.max.wait.msタイムアウトに達するまで応答を遅延させます。

    • 利点: これにより、コンシューマーはプロデューサーバッチと同様に、より大きなデータのチャンクを要求するようになります。
    • ベストプラクティス: スループットがレイテンシよりも重要な場合に増やします。fetch.max.wait.msと組み合わせて、静かな期間にブローカーが長時間待機しないようにします。
  2. fetch.max.bytes: これは、コンシューマーが単一のフェッチリクエストで受け入れるデータの最大量(バイト単位)を設定します。これは、コンシューマーの内部バッファが圧倒されるのを防ぐための上限として機能します。

  3. max.poll.records: これはアプリケーションのスループットにとって重要です。consumer.poll()の1回の呼び出しで返されるレコードの最大数を制御します。

    • コンテキスト: コンシューマーアプリケーション内のループでレコードを処理する場合、この設定はポーリングループの1回の反復で処理される作業の範囲を制限します。
    • ベストプラクティス: 多くのパーティションと大量のデータがある場合、この値を増やすと(例:500から1000以上)、コンシューマースレッドはpoll()を再度呼び出す前にポーリングサイクルごとにより多くのデータを処理でき、ポーリングのオーバーヘッドが削減されます。

コンシューマーポーリングループの例

レコードを処理する際は、max.poll.recordsを尊重して、ポーリングごとの作業量とリバランスへの迅速な対応能力のバランスを維持します。

while (running) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    // max.poll.recordsが1000に設定されている場合、このループは最大1000回実行されます
    for (ConsumerRecord<String, String> record : records) {
        process(record);
    }
    // バッチ処理後にオフセットをコミット
    consumer.commitSync();
}

max.poll.recordsに関する警告: これを高く設定しすぎると、コンシューマーのリバランス中に問題が発生する可能性があります。リバランスが発生した場合、コンシューマーは現在のpoll()で取得したすべてのレコードを処理してからでないと、グループを正常に離脱できません。バッチが過度に大きいと、セッションタイムアウトが長くなり、不必要なグループの不安定性を引き起こす可能性があります。

高度なバッチ処理の考慮事項

バッチ処理の最適化は、特定のワークロード特性(レコードサイズ、スループット目標、許容レイテンシ)に依存する反復的なプロセスです。

1. レコードサイズの変動

メッセージのサイズが大きく異なる場合、固定のbatch.sizeでは不均一なバッチ処理が発生する可能性があります。いくつかの大きなレコードがバッチをすぐに満たす一方で、小さなレコードは効率的にグループ化するためにlinger.msが必要になる場合があります。

  • ヒント: メッセージが一貫して大きい場合は、低いlinger.msをテストし、リクエストレイテンシ、バッファの可用性、ブローカーのリクエストメトリクスを監視します。

2. 圧縮

バッチ処理と圧縮はうまく連携します。大きなバッチを圧縮すると、通常、小さなリクエストを圧縮するよりも良い圧縮率が得られます。snappylz4、またはzstdを検討し、クライアントとブローカーのCPUコストを測定します。

3. 冪等性と再試行

厳密にはバッチ処理ではありませんが、enable.idempotence=trueを確保することは重要です。大きなバッチを送信する場合、一時的なネットワークエラーがレコードのサブセットに影響を与える可能性が高まります。冪等性により、プロデューサーが一時的な障害のためにバッチの再送信を試みた場合、Kafkaがメッセージを重複排除し、正常な配信時の重複を防ぎます。

バッチ処理最適化の目標

設定 目標 スループットへの影響 レイテンシへの影響
プロデューサー batch.size リクエストあたりのデータ量を最大化 高い増加 中程度の増加
プロデューサー linger.ms 満杯になるまで短時間待機 高い増加 中程度の増加
コンシューマー fetch.min.bytes より大きなチャンクを要求 中程度の増加 中程度の増加
コンシューマー max.poll.records ポーリングオーバーヘッドを削減 中程度の増加 最小限の変化

まず1つのプロデューサーワークロードと1つのコンシューマーグループから始め、一度に1つのバッチ設定を変更し、スループット、p95レイテンシ、再試行、コンシューマーラグを比較します。効率的なKafkaバッチ処理は測定の演習であり、設定して終わりの設定ブロックではありません。