Как создавать и управлять топиками Kafka с помощью командной строки

Это руководство от экспертов предоставляет исчерпывающее пошаговое руководство по управлению топиками Kafka исключительно через интерфейс командной строки (CLI). Изучите основные команды с использованием утилиты `kafka-topics.sh` для выполнения важнейших административных задач: создание топиков с заданным количеством разделов и коэффициентом репликации, проверка конфигурации с помощью описания, безопасное увеличение количества разделов и удаление топиков. Освойте эти практические команды для обеспечения надежного и эффективного администрирования кластера Kafka.

34 просмотров

Как создавать и управлять топиками Kafka с помощью командной строки

Apache Kafka — это распределенная платформа потоковой передачи событий, часто используемая для высокопроизводительных конвейеров данных, аналитики в реальном времени и обмена данными между микросервисами. Фундаментальной организационной единицей в Kafka является Топик (Topic) — категория или название ленты, в которую публикуются записи.

Несмотря на существование графических инструментов, наиболее надежный, стабильный и распространенный способ взаимодействия с инфраструктурой Kafka и управления ею — это непосредственно через интерфейс командной строки (CLI). Освоение этих основных команд имеет решающее значение для администраторов и разработчиков, ответственных за поддержание работоспособного и эффективного кластера Kafka. В этом руководстве представлено пошаговое описание использования скрипта kafka-topics.sh для выполнения наиболее распространенных задач по управлению топиками.

Предварительные условия и настройка

Для выполнения команд, описанных в этом руководстве, у вас должен быть доступ к машине, на которой установлены бинарные файлы Kafka. Все операции по управлению топиками выполняются с помощью утилиты kafka-topics.sh, которая обычно находится в каталоге bin вашей установки Kafka.

Для всех команд требуется адрес хотя бы одного брокера Kafka, который указывается с помощью флага --bootstrap-server. Если вы используете более старую версию Kafka (до 2.2), вы, возможно, все еще полагаетесь на флаг --zookeeper, но --bootstrap-server является рекомендуемым и современным стандартом.

Для приведенных ниже примеров мы будем считать, что брокер работает локально на порте по умолчанию:

# Заполнитель стандартного адреса брокера
BROKER_ADDRESS="localhost:9092"

1. Создание нового топика Kafka

Для создания топика требуется определить его имя, а также два критически важных параметра, которые определяют его поведение и отказоустойчивость: количество разделов (partitions) и коэффициент репликации (replication factor).

Основные параметры

  • --topic <name>: Имя топика.
  • --partitions <N>: Количество разделов, на которое будет разбит топик. Разделы (партиции) являются единицами параллелизма и упорядочивания в топике.
  • --replication-factor <N>: Количество копий данных, которые будут храниться на разных брокерах. Коэффициент репликации, равный 1, означает отсутствие избыточности.

Пример команды: Создание sales-data

Эта команда создает топик с именем sales-data с 3 разделами и коэффициентом репликации 2 (что означает, что 2 копии каждого раздела будут существовать в кластере).

kafka-topics.sh --create --topic sales-data \n  --bootstrap-server $BROKER_ADDRESS \n  --partitions 3 \n  --replication-factor 2

Совет: В производственной среде с N брокерами часто рекомендуется коэффициент репликации 3 для обеспечения высокой доступности (что позволяет потерять два брокера до потери данных), а количество разделов должно быть настроено в зависимости от ожидаемой пропускной способности и потребностей в параллелизме потребителей.

2. Просмотр всех топиков

Чтобы просмотреть все топики, доступные в данный момент в кластере Kafka, используйте флаг --list.

Пример команды

kafka-topics.sh --list --bootstrap-server $BROKER_ADDRESS

Пример вывода:

sales-data
logistics-stream
__consumer_offsets

3. Просмотр конфигурации топика

Проверка существующей конфигурации, количества разделов и назначений брокеров для конкретного топика необходима для устранения неполадок и проверки. Используйте флаг --describe.

Пример команды: Описание sales-data

kafka-topics.sh --describe --topic sales-data \n  --bootstrap-server $BROKER_ADDRESS

Интерпретация вывода:

Вывод показывает конфигурацию как на уровне топика, так и на уровне раздела:

Topic: sales-data  PartitionCount: 3 ReplicationFactor: 2 Configs:
  Topic: sales-data  Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
  Topic: sales-data  Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
  Topic: sales-data  Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
  • Leader (Лидер): Брокер, который в настоящее время отвечает за обработку операций чтения/записи для этого раздела.
  • Replicas (Реплики): Список брокеров, хранящих копию этого раздела.
  • Isr (In-Sync Replicas, Синхронизированные реплики): Подмножество реплик, которые полностью синхронизированы с Лидером. Высокая доступность требует, чтобы Лидер находился в ISR.

4. Изменение существующих топиков

Kafka предоставляет ограниченные механизмы для изменения топиков после их создания. Две наиболее распространенные задачи изменения — это увеличение количества разделов и переопределение настроек конфигурации брокера по умолчанию.

A. Увеличение количества разделов

Количество разделов можно только увеличить, но никогда не уменьшить. Увеличение количества разделов помогает масштабировать параллелизм потребителей.

Внимание: Увеличение количества разделов изменяет способ сопоставления (хеширования) сообщений с разделами. Если ваши продюсеры полагаются на гарантии упорядоченности на основе ключей, увеличение количества разделов может нарушить упорядоченную доставку для существующих ключей.

Если sales-data в настоящее время имеет 3 раздела, мы можем увеличить их до 5:

kafka-topics.sh --alter --topic sales-data \n  --bootstrap-server $BROKER_ADDRESS \n  --partitions 5

B. Изменение конфигурации для конкретного топика

Вы можете переопределить глобальные настройки брокера (например, время хранения сообщений или политики очистки) для отдельных топиков с помощью флага --config.

Пример: Установка времени хранения сообщений в 24 часа (86400000 миллисекунд) для sales-data.

kafka-topics.sh --alter --topic sales-data \n  --bootstrap-server $BROKER_ADDRESS \n  --config retention.ms=86400000

Чтобы удалить определенное переопределение конфигурации и вернуться к настройке брокера по умолчанию, используйте флаг --delete-config:

kafka-topics.sh --alter --topic sales-data \n  --bootstrap-server $BROKER_ADDRESS \n  --delete-config retention.ms

5. Удаление топика Kafka

Топики, которые больше не используются, должны быть правильно удалены, чтобы освободить место на диске и поддерживать гигиену кластера.

Включение удаления топиков

По умолчанию брокеры Kafka могут отключать удаление топиков в целях безопасности. Прежде чем вы сможете удалить топик, убедитесь, что в файле server.properties на всех брокерах включена следующая настройка:

delete.topic.enable=true

Пример команды: Удаление old-stream

Используйте флаг --delete, чтобы инициировать удаление топика. Удаление топика часто является асинхронным, то есть команда отправляет запрос, а само удаление происходит в фоновом режиме.

kafka-topics.sh --delete --topic old-stream \n  --bootstrap-server $BROKER_ADDRESS

Вывод подтверждения:

Deletion of topic old-stream initiated successfully.

Сводка команд управления топиками

Действие Флаг(и) Назначение Пример параметров
Создать --create Инициализация нового топика. --partitions 5 --replication-factor 3
Список --list Показать все топики в кластере. Н/Д
Описание --describe Просмотр текущей конфигурации и структуры. --topic my-topic
Изменить (разделы) --alter Увеличение количества разделов. --partitions N (N > текущее количество)
Изменить (конфигурация) --alter --config Переопределение настроек брокера по умолчанию для конкретного топика. --config retention.ms=...
Удалить --delete Окончательное удаление топика. --topic my-topic

Заключение и дальнейшие шаги

Командная строка остается самым мощным и гибким интерфейсом для управления вашим кластером Kafka. Освоив утилиту kafka-topics.sh, вы получаете детальный контроль над параметрами создания топиков, переопределением конфигурации и необходимыми административными действиями, такими как удаление и описание.

Дальнейшие шаги:

  1. Потренируйтесь использовать эти команды в среде разработки или промежуточной среде.
  2. Изучите расширенные параметры конфигурации, используя команду --describe, чтобы увидеть полный список настраиваемых свойств (например, cleanup.policy, max.message.bytes).
  3. Изучите соответствующие команды CLI для тестирования продюсеров и потребителей (kafka-console-producer.sh и kafka-console-consumer.sh).