Как создавать и управлять топиками Kafka с помощью командной строки

Узнайте, как создавать, выводить список, описывать, изменять и удалять топики Kafka с помощью kafka-topics.sh из командной строки.

Как создавать и управлять топиками Kafka с помощью командной строки

Топики Apache Kafka — это место, куда ваши продюсеры записывают записи, а потребители их читают. Если вам нужно создать, проверить, изменить размер или удалить топик из терминала, основным инструментом является kafka-topics.sh.

Графические инструменты полезны, но команды CLI по-прежнему являются самым быстрым способом проверить, что происходит в кластере во время инцидента или развертывания. Это руководство показывает команды для работы с топиками, которые вы будете использовать чаще всего, и обращает внимание на места, где Kafka может вас удивить.

Предварительные требования и настройка

Чтобы выполнить команды из этого руководства, у вас должен быть доступ к машине, где установлены бинарные файлы Kafka. Все операции управления топиками выполняются с помощью утилиты kafka-topics.sh, которая обычно находится в каталоге bin вашей установки Kafka.

Все команды требуют указания адреса хотя бы одного брокера Kafka с помощью --bootstrap-server. В старых кластерах Kafka все еще могут встречаться примеры с использованием --zookeeper, но администрирование на основе брокеров является текущим шаблоном.

Для приведенных ниже примеров мы будем считать, что брокер работает локально на порту по умолчанию:

# Стандартный заполнитель адреса брокера
BROKER_ADDRESS="localhost:9092"

1. Создание нового топика Kafka

Создание топика требует определения его имени, а также двух критических параметров, которые определяют его поведение и отказоустойчивость: количество разделов и коэффициент репликации.

Основные параметры

  • --topic <имя>: Имя топика.
  • --partitions <N>: Количество разделов, на которые будет разбит топик. Разделы — это единицы параллелизма и упорядоченности внутри топика.
  • --replication-factor <N>: Количество копий данных, которые будут храниться на разных брокерах. Коэффициент репликации, равный 1, означает отсутствие избыточности.

Пример команды: создание sales-data

Эта команда создает топик с именем sales-data с 3 разделами и коэффициентом репликации 2 (то есть 2 копии каждого раздела будут существовать в кластере).

kafka-topics.sh --create --topic sales-data \
  --bootstrap-server $BROKER_ADDRESS \
  --partitions 3 \
  --replication-factor 2

Совет: В производственной среде коэффициент репликации 3 является обычным, поскольку он хранит несколько копий каждого раздела на разных брокерах. Реальная отказоустойчивость также зависит от размещения брокеров, min.insync.replicas и настроек подтверждения продюсера.

2. Вывод списка всех топиков

Чтобы просмотреть все топики, доступные в данный момент в кластере Kafka, используйте флаг --list.

Пример команды

kafka-topics.sh --list --bootstrap-server $BROKER_ADDRESS

Пример вывода:

sales-data
logistics-stream
__consumer_offsets

3. Описание конфигурации топика

Проверка существующей конфигурации, количества разделов и назначения брокеров для конкретного топика необходима для устранения неполадок и проверки. Используйте флаг --describe.

Пример команды: описание sales-data

kafka-topics.sh --describe --topic sales-data \
  --bootstrap-server $BROKER_ADDRESS

Интерпретация вывода:

Вывод показывает конфигурацию как на уровне топика, так и на уровне разделов:

Topic: sales-data  PartitionCount: 3 ReplicationFactor: 2 Configs:
  Topic: sales-data  Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
  Topic: sales-data  Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
  Topic: sales-data  Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
  • Leader: Брокер, в настоящее время отвечающий за обработку чтения/записи для этого раздела.
  • Replicas: Список брокеров, хранящих копию этого раздела.
  • Isr (In-Sync Replicas): Реплики, которые достаточно синхронизированы, чтобы быть кандидатами для безопасного переключения при сбое. Если ISR уменьшается, проверьте работоспособность брокера, задержки диска и сетевые задержки.

4. Изменение существующих топиков

Kafka предоставляет ограниченные механизмы для изменения топиков после их создания. Двумя наиболее распространенными задачами изменения являются увеличение количества разделов и переопределение настроек конфигурации брокера по умолчанию.

A. Увеличение количества разделов

Количество разделов можно только увеличить, но никогда не уменьшать. Увеличение количества разделов помогает масштабировать параллелизм потребителей.

Предупреждение: Увеличение количества разделов изменяет способ сопоставления (хеширования) сообщений с разделами. Если ваши продюсеры полагаются на гарантии упорядочения на основе ключей, увеличение количества разделов может нарушить упорядоченную доставку для существующих ключей.

Если sales-data в настоящее время имеет 3 раздела, мы можем увеличить их до 5:

kafka-topics.sh --alter --topic sales-data \
  --bootstrap-server $BROKER_ADDRESS \
  --partitions 5

B. Изменение конфигурации, специфичной для топика

Вы можете встретить старые примеры изменения конфигурации топика через kafka-topics.sh --alter --config. В текущих кластерах Kafka для изменения конфигурации топиков предпочтительнее использовать kafka-configs.sh, поскольку это специальный инструмент администрирования для динамических конфигураций.

Пример: Установка времени хранения сообщений 24 часа (86400000 миллисекунд) для sales-data.

kafka-configs.sh --bootstrap-server $BROKER_ADDRESS \
  --alter --entity-type topics --entity-name sales-data \
  --add-config retention.ms=86400000

Чтобы удалить определенное переопределение конфигурации и вернуться к настройкам брокера по умолчанию, используйте --delete-config:

kafka-configs.sh --bootstrap-server $BROKER_ADDRESS \
  --alter --entity-type topics --entity-name sales-data \
  --delete-config retention.ms

5. Удаление топика Kafka

Топики, которые больше не используются, следует правильно удалять, чтобы освободить дисковое пространство и поддерживать гигиену кластера.

Включение удаления топиков

Брокеры Kafka должны разрешать удаление, прежде чем эта команда сможет удалить данные. Во многих современных развертываниях это включено, но не стоит предполагать. Проверьте конфигурацию брокера на наличие:

delete.topic.enable=true

Пример команды: удаление old-stream

Используйте флаг --delete, чтобы инициировать удаление топика. Удаление топика часто является асинхронным, то есть команда отправляет запрос, а удаление происходит в фоновом режиме.

kafka-topics.sh --delete --topic old-stream \
  --bootstrap-server $BROKER_ADDRESS

Вывод подтверждения:

Deletion of topic old-stream initiated successfully.

Сводка команд управления топиками

Действие Флаг(и) Назначение Пример параметров
Создание --create Инициализация нового топика. --partitions 5 --replication-factor 3
Список --list Показать все топики в кластере. Н/Д
Описание --describe Просмотр текущей конфигурации и структуры. --topic my-topic
Изменение (разделы) --alter Увеличение количества разделов. --partitions N (N > текущее количество)
Изменение (конфиг) kafka-configs.sh --alter Переопределение настроек брокера по умолчанию для конкретного топика. --add-config retention.ms=...
Удаление --delete Окончательное удаление топика. --topic my-topic

Следующие шаги

  1. Попрактикуйтесь в выполнении этих команд в среде разработки или промежуточной среде.
  2. Используйте kafka-configs.sh --describe до и после изменений конфигурации, чтобы знать, какие значения являются переопределениями топика.
  3. Изучите соответствующие команды CLI для тестирования продюсера и потребителя (kafka-console-producer.sh и kafka-console-consumer.sh).