Kafkaコマンドラインツール完全ガイド:CLIリファレンス

Apache Kafkaのパワーを最大限に引き出す、包括的なコマンドラインインターフェース(CLI)リファレンスガイドです。トピック管理(`kafka-topics.sh`)、メッセージ送信(`kafka-console-producer.sh`)、データ消費(`kafka-console-consumer.sh`)、コンシューマグループの調査(`kafka-consumer-groups.sh`)など、Kafkaの必須コマンドを学びます。実践的なユースケース、引数、効果的なKafka管理とトラブルシューティングのベストプラクティスを詳しく解説します。

Kafkaコマンドラインツール完全ガイド:CLIリファレンス

Kafkaのコマンドラインツールは、基本的な運用上の疑問に最も迅速に答える方法です。このトピックは存在するか、どのブローカーがこのパーティションをリードしているか、トピックの中身は何か、なぜこのコンシューマグループが遅れているのか、このクライアントはクラスタで認証できるか。すべてのタスクにこれらが必要なわけではなく、ほとんどの本番環境の変更は依然として自動化を通じて行うべきですが、デプロイが壊れたときや深夜のデータに関する疑問が生じたとき、CLIは事実への最短経路となることがよくあります。

以下の例では、スクリプトがPATHに含まれていることを前提としています。多くのインストールでは、これらはKafkaのbin/ディレクトリにあり、同じコマンドをbin/kafka-topics.shとして実行することもできます。セキュリティ保護されたクラスタの場合、ほとんどのコマンドには--command-config client.propertiesも必要で、このファイルにはSASL、SSL、その他のクライアント設定が含まれています。

コアKafka CLIツール

Kafkaディストリビューションには通常、さまざまなスクリプトや実行可能ファイルを含むbin/ディレクトリが含まれています。ここでは、Kafkaを効果的に管理するために最も頻繁に使用されるものに焦点を当てます。

1. kafka-topics.sh

これは間違いなく最も頻繁に使用されるコマンドラインツールです。これにより、Kafkaトピックの作成、一覧表示、説明、削除、変更、管理が可能になります。トピック管理は、Kafka内のデータストリームを整理するための基本です。

一般的なサブコマンドと引数:

  • --create:新しいトピックを作成します。
  • --list:クラスタ内のすべてのトピックを一覧表示します。
  • --describe:特定のトピックに関する詳細情報を提供します。
  • --delete:1つ以上のトピックを削除します。
  • --alter:既存のトピックの設定を変更します。
  • --topic <topic_name>:トピック名を指定します。
  • --partitions <num_partitions>:トピックのパーティション数を設定します(--createと併用)。
  • --replication-factor <factor>:トピックのレプリケーションファクターを設定します(--createと併用)。
  • --bootstrap-server <host:port>:接続するKafkaブローカーを指定します。

例:

  • my_topicという名前のトピックを3つのパーティション、レプリケーションファクター2で作成する:

    kafka-topics.sh --create --topic my_topic --partitions 3 --replication-factor 2 --bootstrap-server kafka-broker-1:9092,kafka-broker-2:9092
    
  • クラスタ内のすべてのトピックを一覧表示する:

    kafka-topics.sh --list --bootstrap-server kafka-broker-1:9092
    
  • my_topicという名前のトピックを説明する:

    kafka-topics.sh --describe --topic my_topic --bootstrap-server kafka-broker-1:9092
    

    これにより、パーティション、リーダー、レプリカ、ISR(同期レプリカ)などの詳細が表示されます。

  • old_topicという名前のトピックを削除する:

    kafka-topics.sh --delete --topic old_topic --bootstrap-server kafka-broker-1:9092
    

    注:トピックの削除は、Kafkaブローカーの設定で有効にする必要があります(delete.topic.enable=true)。

2. kafka-console-producer.sh

このツールを使用すると、標準入力からKafkaトピックにメッセージを送信できます。プロデューサーのテスト、サンプルデータの注入、手動でのメッセージ公開に非常に役立ちます。

一般的な引数:

  • --topic <topic_name>:ターゲットトピックを指定します。
  • --bootstrap-server <host:port>:接続するKafkaブローカーを指定します。
  • --property <key>=<value>:プロデューサープロパティを設定できます(例:key.serializervalue.serializer)。
  • --producer-property <key>=<value>--propertyと似ていますが、プロデューサー側の設定専用です。

例:

  • my_topicにメッセージを送信する:

    kafka-console-producer.sh --topic my_topic --bootstrap-server kafka-broker-1:9092
    

    実行後、メッセージを1行ずつ入力できます。Ctrl+Cで終了します。

  • キー付きメッセージを送信する(JSON形式):

    kafka-console-producer.sh --topic my_topic --bootstrap-server kafka-broker-1:9092 --property parse.key=true --property key.separator=':'
    

    これで、key:valueのペアを入力でき、Kafkaは指定されたキーで送信します。

3. kafka-console-consumer.sh

このツールは、1つ以上のKafkaトピックにサブスクライブし、受信したメッセージを標準出力に出力します。コンシューマーのテスト、トピック内のデータの検査、プロデューサー/コンシューマーアプリケーションのデバッグに不可欠です。

一般的な引数:

  • --topic <topic_name>:消費するトピックを指定します。
  • --bootstrap-server <host:port>:接続するKafkaブローカーを指定します。
  • --group-id <group_id>:コンシューマグループIDを指定します。これはオフセットの管理と、複数のコンシューマが消費負荷を共有できるようにするために重要です。
  • --from-beginning:トピックのログの最初からメッセージを読み取ります。
  • --offset <offset>:特定のオフセットから消費を開始します。
  • --partition <partition_id>:特定のパーティションから消費します。
  • --property <key>=<value>:コンシューマプロパティを設定できます(例:value.deserializer)。

例:

  • my_topicからすべてのメッセージを消費する:

    kafka-console-consumer.sh --topic my_topic --bootstrap-server kafka-broker-1:9092
    
  • コンシューマグループmy_groupmy_topicの最初からメッセージを消費する:

    kafka-console-consumer.sh --topic my_topic --group-id my_group --from-beginning --bootstrap-server kafka-broker-1:9092
    
  • オフセットとキーを表示してメッセージを消費する:

    kafka-console-consumer.sh --topic my_topic --bootstrap-server kafka-broker-1:9092 --property print.key=true --property key.separator="\t" --property print.offset=true --property print.headers=true
    

4. kafka-consumer-groups.sh

このツールは、コンシューマグループの管理と検査に使用されます。コンシューマラグの理解、パーティションの再割り当て、消費の問題のトラブルシューティングに不可欠です。

一般的なサブコマンドと引数:

  • --list:クラスタ内のすべてのコンシューマグループを一覧表示します。
  • --describe:特定のコンシューマグループに関する詳細(ラグを含む)を提供します。
  • --bootstrap-server <host:port>:接続するKafkaブローカーを指定します。
  • --group <group_id>:コンシューマグループIDを指定します。
  • --reset-offsets:コンシューマグループのオフセットをリセットします。
  • --topic <topic_name>:オフセットリセットのトピックを指定します。
  • --to-earliest:オフセットを最も古い利用可能なメッセージにリセットします。
  • --to-latest:オフセットを最新の利用可能なメッセージにリセットします。
  • --execute:オフセットリセット操作を実行します。

例:

  • すべてのコンシューマグループを一覧表示する:

    kafka-consumer-groups.sh --list --bootstrap-server kafka-broker-1:9092
    
  • コンシューマグループmy_groupを説明し、そのラグを表示する:

    kafka-consumer-groups.sh --describe --group my_group --bootstrap-server kafka-broker-1:9092
    

    出力には、トピック、パーティション、現在のオフセット、ログエンドオフセット、ラグが表示されます。

  • my_groupmy_topicのオフセットを最も古い利用可能なメッセージにリセットする:

    kafka-consumer-groups.sh --group my_group --topic my_topic --reset-offsets --to-earliest --execute --bootstrap-server kafka-broker-1:9092
    

    このコマンドは、コンシューマがどこから読み取りを開始するかに影響するため、注意して使用してください。

5. kafka-log-dirs.sh

このツールは、Kafkaブローカーのログディレクトリを検査するのに役立ちます。ディスク使用量の理解やトピックデータの特定に役立ちます。

一般的な引数:

  • --bootstrap-server <host:port>:接続するKafkaブローカーを指定します。
  • --topic <topic_name>:特定のトピックのディレクトリを表示するように出力をフィルタリングします。

例:

  • ブローカーのログディレクトリを一覧表示する:

    kafka-log-dirs.sh --bootstrap-server kafka-broker-1:9092
    
  • 特定のトピックのログディレクトリを表示する:

    kafka-log-dirs.sh --bootstrap-server kafka-broker-1:9092 --topic my_topic
    

6. kafka-preferred-replica-election.sh

このスクリプトは、トピックの優先レプリカ選出を開始します。優先レプリカとは、レプリケーションファクターに基づいてパーティションのリーダーとして選択されたブローカーです。ブローカーに障害が発生し、優先レプリカ以外がリーダーになった場合、このツールを使用してリーダーシップを優先レプリカに戻すことができます。

一般的な引数:

  • --topic <topic_name>:優先レプリカを選出するトピックを指定します。
  • --broker-list <broker_id1,broker_id2,...>:カンマ区切りのブローカーIDリストを指定します。
  • --bootstrap-server <host:port>:接続するKafkaブローカーを指定します。

例:

  • my_topicの優先レプリカを選出する:

    kafka-preferred-replica-election.sh --topic my_topic --bootstrap-server kafka-broker-1:9092
    
  • 複数のトピックの優先レプリカを選出する:

    kafka-preferred-replica-election.sh --topic topic1,topic2 --bootstrap-server kafka-broker-1:9092
    

重要な考慮事項とベストプラクティス

  • --bootstrap-serverが鍵: Kafkaクラスタに接続するには、常に正しい--bootstrap-server引数を指定してください。これは通常、ブローカーのhost:portのカンマ区切りリストです。
  • 環境: これらのコマンドは通常、Kafkaインストールのbin/ディレクトリにあります。このディレクトリに移動するか、KafkaのbinディレクトリがシステムのPATHに含まれていることを確認する必要があります。
  • 権限: これらのコマンドを実行するユーザーがKafkaブローカーに到達するために必要なネットワークアクセス権を持っていることを確認してください。
  • 設定: 多くのCLIツールは、--property--producer-property/--consumer-property引数を介してKafkaクライアント設定を受け入れることができます。これは、デフォルトのシリアライザー/デシリアライザーをオーバーライドしたり、他の特定のクライアント設定を設定したりする場合に便利です。
  • セキュリティ: セキュリティ保護されたKafkaクラスタ(SSL/TLSやSASL認証など)の場合、これらのツールに追加のセキュリティ関連の引数(クライアントプロパティファイルを指す--command-configなど)を渡す必要があります。
  • トピックの削除: トピックの削除は機密性の高い操作であり、Kafkaブローカーのserver.propertiesファイルでdelete.topic.enable=trueを使用して明示的に有効にする必要があることに注意してください。

本番環境でCLIを安全に使用する方法

CLIは、まず検査ツールとして、次に変更ツールとして使用してください。--list--describe、および短いコンソール読み取りはリスクが低いです。--delete--alter、パーティションの増加、オフセットのリセットはクラスタの動作を変更するため、可能な限りアプリケーションの変更と同じレビューパスを経由する必要があります。

実用的な本番セッションは、通常、クライアント設定ファイルから始まります:

cat client.properties
# security.protocol=SASL_SSL
# sasl.mechanism=SCRAM-SHA-512
# sasl.jaas.config=...

次に、すべてのコマンドにそれを含めます:

kafka-topics.sh --bootstrap-server kafka-1:9093 --command-config client.properties --describe --topic orders

コンソールコンシューマの場合、誤って実際のアプリケーショングループに参加しないようにしてください。データを検査するときは一時的なグループIDを使用し、コマンドが終了するように--max-messagesを使用します:

kafka-console-consumer.sh \
  --bootstrap-server kafka-1:9093 \
  --command-config client.properties \
  --topic orders \
  --group debug-orders-$(date +%s) \
  --from-beginning \
  --max-messages 5 \
  --property print.key=true \
  --property print.offset=true

この小さな習慣により、デバッグコマンドがライブサービスからパーティションを奪うのを防ぎます。また、グループ名によって意図が明確になるため、よりクリーンな監査証跡を残します。

CLIは、退屈であるときに最適です。つまり、1つのコマンドで検査し、1つのコマンドで確認し、状態を変更するコマンドの明確な記録を残すことです。

日常的なトラブルシューティングレシピ

プロデューサーが正常に書き込んでいると言っているのに、コンシューマチームが何も見えない場合は、トピックから始めます:

kafka-topics.sh --bootstrap-server kafka-1:9092 --describe --topic orders

トピック名、パーティション数、リーダーの可用性、同期レプリカを確認します。トピック名のタイプミスは、開発クラスタで自動トピック作成が有効になっている場合、壊れたパイプラインとまったく同じように見えることがあります。本番環境では、オフラインパーティションやISRの縮小があるトピックは、アプリケーションコードの問題よりも先にブローカーまたはレプリケーションの問題を示しています。

次に、一時的なグループで小さなサンプルを読み取ります:

kafka-console-consumer.sh \
  --bootstrap-server kafka-1:9092 \
  --topic orders \
  --group debug-orders-$(date +%s) \
  --max-messages 10 \
  --property print.key=true \
  --property print.timestamp=true \
  --property print.offset=true

そこにレコードが表示される場合、Kafkaにはデータがあり、問題はおそらく実際のコンシューマグループ、そのオフセット、そのサブスクリプション、またはその処理ロジックにあります。レコードが表示されない場合は、プロデューサートピック、シリアライザー、認証、およびプロデューサーが別のクラスタに書き込んでいないかを確認します。

ラグに関する質問については、グループに直接アクセスします:

kafka-consumer-groups.sh --bootstrap-server kafka-1:9092 --describe --group orders-writer

合計ラグだけで止まらないでください。パーティションを比較してください。ラグが大きい単一のパーティションは、すべてのパーティションに中程度のラグがある場合とは異なる問題を意味します。単一パーティションのラグは、多くの場合、キーの偏りまたは1つの不良なコンシューマ割り当てを意味します。均等なラグは通常、アプリケーション全体が入力レートよりも遅いことを意味します。

「何が変わったのか?」という質問については、トピック設定を検査します:

kafka-configs.sh \
  --bootstrap-server kafka-1:9092 \
  --entity-type topics \
  --entity-name orders \
  --describe

ここで、保持期間の変更、クリーンアップポリシーの驚き、圧縮のオーバーライド、およびサービスの想定と異なるメッセージサイズ設定を発見できます。

CLIの出力は監視の代わりにはなりませんが、不確実性を減らすのに優れています。実際のインシデントでは、いくつかのコマンド出力をチケットに貼り付けることで、トピックが存在するかどうか、レコードが存在するかどうか、グループが実際に動いているかどうかについて全員が議論する手間を省くことができます。

慎重に扱う価値のあるコマンド

一部のKafka CLIコマンドは短いため無害に見えますが、無害ではありません。

kafka-topics.sh --alter --partitionsはパーティション数を増やすだけで、後で変更を後悔しても減らすことはできません。パーティションを増やすとコンシューマの並列処理に役立つことがありますが、新しいレコードのキー分布を変更したり、キー範囲のすべてのイベントがより少ないパーティションセットに収まることを期待していたシステムの前提を壊したりする可能性があります。

kafka-consumer-groups.sh --reset-offsets --executeは、グループが次に読み取る場所を変更します。最初に--dry-runを使用し、影響を受けるコンシューマを停止し、古いオフセットを記録してください。最も古いオフセットにリセットすると、べき等でないシステムにデータが再送信される可能性があります。最新のオフセットにリセットすると、ビジネスがまだ処理することを期待しているデータがスキップされる可能性があります。

kafka-topics.sh --deleteはクラスタの設定とポリシーに依存しますが、削除が許可されている場合、それはデータベーステーブルを削除するように扱う必要があります。クラスタ、トピックを確認し、別の環境が同じ命名規則を使用していないか確認してください。orders-testという本番トピックは、実際のサービスがそれに依存している場合、依然として本番環境です。

繰り返し可能な操作については、コマンドをRunbookまたはスクリプトに、クラスタ、トピック、グループ、コマンド設定の明示的な変数を指定して配置してください。CLIは調査には最適ですが、本番環境での変更は退屈で、レビューされ、監査が容易であるべきです。