コマンドラインを使った Kafka トピックの作成と管理方法
Apache Kafka は、高スループットのデータパイプライン、リアルタイム分析、マイクロサービス間の通信によく利用される分散イベントストリーミングプラットフォームです。Kafka における基本的な組織単位はトピックであり、レコードが発行されるカテゴリまたはフィード名です。
グラフィカルツールも存在しますが、Kafka インフラストラクチャと対話し、管理するための最も堅牢で信頼性が高く、一般的な方法は、コマンドラインインターフェース (CLI) を直接使用することです。これらの必須コマンドを習得することは、健全で効率的な Kafka クラスターを維持する責任を負う管理者や開発者にとって不可欠です。このガイドでは、kafka-topics.sh スクリプトを使用して最も一般的なトピック管理タスクを実行するためのステップバイステップのチュートリアルを提供します。
前提条件とセットアップ
このガイドのコマンドを実行するには、Kafka バイナリがインストールされているマシンにアクセスできる必要があります。すべてのトピック管理操作は、kafka-topics.sh ユーティリティを使用して実行されます。これは通常、Kafka インストールディレクトリの bin ディレクトリにあります。
すべてのコマンドには、少なくとも1つの Kafka ブローカーのアドレスが必要で、これは --bootstrap-server フラグを使用して指定します。古い Kafka バージョン (2.2より前) を使用している場合、--zookeeper フラグに依存することもありますが、--bootstrap-server が推奨される現代の標準です。
以下の例では、ブローカーがデフォルトポートでローカルに実行されていると仮定します。
# 標準ブローカーアドレスのプレースホルダー
BROKER_ADDRESS="localhost:9092"
1. 新しい Kafka トピックの作成
トピックを作成するには、その名前と、その動作およびフォールトトレランスを決定する2つの重要なパラメータ(パーティション数とレプリケーションファクター)を定義する必要があります。
必須パラメータ
--topic <名前>: トピックの名前。--partitions <N>: トピックが分割されるパーティションの数。パーティションは、トピック内の並列処理と順序付けの単位です。--replication-factor <N>: 異なるブローカー間で維持されるデータのコピー数。レプリケーションファクターが1の場合、冗長性はありません。
コマンド例: sales-data の作成
このコマンドは、sales-data という名前のトピックを、3つのパーティションとレプリケーションファクター2 (つまり、すべてのパーティションの2つのコピーがクラスター全体に存在します) で作成します。
kafka-topics.sh --create --topic sales-data \n --bootstrap-server $BROKER_ADDRESS \n --partitions 3 \n --replication-factor 2
ヒント: N個のブローカーを持つ本番環境では、高可用性 (データ損失が発生する前に2つのブローカーの損失を許容) のためにレプリケーションファクター3が推奨されることが多く、パーティション数は予想されるスループットとコンシューマの並列処理ニーズに基づいて調整する必要があります。
2. すべてのトピックのリスト表示
Kafka クラスターで現在利用可能なすべてのトピックを表示するには、--list フラグを使用します。
コマンド例
kafka-topics.sh --list --bootstrap-server $BROKER_ADDRESS
出力例:
sales-data
logistics-stream
__consumer_offsets
3. トピック設定の記述
特定のトピックの既存の設定、パーティション数、およびブローカー割り当てを確認することは、トラブルシューティングと検証のために不可欠です。--describe フラグを使用します。
コマンド例: sales-data の記述
kafka-topics.sh --describe --topic sales-data \n --bootstrap-server $BROKER_ADDRESS
出力の解釈:
出力は、トピックレベルとパーティションレベルの両方での設定を示しています。
Topic: sales-data PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: sales-data Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: sales-data Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: sales-data Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
- Leader (リーダー): そのパーティションの読み書きを現在担当しているブローカー。
- Replicas (レプリカ): そのパーティションのコピーを保持しているブローカーのリスト。
- Isr (In-Sync Replicas、同期済みレプリカ): リーダーと完全に同期しているレプリカのサブセット。高可用性のためには、リーダーが ISR 内に存在する必要があります。
4. 既存トピックの変更
Kafka は、トピック作成後の変更に対して限られたメカニズムを提供します。最も一般的な2つの変更タスクは、パーティション数の増加と、デフォルトのブローカー設定の上書きです。
A. パーティションの増加
パーティションは増加のみ可能であり、減少させることはできません。パーティションを増やすことは、コンシューマの並列処理をスケールアウトするのに役立ちます。
警告: パーティションを増やすと、メッセージがパーティションにマッピング (ハッシュ化) される方法が変わります。プロデューサーがキーベースの順序保証に依存している場合、パーティションを増やすと、既存のキーに対する順序付けられた配信が損なわれる可能性があります。
sales-data が現在3つのパーティションを持っている場合、それを5に増やすことができます。
kafka-topics.sh --alter --topic sales-data \n --bootstrap-server $BROKER_ADDRESS \n --partitions 5
B. トピック固有の設定の変更
--config フラグを使用すると、個々のトピックに対してグローバルなブローカー設定 (メッセージ保持時間やクリーンアップポリシーなど) を上書きできます。
例: sales-data のメッセージ保持時間を24時間 (86400000ミリ秒) に設定する。
kafka-topics.sh --alter --topic sales-data \n --bootstrap-server $BROKER_ADDRESS \n --config retention.ms=86400000
特定の設定の上書きを削除し、デフォルトのブローカー設定に戻すには、--delete-config フラグを使用します。
kafka-topics.sh --alter --topic sales-data \n --bootstrap-server $BROKER_ADDRESS \n --delete-config retention.ms
5. Kafka トピックの削除
不要になったトピックは、ディスクスペースを解放し、クラスターの健全性を維持するために適切に削除する必要があります。
トピック削除の有効化
デフォルトでは、Kafka ブローカーは安全のためにトピック削除を無効にしている場合があります。トピックを削除する前に、すべてのブローカーの server.properties ファイルで以下の設定が有効になっていることを確認してください。
delete.topic.enable=true
コマンド例: old-stream の削除
--delete フラグを使用してトピック削除を開始します。トピック削除は非同期で行われることが多く、コマンドがリクエストを送信し、削除はバックグラウンドで実行されます。
kafka-topics.sh --delete --topic old-stream \n --bootstrap-server $BROKER_ADDRESS
確認出力:
トピック old-stream の削除が正常に開始されました。
トピック管理コマンドの要約
| アクション | フラグ | 目的 | 例のパラメータ |
|---|---|---|---|
| 作成 | --create |
新しいトピックを初期化します。 | --partitions 5 --replication-factor 3 |
| リスト表示 | --list |
クラスター内のすべてのトピックを表示します。 | N/A |
| 記述 | --describe |
現在の設定とレイアウトを表示します。 | --topic my-topic |
| 変更 (パーティション) | --alter |
パーティション数を増やします。 | --partitions N (N > 現在の数) |
| 変更 (設定) | --alter --config |
特定のトピックのブローカーデフォルトを上書きします。 | --config retention.ms=... |
| 削除 | --delete |
トピックを完全に削除します。 | --topic my-topic |
まとめと次のステップ
コマンドラインは、Kafka クラスターを管理するための最も強力で柔軟なインターフェースであり続けています。kafka-topics.sh ユーティリティを習得することで、トピック作成パラメータ、設定の上書き、および削除や記述のような必要な管理アクションを詳細に制御できるようになります。
次のステップ:
- これらのコマンドを開発環境またはステージング環境で練習してください。
--describeコマンドを使用して、設定可能なプロパティの全リスト (cleanup.policy、max.message.bytesなど) を確認し、高度な設定オプションを探索してください。- プロデューサーおよびコンシューマーテストに対応する CLI コマンド (
kafka-console-producer.shおよびkafka-console-consumer.sh) を学習してください。