理解Kafka命令行工具:CLI参考指南

通过这份全面的命令行界面(CLI)参考指南,释放Apache Kafka的强大功能。学习管理主题(`kafka-topics.sh`)、发送消息(`kafka-console-producer.sh`)、消费数据(`kafka-console-consumer.sh`)以及检查消费者组(`kafka-consumer-groups.sh`)的基本Kafka命令。本指南详细介绍了实际用例、参数以及有效管理和故障排除Kafka的最佳实践。

理解Kafka命令行工具:CLI参考指南

Kafka的命令行工具是回答基本运维问题的最快方式:这个主题是否存在?哪个broker领导这个分区?主题里有什么?为什么这个消费者组落后?这个客户端能否与集群进行身份验证?你不需要它们来完成所有任务,大多数生产变更仍应通过自动化进行,但在部署中断或深夜数据问题时,CLI通常是获取事实的最短路径。

下面的示例假设脚本在你的PATH中。在许多安装中,它们位于Kafka的bin/目录下,因此相同的命令可以以bin/kafka-topics.sh的形式运行。对于安全集群,大多数命令还需要--command-config client.properties,该文件包含SASL、SSL和其他客户端设置。

核心Kafka CLI工具

Kafka发行版通常包含一个bin/目录,其中包含各种脚本和可执行文件。我们将重点关注那些最常用于有效管理Kafka的工具。

1. kafka-topics.sh

这可以说是最常用的命令行工具。它允许你创建、列出、描述、删除、更改和管理Kafka主题。主题管理是组织Kafka中数据流的基础。

常用子命令和参数:

  • --create:创建一个新主题。
  • --list:列出集群中的所有主题。
  • --describe:提供特定主题的详细信息。
  • --delete:删除一个或多个主题。
  • --alter:修改现有主题的配置。
  • --topic <topic_name>:指定主题名称。
  • --partitions <num_partitions>:设置主题的分区数(与--create一起使用)。
  • --replication-factor <factor>:设置主题的复制因子(与--create一起使用)。
  • --bootstrap-server <host:port>:指定要连接的Kafka broker。

示例:

  • 创建一个名为my_topic的主题,包含3个分区,复制因子为2:

    kafka-topics.sh --create --topic my_topic --partitions 3 --replication-factor 2 --bootstrap-server kafka-broker-1:9092,kafka-broker-2:9092
    
  • 列出集群中的所有主题:

    kafka-topics.sh --list --bootstrap-server kafka-broker-1:9092
    
  • 描述一个名为my_topic的主题:

    kafka-topics.sh --describe --topic my_topic --bootstrap-server kafka-broker-1:9092
    

    这将显示分区、领导者、副本和ISR(同步副本)等详细信息。

  • 删除一个名为old_topic的主题:

    kafka-topics.sh --delete --topic old_topic --bootstrap-server kafka-broker-1:9092
    

    注意:需要在Kafka broker配置中启用主题删除(delete.topic.enable=true)。

2. kafka-console-producer.sh

此工具允许你从标准输入向Kafka主题发送消息。它对于测试生产者、注入样本数据或手动发布消息非常有用。

常用参数:

  • --topic <topic_name>:指定目标主题。
  • --bootstrap-server <host:port>:指定要连接的Kafka broker。
  • --property <key>=<value>:允许设置生产者属性(例如key.serializervalue.serializer)。
  • --producer-property <key>=<value>:类似于--property,但专门用于生产者端配置。

示例:

  • my_topic发送消息:

    kafka-console-producer.sh --topic my_topic --bootstrap-server kafka-broker-1:9092
    

    运行此命令后,你可以逐行输入消息。按Ctrl+C退出。

  • 发送带键的消息(JSON格式):

    kafka-console-producer.sh --topic my_topic --bootstrap-server kafka-broker-1:9092 --property parse.key=true --property key.separator=':'
    

    现在你可以输入key:value对,Kafka将使用指定的键发送它们。

3. kafka-console-consumer.sh

此工具订阅一个或多个Kafka主题,并将接收到的消息打印到标准输出。它对于测试消费者、检查主题中的数据以及调试生产者/消费者应用程序至关重要。

常用参数:

  • --topic <topic_name>:指定要消费的主题。
  • --bootstrap-server <host:port>:指定要连接的Kafka broker。
  • --group-id <group_id>:指定消费者组ID。这对于管理偏移量以及允许多个消费者共享消费负载非常重要。
  • --from-beginning:从主题日志的开头读取消息。
  • --offset <offset>:从特定偏移量开始消费。
  • --partition <partition_id>:从特定分区消费。
  • --property <key>=<value>:允许设置消费者属性(例如value.deserializer)。

示例:

  • 消费my_topic中的所有消息:

    kafka-console-consumer.sh --topic my_topic --bootstrap-server kafka-broker-1:9092
    
  • my_topic的开头开始消费,消费者组为my_group

    kafka-console-consumer.sh --topic my_topic --group-id my_group --from-beginning --bootstrap-server kafka-broker-1:9092
    
  • 消费消息并打印偏移量和键:

    kafka-console-consumer.sh --topic my_topic --bootstrap-server kafka-broker-1:9092 --property print.key=true --property key.separator="\t" --property print.offset=true --property print.headers=true
    

4. kafka-consumer-groups.sh

此工具用于管理和检查消费者组。它对于理解消费者滞后、重新分配分区以及排查消费问题至关重要。

常用子命令和参数:

  • --list:列出集群中的所有消费者组。
  • --describe:提供特定消费者组的详细信息,包括滞后。
  • --bootstrap-server <host:port>:指定要连接的Kafka broker。
  • --group <group_id>:指定消费者组ID。
  • --reset-offsets:重置消费者组的偏移量。
  • --topic <topic_name>:指定用于偏移量重置的主题。
  • --to-earliest:将偏移量重置为最早可用的消息。
  • --to-latest:将偏移量重置为最新可用的消息。
  • --execute:执行偏移量重置操作。

示例:

  • 列出所有消费者组:

    kafka-consumer-groups.sh --list --bootstrap-server kafka-broker-1:9092
    
  • 描述消费者组my_group并显示其滞后:

    kafka-consumer-groups.sh --describe --group my_group --bootstrap-server kafka-broker-1:9092
    

    输出将显示主题、分区、当前偏移量、日志结束偏移量和滞后。

  • my_groupmy_topic上的偏移量重置为最早可用的消息:

    kafka-consumer-groups.sh --group my_group --topic my_topic --reset-offsets --to-earliest --execute --bootstrap-server kafka-broker-1:9092
    

    谨慎使用此命令,因为它会影响消费者开始读取的位置。

5. kafka-log-dirs.sh

此工具有助于检查Kafka broker上的日志目录。它对于了解磁盘使用情况和定位主题数据非常有用。

常用参数:

  • --bootstrap-server <host:port>:指定要连接的Kafka broker。
  • --topic <topic_name>:过滤输出以显示特定主题的目录。

示例:

  • 列出broker上的日志目录:

    kafka-log-dirs.sh --bootstrap-server kafka-broker-1:9092
    
  • 显示特定主题的日志目录:

    kafka-log-dirs.sh --bootstrap-server kafka-broker-1:9092 --topic my_topic
    

6. kafka-preferred-replica-election.sh

此脚本为主题启动首选副本选举。首选副本是根据其复制因子被选为分区领导者的broker。如果broker发生故障并且非首选副本成为领导者,则可以使用此工具将领导权移回首选副本。

常用参数:

  • --topic <topic_name>:指定要选举首选副本的主题。
  • --broker-list <broker_id1,broker_id2,...>:指定以逗号分隔的broker ID列表。
  • --bootstrap-server <host:port>:指定要连接的Kafka broker。

示例:

  • my_topic选举首选副本:

    kafka-preferred-replica-election.sh --topic my_topic --bootstrap-server kafka-broker-1:9092
    
  • 为多个主题选举首选副本:

    kafka-preferred-replica-election.sh --topic topic1,topic2 --bootstrap-server kafka-broker-1:9092
    

重要考虑因素和最佳实践

  • --bootstrap-server是关键: 始终确保指定正确的--bootstrap-server参数以连接到你的Kafka集群。这通常是broker的host:port逗号分隔列表。
  • 环境: 这些命令通常位于Kafka安装目录的bin/目录中。你需要导航到此目录或确保Kafka的bin目录在你的系统PATH中。
  • 权限: 确保运行这些命令的用户具有必要的网络访问权限以到达Kafka broker。
  • 配置: 许多CLI工具可以通过--property--producer-property/--consumer-property参数接受Kafka客户端配置。这对于覆盖默认的序列化器/反序列化器或设置其他特定的客户端配置非常有用。
  • 安全性: 对于安全的Kafka集群(例如,使用SSL/TLS或SASL身份验证),你需要向这些工具传递额外的安全相关参数(例如指向客户端属性文件的--command-config)。
  • 主题删除: 请记住,主题删除是一个敏感操作,必须在Kafka broker的server.properties文件中使用delete.topic.enable=true显式启用。

在生产中安全使用CLI的方法

首先将CLI作为检查工具,其次才是变更工具。--list--describe和短时间的控制台读取风险较低。--delete--alter、分区增加和偏移量重置会改变集群行为,应尽可能通过与应用变更相同的审查路径。

一个实用的生产会话通常从客户端配置文件开始:

cat client.properties
# security.protocol=SASL_SSL
# sasl.mechanism=SCRAM-SHA-512
# sasl.jaas.config=...

然后每个命令都包含它:

kafka-topics.sh --bootstrap-server kafka-1:9093 --command-config client.properties --describe --topic orders

对于控制台消费者,避免意外加入真实的应用程序组。在检查数据时使用临时组ID,并使用--max-messages以便命令退出:

kafka-console-consumer.sh \
  --bootstrap-server kafka-1:9093 \
  --command-config client.properties \
  --topic orders \
  --group debug-orders-$(date +%s) \
  --from-beginning \
  --max-messages 5 \
  --property print.key=true \
  --property print.offset=true

这个小习惯可以防止调试命令从实时服务中窃取分区。它还会留下更清晰的审计跟踪,因为组名使意图显而易见。

CLI在平淡无奇时表现最佳:一个命令用于检查,一个命令用于确认,以及任何更改状态的命令的清晰记录。

日常故障排除技巧

如果生产者说它正在成功写入,但消费者团队什么也没看到,请从主题开始:

kafka-topics.sh --bootstrap-server kafka-1:9092 --describe --topic orders

确认主题名称、分区数、领导者可用性和同步副本。主题名称中的拼写错误在开发集群中启用自动主题创建时,看起来可能完全像管道中断。在生产环境中,具有离线分区或ISR缩小的主题指向broker或复制问题,而不是应用程序代码。

接下来,使用临时组读取一个小样本:

kafka-console-consumer.sh \
  --bootstrap-server kafka-1:9092 \
  --topic orders \
  --group debug-orders-$(date +%s) \
  --max-messages 10 \
  --property print.key=true \
  --property print.timestamp=true \
  --property print.offset=true

如果记录出现在那里,则Kafka拥有数据,问题可能出在真实的消费者组、其偏移量、其订阅或其处理逻辑上。如果没有记录出现,请检查生产者主题、序列化器、身份验证以及生产者是否正在写入不同的集群。

对于滞后问题,直接查看组:

kafka-consumer-groups.sh --bootstrap-server kafka-1:9092 --describe --group orders-writer

不要止步于总滞后。比较分区。单个分区具有较大滞后意味着与每个分区具有中等滞后不同的问题。单分区滞后通常意味着键倾斜或一个糟糕的消费者分配。均匀的滞后通常意味着整个应用程序比输入速率慢。

对于“发生了什么变化?”的问题,检查主题配置:

kafka-configs.sh \
  --bootstrap-server kafka-1:9092 \
  --entity-type topics \
  --entity-name orders \
  --describe

这是你发现保留更改、清理策略意外、压缩覆盖以及与服务的假设不同的消息大小设置的地方。

CLI输出不能替代监控,但它非常适合减少不确定性。在真实事件中,将几个命令输出粘贴到工单中可以节省每个人争论主题是否存在、记录是否存在以及组是否实际在移动的时间。

需要谨慎处理的命令

一些Kafka CLI命令看起来无害,因为它们很短。但它们并不无害。

kafka-topics.sh --alter --partitions 只会增加分区数;如果你后悔更改,它以后不会缩小。更多分区可以帮助消费者并行性,但它们也可以改变新记录的键分布,并破坏那些期望某个键范围的所有事件都落在较小分区集中的系统的假设。

kafka-consumer-groups.sh --reset-offsets --execute 会更改组接下来读取的位置。首先使用--dry-run,停止受影响的消费者,并记录旧的偏移量。重置为最早可能会将数据重播到非幂等的系统中。重置为最新可能会跳过业务仍期望处理的数据。

kafka-topics.sh --delete 取决于集群配置和策略,但当允许删除时,应像删除数据库表一样对待。检查集群,检查主题,并检查另一个环境是否使用相同的命名约定。如果真实服务依赖于它,那么名为orders-test的生产主题仍然是生产环境。

对于可重复的操作,将命令放入运行手册或脚本中,并为集群、主题、组和命令配置使用显式变量。CLI非常适合调查,但生产变更应该是平淡无奇的、经过审查的且易于审计的。