Erstellen und Verwalten von Kafka-Themen über die Befehlszeile

Erfahren Sie, wie Sie Kafka-Themen mit kafka-topics.sh über die Befehlszeile erstellen, auflisten, beschreiben, ändern und löschen.

Erstellen und Verwalten von Kafka-Themen über die Befehlszeile

Apache Kafka-Themen sind der Ort, an den Ihre Produzenten Datensätze schreiben und Ihre Konsumenten sie lesen. Wenn Sie ein Thema über ein Terminal erstellen, überprüfen, vergrößern oder löschen müssen, ist das Hauptwerkzeug kafka-topics.sh.

Grafische Werkzeuge sind nützlich, aber CLI-Befehle sind immer noch der schnellste Weg, um während eines Vorfalls oder einer Bereitstellung zu überprüfen, was der Cluster tut. Diese Anleitung zeigt die Themenbefehle, die Sie am häufigsten verwenden werden, und weist auf die Stellen hin, an denen Kafka Sie überraschen kann.

Voraussetzungen und Einrichtung

Um die Befehle in dieser Anleitung auszuführen, müssen Sie Zugriff auf einen Rechner haben, auf dem die Kafka-Binärdateien installiert sind. Alle Themenverwaltungsvorgänge werden mit dem Dienstprogramm kafka-topics.sh durchgeführt, das sich normalerweise im bin-Verzeichnis Ihrer Kafka-Installation befindet.

Alle Befehle erfordern die Adresse mindestens eines Kafka-Brokers, die mit --bootstrap-server angegeben wird. Ältere Kafka-Cluster zeigen möglicherweise noch Beispiele mit --zookeeper, aber die brokerbasierte Verwaltung ist das aktuelle Muster.

Für die folgenden Beispiele gehen wir davon aus, dass der Broker lokal auf dem Standardport läuft:

# Platzhalter für Standard-Broker-Adresse
BROKER_ADDRESS="localhost:9092"

1. Erstellen eines neuen Kafka-Themas

Das Erstellen eines Themas erfordert die Definition seines Namens sowie zwei kritischer Parameter, die sein Verhalten und seine Fehlertoleranz bestimmen: die Anzahl der Partitionen und den Replikationsfaktor.

Wesentliche Parameter

  • --topic <name>: Der Name des Themas.
  • --partitions <N>: Die Anzahl der Partitionen, in die das Thema aufgeteilt wird. Partitionen sind die Einheiten der Parallelität und Reihenfolge innerhalb eines Themas.
  • --replication-factor <N>: Die Anzahl der Kopien der Daten, die auf verschiedenen Brokern vorgehalten werden. Ein Replikationsfaktor von 1 bedeutet keine Redundanz.

Befehlsbeispiel: Erstellen von sales-data

Dieser Befehl erstellt ein Thema namens sales-data mit 3 Partitionen und einem Replikationsfaktor von 2 (d. h. es existieren 2 Kopien jeder Partition im Cluster).

kafka-topics.sh --create --topic sales-data \
  --bootstrap-server $BROKER_ADDRESS \
  --partitions 3 \
  --replication-factor 2

Tipp: In der Produktion ist ein Replikationsfaktor von 3 üblich, da er mehrere Kopien jeder Partition auf verschiedenen Brokern vorhält. Die tatsächliche Fehlertoleranz hängt auch von der Broker-Platzierung, min.insync.replicas und den Bestätigungseinstellungen des Produzenten ab.

2. Auflisten aller Themen

Um alle derzeit im Kafka-Cluster verfügbaren Themen anzuzeigen, verwenden Sie das Flag --list.

Befehlsbeispiel

kafka-topics.sh --list --bootstrap-server $BROKER_ADDRESS

Beispielausgabe:

sales-data
logistics-stream
__consumer_offsets

3. Beschreiben der Themenkonfiguration

Die Überprüfung der vorhandenen Konfiguration, der Partitionsanzahl und der Broker-Zuweisung für ein bestimmtes Thema ist für die Fehlerbehebung und Verifizierung unerlässlich. Verwenden Sie das Flag --describe.

Befehlsbeispiel: Beschreiben von sales-data

kafka-topics.sh --describe --topic sales-data \
  --bootstrap-server $BROKER_ADDRESS

Interpretation der Ausgabe:

Die Ausgabe zeigt die Konfiguration sowohl auf Themen- als auch auf Partitionsebene:

Topic: sales-data  PartitionCount: 3 ReplicationFactor: 2 Configs:
  Topic: sales-data  Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
  Topic: sales-data  Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
  Topic: sales-data  Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
  • Leader: Der Broker, der derzeit für die Bearbeitung von Lese-/Schreibvorgängen für diese Partition verantwortlich ist.
  • Replicas: Die Liste der Broker, die eine Kopie dieser Partition halten.
  • Isr (In-Sync Replicas): Die Replikate, die ausreichend synchronisiert sind, um für ein sicheres Failover in Frage zu kommen. Wenn die ISR schrumpft, überprüfen Sie die Broker-Gesundheit, die Festplattenlatenz und die Netzwerkverzögerung.

4. Ändern vorhandener Themen

Kafka bietet begrenzte Mechanismen zum Ändern von Themen nach der Erstellung. Die beiden häufigsten Änderungsaufgaben sind das Erhöhen der Partitionsanzahl und das Überschreiben von Standard-Broker-Konfigurationseinstellungen.

A. Erhöhen der Partitionen

Partitionen können nur erhöht, niemals verringert werden. Das Erhöhen der Partitionen hilft, die Parallelität der Verbraucher zu skalieren.

Warnung: Das Erhöhen der Partitionen ändert die Art und Weise, wie Nachrichten Partitionen zugeordnet (gehasht) werden. Wenn Ihre Produzenten auf schlüsselbasierte Reihenfolgegarantien angewiesen sind, kann das Erhöhen der Partitionen die geordnete Zustellung für vorhandene Schlüssel unterbrechen.

Wenn sales-data derzeit 3 Partitionen hat, können wir es auf 5 erhöhen:

kafka-topics.sh --alter --topic sales-data \
  --bootstrap-server $BROKER_ADDRESS \
  --partitions 5

B. Ändern der themenspezifischen Konfiguration

Möglicherweise sehen Sie ältere Beispiele, die Themenkonfigurationen über kafka-topics.sh --alter --config ändern. Verwenden Sie bei aktuellen Kafka-Clustern bevorzugt kafka-configs.sh für Themenkonfigurationsänderungen, da es das dedizierte Verwaltungstool für dynamische Konfigurationen ist.

Beispiel: Festlegen einer Nachrichtenaufbewahrungszeit von 24 Stunden (86400000 Millisekunden) für sales-data.

kafka-configs.sh --bootstrap-server $BROKER_ADDRESS \
  --alter --entity-type topics --entity-name sales-data \
  --add-config retention.ms=86400000

Um eine bestimmte Konfigurationsüberschreibung zu entfernen und zum Broker-Standard zurückzukehren, verwenden Sie --delete-config:

kafka-configs.sh --bootstrap-server $BROKER_ADDRESS \
  --alter --entity-type topics --entity-name sales-data \
  --delete-config retention.ms

5. Löschen eines Kafka-Themas

Nicht mehr verwendete Themen sollten ordnungsgemäß gelöscht werden, um Speicherplatz freizugeben und die Cluster-Hygiene aufrechtzuerhalten.

Aktivieren der Themenlöschung

Kafka-Broker müssen das Löschen erlauben, bevor dieser Befehl Daten entfernen kann. In vielen modernen Bereitstellungen ist es aktiviert, aber gehen Sie nicht davon aus. Überprüfen Sie die Broker-Konfiguration auf:

delete.topic.enable=true

Befehlsbeispiel: Löschen von old-stream

Verwenden Sie das Flag --delete, um die Themenentfernung einzuleiten. Das Löschen von Themen ist oft asynchron, d. h. der Befehl sendet die Anforderung und das Löschen erfolgt im Hintergrund.

kafka-topics.sh --delete --topic old-stream \
  --bootstrap-server $BROKER_ADDRESS

Bestätigungsausgabe:

Löschen des Themas old-stream erfolgreich eingeleitet.

Zusammenfassung der Themenverwaltungsbefehle

Aktion Flag(s) Zweck Beispielparameter
Erstellen --create Ein neues Thema initialisieren. --partitions 5 --replication-factor 3
Auflisten --list Alle Themen im Cluster anzeigen. N/A
Beschreiben --describe Aktuelle Konfiguration und Layout anzeigen. --topic my-topic
Ändern (Partitionen) --alter Anzahl der Partitionen erhöhen. --partitions N (N > aktuelle Anzahl)
Ändern (Konfig.) kafka-configs.sh --alter Broker-Standardeinstellungen für ein bestimmtes Thema überschreiben. --add-config retention.ms=...
Löschen --delete Ein Thema dauerhaft entfernen. --topic my-topic

Nächste Schritte

  1. Üben Sie diese Befehle in einer Entwicklungs- oder Staging-Umgebung.
  2. Verwenden Sie kafka-configs.sh --describe vor und nach Konfigurationsänderungen, damit Sie wissen, welche Werte Themenüberschreibungen sind.
  3. Lernen Sie die entsprechenden CLI-Befehle für Produzenten- und Konsumententests (kafka-console-producer.sh und kafka-console-consumer.sh).