コマンドラインを使用したKafkaトピックの作成と管理方法

kafka-topics.shを使用して、コマンドラインからKafkaトピックを作成、一覧表示、説明、変更、削除する方法を学びます。

コマンドラインを使用したKafkaトピックの作成と管理方法

Apache Kafkaのトピックは、プロデューサーがレコードを書き込み、コンシューマーが読み取る場所です。ターミナルからトピックを作成、検査、サイズ変更、または削除する必要がある場合、主要なツールはkafka-topics.shです。

グラフィカルツールも便利ですが、インシデントやデプロイメント中にクラスターの状態を確認する最も速い方法は、依然としてCLIコマンドです。このガイドでは、最も頻繁に使用するトピックコマンドを紹介し、Kafkaで驚く可能性がある点についても説明します。

前提条件とセットアップ

このガイドのコマンドを実行するには、Kafkaバイナリがインストールされているマシンにアクセスできる必要があります。すべてのトピック管理操作は、通常Kafkaインストールのbinディレクトリにあるkafka-topics.shユーティリティを使用して実行されます。

すべてのコマンドには、--bootstrap-serverで指定された少なくとも1つのKafkaブローカーのアドレスが必要です。古いKafkaクラスターでは--zookeeperを使用した例がまだ表示される場合がありますが、ブローカーベースの管理が現在のパターンです。

以下の例では、ブローカーがデフォルトポートでローカルに実行されていると仮定します。

# 標準ブローカーアドレスプレースホルダー
BROKER_ADDRESS="localhost:9092"

1. 新しいKafkaトピックの作成

トピックを作成するには、その名前と、その動作とフォールトトレランスを決定する2つの重要なパラメーター(パーティション数とレプリケーションファクター)を定義する必要があります。

必須パラメーター

  • --topic <名前>: トピックの名前。
  • --partitions <N>: トピックが分割されるパーティションの数。パーティションは、トピック内の並列処理と順序付けの単位です。
  • --replication-factor <N>: 異なるブローカー間で維持されるデータのコピー数。レプリケーションファクターが1の場合、冗長性はありません。

コマンド例: sales-dataの作成

このコマンドは、3つのパーティションとレプリケーションファクター2(つまり、すべてのパーティションの2つのコピーがクラスター全体に存在する)を持つsales-dataという名前のトピックを作成します。

kafka-topics.sh --create --topic sales-data \
  --bootstrap-server $BROKER_ADDRESS \
  --partitions 3 \
  --replication-factor 2

ヒント: 本番環境では、レプリケーションファクター3が一般的です。これは、各パーティションの複数のコピーを異なるブローカーに保持するためです。実際のフォールトトレランスは、ブローカーの配置、min.insync.replicas、およびプロデューサーの確認応答設定にも依存します。

2. すべてのトピックの一覧表示

Kafkaクラスターで現在利用可能なすべてのトピックを表示するには、--listフラグを使用します。

コマンド例

kafka-topics.sh --list --bootstrap-server $BROKER_ADDRESS

出力例:

sales-data
logistics-stream
__consumer_offsets

3. トピック構成の説明

特定のトピックの既存の構成、パーティション数、およびブローカー割り当てを確認することは、トラブルシューティングと検証に不可欠です。--describeフラグを使用します。

コマンド例: sales-dataの説明

kafka-topics.sh --describe --topic sales-data \
  --bootstrap-server $BROKER_ADDRESS

出力の解釈:

出力は、トピックレベルとパーティションレベルの両方の構成を示します。

Topic: sales-data  PartitionCount: 3 ReplicationFactor: 2 Configs:
  Topic: sales-data  Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
  Topic: sales-data  Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
  Topic: sales-data  Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
  • Leader: 現在そのパーティションの読み取り/書き込みを処理しているブローカー。
  • Replicas: そのパーティションのコピーを保持しているブローカーのリスト。
  • Isr (In-Sync Replicas): 安全なフェイルオーバーの対象となるために十分に追いついているレプリカ。ISRが縮小した場合は、ブローカーの健全性、ディスクレイテンシ、およびネットワーク遅延を確認してください。

4. 既存のトピックの変更

Kafkaは、作成後にトピックを変更するための限られたメカニズムを提供します。最も一般的な2つの変更タスクは、パーティション数の増加とデフォルトのブローカー構成設定のオーバーライドです。

A. パーティションの増加

パーティションは増やすことのみ可能で、減らすことはできません。パーティションを増やすと、コンシューマーの並列処理をスケールアウトするのに役立ちます。

警告: パーティションを増やすと、メッセージがパーティションにマッピング(ハッシュ)される方法が変わります。プロデューサーがキーベースの順序付け保証に依存している場合、パーティションを増やすと、既存のキーの順序付き配信が壊れる可能性があります。

sales-dataに現在3つのパーティションがある場合、5つに増やすことができます。

kafka-topics.sh --alter --topic sales-data \
  --bootstrap-server $BROKER_ADDRESS \
  --partitions 5

B. トピック固有の構成の変更

kafka-topics.sh --alter --configを使用してトピック構成を変更する古い例を見かけるかもしれません。現在のKafkaクラスターでは、トピック構成の変更にはkafka-configs.shを使用することをお勧めします。これは動的構成専用の管理ツールだからです。

例: sales-dataのメッセージ保持時間を24時間(86400000ミリ秒)に設定します。

kafka-configs.sh --bootstrap-server $BROKER_ADDRESS \
  --alter --entity-type topics --entity-name sales-data \
  --add-config retention.ms=86400000

特定の構成オーバーライドを削除してブローカーのデフォルトに戻すには、--delete-configを使用します。

kafka-configs.sh --bootstrap-server $BROKER_ADDRESS \
  --alter --entity-type topics --entity-name sales-data \
  --delete-config retention.ms

5. Kafkaトピックの削除

使用されなくなったトピックは、ディスク領域を解放し、クラスターの衛生状態を維持するために適切に削除する必要があります。

トピック削除の有効化

このコマンドでデータを削除できるようにするには、Kafkaブローカーが削除を許可している必要があります。最新のデプロイメントの多くでは有効になっていますが、想定しないでください。ブローカー構成で以下を確認してください。

delete.topic.enable=true

コマンド例: old-streamの削除

--deleteフラグを使用して、トピックの削除を開始します。トピックの削除は多くの場合非同期であり、コマンドがリクエストを送信し、削除がバックグラウンドで実行されることを意味します。

kafka-topics.sh --delete --topic old-stream \
  --bootstrap-server $BROKER_ADDRESS

確認出力:

Deletion of topic old-stream initiated successfully.

トピック管理コマンドのまとめ

アクション フラグ 目的 パラメーター例
作成 --create 新しいトピックを初期化する。 --partitions 5 --replication-factor 3
一覧表示 --list クラスター内のすべてのトピックを表示する。 N/A
説明 --describe 現在の構成とレイアウトを表示する。 --topic my-topic
変更(パーティション) --alter パーティション数を増やす。 --partitions N (N > 現在の数)
変更(構成) kafka-configs.sh --alter 特定のトピックのブローカーデフォルトをオーバーライドする。 --add-config retention.ms=...
削除 --delete トピックを完全に削除する。 --topic my-topic

次のステップ

  1. 開発環境またはステージング環境でこれらのコマンドを練習します。
  2. 構成変更の前後にkafka-configs.sh --describeを使用して、どの値がトピックオーバーライドであるかを把握します。
  3. プロデューサーとコンシューマーのテストに対応するCLIコマンド(kafka-console-producer.shおよびkafka-console-consumer.sh)を学びます。