Come Creare e Gestire Topic Kafka Utilizzando la Riga di Comando

Impara come creare, elencare, descrivere, modificare ed eliminare topic Kafka con kafka-topics.sh dalla riga di comando.

Come Creare e Gestire Topic Kafka Utilizzando la Riga di Comando

I topic di Apache Kafka sono il luogo in cui i tuoi produttori scrivono i record e i tuoi consumatori li leggono. Se hai bisogno di creare, ispezionare, ridimensionare o eliminare un topic da un terminale, lo strumento principale è kafka-topics.sh.

Gli strumenti grafici sono utili, ma i comandi CLI sono ancora il modo più veloce per verificare cosa sta facendo il cluster durante un incidente o un deployment. Questa guida mostra i comandi per i topic che utilizzerai più spesso e segnala i punti in cui Kafka può sorprenderti.

Prerequisiti e Configurazione

Per eseguire i comandi in questa guida, devi avere accesso a una macchina in cui sono installati i binari di Kafka. Tutte le operazioni di gestione dei topic vengono eseguite utilizzando l'utilità kafka-topics.sh, che si trova tipicamente nella directory bin della tua installazione Kafka.

Tutti i comandi richiedono l'indirizzo di almeno un broker Kafka, specificato con --bootstrap-server. I cluster Kafka più vecchi potrebbero ancora mostrare esempi che utilizzano --zookeeper, ma l'amministrazione basata su broker è il modello attuale.

Per gli esempi seguenti, assumeremo che il broker sia in esecuzione localmente sulla porta predefinita:

# Segnaposto per l'indirizzo del broker standard
BROKER_ADDRESS="localhost:9092"

1. Creazione di un Nuovo Topic Kafka

Creare un topic richiede la definizione del suo nome, insieme a due parametri critici che ne determinano il comportamento e la tolleranza ai guasti: il numero di partizioni e il fattore di replica.

Parametri Essenziali

  • --topic <nome>: Il nome del topic.
  • --partitions <N>: Il numero di partizioni in cui il topic sarà suddiviso. Le partizioni sono le unità di parallelismo e ordinamento all'interno di un topic.
  • --replication-factor <N>: Il numero di copie dei dati che verranno mantenute su diversi broker. Un fattore di replica pari a 1 significa nessuna ridondanza.

Esempio di Comando: Creazione di sales-data

Questo comando crea un topic chiamato sales-data con 3 partizioni e un fattore di replica di 2 (il che significa che 2 copie di ogni partizione esisteranno nel cluster).

kafka-topics.sh --create --topic sales-data \
  --bootstrap-server $BROKER_ADDRESS \
  --partitions 3 \
  --replication-factor 2

Suggerimento: In produzione, un fattore di replica di 3 è comune perché mantiene più copie di ogni partizione su broker diversi. La reale tolleranza ai guasti dipende anche dal posizionamento dei broker, da min.insync.replicas e dalle impostazioni di acknowledgment del produttore.

2. Elenco di Tutti i Topic

Per visualizzare tutti i topic attualmente disponibili nel cluster Kafka, utilizza il flag --list.

Esempio di Comando

kafka-topics.sh --list --bootstrap-server $BROKER_ADDRESS

Esempio di Output:

sales-data
logistics-stream
__consumer_offsets

3. Descrizione della Configurazione del Topic

Controllare la configurazione esistente, il conteggio delle partizioni e l'assegnazione dei broker per un topic specifico è essenziale per la risoluzione dei problemi e la verifica. Utilizza il flag --describe.

Esempio di Comando: Descrizione di sales-data

kafka-topics.sh --describe --topic sales-data \
  --bootstrap-server $BROKER_ADDRESS

Interpretazione dell'Output:

L'output mostra la configurazione sia a livello di topic che a livello di partizione:

Topic: sales-data  PartitionCount: 3 ReplicationFactor: 2 Configs:
  Topic: sales-data  Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
  Topic: sales-data  Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
  Topic: sales-data  Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
  • Leader: Il broker attualmente responsabile della gestione delle letture/scritture per quella partizione.
  • Replicas: L'elenco dei broker che contengono una copia di quella partizione.
  • Isr (In-Sync Replicas): Le repliche che sono sufficientemente aggiornate per essere idonee a un failover sicuro. Se ISR si riduce, controlla lo stato di salute del broker, la latenza del disco e il ritardo di rete.

4. Modifica dei Topic Esistenti

Kafka fornisce meccanismi limitati per la modifica dei topic dopo la creazione. I due compiti di modifica più comuni sono l'aumento del numero di partizioni e la sovrascrittura delle impostazioni di configurazione predefinite del broker.

A. Aumento delle Partizioni

Le partizioni possono essere solo aumentate, mai diminuite. L'aumento delle partizioni aiuta a scalare il parallelismo dei consumatori.

Attenzione: L'aumento delle partizioni modifica il modo in cui i messaggi vengono mappati (hash) alle partizioni. Se i tuoi produttori si basano su garanzie di ordinamento basate sulla chiave, l'aumento delle partizioni potrebbe rompere la consegna ordinata per le chiavi esistenti.

Se sales-data ha attualmente 3 partizioni, possiamo aumentarle a 5:

kafka-topics.sh --alter --topic sales-data \
  --bootstrap-server $BROKER_ADDRESS \
  --partitions 5

B. Modifica della Configurazione Specifica del Topic

Potresti vedere esempi più vecchi che modificano le configurazioni dei topic tramite kafka-topics.sh --alter --config. Sui cluster Kafka attuali, preferisci kafka-configs.sh per le modifiche alla configurazione dei topic perché è lo strumento di amministrazione dedicato per le configurazioni dinamiche.

Esempio: Impostazione di un tempo di conservazione dei messaggi di 24 ore (86400000 millisecondi) per sales-data.

kafka-configs.sh --bootstrap-server $BROKER_ADDRESS \
  --alter --entity-type topics --entity-name sales-data \
  --add-config retention.ms=86400000

Per rimuovere una specifica sovrascrittura di configurazione e tornare al valore predefinito del broker, utilizza --delete-config:

kafka-configs.sh --bootstrap-server $BROKER_ADDRESS \
  --alter --entity-type topics --entity-name sales-data \
  --delete-config retention.ms

5. Eliminazione di un Topic Kafka

I topic che non sono più in uso dovrebbero essere eliminati correttamente per recuperare spazio su disco e mantenere l'igiene del cluster.

Abilitazione dell'Eliminazione dei Topic

I broker Kafka devono consentire l'eliminazione prima che questo comando possa rimuovere i dati. In molte implementazioni moderne è abilitata, ma non dare per scontato. Controlla la configurazione del broker per:

delete.topic.enable=true

Esempio di Comando: Eliminazione di old-stream

Utilizza il flag --delete per avviare la rimozione del topic. L'eliminazione del topic è spesso asincrona, il che significa che il comando invia la richiesta e l'eliminazione avviene in background.

kafka-topics.sh --delete --topic old-stream \
  --bootstrap-server $BROKER_ADDRESS

Output di Conferma:

Eliminazione del topic old-stream avviata con successo.

Riepilogo dei Comandi di Gestione dei Topic

Azione Flag Scopo Parametri di Esempio
Creazione --create Inizializzare un nuovo topic. --partitions 5 --replication-factor 3
Elenco --list Mostrare tutti i topic nel cluster. N/D
Descrizione --describe Visualizzare la configurazione e la disposizione attuali. --topic my-topic
Modifica (Partizioni) --alter Aumentare il numero di partizioni. --partitions N (N > conteggio attuale)
Modifica (Config) kafka-configs.sh --alter Sovrascrivere le impostazioni predefinite del broker per un topic specifico. --add-config retention.ms=...
Eliminazione --delete Rimuovere un topic in modo permanente. --topic my-topic

Passi Successivi

  1. Esercitati con questi comandi in un ambiente di sviluppo o staging.
  2. Utilizza kafka-configs.sh --describe prima e dopo le modifiche alla configurazione in modo da sapere quali valori sono sovrascritture del topic.
  3. Impara i corrispondenti comandi CLI per il test di produttori e consumatori (kafka-console-producer.sh e kafka-console-consumer.sh).