Понимание инструментов командной строки Kafka: справочное руководство по CLI
Раскройте возможности Apache Kafka с помощью этого всеобъемлющего справочного руководства по интерфейсу командной строки (CLI). Изучите основные команды Kafka для управления темами (`kafka-topics.sh`), отправки сообщений (`kafka-console-producer.sh`), потребления данных (`kafka-console-consumer.sh`) и проверки групп потребителей (`kafka-consumer-groups.sh`). В этом руководстве подробно описаны практические примеры использования, аргументы и лучшие практики для эффективного администрирования и устранения неполадок Kafka.
Понимание инструментов командной строки Kafka: справочное руководство по CLI
Инструменты командной строки Kafka — это самый быстрый способ ответить на основные операционные вопросы: существует ли эта тема, какой брокер ведет эту партицию, что находится внутри темы, почему эта группа потребителей отстает и может ли этот клиент аутентифицироваться в кластере? Они не нужны для каждой задачи, и большинство производственных изменений все равно должны проходить через автоматизацию, но во время сломанного развертывания или ночного вопроса о данных CLI часто является самым коротким путем к фактам.
Примеры ниже предполагают, что скрипты находятся в вашем PATH. Во многих установках они находятся в каталоге bin/ Kafka, поэтому та же команда может быть запущена как bin/kafka-topics.sh. Для защищенных кластеров большинству команд также требуется --command-config client.properties, где этот файл содержит настройки SASL, SSL и другие клиентские настройки.
Основные инструменты CLI Kafka
Дистрибутивы Kafka обычно включают каталог bin/, содержащий различные скрипты и исполняемые файлы. Мы сосредоточимся на наиболее часто используемых для эффективного управления Kafka.
1. kafka-topics.sh
Это, пожалуй, самый часто используемый инструмент командной строки. Он позволяет создавать, перечислять, описывать, удалять, изменять и управлять темами Kafka. Управление темами является основополагающим для организации потоков данных в Kafka.
Общие подкоманды и аргументы:
--create: Создает новую тему.--list: Перечисляет все темы в кластере.--describe: Предоставляет подробную информацию о конкретных темах.--delete: Удаляет одну или несколько тем.--alter: Изменяет конфигурацию существующей темы.--topic <имя_темы>: Указывает имя темы.--partitions <количество_партиций>: Устанавливает количество партиций для темы (используется с--create).--replication-factor <фактор>: Устанавливает коэффициент репликации для темы (используется с--create).--bootstrap-server <хост:порт>: Указывает брокера Kafka для подключения.
Примеры:
Создать тему с именем
my_topicс 3 партициями и коэффициентом репликации 2:kafka-topics.sh --create --topic my_topic --partitions 3 --replication-factor 2 --bootstrap-server kafka-broker-1:9092,kafka-broker-2:9092Перечислить все темы в кластере:
kafka-topics.sh --list --bootstrap-server kafka-broker-1:9092Описать тему с именем
my_topic:kafka-topics.sh --describe --topic my_topic --bootstrap-server kafka-broker-1:9092Это покажет детали, такие как партиции, лидер, реплики и ISR (синхронизированные реплики).
Удалить тему с именем
old_topic:kafka-topics.sh --delete --topic old_topic --bootstrap-server kafka-broker-1:9092Примечание: Удаление темы должно быть включено в конфигурации брокера Kafka (
delete.topic.enable=true).
2. kafka-console-producer.sh
Этот инструмент позволяет отправлять сообщения в тему Kafka из стандартного ввода. Он бесценен для тестирования продюсеров, вставки образцов данных или ручной публикации сообщений.
Общие аргументы:
--topic <имя_темы>: Указывает целевую тему.--bootstrap-server <хост:порт>: Указывает брокера Kafka для подключения.--property <ключ>=<значение>: Позволяет устанавливать свойства продюсера (например,key.serializer,value.serializer).--producer-property <ключ>=<значение>: Аналогично--property, но специально для конфигураций на стороне продюсера.
Примеры:
Отправить сообщения в
my_topic:kafka-console-producer.sh --topic my_topic --bootstrap-server kafka-broker-1:9092После запуска вы можете вводить сообщения строка за строкой. Нажмите
Ctrl+Cдля выхода.Отправить сообщения с ключами (формат JSON):
kafka-console-producer.sh --topic my_topic --bootstrap-server kafka-broker-1:9092 --property parse.key=true --property key.separator=':'Теперь вы можете вводить пары
ключ:значение, и Kafka отправит их с указанным ключом.
3. kafka-console-consumer.sh
Этот инструмент подписывается на одну или несколько тем Kafka и выводит полученные сообщения в стандартный вывод. Он необходим для тестирования потребителей, проверки данных в темах и отладки приложений продюсера/потребителя.
Общие аргументы:
--topic <имя_темы>: Указывает тему(ы) для потребления.--bootstrap-server <хост:порт>: Указывает брокера Kafka для подключения.--group-id <идентификатор_группы>: Указывает идентификатор группы потребителей. Это важно для управления смещениями и позволяет нескольким потребителям разделять нагрузку потребления.--from-beginning: Читает сообщения с начала журнала темы.--offset <смещение>: Начинает потребление с определенного смещения.--partition <идентификатор_партиции>: Потребляет из определенной партиции.--property <ключ>=<значение>: Позволяет устанавливать свойства потребителя (например,value.deserializer).
Примеры:
Потребить все сообщения из
my_topic:kafka-console-consumer.sh --topic my_topic --bootstrap-server kafka-broker-1:9092Потребить сообщения с начала
my_topicдля группы потребителейmy_group:kafka-console-consumer.sh --topic my_topic --group-id my_group --from-beginning --bootstrap-server kafka-broker-1:9092Потребить сообщения с выводом смещений и ключей:
kafka-console-consumer.sh --topic my_topic --bootstrap-server kafka-broker-1:9092 --property print.key=true --property key.separator="\t" --property print.offset=true --property print.headers=true
4. kafka-consumer-groups.sh
Этот инструмент используется для управления и проверки групп потребителей. Он жизненно важен для понимания отставания потребителей, перераспределения партиций и устранения проблем с потреблением.
Общие подкоманды и аргументы:
--list: Перечисляет все группы потребителей в кластере.--describe: Предоставляет подробную информацию о конкретных группах потребителей, включая отставание.--bootstrap-server <хост:порт>: Указывает брокера Kafka для подключения.--group <идентификатор_группы>: Указывает идентификатор группы потребителей.--reset-offsets: Сбрасывает смещения для группы потребителей.--topic <имя_темы>: Указывает тему для сброса смещения.--to-earliest: Сбрасывает смещения на самое раннее доступное сообщение.--to-latest: Сбрасывает смещения на самое последнее доступное сообщение.--execute: Выполняет операцию сброса смещения.
Примеры:
Перечислить все группы потребителей:
kafka-consumer-groups.sh --list --bootstrap-server kafka-broker-1:9092Описать группу потребителей
my_groupи показать ее отставание:kafka-consumer-groups.sh --describe --group my_group --bootstrap-server kafka-broker-1:9092Вывод покажет тему, партицию, текущее смещение, конечное смещение журнала и отставание.
Сбросить смещения для
my_groupнаmy_topicдо самого раннего доступного сообщения:kafka-consumer-groups.sh --group my_group --topic my_topic --reset-offsets --to-earliest --execute --bootstrap-server kafka-broker-1:9092Используйте эту команду с осторожностью, так как она влияет на то, с какого места потребители начнут читать.
5. kafka-log-dirs.sh
Этот инструмент помогает проверить каталоги журналов на брокерах Kafka. Он может быть полезен для понимания использования диска и поиска данных темы.
Общие аргументы:
--bootstrap-server <хост:порт>: Указывает брокера Kafka для подключения.--topic <имя_темы>: Фильтрует вывод для отображения каталогов для конкретной темы.
Примеры:
Перечислить каталоги журналов на брокере:
kafka-log-dirs.sh --bootstrap-server kafka-broker-1:9092Показать каталоги журналов для конкретной темы:
kafka-log-dirs.sh --bootstrap-server kafka-broker-1:9092 --topic my_topic
6. kafka-preferred-replica-election.sh
Этот скрипт инициирует выборы предпочтительных реплик для тем. Предпочтительная реплика — это брокер, который выбран в качестве лидера для партиции на основе ее коэффициента репликации. Если брокер выходит из строя и непредпочтительная реплика становится лидером, этот инструмент можно использовать для возврата лидерства предпочтительной реплике.
Общие аргументы:
--topic <имя_темы>: Указывает тему, для которой нужно выбрать предпочтительные реплики.--broker-list <идентификатор_брокера1,идентификатор_брокера2,...>: Указывает разделенный запятыми список идентификаторов брокеров.--bootstrap-server <хост:порт>: Указывает брокера Kafka для подключения.
Примеры:
Выбрать предпочтительные реплики для
my_topic:kafka-preferred-replica-election.sh --topic my_topic --bootstrap-server kafka-broker-1:9092Выбрать предпочтительные реплики для нескольких тем:
kafka-preferred-replica-election.sh --topic topic1,topic2 --bootstrap-server kafka-broker-1:9092
Важные соображения и лучшие практики
--bootstrap-server— ключевой момент: Всегда указывайте правильный аргумент--bootstrap-serverдля подключения к вашему кластеру Kafka. Обычно это разделенный запятыми списокхост:портдля ваших брокеров.- Окружение: Эти команды обычно находятся в каталоге
bin/вашей установки Kafka. Вам нужно перейти в этот каталог или убедиться, что каталогbinKafka находится в вашем PATH. - Разрешения: Убедитесь, что пользователь, запускающий эти команды, имеет необходимый сетевой доступ к брокерам Kafka.
- Конфигурация: Многие инструменты CLI могут принимать конфигурации клиента Kafka через аргументы
--propertyили--producer-property/--consumer-property. Это полезно для переопределения стандартных сериализаторов/десериализаторов или установки других конкретных конфигураций клиента. - Безопасность: Для защищенных кластеров Kafka (например, с SSL/TLS или аутентификацией SASL) вам нужно будет передавать дополнительные аргументы, связанные с безопасностью (например,
--command-config, указывающий на файл свойств клиента), этим инструментам. - Удаление темы: Помните, что удаление темы — это чувствительная операция, и она должна быть явно включена в файле
server.propertiesброкера Kafka с помощьюdelete.topic.enable=true.
Безопасный способ использования CLI в продакшене
Используйте CLI в первую очередь как инструмент проверки и во вторую — как инструмент изменения. --list, --describe и короткие чтения консоли имеют низкий риск. --delete, --alter, увеличение партиций и сброс смещений изменяют поведение кластера и должны проходить тот же путь проверки, что и изменения приложений, когда это возможно.
Практическая производственная сессия обычно начинается с файла конфигурации клиента:
cat client.properties
# security.protocol=SASL_SSL
# sasl.mechanism=SCRAM-SHA-512
# sasl.jaas.config=...
Затем каждая команда включает его:
kafka-topics.sh --bootstrap-server kafka-1:9093 --command-config client.properties --describe --topic orders
Для консольных потребителей избегайте случайного присоединения к реальной группе приложений. Используйте временный идентификатор группы при проверке данных и используйте --max-messages, чтобы команда завершалась:
kafka-console-consumer.sh \
--bootstrap-server kafka-1:9093 \
--command-config client.properties \
--topic orders \
--group debug-orders-$(date +%s) \
--from-beginning \
--max-messages 5 \
--property print.key=true \
--property print.offset=true
Эта небольшая привычка предотвращает кражу партиций отладочной командой у работающего сервиса. Она также оставляет более чистый след аудита, потому что имя группы делает намерение очевидным.
CLI лучше всего, когда он скучен: одна команда для проверки, одна команда для подтверждения и четкая запись любой команды, изменяющей состояние.
Повседневные рецепты устранения неполадок
Если продюсер говорит, что запись успешна, но команда потребителей ничего не видит, начните с темы:
kafka-topics.sh --bootstrap-server kafka-1:9092 --describe --topic orders
Подтвердите имя темы, количество партиций, доступность лидера и синхронизированные реплики. Опечатка в имени темы может выглядеть точно так же, как сломанный конвейер, когда в кластере разработки включено автоматическое создание тем. В продакшене тема с офлайн-партициями или уменьшающимся ISR указывает на проблему с брокером или репликацией, прежде чем указывать на код приложения.
Затем прочитайте небольшую выборку с временной группой:
kafka-console-consumer.sh \
--bootstrap-server kafka-1:9092 \
--topic orders \
--group debug-orders-$(date +%s) \
--max-messages 10 \
--property print.key=true \
--property print.timestamp=true \
--property print.offset=true
Если записи появляются там, Kafka имеет данные, и проблема, вероятно, в реальной группе потребителей, ее смещениях, ее подписках или ее логике обработки. Если записи не появляются, проверьте тему продюсера, сериализаторы, аутентификацию и то, не пишет ли продюсер в другой кластер.
Для вопросов об отставании сразу переходите к группе:
kafka-consumer-groups.sh --bootstrap-server kafka-1:9092 --describe --group orders-writer
Не останавливайтесь на общем отставании. Сравните партиции. Одна партиция с большим отставанием означает другую проблему, чем каждая партиция с умеренным отставанием. Отставание на одной партиции часто означает перекос ключей или одно плохое назначение потребителя. Равномерное отставание обычно означает, что все приложение медленнее, чем скорость ввода.
Для вопросов "что изменилось?" проверьте конфигурацию темы:
kafka-configs.sh \
--bootstrap-server kafka-1:9092 \
--entity-type topics \
--entity-name orders \
--describe
Здесь вы ловите изменения срока хранения, сюрпризы политики очистки, переопределения сжатия и настройки размера сообщения, которые отличаются от предположений сервиса.
Вывод CLI не заменяет мониторинг, но он отлично подходит для уменьшения неопределенности. В реальном инциденте несколько выводов команд, вставленных в тикет, могут избавить всех от споров о том, существует ли тема, присутствуют ли записи и движется ли группа на самом деле.
Команды, к которым стоит относиться с осторожностью
Некоторые команды CLI Kafka выглядят безобидно, потому что они короткие. Они не безобидны.
kafka-topics.sh --alter --partitions только увеличивает количество партиций; он не уменьшает их позже, если вы пожалеете об изменении. Больше партиций может помочь параллелизму потребителей, но они также могут изменить распределение ключей для новых записей и нарушить предположения в системах, которые ожидали, что все события для диапазона ключей попадут в меньший набор партиций.
kafka-consumer-groups.sh --reset-offsets --execute изменяет то, где группа будет читать дальше. Сначала используйте --dry-run, остановите затронутых потребителей и запишите старые смещения. Сброс на самое раннее может воспроизвести данные в системы, которые не являются идемпотентными. Сброс на самое последнее может пропустить данные, которые бизнес все еще ожидает обработать.
kafka-topics.sh --delete зависит от конфигурации и политики кластера, но когда удаление разрешено, к нему следует относиться как к удалению таблицы базы данных. Проверьте кластер, проверьте тему и проверьте, не использует ли другое окружение то же соглашение об именах. Производственная тема с именем orders-test все еще является производственной, если от нее зависят реальные сервисы.
Для повторяющихся операций поместите команду в runbook или скрипт с явными переменными для кластера, темы, группы и конфигурации команды. CLI отлично подходит для расследования, но производственные изменения должны быть скучными, проверенными и легкими для аудита.