Come Creare e Gestire Topic Kafka Tramite Riga di Comando
Apache Kafka è una piattaforma di streaming di eventi distribuita, spesso utilizzata per pipeline di dati ad alto throughput, analisi in tempo reale e comunicazione tra microservizi. L'unità organizzativa fondamentale all'interno di Kafka è il Topic, un nome di categoria o feed a cui vengono pubblicati i record.
Sebbene esistano strumenti grafici, il modo più robusto, affidabile e comune per interagire e gestire l'infrastruttura Kafka è direttamente tramite l'interfaccia a riga di comando (CLI). Padroneggiare questi comandi essenziali è fondamentale per gli amministratori e gli sviluppatori responsabili del mantenimento di un cluster Kafka sano ed efficiente. Questa guida fornisce un tutorial passo passo sull'utilizzo dello script kafka-topics.sh per eseguire le attività di gestione dei topic più comuni.
Prerequisiti e Configurazione
Per eseguire i comandi di questa guida, devi avere accesso a una macchina su cui sono installati i binari di Kafka. Tutte le operazioni di gestione dei topic vengono eseguite utilizzando l'utility kafka-topics.sh, solitamente presente nella directory bin della tua installazione Kafka.
Tutti i comandi richiedono l'indirizzo di almeno un broker Kafka, specificato utilizzando il flag --bootstrap-server. Se stai utilizzando una versione precedente di Kafka (precedente alla 2.2), potresti ancora fare affidamento sul flag --zookeeper, ma --bootstrap-server è lo standard moderno e raccomandato.
Per gli esempi seguenti, assumeremo che il broker sia in esecuzione localmente sulla porta predefinita:
# Segnaposto per l'indirizzo standard del broker
BROKER_ADDRESS="localhost:9092"
1. Creazione di un Nuovo Topic Kafka
La creazione di un topic richiede la definizione del suo nome, insieme a due parametri critici che ne determinano il comportamento e la tolleranza ai guasti: il numero di partizioni e il fattore di replica.
Parametri Essenziali
--topic <nome>: Il nome del topic.--partitions <N>: Il numero di partizioni in cui il topic verrà suddiviso. Le partizioni sono le unità di parallelismo e ordinamento all'interno di un topic.--replication-factor <N>: Il numero di copie dei dati che verranno mantenute su broker diversi. Un fattore di replica di 1 significa nessuna ridondanza.
Esempio di Comando: Creazione di sales-data
Questo comando crea un topic denominato sales-data con 3 partizioni e un fattore di replica di 2 (il che significa che esisteranno 2 copie di ogni partizione nel cluster).
kafka-topics.sh --create --topic sales-data \n --bootstrap-server $BROKER_ADDRESS \n --partitions 3 \n --replication-factor 2
Suggerimento: In un ambiente di produzione con N broker, un fattore di replica di 3 è spesso raccomandato per l'alta disponibilità (consentendo la perdita di due broker prima che si verifichi la perdita di dati), e il numero di partizioni dovrebbe essere ottimizzato in base al throughput previsto e alle esigenze di parallelismo dei consumer.
2. Elenco di Tutti i Topic
Per visualizzare tutti i topic attualmente disponibili nel cluster Kafka, usa il flag --list.
Esempio di Comando
kafka-topics.sh --list --bootstrap-server $BROKER_ADDRESS
Esempio di Output:
sales-data
logistics-stream
__consumer_offsets
3. Descrizione della Configurazione del Topic
Verificare la configurazione esistente, il numero di partizioni e l'assegnazione dei broker per un topic specifico è essenziale per la risoluzione dei problemi e la verifica. Usa il flag --describe.
Esempio di Comando: Descrizione di sales-data
kafka-topics.sh --describe --topic sales-data \n --bootstrap-server $BROKER_ADDRESS
Interpretazione dell'Output:
L'output mostra la configurazione sia a livello di topic che a livello di partizione:
Topic: sales-data PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: sales-data Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: sales-data Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: sales-data Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
- Leader: Il broker attualmente responsabile della gestione delle letture/scritture per quella partizione.
- Replicas: L'elenco dei broker che detengono una copia di quella partizione.
- Isr (In-Sync Replicas): Il sottoinsieme di repliche completamente sincronizzate con il Leader. L'alta disponibilità richiede che il Leader sia nell'ISR.
4. Modifica di Topic Esistenti
Kafka fornisce meccanismi limitati per la modifica dei topic dopo la creazione. Le due attività di modifica più comuni sono l'aumento del numero di partizioni e la sovrascrittura delle impostazioni di configurazione predefinite dei broker.
A. Aumento delle Partizioni
Le partizioni possono solo essere aumentate, mai diminuite. L'aumento delle partizioni aiuta a scalare il parallelismo dei consumer.
Attenzione: L'aumento delle partizioni modifica il modo in cui i messaggi vengono mappati (hashing) alle partizioni. Se i tuoi producer si basano su garanzie di ordinamento basate su chiavi, l'aumento delle partizioni potrebbe interrompere la consegna ordinata per le chiavi esistenti.
Se sales-data ha attualmente 3 partizioni, possiamo aumentarlo a 5:
kafka-topics.sh --alter --topic sales-data \n --bootstrap-server $BROKER_ADDRESS \n --partitions 5
B. Modifica della Configurazione Specifica del Topic
Puoi sovrascrivere le impostazioni globali dei broker (come il tempo di conservazione dei messaggi o le policy di pulizia) per i singoli topic utilizzando il flag --config.
Esempio: Impostazione di un tempo di conservazione dei messaggi di 24 ore (86400000 millisecondi) per sales-data.
kafka-topics.sh --alter --topic sales-data \n --bootstrap-server $BROKER_ADDRESS \n --config retention.ms=86400000
Per rimuovere una specifica sovrascrittura di configurazione e ripristinare l'impostazione predefinita del broker, usa il flag --delete-config:
kafka-topics.sh --alter --topic sales-data \n --bootstrap-server $BROKER_ADDRESS \n --delete-config retention.ms
5. Eliminazione di un Topic Kafka
I topic non più in uso dovrebbero essere eliminati correttamente per recuperare spazio su disco e mantenere l'igiene del cluster.
Abilitazione dell'Eliminazione dei Topic
Per impostazione predefinita, i broker Kafka potrebbero disabilitare l'eliminazione dei topic per sicurezza. Prima di poter eliminare un topic, assicurati che la seguente impostazione sia abilitata nel tuo file server.properties su tutti i broker:
delete.topic.enable=true
Esempio di Comando: Eliminazione di old-stream
Usa il flag --delete per avviare la rimozione del topic. L'eliminazione dei topic è spesso asincrona, il che significa che il comando invia la richiesta e l'eliminazione avviene in background.
kafka-topics.sh --delete --topic old-stream \n --bootstrap-server $BROKER_ADDRESS
Output di Conferma:
Deletion of topic old-stream initiated successfully.
Riepilogo dei Comandi di Gestione dei Topic
| Azione | Flag/i | Scopo | Parametri Esempio |
|---|---|---|---|
| Crea | --create |
Inizializza un nuovo topic. | --partitions 5 --replication-factor 3 |
| Elenca | --list |
Mostra tutti i topic nel cluster. | N/A |
| Descrivi | --describe |
Visualizza la configurazione e il layout corrente. | --topic my-topic |
| Modifica (Partizioni) | --alter |
Aumenta il numero di partizioni. | --partitions N (N > conteggio corrente) |
| Modifica (Config) | --alter --config |
Sovrascrivi i valori predefiniti del broker per un topic specifico. | --config retention.ms=... |
| Elimina | --delete |
Rimuove un topic in modo permanente. | --topic my-topic |
Conclusione e Prossimi Passi
La riga di comando rimane l'interfaccia più potente e flessibile per la gestione del tuo cluster Kafka. Padroneggiando l'utility kafka-topics.sh, ottieni un controllo granulare sui parametri di creazione dei topic, sulle sovrascritture di configurazione e sulle azioni amministrative necessarie come l'eliminazione e la descrizione.
Prossimi Passi:
- Prova questi comandi in un ambiente di sviluppo o staging.
- Esplora le opzioni di configurazione avanzate utilizzando il comando
--describeper vedere l'elenco completo delle proprietà configurabili (ad esempio,cleanup.policy,max.message.bytes). - Impara i comandi CLI corrispondenti per il test di producer e consumer (
kafka-console-producer.shekafka-console-consumer.sh).