如何使用命令行创建和管理Kafka主题

学习如何使用kafka-topics.sh从命令行创建、列出、描述、修改和删除Kafka主题。

如何使用命令行创建和管理Kafka主题

Apache Kafka主题是生产者写入记录、消费者读取记录的地方。如果你需要从终端创建、检查、调整大小或删除主题,主要工具是kafka-topics.sh

图形化工具很有用,但在事件或部署期间,CLI命令仍然是验证集群状态的最快方式。本指南展示了你最常使用的主题命令,并指出了Kafka可能让你感到意外的地方。

前提条件和设置

要执行本指南中的命令,你必须能够访问安装了Kafka二进制文件的机器。所有主题管理操作都使用kafka-topics.sh工具执行,该工具通常位于Kafka安装目录的bin目录中。

所有命令都需要至少一个Kafka代理的地址,通过--bootstrap-server指定。较旧的Kafka集群可能仍会展示使用--zookeeper的示例,但基于代理的管理是当前的模式。

在下面的示例中,我们假设代理在本地默认端口上运行:

# 标准代理地址占位符
BROKER_ADDRESS="localhost:9092"

1. 创建新的Kafka主题

创建主题需要定义其名称,以及两个决定其行为和容错能力的关键参数:分区数和复制因子。

基本参数

  • --topic <name>:主题的名称。
  • --partitions <N>:主题将被分割成的分区数。分区是主题内并行性和排序的单位。
  • --replication-factor <N>:在不同代理之间维护的数据副本数。复制因子为1表示没有冗余。

命令示例:创建sales-data

此命令创建一个名为sales-data的主题,包含3个分区和复制因子2(意味着每个分区在集群中有2个副本)。

kafka-topics.sh --create --topic sales-data \
  --bootstrap-server $BROKER_ADDRESS \
  --partitions 3 \
  --replication-factor 2

提示: 在生产环境中,复制因子通常为3,因为它将每个分区的多个副本保存在不同的代理上。真正的容错能力还取决于代理放置、min.insync.replicas和生产者确认设置。

2. 列出所有主题

要查看Kafka集群中当前可用的所有主题,请使用--list标志。

命令示例

kafka-topics.sh --list --bootstrap-server $BROKER_ADDRESS

输出示例:

sales-data
logistics-stream
__consumer_offsets

3. 描述主题配置

检查特定主题的现有配置、分区数和代理分配对于故障排除和验证至关重要。请使用--describe标志。

命令示例:描述sales-data

kafka-topics.sh --describe --topic sales-data \
  --bootstrap-server $BROKER_ADDRESS

输出解释:

输出显示主题级别和分区级别的配置:

Topic: sales-data  PartitionCount: 3 ReplicationFactor: 2 Configs:
  Topic: sales-data  Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
  Topic: sales-data  Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
  Topic: sales-data  Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
  • Leader: 当前负责处理该分区读写操作的代理。
  • Replicas: 持有该分区副本的代理列表。
  • Isr(同步副本): 足够跟上进度、有资格进行安全故障转移的副本。如果ISR缩小,请检查代理健康状况、磁盘延迟和网络延迟。

4. 修改现有主题

Kafka提供了有限的机制来在创建后修改主题。最常见的两种修改任务是增加分区数和覆盖默认代理配置设置。

A. 增加分区

分区只能增加,不能减少。增加分区有助于扩展消费者并行性。

警告: 增加分区会改变消息映射(哈希)到分区的方式。如果你的生产者依赖于基于键的排序保证,增加分区可能会破坏现有键的有序传递。

如果sales-data当前有3个分区,我们可以将其增加到5个:

kafka-topics.sh --alter --topic sales-data \
  --bootstrap-server $BROKER_ADDRESS \
  --partitions 5

B. 修改主题特定配置

你可能会看到通过kafka-topics.sh --alter --config更改主题配置的旧示例。在当前Kafka集群上,建议使用kafka-configs.sh进行主题配置更改,因为它是专门用于动态配置的管理工具。

示例:为sales-data设置消息保留时间为24小时(86400000毫秒)。

kafka-configs.sh --bootstrap-server $BROKER_ADDRESS \
  --alter --entity-type topics --entity-name sales-data \
  --add-config retention.ms=86400000

要删除特定的配置覆盖并恢复为代理默认值,请使用--delete-config

kafka-configs.sh --bootstrap-server $BROKER_ADDRESS \
  --alter --entity-type topics --entity-name sales-data \
  --delete-config retention.ms

5. 删除Kafka主题

不再使用的主题应妥善删除,以回收磁盘空间并维护集群整洁。

启用主题删除

Kafka代理必须允许删除,此命令才能移除数据。在许多现代部署中,它是启用的,但不要假设。检查代理配置:

delete.topic.enable=true

命令示例:删除old-stream

使用--delete标志启动主题移除。主题删除通常是异步的,意味着命令提交请求,删除在后台进行。

kafka-topics.sh --delete --topic old-stream \
  --bootstrap-server $BROKER_ADDRESS

确认输出:

Deletion of topic old-stream initiated successfully.

主题管理命令总结

操作 标志 目的 示例参数
创建 --create 初始化新主题。 --partitions 5 --replication-factor 3
列出 --list 显示集群中的所有主题。
描述 --describe 查看当前配置和布局。 --topic my-topic
修改(分区) --alter 增加分区数。 --partitions N(N > 当前数量)
修改(配置) kafka-configs.sh --alter 覆盖特定主题的代理默认值。 --add-config retention.ms=...
删除 --delete 永久移除主题。 --topic my-topic

下一步

  1. 在开发或暂存环境中练习这些命令。
  2. 在配置更改前后使用kafka-configs.sh --describe,以便知道哪些值是主题覆盖。
  3. 学习生产者和消费者测试的相应CLI命令(kafka-console-producer.shkafka-console-consumer.sh)。