Kafkaコマンドラインツールの理解:CLIリファレンスガイド
Apache Kafkaは、高スループット、耐障害性、スケーラブルなリアルタイムデータパイプラインを可能にする、強力な分散イベントストリーミングプラットフォームです。KafkaはAPIを通じてプログラムで管理および操作できますが、そのコマンドラインインターフェース(CLI)ツールは、不可欠な管理タスクの実行、トピックの管理、コンシューマーとの対話、クラスターの健全性の監視を行うための直接的かつ効率的な方法を提供します。このガイドでは、最も一般的に使用されるKafka CLIツールについて、その目的、必須の引数、および実践的な使用例を詳述した包括的なリファレンスを提供します。
これらのツールを理解することは、Kafka管理者、開発者、およびKafkaクラスターの管理またはトラブルシューティングに関わるすべての人にとって非常に重要です。これらを使用すると、すべての単純な操作のためにカスタムスクリプトやアプリケーションを作成することなく、迅速な検査、操作、および診断が可能になります。
コアKafka CLIツール
Kafkaディストリビューションには通常、さまざまなスクリプトと実行可能ファイルを含む bin/ ディレクトリが含まれています。ここでは、Kafkaを効果的に管理するために最も頻繁に使用されるツールに焦点を当てます。
1. kafka-topics.sh
これは、おそらく最も頻繁に使用されるコマンドラインツールです。Kafkaトピックの作成、一覧表示、詳細表示、削除、変更、および管理を行うことができます。トピック管理は、Kafka内のデータストリームを整理するための基本です。
一般的なサブコマンドと引数:
--create: 新しいトピックを作成します。--list: クラスター内のすべてのトピックを一覧表示します。--describe: 特定のトピックに関する詳細情報を提供します。--delete: 1つ以上のトピックを削除します。--alter: 既存のトピックの設定を変更します。--topic <topic_name>: トピック名を指定します。--partitions <num_partitions>: トピックのパーティション数を設定します(--createと併用)。--replication-factor <factor>: トピックのレプリケーションファクターを設定します(--createと併用)。--bootstrap-server <host:port>: 接続するKafkaブローカーを指定します。
例:
-
トピック名
my_topicを3つのパーティションとレプリケーションファクター2で作成する例:
bash kafka-topics.sh --create --topic my_topic --partitions 3 --replication-factor 2 --bootstrap-server kafka-broker-1:9092,kafka-broker-2:9092 -
クラスター内のすべてのトピックを一覧表示する例:
bash kafka-topics.sh --list --bootstrap-server kafka-broker-1:9092 -
トピック名
my_topicを詳細表示する例:
bash kafka-topics.sh --describe --topic my_topic --bootstrap-server kafka-broker-1:9092
これにより、パーティション、リーダー、レプリカ、およびISRs(同期済みレプリカ)などの詳細が表示されます。 -
トピック名
old_topicを削除する例:
bash kafka-topics.sh --delete --topic old_topic --bootstrap-server kafka-broker-1:9092
注:トピックの削除を有効にするには、Kafkaブローカーの設定で (delete.topic.enable=true) を設定する必要があります。
2. kafka-console-producer.sh
このツールを使用すると、標準入力からKafkaトピックにメッセージを送信できます。プロデューサーのテスト、サンプルデータの注入、またはメッセージの手動公開に非常に役立ちます。
一般的な引数:
--topic <topic_name>: ターゲットのトピックを指定します。--bootstrap-server <host:port>: 接続するKafkaブローカーを指定します。--property <key>=<value>: プロデューサーのプロパティ(例:key.serializer、value.serializer)を設定できます。--producer-property <key>=<value>:--propertyに似ていますが、特にプロデューサー側の設定に使用されます。
例:
-
my_topicにメッセージを送信する例:
bash kafka-console-producer.sh --topic my_topic --bootstrap-server kafka-broker-1:9092
これを実行した後、メッセージを1行ずつ入力できます。終了するにはCtrl+Cを押します。 -
キー付きでメッセージを送信する例(JSON形式):
bash kafka-console-producer.sh --topic my_topic --bootstrap-server kafka-broker-1:9092 --property parse.key=true --property key.separator=':'
これで、key:valueペアを入力でき、Kafkaは指定されたキーとともにそれらを送信します。
3. kafka-console-consumer.sh
このツールは、1つ以上のKafkaトピックを購読し、受信したメッセージを標準出力に出力します。コンシューマーのテスト、トピック内のデータの検査、プロデューサー/コンシューマーアプリケーションのデバッグに不可欠です。
一般的な引数:
--topic <topic_name>: 消費するトピックを指定します。--bootstrap-server <host:port>: 接続するKafkaブローカーを指定します。--group-id <group_id>: コンシューマーグループIDを指定します。これは、オフセットの管理や、複数のコンシューマーが消費負荷を共有できるようにするために重要です。--from-beginning: トピックのログの先頭からメッセージを読み取ります。--offset <offset>: 特定のオフセットから消費を開始します。--partition <partition_id>: 特定のパーティションから消費します。--property <key>=<value>: コンシューマーのプロパティ(例:value.deserializer)を設定できます。
例:
-
my_topicからすべてのメッセージを消費する例:
bash kafka-console-consumer.sh --topic my_topic --bootstrap-server kafka-broker-1:9092 -
コンシューマーグループ
my_groupのためにmy_topicの先頭からメッセージを消費する例:
bash kafka-console-consumer.sh --topic my_topic --group-id my_group --from-beginning --bootstrap-server kafka-broker-1:9092 -
オフセットとキーを出力してメッセージを消費する例:
bash kafka-console-consumer.sh --topic my_topic --bootstrap-server kafka-broker-1:9092 --property print.key=true --property key.separator="\t" --property print.offset=true --property print.headers=true
4. kafka-consumer-groups.sh
このツールは、コンシューマーグループの管理と検査に使用されます。コンシューマーの遅延(ラグ)の把握、パーティションの再割り当て、および消費に関する問題のトラブルシューティングに不可欠です。
一般的なサブコマンドと引数:
--list: クラスター内のすべてのコンシューマーグループを一覧表示します。--describe: 遅延(ラグ)を含む、特定のコンシューマーグループに関する詳細を提供します。--bootstrap-server <host:port>: 接続するKafkaブローカーを指定します。--group <group_id>: コンシューマーグループIDを指定します。--reset-offsets: コンシューマーグループのオフセットをリセットします。--topic <topic_name>: オフセットリセットの対象となるトピックを指定します。--to-earliest: オフセットを最も古い利用可能なメッセージにリセットします。--to-latest: オフセットを最も新しい利用可能なメッセージにリセットします。--execute: オフセットリセット操作を実行します。
例:
-
すべてのコンシューマーグループを一覧表示する例:
bash kafka-consumer-groups.sh --list --bootstrap-server kafka-broker-1:9092 -
コンシューマーグループ
my_groupを詳細表示し、その遅延(ラグ)を示す例:
bash kafka-consumer-groups.sh --describe --group my_group --bootstrap-server kafka-broker-1:9092
出力には、トピック、パーティション、現在のオフセット、ログの最終オフセット、および遅延(ラグ)が表示されます。 -
my_groupのオフセットをmy_topic上の最も古い利用可能なメッセージにリセットする例:
bash kafka-consumer-groups.sh --group my_group --topic my_topic --reset-offsets --to-earliest --execute --bootstrap-server kafka-broker-1:9092
このコマンドは、コンシューマーがどこから読み取りを開始するかに影響するため、注意して使用してください。
5. kafka-log-dirs.sh
このツールは、Kafkaブローカー上のログディレクトリを検査するのに役立ちます。ディスク使用量を把握したり、トピックデータの場所を特定したりするのに役立ちます。
一般的な引数:
--bootstrap-server <host:port>: 接続するKafkaブローカーを指定します。--topic <topic_name>: 特定のトピックのディレクトリを表示するように出力をフィルタリングします。
例:
-
ブローカー上のログディレクトリを一覧表示する例:
bash kafka-log-dirs.sh --bootstrap-server kafka-broker-1:9092 -
特定のトピックのログディレクトリを表示する例:
bash kafka-log-dirs.sh --bootstrap-server kafka-broker-1:9092 --topic my_topic
6. kafka-preferred-replica-election.sh
このスクリプトは、トピックのプリファードレプリカ選出を開始します。プリファードレプリカとは、そのレプリケーションファクターに基づいてパーティションのリーダーとして選ばれるブローカーです。ブローカーがダウンし、非プリファードレプリカがリーダーになった場合、このツールを使用してリーダーシップをプリファードレプリカに戻すことができます。
一般的な引数:
--topic <topic_name>: プリファードレプリカを選出するトピックを指定します。--broker-list <broker_id1,broker_id2,...>: コンマ区切りのブローカーIDリストを指定します。--bootstrap-server <host:port>: 接続するKafkaブローカーを指定します。
例:
-
my_topicのプリファードレプリカを選出する例:
bash kafka-preferred-replica-election.sh --topic my_topic --bootstrap-server kafka-broker-1:9092 -
複数のトピックのプリファードレプリカを選出する例:
bash kafka-preferred-replica-election.sh --topic topic1,topic2 --bootstrap-server kafka-broker-1:9092
重要な考慮事項とベストプラクティス
--bootstrap-serverが鍵: Kafkaクラスターに接続するために、常に正しい--bootstrap-server引数を指定していることを確認してください。これは通常、ブローカーのhost:portをコンマで区切ったリストです。- 環境: これらのコマンドは通常、Kafkaインストールの
bin/ディレクトリにあります。このディレクトリに移動するか、KafkaのbinディレクトリがシステムのPATHに含まれていることを確認する必要があります。 - パーミッション: これらのコマンドを実行しているユーザーがKafkaブローカーに到達するために必要なネットワークアクセス権を持っていることを確認してください。
- 設定: 多くのCLIツールは、
--propertyまたは--producer-property/--consumer-property引数を通じてKafkaクライアント設定を受け入れることができます。これは、デフォルトのシリアライザー/デシリアライザーをオーバーライドしたり、その他の特定のクライアント設定を行ったりするのに役立ちます。 - セキュリティ: 安全なKafkaクラスター(例:SSL/TLSまたはSASL認証を使用)の場合、これらのツールに追加のセキュリティ関連の引数(クライアントプロパティファイルを指す
--command-configなど)を渡す必要があります。 - トピックの削除: トピックの削除は機密性の高い操作であり、Kafkaブローカーの
server.propertiesファイルでdelete.topic.enable=trueを使用して明示的に有効にする必要があることに注意してください。
まとめ
Kafkaのコマンドラインツールは、Kafkaクラスターを管理および操作するための堅牢でアクセスしやすいインターフェースを提供します。kafka-topics.sh、kafka-console-producer.sh、kafka-console-consumer.sh、および kafka-consumer-groups.sh のようなツールを習得することは、効率的なKafka運用、トラブルシューティング、および開発に不可欠です。これらの引数と使用例を理解することで、ワークフローを大幅に合理化し、イベントストリーミングインフラストラクチャに関するより深い洞察を得ることができます。
これらのコマンドを定期的に参照することは、日常の管理タスクを実行するのに役立つだけでなく、問題をより効果的に診断および解決する能力を高めます。Kafkaに慣れるにつれて、より高度な操作のために bin/ ディレクトリで利用可能な他のユーティリティスクリプトを探索できます。