Kafka-Topics über die Befehlszeile erstellen und verwalten
Apache Kafka ist eine verteilte Event-Streaming-Plattform, die häufig für Hochdurchsatz-Datenpipelines, Echtzeitanalysen und die Kommunikation von Microservices eingesetzt wird. Die grundlegende Organisationseinheit innerhalb von Kafka ist das Topic, ein Kategorie- oder Feed-Name, unter dem Datensätze veröffentlicht werden.
Obwohl grafische Tools existieren, ist die robusteste, zuverlässigste und gebräuchlichste Methode zur Interaktion und Verwaltung der Kafka-Infrastruktur direkt über die Befehlszeilenschnittstelle (CLI). Die Beherrschung dieser grundlegenden Befehle ist für Administratoren und Entwickler, die für die Pflege eines gesunden und effizienten Kafka-Clusters verantwortlich sind, von entscheidender Bedeutung. Diese Anleitung bietet ein Schritt-für-Schritt-Tutorial zur Verwendung des Skripts kafka-topics.sh, um die gängigsten Aufgaben der Topic-Verwaltung durchzuführen.
Voraussetzungen und Einrichtung
Um die Befehle in dieser Anleitung auszuführen, müssen Sie Zugriff auf eine Maschine haben, auf der die Kafka-Binärdateien installiert sind. Alle Topic-Verwaltungsvorgänge werden mit dem Dienstprogramm kafka-topics.sh durchgeführt, das sich typischerweise im bin-Verzeichnis Ihrer Kafka-Installation befindet.
Alle Befehle erfordern die Adresse mindestens eines Kafka-Brokers, die mit dem Flag --bootstrap-server angegeben wird. Wenn Sie eine ältere Kafka-Version (vor 2.2) verwenden, verlassen Sie sich möglicherweise immer noch auf das Flag --zookeeper, aber --bootstrap-server ist der empfohlene und moderne Standard.
Für die folgenden Beispiele gehen wir davon aus, dass der Broker lokal auf dem Standardport läuft:
# Platzhalter für Standard-Broker-Adresse
BROKER_ADDRESS="localhost:9092"
1. Erstellen eines neuen Kafka-Topics
Das Erstellen eines Topics erfordert die Definition seines Namens sowie zweier kritischer Parameter, die sein Verhalten und seine Fehlertoleranz bestimmen: die Anzahl der Partitionen und der Replikationsfaktor.
Wesentliche Parameter
--topic <Name>: Der Name des Topics.--partitions <N>: Die Anzahl der Partitionen, in die das Topic aufgeteilt wird. Partitionen sind die Einheiten der Parallelität und Reihenfolge innerhalb eines Topics.--replication-factor <N>: Die Anzahl der Datenkopien, die über verschiedene Broker hinweg aufrechterhalten werden. Ein Replikationsfaktor von 1 bedeutet keine Redundanz.
Befehlsbeispiel: Erstellen von sales-data
Dieser Befehl erstellt ein Topic namens sales-data mit 3 Partitionen und einem Replikationsfaktor von 2 (was bedeutet, dass 2 Kopien jeder Partition im Cluster vorhanden sein werden).
kf-topics.sh --create --topic sales-data \n --bootstrap-server $BROKER_ADDRESS \n --partitions 3 \n --replication-factor 2
Tipp: In einer Produktionsumgebung mit N Brokern wird oft ein Replikationsfaktor von 3 für hohe Verfügbarkeit empfohlen (was den Verlust von zwei Brokern vor Datenverlust zulässt), und die Anzahl der Partitionen sollte basierend auf dem erwarteten Durchsatz und den Anforderungen an die Consumer-Parallelität angepasst werden.
2. Alle Topics auflisten
Um alle derzeit im Kafka-Cluster verfügbaren Topics anzuzeigen, verwenden Sie das Flag --list.
Befehlsbeispiel
kafka-topics.sh --list --bootstrap-server $BROKER_ADDRESS
Beispielausgabe:
sales-data
logistics-stream
__consumer-offsets
3. Topic-Konfiguration beschreiben
Die Überprüfung der bestehenden Konfiguration, der Partitionsanzahl und der Broker-Zuweisung für ein bestimmtes Topic ist für die Fehlerbehebung und Verifizierung unerlässlich. Verwenden Sie das Flag --describe.
Befehlsbeispiel: Beschreiben von sales-data
kafka-topics.sh --describe --topic sales-data \n --bootstrap-server $BROKER_ADDRESS
Ausgabeinterpretation:
Die Ausgabe zeigt die Konfiguration sowohl auf Topic-Ebene als auch auf Partitionsebene:
Topic: sales-data PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: sales-data Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: sales-data Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: sales-data Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
- Leader: Der Broker, der derzeit für die Bearbeitung von Lese-/Schreibvorgängen für diese Partition verantwortlich ist.
- Replicas: Die Liste der Broker, die eine Kopie dieser Partition halten.
- ISR (In-Sync Replicas): Die Untermenge der Replikate, die vollständig mit dem Leader synchronisiert sind. Hohe Verfügbarkeit erfordert, dass der Leader im ISR ist.
4. Vorhandene Topics ändern
Kafka bietet begrenzte Mechanismen zur Änderung von Topics nach der Erstellung. Die beiden häufigsten Änderungsaufgaben sind die Erhöhung der Partitionsanzahl und das Überschreiben von Standard-Broker-Konfigurationseinstellungen.
A. Partitionen erhöhen
Partitionen können nur erhöht, niemals verringert werden. Das Erhöhen von Partitionen hilft, die Consumer-Parallelität zu skalieren.
Warnung: Das Erhöhen von Partitionen ändert, wie Nachrichten Partitionen zugeordnet (gehasht) werden. Wenn Ihre Producer auf schlüsselbasierte Reihenfolgegarantien angewiesen sind, kann das Erhöhen von Partitionen die geordnete Zustellung für bestehende Schlüssel unterbrechen.
Wenn sales-data derzeit 3 Partitionen hat, können wir diese auf 5 erhöhen:
kafka-topics.sh --alter --topic sales-data \n --bootstrap-server $BROKER_ADDRESS \n --partitions 5
B. Topic-spezifische Konfiguration ändern
Sie können globale Broker-Einstellungen (wie Nachrichtenspeicherzeit oder Bereinigungsrichtlinien) für einzelne Topics mithilfe des Flags --config überschreiben.
Beispiel: Festlegen einer Nachrichtenspeicherzeit von 24 Stunden (86400000 Millisekunden) für sales-data.
kafka-topics.sh --alter --topic sales-data \n --bootstrap-server $BROKER_ADDRESS \n --config retention.ms=86400000
Um eine bestimmte Konfigurationsüberschreibung zu entfernen und zur Standard-Broker-Einstellung zurückzukehren, verwenden Sie das Flag --delete-config:
kafka-topics.sh --alter --topic sales-data \n --bootstrap-server $BROKER_ADDRESS \n --delete-config retention.ms
5. Löschen eines Kafka-Topics
Nicht mehr verwendete Topics sollten ordnungsgemäß gelöscht werden, um Speicherplatz freizugeben und die Cluster-Hygiene zu gewährleisten.
Topic-Löschung aktivieren
Standardmäßig können Kafka-Broker die Topic-Löschung aus Sicherheitsgründen deaktivieren. Bevor Sie ein Topic löschen können, stellen Sie sicher, dass die folgende Einstellung in Ihrer server.properties-Datei auf allen Brokern aktiviert ist:
delete.topic.enable=true
Befehlsbeispiel: Löschen von old-stream
Verwenden Sie das Flag --delete, um die Topic-Entfernung zu initiieren. Die Topic-Löschung ist oft asynchron, d.h. der Befehl übermittelt die Anforderung, und die Löschung erfolgt im Hintergrund.
kafka-topics.sh --delete --topic old-stream \n --bootstrap-server $BROKER_ADDRESS
Bestätigungsausgabe:
Deletion of topic old-stream initiated successfully.
Zusammenfassung der Topic-Verwaltungsbefehle
| Aktion | Flag(s) | Zweck | Beispielparameter |
|---|---|---|---|
| Erstellen | --create |
Initialisiert ein neues Topic. | --partitions 5 --replication-factor 3 |
| Auflisten | --list |
Zeigt alle Topics im Cluster an. | N/A |
| Beschreiben | --describe |
Zeigt die aktuelle Konfiguration und das Layout an. | --topic my-topic |
| Ändern (Partitionen) | --alter |
Erhöht die Anzahl der Partitionen. | --partitions N (N > aktuelle Anzahl) |
| Ändern (Konfiguration) | --alter --config |
Überschreibt Broker-Standardwerte für ein bestimmtes Topic. | --config retention.ms=... |
| Löschen | --delete |
Entfernt ein Topic dauerhaft. | --topic my-topic |
Fazit und nächste Schritte
Die Befehlszeile bleibt die leistungsfähigste und flexibelste Schnittstelle zur Verwaltung Ihres Kafka-Clusters. Durch die Beherrschung des Dienstprogramms kafka-topics.sh erhalten Sie eine detaillierte Kontrolle über Topic-Erstellungsparameter, Konfigurationsüberschreibungen und notwendige administrative Aktionen wie Löschen und Beschreiben.
Nächste Schritte:
- Üben Sie diese Befehle in einer Entwicklungs- oder Staging-Umgebung.
- Erkunden Sie erweiterte Konfigurationsoptionen mit dem Befehl
--describe, um die vollständige Liste der konfigurierbaren Eigenschaften (z.B.cleanup.policy,max.message.bytes) anzuzeigen. - Lernen Sie die entsprechenden CLI-Befehle für das Testen von Producer und Consumer (
kafka-console-producer.shundkafka-console-consumer.sh).