理解 Kafka 命令行工具:CLI 参考指南

借助这份全面的命令行界面 (CLI) 参考指南,释放 Apache Kafka 的强大功能。学习用于管理主题 (`kafka-topics.sh`)、发送消息 (`kafka-console-producer.sh`)、消费数据 (`kafka-console-consumer.sh`) 以及检查消费者组 (`kafka-consumer-groups.sh`) 的基本 Kafka 命令。本指南详细介绍了实际用例、参数以及有效 Kafka 管理和故障排除的最佳实践。

37 浏览量

了解 Kafka 命令行工具:CLI 参考指南

Apache Kafka 是一个强大的分布式事件流平台,能够实现高吞吐量、容错和可扩展的实时数据管道。虽然可以通过其 API 以编程方式管理和交互 Kafka,但其命令行界面 (CLI) 工具提供了一种直接且高效的方式来执行基本的管理任务、管理主题、与消费者交互以及监控集群健康状况。本指南提供了最常用的 Kafka CLI 工具的全面参考,详细介绍了它们的用途、基本参数和实际用例。

了解这些工具对于 Kafka 管理员、开发人员以及任何参与管理或故障排除 Kafka 集群的人员都至关重要。它们允许快速检查、操作和诊断,而无需为每个简单的操作编写自定义脚本或应用程序。

核心 Kafka CLI 工具

Kafka 发行版通常包含一个 bin/ 目录,其中包含各种脚本和可执行文件。我们将重点介绍最常用于有效管理 Kafka 的工具。

1. kafka-topics.sh

这可以说是使用频率最高的命令行工具。它允许您创建、列出、描述、删除、修改和管理 Kafka 主题(topics)。主题管理是组织 Kafka 内数据流的基础。

常用子命令和参数:

  • --create: 创建新主题。
  • --list: 列出集群中的所有主题。
  • --describe: 提供有关特定主题的详细信息。
  • --delete: 删除一个或多个主题。
  • --alter: 修改现有主题的配置。
  • --topic <topic_name>: 指定主题名称。
  • --partitions <num_partitions>: 设置主题的分区数量(与 --create 一起使用)。
  • --replication-factor <factor>: 设置主题的副本因子(与 --create 一起使用)。
  • --bootstrap-server <host:port>: 指定要连接的 Kafka 代理(broker)。

示例:

  • 创建一个名为 my_topic、具有 3 个分区和副本因子为 2 的主题:
    bash kafka-topics.sh --create --topic my_topic --partitions 3 --replication-factor 2 --bootstrap-server kafka-broker-1:9092,kafka-broker-2:9092

  • 列出集群中的所有主题:
    bash kafka-topics.sh --list --bootstrap-server kafka-broker-1:9092

  • 描述名为 my_topic 的主题:
    bash kafka-topics.sh --describe --topic my_topic --bootstrap-server kafka-broker-1:9092
    这将显示分区、领导者(leader)、副本(replicas)和 ISRs(同步副本,In-Sync Replicas)等详细信息。

  • 删除名为 old_topic 的主题:
    bash kafka-topics.sh --delete --topic old_topic --bootstrap-server kafka-broker-1:9092
    注意:需要在 Kafka 代理配置中启用主题删除 (delete.topic.enable=true)。

2. kafka-console-producer.sh

该工具允许您从标准输入将消息发送到 Kafka 主题。它对于测试生产者、注入示例数据或手动发布消息非常有用。

常用参数:

  • --topic <topic_name>: 指定目标主题。
  • --bootstrap-server <host:port>: 指定要连接的 Kafka 代理。
  • --property <key>=<value>: 允许设置生产者属性(例如,key.serializervalue.serializer)。
  • --producer-property <key>=<value>: 类似于 --property,但专用于生产者端配置。

示例:

  • my_topic 发送消息:
    bash kafka-console-producer.sh --topic my_topic --bootstrap-server kafka-broker-1:9092
    运行此命令后,您可以逐行键入消息。按 Ctrl+C 退出。

  • 发送带键(key)的消息(JSON 格式):
    bash kafka-console-producer.sh --topic my_topic --bootstrap-server kafka-broker-1:9092 --property parse.key=true --property key.separator=':'
    现在您可以键入 key:value 对,Kafka 将使用指定的键发送它们。

3. kafka-console-consumer.sh

此工具订阅一个或多个 Kafka 主题并将其接收到的消息打印到标准输出。它对于测试消费者、检查主题中的数据以及调试生产者/消费者应用程序至关重要。

常用参数:

  • --topic <topic_name>: 指定要消费的主题。
  • --bootstrap-server <host:port>: 指定要连接的 Kafka 代理。
  • --group-id <group_id>: 指定消费者组 ID。这对于管理偏移量(offsets)并允许多个消费者分摊消费负载至关重要。
  • --from-beginning: 从主题日志的开头读取消息。
  • --offset <offset>: 从特定的偏移量开始消费。
  • --partition <partition_id>: 从特定的分区消费。
  • --property <key>=<value>: 允许设置消费者属性(例如,value.deserializer)。

示例:

  • 消费 my_topic 中的所有消息:
    bash kafka-console-consumer.sh --topic my_topic --bootstrap-server kafka-broker-1:9092

  • 为消费者组 my_groupmy_topic 的开头消费消息:
    bash kafka-console-consumer.sh --topic my_topic --group-id my_group --from-beginning --bootstrap-server kafka-broker-1:9092

  • 消费并打印带有偏移量和键的消息:
    bash kafka-console-consumer.sh --topic my_topic --bootstrap-server kafka-broker-1:9092 --property print.key=true --property key.separator="\t" --property print.offset=true --property print.headers=true

4. kafka-consumer-groups.sh

此工具用于管理和检查消费者组。它对于理解消费者滞后(lag)、重新分配分区以及排查消费问题至关重要。

常用子命令和参数:

  • --list: 列出集群中的所有消费者组。
  • --describe: 提供有关特定消费者组的详细信息,包括滞后(lag)。
  • --bootstrap-server <host:port>: 指定要连接的 Kafka 代理。
  • --group <group_id>: 指定消费者组 ID。
  • --reset-offsets: 重置消费者组的偏移量。
  • --topic <topic_name>: 指定用于重置偏移量的主题。
  • --to-earliest: 将偏移量重置为最早可用的消息。
  • --to-latest: 将偏移量重置为最新可用的消息。
  • --execute: 执行偏移量重置操作。

示例:

  • 列出所有消费者组:
    bash kafka-consumer-groups.sh --list --bootstrap-server kafka-broker-1:9092

  • 描述消费者组 my_group 并显示其滞后量:
    bash kafka-consumer-groups.sh --describe --group my_group --bootstrap-server kafka-broker-1:9092
    输出将显示主题、分区、当前偏移量、日志末尾偏移量和滞后量。

  • my_groupmy_topic 上的偏移量重置为最早可用的消息:
    bash kafka-consumer-groups.sh --group my_group --topic my_topic --reset-offsets --to-earliest --execute --bootstrap-server kafka-broker-1:9092
    请谨慎使用此命令,因为它会影响消费者开始读取的位置。

5. kafka-log-dirs.sh

此工具用于检查 Kafka 代理上的日志目录。它有助于了解磁盘使用情况和查找主题数据。

常用参数:

  • --bootstrap-server <host:port>: 指定要连接的 Kafka 代理。
  • --topic <topic_name>: 过滤输出,仅显示特定主题的目录。

示例:

  • 列出代理上的日志目录:
    bash kafka-log-dirs.sh --bootstrap-server kafka-broker-1:9092

  • 显示特定主题的日志目录:
    bash kafka-log-dirs.sh --bootstrap-server kafka-broker-1:9092 --topic my_topic

6. kafka-preferred-replica-election.sh

该脚本启动主题的首选副本选举。首选副本是根据其副本因子被选为分区领导者(leader)的代理。如果某个代理失败,并且非首选副本成为领导者,则可以使用此工具将领导权移回首选副本。

常用参数:

  • --topic <topic_name>: 指定要选举首选副本的主题。
  • --broker-list <broker_id1,broker_id2,...>: 指定逗号分隔的代理 ID 列表。
  • --bootstrap-server <host:port>: 指定要连接的 Kafka 代理。

示例:

  • my_topic 选举首选副本:
    bash kafka-preferred-replica-election.sh --topic my_topic --bootstrap-server kafka-broker-1:9092

  • 为多个主题选举首选副本:
    bash kafka-preferred-replica-election.sh --topic topic1,topic2 --bootstrap-server kafka-broker-1:9092

重要注意事项和最佳实践

  • --bootstrap-server 参数是关键: 务必确保指定正确的 --bootstrap-server 参数来连接您的 Kafka 集群。这通常是代理的 host:port 逗号分隔列表。
  • 环境: 这些命令通常位于 Kafka 安装目录的 bin/ 目录下。您需要导航到此目录,或确保 Kafka 的 bin 目录已添加到系统的 PATH 中。
  • 权限: 确保运行这些命令的用户具有必要的网络访问权限来连接 Kafka 代理。
  • 配置: 许多 CLI 工具可以通过 --property--producer-property/--consumer-property 参数接受 Kafka 客户端配置。这对于覆盖默认的序列化器/反序列化器或设置其他特定的客户端配置非常有用。
  • 安全性: 对于安全的 Kafka 集群(例如,使用 SSL/TLS 或 SASL 身份验证),您需要向这些工具传递额外的安全相关参数(例如,指向客户端属性文件的 --command-config)。
  • 主题删除: 请记住,主题删除是一项敏感操作,必须使用 delete.topic.enable=true 在 Kafka 代理的 server.properties 文件中明确启用。

总结

Kafka 的命令行工具提供了强大且易于访问的界面,用于管理和交互您的 Kafka 集群。掌握 kafka-topics.shkafka-console-producer.shkafka-console-consumer.shkafka-consumer-groups.sh 等工具对于高效的 Kafka 运维、故障排除和开发至关重要。通过理解它们的参数和用例,您可以显着简化工作流程,并对事件流基础设施获得更深入的了解。

经常参考这些命令不仅有助于您执行日常管理任务,还能使您更有效地诊断和解决问题。随着您对 Kafka 越来越熟悉,您可以探索 bin/ 目录中可用于更高级操作的其他实用脚本。