Comment créer et gérer des sujets Kafka en ligne de commande

Apprenez à créer, lister, décrire, modifier et supprimer des sujets Kafka avec kafka-topics.sh depuis la ligne de commande.

Comment créer et gérer des sujets Kafka en ligne de commande

Les sujets Apache Kafka sont l'endroit où vos producteurs écrivent des enregistrements et vos consommateurs les lisent. Si vous devez créer, inspecter, redimensionner ou supprimer un sujet depuis un terminal, l'outil principal est kafka-topics.sh.

Les outils graphiques sont utiles, mais les commandes CLI restent le moyen le plus rapide de vérifier ce que fait le cluster lors d'un incident ou d'un déploiement. Ce guide présente les commandes de sujets que vous utiliserez le plus souvent et attire l'attention sur les endroits où Kafka peut vous surprendre.

Prérequis et configuration

Pour exécuter les commandes de ce guide, vous devez avoir accès à une machine où les binaires Kafka sont installés. Toutes les opérations de gestion des sujets sont effectuées à l'aide de l'utilitaire kafka-topics.sh, généralement situé dans le répertoire bin de votre installation Kafka.

Toutes les commandes nécessitent l'adresse d'au moins un courtier Kafka, spécifiée avec --bootstrap-server. Les anciens clusters Kafka peuvent encore montrer des exemples utilisant --zookeeper, mais l'administration basée sur les courtiers est le modèle actuel.

Pour les exemples ci-dessous, nous supposerons que le courtier fonctionne localement sur le port par défaut :

# Adresse standard du courtier (à remplacer)
BROKER_ADDRESS="localhost:9092"

1. Création d'un nouveau sujet Kafka

La création d'un sujet nécessite de définir son nom, ainsi que deux paramètres critiques qui dictent son comportement et sa tolérance aux pannes : le nombre de partitions et le facteur de réplication.

Paramètres essentiels

  • --topic <nom> : Le nom du sujet.
  • --partitions <N> : Le nombre de partitions dans lequel le sujet sera divisé. Les partitions sont les unités de parallélisme et d'ordre au sein d'un sujet.
  • --replication-factor <N> : Le nombre de copies des données qui seront conservées sur différents courtiers. Un facteur de réplication de 1 signifie aucune redondance.

Exemple de commande : Création de sales-data

Cette commande crée un sujet nommé sales-data avec 3 partitions et un facteur de réplication de 2 (ce qui signifie que 2 copies de chaque partition existeront sur le cluster).

kafka-topics.sh --create --topic sales-data \
  --bootstrap-server $BROKER_ADDRESS \
  --partitions 3 \
  --replication-factor 2

Astuce : En production, un facteur de réplication de 3 est courant car il conserve plusieurs copies de chaque partition sur différents courtiers. La véritable tolérance aux pannes dépend également de l'emplacement des courtiers, de min.insync.replicas et des paramètres d'accusé de réception du producteur.

2. Liste de tous les sujets

Pour voir tous les sujets actuellement disponibles dans le cluster Kafka, utilisez l'option --list.

Exemple de commande

kafka-topics.sh --list --bootstrap-server $BROKER_ADDRESS

Exemple de sortie :

sales-data
logistics-stream
__consumer_offsets

3. Description de la configuration d'un sujet

Vérifier la configuration existante, le nombre de partitions et l'affectation des courtiers pour un sujet spécifique est essentiel pour le dépannage et la vérification. Utilisez l'option --describe.

Exemple de commande : Description de sales-data

kafka-topics.sh --describe --topic sales-data \
  --bootstrap-server $BROKER_ADDRESS

Interprétation de la sortie :

La sortie affiche la configuration à la fois au niveau du sujet et au niveau de la partition :

Topic: sales-data  PartitionCount: 3 ReplicationFactor: 2 Configs:
  Topic: sales-data  Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
  Topic: sales-data  Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
  Topic: sales-data  Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
  • Leader : Le courtier actuellement responsable de la gestion des lectures/écritures pour cette partition.
  • Replicas : La liste des courtiers détenant une copie de cette partition.
  • Isr (In-Sync Replicas) : Les réplicas suffisamment à jour pour être éligibles à un basculement sûr. Si l'ISR diminue, vérifiez l'état du courtier, la latence du disque et la latence réseau.

4. Modification des sujets existants

Kafka fournit des mécanismes limités pour modifier les sujets après leur création. Les deux tâches de modification les plus courantes sont l'augmentation du nombre de partitions et le remplacement des paramètres de configuration par défaut du courtier.

A. Augmentation des partitions

Les partitions ne peuvent être augmentées, jamais diminuées. L'augmentation des partitions permet de faire évoluer le parallélisme des consommateurs.

Attention : L'augmentation des partitions modifie la façon dont les messages sont mappés (hachés) aux partitions. Si vos producteurs s'appuient sur des garanties d'ordre basées sur les clés, l'augmentation des partitions peut briser la livraison ordonnée pour les clés existantes.

Si sales-data a actuellement 3 partitions, nous pouvons l'augmenter à 5 :

kafka-topics.sh --alter --topic sales-data \
  --bootstrap-server $BROKER_ADDRESS \
  --partitions 5

B. Modification de la configuration spécifique au sujet

Vous pouvez voir des exemples plus anciens modifiant les configurations de sujets via kafka-topics.sh --alter --config. Sur les clusters Kafka actuels, préférez kafka-configs.sh pour les modifications de configuration de sujets car c'est l'outil d'administration dédié aux configurations dynamiques.

Exemple : Définition d'un temps de rétention des messages de 24 heures (86400000 millisecondes) pour sales-data.

kafka-configs.sh --bootstrap-server $BROKER_ADDRESS \
  --alter --entity-type topics --entity-name sales-data \
  --add-config retention.ms=86400000

Pour supprimer un remplacement de configuration spécifique et revenir à la valeur par défaut du courtier, utilisez --delete-config :

kafka-configs.sh --bootstrap-server $BROKER_ADDRESS \
  --alter --entity-type topics --entity-name sales-data \
  --delete-config retention.ms

5. Suppression d'un sujet Kafka

Les sujets qui ne sont plus utilisés doivent être correctement supprimés pour libérer de l'espace disque et maintenir l'hygiène du cluster.

Activation de la suppression des sujets

Les courtiers Kafka doivent autoriser la suppression avant que cette commande ne puisse supprimer des données. Dans de nombreux déploiements modernes, elle est activée, mais ne le supposez pas. Vérifiez la configuration du courtier pour :

delete.topic.enable=true

Exemple de commande : Suppression de old-stream

Utilisez l'option --delete pour lancer la suppression du sujet. La suppression d'un sujet est souvent asynchrone, ce qui signifie que la commande soumet la demande et que la suppression se produit en arrière-plan.

kafka-topics.sh --delete --topic old-stream \
  --bootstrap-server $BROKER_ADDRESS

Sortie de confirmation :

Suppression du sujet old-stream initiée avec succès.

Résumé des commandes de gestion des sujets

Action Option(s) Objectif Exemples de paramètres
Créer --create Initialiser un nouveau sujet. --partitions 5 --replication-factor 3
Lister --list Afficher tous les sujets du cluster. N/A
Décrire --describe Voir la configuration et la disposition actuelles. --topic mon-sujet
Modifier (Partitions) --alter Augmenter le nombre de partitions. --partitions N (N > nombre actuel)
Modifier (Config) kafka-configs.sh --alter Remplacer les valeurs par défaut du courtier pour un sujet spécifique. --add-config retention.ms=...
Supprimer --delete Supprimer un sujet définitivement. --topic mon-sujet

Prochaines étapes

  1. Pratiquez ces commandes dans un environnement de développement ou de préproduction.
  2. Utilisez kafka-configs.sh --describe avant et après les modifications de configuration afin de savoir quelles valeurs sont des remplacements de sujets.
  3. Apprenez les commandes CLI correspondantes pour les tests de producteurs et de consommateurs (kafka-console-producer.sh et kafka-console-consumer.sh).