Erstellen und Verwalten von Kafka-Themen über die Befehlszeile
Erfahren Sie, wie Sie Kafka-Themen mit kafka-topics.sh über die Befehlszeile erstellen, auflisten, beschreiben, ändern und löschen.
Erstellen und Verwalten von Kafka-Themen über die Befehlszeile
Apache Kafka-Themen sind der Ort, an den Ihre Produzenten Datensätze schreiben und Ihre Konsumenten sie lesen. Wenn Sie ein Thema über ein Terminal erstellen, überprüfen, vergrößern oder löschen müssen, ist das Hauptwerkzeug kafka-topics.sh.
Grafische Werkzeuge sind nützlich, aber CLI-Befehle sind immer noch der schnellste Weg, um während eines Vorfalls oder einer Bereitstellung zu überprüfen, was der Cluster tut. Diese Anleitung zeigt die Themenbefehle, die Sie am häufigsten verwenden werden, und weist auf die Stellen hin, an denen Kafka Sie überraschen kann.
Voraussetzungen und Einrichtung
Um die Befehle in dieser Anleitung auszuführen, müssen Sie Zugriff auf einen Rechner haben, auf dem die Kafka-Binärdateien installiert sind. Alle Themenverwaltungsvorgänge werden mit dem Dienstprogramm kafka-topics.sh durchgeführt, das sich normalerweise im bin-Verzeichnis Ihrer Kafka-Installation befindet.
Alle Befehle erfordern die Adresse mindestens eines Kafka-Brokers, die mit --bootstrap-server angegeben wird. Ältere Kafka-Cluster zeigen möglicherweise noch Beispiele mit --zookeeper, aber die brokerbasierte Verwaltung ist das aktuelle Muster.
Für die folgenden Beispiele gehen wir davon aus, dass der Broker lokal auf dem Standardport läuft:
# Platzhalter für Standard-Broker-Adresse
BROKER_ADDRESS="localhost:9092"
1. Erstellen eines neuen Kafka-Themas
Das Erstellen eines Themas erfordert die Definition seines Namens sowie zwei kritischer Parameter, die sein Verhalten und seine Fehlertoleranz bestimmen: die Anzahl der Partitionen und den Replikationsfaktor.
Wesentliche Parameter
--topic <name>: Der Name des Themas.--partitions <N>: Die Anzahl der Partitionen, in die das Thema aufgeteilt wird. Partitionen sind die Einheiten der Parallelität und Reihenfolge innerhalb eines Themas.--replication-factor <N>: Die Anzahl der Kopien der Daten, die auf verschiedenen Brokern vorgehalten werden. Ein Replikationsfaktor von 1 bedeutet keine Redundanz.
Befehlsbeispiel: Erstellen von sales-data
Dieser Befehl erstellt ein Thema namens sales-data mit 3 Partitionen und einem Replikationsfaktor von 2 (d. h. es existieren 2 Kopien jeder Partition im Cluster).
kafka-topics.sh --create --topic sales-data \
--bootstrap-server $BROKER_ADDRESS \
--partitions 3 \
--replication-factor 2
Tipp: In der Produktion ist ein Replikationsfaktor von 3 üblich, da er mehrere Kopien jeder Partition auf verschiedenen Brokern vorhält. Die tatsächliche Fehlertoleranz hängt auch von der Broker-Platzierung,
min.insync.replicasund den Bestätigungseinstellungen des Produzenten ab.
2. Auflisten aller Themen
Um alle derzeit im Kafka-Cluster verfügbaren Themen anzuzeigen, verwenden Sie das Flag --list.
Befehlsbeispiel
kafka-topics.sh --list --bootstrap-server $BROKER_ADDRESS
Beispielausgabe:
sales-data
logistics-stream
__consumer_offsets
3. Beschreiben der Themenkonfiguration
Die Überprüfung der vorhandenen Konfiguration, der Partitionsanzahl und der Broker-Zuweisung für ein bestimmtes Thema ist für die Fehlerbehebung und Verifizierung unerlässlich. Verwenden Sie das Flag --describe.
Befehlsbeispiel: Beschreiben von sales-data
kafka-topics.sh --describe --topic sales-data \
--bootstrap-server $BROKER_ADDRESS
Interpretation der Ausgabe:
Die Ausgabe zeigt die Konfiguration sowohl auf Themen- als auch auf Partitionsebene:
Topic: sales-data PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: sales-data Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: sales-data Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: sales-data Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
- Leader: Der Broker, der derzeit für die Bearbeitung von Lese-/Schreibvorgängen für diese Partition verantwortlich ist.
- Replicas: Die Liste der Broker, die eine Kopie dieser Partition halten.
- Isr (In-Sync Replicas): Die Replikate, die ausreichend synchronisiert sind, um für ein sicheres Failover in Frage zu kommen. Wenn die ISR schrumpft, überprüfen Sie die Broker-Gesundheit, die Festplattenlatenz und die Netzwerkverzögerung.
4. Ändern vorhandener Themen
Kafka bietet begrenzte Mechanismen zum Ändern von Themen nach der Erstellung. Die beiden häufigsten Änderungsaufgaben sind das Erhöhen der Partitionsanzahl und das Überschreiben von Standard-Broker-Konfigurationseinstellungen.
A. Erhöhen der Partitionen
Partitionen können nur erhöht, niemals verringert werden. Das Erhöhen der Partitionen hilft, die Parallelität der Verbraucher zu skalieren.
Warnung: Das Erhöhen der Partitionen ändert die Art und Weise, wie Nachrichten Partitionen zugeordnet (gehasht) werden. Wenn Ihre Produzenten auf schlüsselbasierte Reihenfolgegarantien angewiesen sind, kann das Erhöhen der Partitionen die geordnete Zustellung für vorhandene Schlüssel unterbrechen.
Wenn sales-data derzeit 3 Partitionen hat, können wir es auf 5 erhöhen:
kafka-topics.sh --alter --topic sales-data \
--bootstrap-server $BROKER_ADDRESS \
--partitions 5
B. Ändern der themenspezifischen Konfiguration
Möglicherweise sehen Sie ältere Beispiele, die Themenkonfigurationen über kafka-topics.sh --alter --config ändern. Verwenden Sie bei aktuellen Kafka-Clustern bevorzugt kafka-configs.sh für Themenkonfigurationsänderungen, da es das dedizierte Verwaltungstool für dynamische Konfigurationen ist.
Beispiel: Festlegen einer Nachrichtenaufbewahrungszeit von 24 Stunden (86400000 Millisekunden) für sales-data.
kafka-configs.sh --bootstrap-server $BROKER_ADDRESS \
--alter --entity-type topics --entity-name sales-data \
--add-config retention.ms=86400000
Um eine bestimmte Konfigurationsüberschreibung zu entfernen und zum Broker-Standard zurückzukehren, verwenden Sie --delete-config:
kafka-configs.sh --bootstrap-server $BROKER_ADDRESS \
--alter --entity-type topics --entity-name sales-data \
--delete-config retention.ms
5. Löschen eines Kafka-Themas
Nicht mehr verwendete Themen sollten ordnungsgemäß gelöscht werden, um Speicherplatz freizugeben und die Cluster-Hygiene aufrechtzuerhalten.
Aktivieren der Themenlöschung
Kafka-Broker müssen das Löschen erlauben, bevor dieser Befehl Daten entfernen kann. In vielen modernen Bereitstellungen ist es aktiviert, aber gehen Sie nicht davon aus. Überprüfen Sie die Broker-Konfiguration auf:
delete.topic.enable=true
Befehlsbeispiel: Löschen von old-stream
Verwenden Sie das Flag --delete, um die Themenentfernung einzuleiten. Das Löschen von Themen ist oft asynchron, d. h. der Befehl sendet die Anforderung und das Löschen erfolgt im Hintergrund.
kafka-topics.sh --delete --topic old-stream \
--bootstrap-server $BROKER_ADDRESS
Bestätigungsausgabe:
Löschen des Themas old-stream erfolgreich eingeleitet.
Zusammenfassung der Themenverwaltungsbefehle
| Aktion | Flag(s) | Zweck | Beispielparameter |
|---|---|---|---|
| Erstellen | --create |
Ein neues Thema initialisieren. | --partitions 5 --replication-factor 3 |
| Auflisten | --list |
Alle Themen im Cluster anzeigen. | N/A |
| Beschreiben | --describe |
Aktuelle Konfiguration und Layout anzeigen. | --topic my-topic |
| Ändern (Partitionen) | --alter |
Anzahl der Partitionen erhöhen. | --partitions N (N > aktuelle Anzahl) |
| Ändern (Konfig.) | kafka-configs.sh --alter |
Broker-Standardeinstellungen für ein bestimmtes Thema überschreiben. | --add-config retention.ms=... |
| Löschen | --delete |
Ein Thema dauerhaft entfernen. | --topic my-topic |
Nächste Schritte
- Üben Sie diese Befehle in einer Entwicklungs- oder Staging-Umgebung.
- Verwenden Sie
kafka-configs.sh --describevor und nach Konfigurationsänderungen, damit Sie wissen, welche Werte Themenüberschreibungen sind. - Lernen Sie die entsprechenden CLI-Befehle für Produzenten- und Konsumententests (
kafka-console-producer.shundkafka-console-consumer.sh).