Как создавать и управлять топиками Kafka с помощью командной строки
Apache Kafka — это распределенная платформа потоковой передачи событий, часто используемая для высокопроизводительных конвейеров данных, аналитики в реальном времени и обмена данными между микросервисами. Фундаментальной организационной единицей в Kafka является Топик (Topic) — категория или название ленты, в которую публикуются записи.
Несмотря на существование графических инструментов, наиболее надежный, стабильный и распространенный способ взаимодействия с инфраструктурой Kafka и управления ею — это непосредственно через интерфейс командной строки (CLI). Освоение этих основных команд имеет решающее значение для администраторов и разработчиков, ответственных за поддержание работоспособного и эффективного кластера Kafka. В этом руководстве представлено пошаговое описание использования скрипта kafka-topics.sh для выполнения наиболее распространенных задач по управлению топиками.
Предварительные условия и настройка
Для выполнения команд, описанных в этом руководстве, у вас должен быть доступ к машине, на которой установлены бинарные файлы Kafka. Все операции по управлению топиками выполняются с помощью утилиты kafka-topics.sh, которая обычно находится в каталоге bin вашей установки Kafka.
Для всех команд требуется адрес хотя бы одного брокера Kafka, который указывается с помощью флага --bootstrap-server. Если вы используете более старую версию Kafka (до 2.2), вы, возможно, все еще полагаетесь на флаг --zookeeper, но --bootstrap-server является рекомендуемым и современным стандартом.
Для приведенных ниже примеров мы будем считать, что брокер работает локально на порте по умолчанию:
# Заполнитель стандартного адреса брокера
BROKER_ADDRESS="localhost:9092"
1. Создание нового топика Kafka
Для создания топика требуется определить его имя, а также два критически важных параметра, которые определяют его поведение и отказоустойчивость: количество разделов (partitions) и коэффициент репликации (replication factor).
Основные параметры
--topic <name>: Имя топика.--partitions <N>: Количество разделов, на которое будет разбит топик. Разделы (партиции) являются единицами параллелизма и упорядочивания в топике.--replication-factor <N>: Количество копий данных, которые будут храниться на разных брокерах. Коэффициент репликации, равный 1, означает отсутствие избыточности.
Пример команды: Создание sales-data
Эта команда создает топик с именем sales-data с 3 разделами и коэффициентом репликации 2 (что означает, что 2 копии каждого раздела будут существовать в кластере).
kafka-topics.sh --create --topic sales-data \n --bootstrap-server $BROKER_ADDRESS \n --partitions 3 \n --replication-factor 2
Совет: В производственной среде с N брокерами часто рекомендуется коэффициент репликации 3 для обеспечения высокой доступности (что позволяет потерять два брокера до потери данных), а количество разделов должно быть настроено в зависимости от ожидаемой пропускной способности и потребностей в параллелизме потребителей.
2. Просмотр всех топиков
Чтобы просмотреть все топики, доступные в данный момент в кластере Kafka, используйте флаг --list.
Пример команды
kafka-topics.sh --list --bootstrap-server $BROKER_ADDRESS
Пример вывода:
sales-data
logistics-stream
__consumer_offsets
3. Просмотр конфигурации топика
Проверка существующей конфигурации, количества разделов и назначений брокеров для конкретного топика необходима для устранения неполадок и проверки. Используйте флаг --describe.
Пример команды: Описание sales-data
kafka-topics.sh --describe --topic sales-data \n --bootstrap-server $BROKER_ADDRESS
Интерпретация вывода:
Вывод показывает конфигурацию как на уровне топика, так и на уровне раздела:
Topic: sales-data PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: sales-data Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: sales-data Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: sales-data Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
- Leader (Лидер): Брокер, который в настоящее время отвечает за обработку операций чтения/записи для этого раздела.
- Replicas (Реплики): Список брокеров, хранящих копию этого раздела.
- Isr (In-Sync Replicas, Синхронизированные реплики): Подмножество реплик, которые полностью синхронизированы с Лидером. Высокая доступность требует, чтобы Лидер находился в ISR.
4. Изменение существующих топиков
Kafka предоставляет ограниченные механизмы для изменения топиков после их создания. Две наиболее распространенные задачи изменения — это увеличение количества разделов и переопределение настроек конфигурации брокера по умолчанию.
A. Увеличение количества разделов
Количество разделов можно только увеличить, но никогда не уменьшить. Увеличение количества разделов помогает масштабировать параллелизм потребителей.
Внимание: Увеличение количества разделов изменяет способ сопоставления (хеширования) сообщений с разделами. Если ваши продюсеры полагаются на гарантии упорядоченности на основе ключей, увеличение количества разделов может нарушить упорядоченную доставку для существующих ключей.
Если sales-data в настоящее время имеет 3 раздела, мы можем увеличить их до 5:
kafka-topics.sh --alter --topic sales-data \n --bootstrap-server $BROKER_ADDRESS \n --partitions 5
B. Изменение конфигурации для конкретного топика
Вы можете переопределить глобальные настройки брокера (например, время хранения сообщений или политики очистки) для отдельных топиков с помощью флага --config.
Пример: Установка времени хранения сообщений в 24 часа (86400000 миллисекунд) для sales-data.
kafka-topics.sh --alter --topic sales-data \n --bootstrap-server $BROKER_ADDRESS \n --config retention.ms=86400000
Чтобы удалить определенное переопределение конфигурации и вернуться к настройке брокера по умолчанию, используйте флаг --delete-config:
kafka-topics.sh --alter --topic sales-data \n --bootstrap-server $BROKER_ADDRESS \n --delete-config retention.ms
5. Удаление топика Kafka
Топики, которые больше не используются, должны быть правильно удалены, чтобы освободить место на диске и поддерживать гигиену кластера.
Включение удаления топиков
По умолчанию брокеры Kafka могут отключать удаление топиков в целях безопасности. Прежде чем вы сможете удалить топик, убедитесь, что в файле server.properties на всех брокерах включена следующая настройка:
delete.topic.enable=true
Пример команды: Удаление old-stream
Используйте флаг --delete, чтобы инициировать удаление топика. Удаление топика часто является асинхронным, то есть команда отправляет запрос, а само удаление происходит в фоновом режиме.
kafka-topics.sh --delete --topic old-stream \n --bootstrap-server $BROKER_ADDRESS
Вывод подтверждения:
Deletion of topic old-stream initiated successfully.
Сводка команд управления топиками
| Действие | Флаг(и) | Назначение | Пример параметров |
|---|---|---|---|
| Создать | --create |
Инициализация нового топика. | --partitions 5 --replication-factor 3 |
| Список | --list |
Показать все топики в кластере. | Н/Д |
| Описание | --describe |
Просмотр текущей конфигурации и структуры. | --topic my-topic |
| Изменить (разделы) | --alter |
Увеличение количества разделов. | --partitions N (N > текущее количество) |
| Изменить (конфигурация) | --alter --config |
Переопределение настроек брокера по умолчанию для конкретного топика. | --config retention.ms=... |
| Удалить | --delete |
Окончательное удаление топика. | --topic my-topic |
Заключение и дальнейшие шаги
Командная строка остается самым мощным и гибким интерфейсом для управления вашим кластером Kafka. Освоив утилиту kafka-topics.sh, вы получаете детальный контроль над параметрами создания топиков, переопределением конфигурации и необходимыми административными действиями, такими как удаление и описание.
Дальнейшие шаги:
- Потренируйтесь использовать эти команды в среде разработки или промежуточной среде.
- Изучите расширенные параметры конфигурации, используя команду
--describe, чтобы увидеть полный список настраиваемых свойств (например,cleanup.policy,max.message.bytes). - Изучите соответствующие команды CLI для тестирования продюсеров и потребителей (
kafka-console-producer.shиkafka-console-consumer.sh).