Come Creare e Gestire Topic Kafka Utilizzando la Riga di Comando
Impara come creare, elencare, descrivere, modificare ed eliminare topic Kafka con kafka-topics.sh dalla riga di comando.
Come Creare e Gestire Topic Kafka Utilizzando la Riga di Comando
I topic di Apache Kafka sono il luogo in cui i tuoi produttori scrivono i record e i tuoi consumatori li leggono. Se hai bisogno di creare, ispezionare, ridimensionare o eliminare un topic da un terminale, lo strumento principale è kafka-topics.sh.
Gli strumenti grafici sono utili, ma i comandi CLI sono ancora il modo più veloce per verificare cosa sta facendo il cluster durante un incidente o un deployment. Questa guida mostra i comandi per i topic che utilizzerai più spesso e segnala i punti in cui Kafka può sorprenderti.
Prerequisiti e Configurazione
Per eseguire i comandi in questa guida, devi avere accesso a una macchina in cui sono installati i binari di Kafka. Tutte le operazioni di gestione dei topic vengono eseguite utilizzando l'utilità kafka-topics.sh, che si trova tipicamente nella directory bin della tua installazione Kafka.
Tutti i comandi richiedono l'indirizzo di almeno un broker Kafka, specificato con --bootstrap-server. I cluster Kafka più vecchi potrebbero ancora mostrare esempi che utilizzano --zookeeper, ma l'amministrazione basata su broker è il modello attuale.
Per gli esempi seguenti, assumeremo che il broker sia in esecuzione localmente sulla porta predefinita:
# Segnaposto per l'indirizzo del broker standard
BROKER_ADDRESS="localhost:9092"
1. Creazione di un Nuovo Topic Kafka
Creare un topic richiede la definizione del suo nome, insieme a due parametri critici che ne determinano il comportamento e la tolleranza ai guasti: il numero di partizioni e il fattore di replica.
Parametri Essenziali
--topic <nome>: Il nome del topic.--partitions <N>: Il numero di partizioni in cui il topic sarà suddiviso. Le partizioni sono le unità di parallelismo e ordinamento all'interno di un topic.--replication-factor <N>: Il numero di copie dei dati che verranno mantenute su diversi broker. Un fattore di replica pari a 1 significa nessuna ridondanza.
Esempio di Comando: Creazione di sales-data
Questo comando crea un topic chiamato sales-data con 3 partizioni e un fattore di replica di 2 (il che significa che 2 copie di ogni partizione esisteranno nel cluster).
kafka-topics.sh --create --topic sales-data \
--bootstrap-server $BROKER_ADDRESS \
--partitions 3 \
--replication-factor 2
Suggerimento: In produzione, un fattore di replica di 3 è comune perché mantiene più copie di ogni partizione su broker diversi. La reale tolleranza ai guasti dipende anche dal posizionamento dei broker, da
min.insync.replicase dalle impostazioni di acknowledgment del produttore.
2. Elenco di Tutti i Topic
Per visualizzare tutti i topic attualmente disponibili nel cluster Kafka, utilizza il flag --list.
Esempio di Comando
kafka-topics.sh --list --bootstrap-server $BROKER_ADDRESS
Esempio di Output:
sales-data
logistics-stream
__consumer_offsets
3. Descrizione della Configurazione del Topic
Controllare la configurazione esistente, il conteggio delle partizioni e l'assegnazione dei broker per un topic specifico è essenziale per la risoluzione dei problemi e la verifica. Utilizza il flag --describe.
Esempio di Comando: Descrizione di sales-data
kafka-topics.sh --describe --topic sales-data \
--bootstrap-server $BROKER_ADDRESS
Interpretazione dell'Output:
L'output mostra la configurazione sia a livello di topic che a livello di partizione:
Topic: sales-data PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: sales-data Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: sales-data Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: sales-data Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
- Leader: Il broker attualmente responsabile della gestione delle letture/scritture per quella partizione.
- Replicas: L'elenco dei broker che contengono una copia di quella partizione.
- Isr (In-Sync Replicas): Le repliche che sono sufficientemente aggiornate per essere idonee a un failover sicuro. Se ISR si riduce, controlla lo stato di salute del broker, la latenza del disco e il ritardo di rete.
4. Modifica dei Topic Esistenti
Kafka fornisce meccanismi limitati per la modifica dei topic dopo la creazione. I due compiti di modifica più comuni sono l'aumento del numero di partizioni e la sovrascrittura delle impostazioni di configurazione predefinite del broker.
A. Aumento delle Partizioni
Le partizioni possono essere solo aumentate, mai diminuite. L'aumento delle partizioni aiuta a scalare il parallelismo dei consumatori.
Attenzione: L'aumento delle partizioni modifica il modo in cui i messaggi vengono mappati (hash) alle partizioni. Se i tuoi produttori si basano su garanzie di ordinamento basate sulla chiave, l'aumento delle partizioni potrebbe rompere la consegna ordinata per le chiavi esistenti.
Se sales-data ha attualmente 3 partizioni, possiamo aumentarle a 5:
kafka-topics.sh --alter --topic sales-data \
--bootstrap-server $BROKER_ADDRESS \
--partitions 5
B. Modifica della Configurazione Specifica del Topic
Potresti vedere esempi più vecchi che modificano le configurazioni dei topic tramite kafka-topics.sh --alter --config. Sui cluster Kafka attuali, preferisci kafka-configs.sh per le modifiche alla configurazione dei topic perché è lo strumento di amministrazione dedicato per le configurazioni dinamiche.
Esempio: Impostazione di un tempo di conservazione dei messaggi di 24 ore (86400000 millisecondi) per sales-data.
kafka-configs.sh --bootstrap-server $BROKER_ADDRESS \
--alter --entity-type topics --entity-name sales-data \
--add-config retention.ms=86400000
Per rimuovere una specifica sovrascrittura di configurazione e tornare al valore predefinito del broker, utilizza --delete-config:
kafka-configs.sh --bootstrap-server $BROKER_ADDRESS \
--alter --entity-type topics --entity-name sales-data \
--delete-config retention.ms
5. Eliminazione di un Topic Kafka
I topic che non sono più in uso dovrebbero essere eliminati correttamente per recuperare spazio su disco e mantenere l'igiene del cluster.
Abilitazione dell'Eliminazione dei Topic
I broker Kafka devono consentire l'eliminazione prima che questo comando possa rimuovere i dati. In molte implementazioni moderne è abilitata, ma non dare per scontato. Controlla la configurazione del broker per:
delete.topic.enable=true
Esempio di Comando: Eliminazione di old-stream
Utilizza il flag --delete per avviare la rimozione del topic. L'eliminazione del topic è spesso asincrona, il che significa che il comando invia la richiesta e l'eliminazione avviene in background.
kafka-topics.sh --delete --topic old-stream \
--bootstrap-server $BROKER_ADDRESS
Output di Conferma:
Eliminazione del topic old-stream avviata con successo.
Riepilogo dei Comandi di Gestione dei Topic
| Azione | Flag | Scopo | Parametri di Esempio |
|---|---|---|---|
| Creazione | --create |
Inizializzare un nuovo topic. | --partitions 5 --replication-factor 3 |
| Elenco | --list |
Mostrare tutti i topic nel cluster. | N/D |
| Descrizione | --describe |
Visualizzare la configurazione e la disposizione attuali. | --topic my-topic |
| Modifica (Partizioni) | --alter |
Aumentare il numero di partizioni. | --partitions N (N > conteggio attuale) |
| Modifica (Config) | kafka-configs.sh --alter |
Sovrascrivere le impostazioni predefinite del broker per un topic specifico. | --add-config retention.ms=... |
| Eliminazione | --delete |
Rimuovere un topic in modo permanente. | --topic my-topic |
Passi Successivi
- Esercitati con questi comandi in un ambiente di sviluppo o staging.
- Utilizza
kafka-configs.sh --describeprima e dopo le modifiche alla configurazione in modo da sapere quali valori sono sovrascritture del topic. - Impara i corrispondenti comandi CLI per il test di produttori e consumatori (
kafka-console-producer.shekafka-console-consumer.sh).