了解 Kafka 命令行工具:CLI 参考指南
Apache Kafka 是一个强大的分布式事件流平台,能够实现高吞吐量、容错和可扩展的实时数据管道。虽然可以通过其 API 以编程方式管理和交互 Kafka,但其命令行界面 (CLI) 工具提供了一种直接且高效的方式来执行基本的管理任务、管理主题、与消费者交互以及监控集群健康状况。本指南提供了最常用的 Kafka CLI 工具的全面参考,详细介绍了它们的用途、基本参数和实际用例。
了解这些工具对于 Kafka 管理员、开发人员以及任何参与管理或故障排除 Kafka 集群的人员都至关重要。它们允许快速检查、操作和诊断,而无需为每个简单的操作编写自定义脚本或应用程序。
核心 Kafka CLI 工具
Kafka 发行版通常包含一个 bin/ 目录,其中包含各种脚本和可执行文件。我们将重点介绍最常用于有效管理 Kafka 的工具。
1. kafka-topics.sh
这可以说是使用频率最高的命令行工具。它允许您创建、列出、描述、删除、修改和管理 Kafka 主题(topics)。主题管理是组织 Kafka 内数据流的基础。
常用子命令和参数:
--create: 创建新主题。--list: 列出集群中的所有主题。--describe: 提供有关特定主题的详细信息。--delete: 删除一个或多个主题。--alter: 修改现有主题的配置。--topic <topic_name>: 指定主题名称。--partitions <num_partitions>: 设置主题的分区数量(与--create一起使用)。--replication-factor <factor>: 设置主题的副本因子(与--create一起使用)。--bootstrap-server <host:port>: 指定要连接的 Kafka 代理(broker)。
示例:
-
创建一个名为
my_topic、具有 3 个分区和副本因子为 2 的主题:
bash kafka-topics.sh --create --topic my_topic --partitions 3 --replication-factor 2 --bootstrap-server kafka-broker-1:9092,kafka-broker-2:9092 -
列出集群中的所有主题:
bash kafka-topics.sh --list --bootstrap-server kafka-broker-1:9092 -
描述名为
my_topic的主题:
bash kafka-topics.sh --describe --topic my_topic --bootstrap-server kafka-broker-1:9092
这将显示分区、领导者(leader)、副本(replicas)和 ISRs(同步副本,In-Sync Replicas)等详细信息。 -
删除名为
old_topic的主题:
bash kafka-topics.sh --delete --topic old_topic --bootstrap-server kafka-broker-1:9092
注意:需要在 Kafka 代理配置中启用主题删除 (delete.topic.enable=true)。
2. kafka-console-producer.sh
该工具允许您从标准输入将消息发送到 Kafka 主题。它对于测试生产者、注入示例数据或手动发布消息非常有用。
常用参数:
--topic <topic_name>: 指定目标主题。--bootstrap-server <host:port>: 指定要连接的 Kafka 代理。--property <key>=<value>: 允许设置生产者属性(例如,key.serializer,value.serializer)。--producer-property <key>=<value>: 类似于--property,但专用于生产者端配置。
示例:
-
向
my_topic发送消息:
bash kafka-console-producer.sh --topic my_topic --bootstrap-server kafka-broker-1:9092
运行此命令后,您可以逐行键入消息。按Ctrl+C退出。 -
发送带键(key)的消息(JSON 格式):
bash kafka-console-producer.sh --topic my_topic --bootstrap-server kafka-broker-1:9092 --property parse.key=true --property key.separator=':'
现在您可以键入key:value对,Kafka 将使用指定的键发送它们。
3. kafka-console-consumer.sh
此工具订阅一个或多个 Kafka 主题并将其接收到的消息打印到标准输出。它对于测试消费者、检查主题中的数据以及调试生产者/消费者应用程序至关重要。
常用参数:
--topic <topic_name>: 指定要消费的主题。--bootstrap-server <host:port>: 指定要连接的 Kafka 代理。--group-id <group_id>: 指定消费者组 ID。这对于管理偏移量(offsets)并允许多个消费者分摊消费负载至关重要。--from-beginning: 从主题日志的开头读取消息。--offset <offset>: 从特定的偏移量开始消费。--partition <partition_id>: 从特定的分区消费。--property <key>=<value>: 允许设置消费者属性(例如,value.deserializer)。
示例:
-
消费
my_topic中的所有消息:
bash kafka-console-consumer.sh --topic my_topic --bootstrap-server kafka-broker-1:9092 -
为消费者组
my_group从my_topic的开头消费消息:
bash kafka-console-consumer.sh --topic my_topic --group-id my_group --from-beginning --bootstrap-server kafka-broker-1:9092 -
消费并打印带有偏移量和键的消息:
bash kafka-console-consumer.sh --topic my_topic --bootstrap-server kafka-broker-1:9092 --property print.key=true --property key.separator="\t" --property print.offset=true --property print.headers=true
4. kafka-consumer-groups.sh
此工具用于管理和检查消费者组。它对于理解消费者滞后(lag)、重新分配分区以及排查消费问题至关重要。
常用子命令和参数:
--list: 列出集群中的所有消费者组。--describe: 提供有关特定消费者组的详细信息,包括滞后(lag)。--bootstrap-server <host:port>: 指定要连接的 Kafka 代理。--group <group_id>: 指定消费者组 ID。--reset-offsets: 重置消费者组的偏移量。--topic <topic_name>: 指定用于重置偏移量的主题。--to-earliest: 将偏移量重置为最早可用的消息。--to-latest: 将偏移量重置为最新可用的消息。--execute: 执行偏移量重置操作。
示例:
-
列出所有消费者组:
bash kafka-consumer-groups.sh --list --bootstrap-server kafka-broker-1:9092 -
描述消费者组
my_group并显示其滞后量:
bash kafka-consumer-groups.sh --describe --group my_group --bootstrap-server kafka-broker-1:9092
输出将显示主题、分区、当前偏移量、日志末尾偏移量和滞后量。 -
将
my_group在my_topic上的偏移量重置为最早可用的消息:
bash kafka-consumer-groups.sh --group my_group --topic my_topic --reset-offsets --to-earliest --execute --bootstrap-server kafka-broker-1:9092
请谨慎使用此命令,因为它会影响消费者开始读取的位置。
5. kafka-log-dirs.sh
此工具用于检查 Kafka 代理上的日志目录。它有助于了解磁盘使用情况和查找主题数据。
常用参数:
--bootstrap-server <host:port>: 指定要连接的 Kafka 代理。--topic <topic_name>: 过滤输出,仅显示特定主题的目录。
示例:
-
列出代理上的日志目录:
bash kafka-log-dirs.sh --bootstrap-server kafka-broker-1:9092 -
显示特定主题的日志目录:
bash kafka-log-dirs.sh --bootstrap-server kafka-broker-1:9092 --topic my_topic
6. kafka-preferred-replica-election.sh
该脚本启动主题的首选副本选举。首选副本是根据其副本因子被选为分区领导者(leader)的代理。如果某个代理失败,并且非首选副本成为领导者,则可以使用此工具将领导权移回首选副本。
常用参数:
--topic <topic_name>: 指定要选举首选副本的主题。--broker-list <broker_id1,broker_id2,...>: 指定逗号分隔的代理 ID 列表。--bootstrap-server <host:port>: 指定要连接的 Kafka 代理。
示例:
-
为
my_topic选举首选副本:
bash kafka-preferred-replica-election.sh --topic my_topic --bootstrap-server kafka-broker-1:9092 -
为多个主题选举首选副本:
bash kafka-preferred-replica-election.sh --topic topic1,topic2 --bootstrap-server kafka-broker-1:9092
重要注意事项和最佳实践
--bootstrap-server参数是关键: 务必确保指定正确的--bootstrap-server参数来连接您的 Kafka 集群。这通常是代理的host:port逗号分隔列表。- 环境: 这些命令通常位于 Kafka 安装目录的
bin/目录下。您需要导航到此目录,或确保 Kafka 的bin目录已添加到系统的 PATH 中。 - 权限: 确保运行这些命令的用户具有必要的网络访问权限来连接 Kafka 代理。
- 配置: 许多 CLI 工具可以通过
--property或--producer-property/--consumer-property参数接受 Kafka 客户端配置。这对于覆盖默认的序列化器/反序列化器或设置其他特定的客户端配置非常有用。 - 安全性: 对于安全的 Kafka 集群(例如,使用 SSL/TLS 或 SASL 身份验证),您需要向这些工具传递额外的安全相关参数(例如,指向客户端属性文件的
--command-config)。 - 主题删除: 请记住,主题删除是一项敏感操作,必须使用
delete.topic.enable=true在 Kafka 代理的server.properties文件中明确启用。
总结
Kafka 的命令行工具提供了强大且易于访问的界面,用于管理和交互您的 Kafka 集群。掌握 kafka-topics.sh、kafka-console-producer.sh、kafka-console-consumer.sh 和 kafka-consumer-groups.sh 等工具对于高效的 Kafka 运维、故障排除和开发至关重要。通过理解它们的参数和用例,您可以显着简化工作流程,并对事件流基础设施获得更深入的了解。
经常参考这些命令不仅有助于您执行日常管理任务,还能使您更有效地诊断和解决问题。随着您对 Kafka 越来越熟悉,您可以探索 bin/ 目录中可用于更高级操作的其他实用脚本。