如何使用命令行创建和管理Kafka主题
学习如何使用kafka-topics.sh从命令行创建、列出、描述、修改和删除Kafka主题。
如何使用命令行创建和管理Kafka主题
Apache Kafka主题是生产者写入记录、消费者读取记录的地方。如果你需要从终端创建、检查、调整大小或删除主题,主要工具是kafka-topics.sh。
图形化工具很有用,但在事件或部署期间,CLI命令仍然是验证集群状态的最快方式。本指南展示了你最常使用的主题命令,并指出了Kafka可能让你感到意外的地方。
前提条件和设置
要执行本指南中的命令,你必须能够访问安装了Kafka二进制文件的机器。所有主题管理操作都使用kafka-topics.sh工具执行,该工具通常位于Kafka安装目录的bin目录中。
所有命令都需要至少一个Kafka代理的地址,通过--bootstrap-server指定。较旧的Kafka集群可能仍会展示使用--zookeeper的示例,但基于代理的管理是当前的模式。
在下面的示例中,我们假设代理在本地默认端口上运行:
# 标准代理地址占位符
BROKER_ADDRESS="localhost:9092"
1. 创建新的Kafka主题
创建主题需要定义其名称,以及两个决定其行为和容错能力的关键参数:分区数和复制因子。
基本参数
--topic <name>:主题的名称。--partitions <N>:主题将被分割成的分区数。分区是主题内并行性和排序的单位。--replication-factor <N>:在不同代理之间维护的数据副本数。复制因子为1表示没有冗余。
命令示例:创建sales-data
此命令创建一个名为sales-data的主题,包含3个分区和复制因子2(意味着每个分区在集群中有2个副本)。
kafka-topics.sh --create --topic sales-data \
--bootstrap-server $BROKER_ADDRESS \
--partitions 3 \
--replication-factor 2
提示: 在生产环境中,复制因子通常为3,因为它将每个分区的多个副本保存在不同的代理上。真正的容错能力还取决于代理放置、
min.insync.replicas和生产者确认设置。
2. 列出所有主题
要查看Kafka集群中当前可用的所有主题,请使用--list标志。
命令示例
kafka-topics.sh --list --bootstrap-server $BROKER_ADDRESS
输出示例:
sales-data
logistics-stream
__consumer_offsets
3. 描述主题配置
检查特定主题的现有配置、分区数和代理分配对于故障排除和验证至关重要。请使用--describe标志。
命令示例:描述sales-data
kafka-topics.sh --describe --topic sales-data \
--bootstrap-server $BROKER_ADDRESS
输出解释:
输出显示主题级别和分区级别的配置:
Topic: sales-data PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: sales-data Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: sales-data Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: sales-data Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
- Leader: 当前负责处理该分区读写操作的代理。
- Replicas: 持有该分区副本的代理列表。
- Isr(同步副本): 足够跟上进度、有资格进行安全故障转移的副本。如果ISR缩小,请检查代理健康状况、磁盘延迟和网络延迟。
4. 修改现有主题
Kafka提供了有限的机制来在创建后修改主题。最常见的两种修改任务是增加分区数和覆盖默认代理配置设置。
A. 增加分区
分区只能增加,不能减少。增加分区有助于扩展消费者并行性。
警告: 增加分区会改变消息映射(哈希)到分区的方式。如果你的生产者依赖于基于键的排序保证,增加分区可能会破坏现有键的有序传递。
如果sales-data当前有3个分区,我们可以将其增加到5个:
kafka-topics.sh --alter --topic sales-data \
--bootstrap-server $BROKER_ADDRESS \
--partitions 5
B. 修改主题特定配置
你可能会看到通过kafka-topics.sh --alter --config更改主题配置的旧示例。在当前Kafka集群上,建议使用kafka-configs.sh进行主题配置更改,因为它是专门用于动态配置的管理工具。
示例:为sales-data设置消息保留时间为24小时(86400000毫秒)。
kafka-configs.sh --bootstrap-server $BROKER_ADDRESS \
--alter --entity-type topics --entity-name sales-data \
--add-config retention.ms=86400000
要删除特定的配置覆盖并恢复为代理默认值,请使用--delete-config:
kafka-configs.sh --bootstrap-server $BROKER_ADDRESS \
--alter --entity-type topics --entity-name sales-data \
--delete-config retention.ms
5. 删除Kafka主题
不再使用的主题应妥善删除,以回收磁盘空间并维护集群整洁。
启用主题删除
Kafka代理必须允许删除,此命令才能移除数据。在许多现代部署中,它是启用的,但不要假设。检查代理配置:
delete.topic.enable=true
命令示例:删除old-stream
使用--delete标志启动主题移除。主题删除通常是异步的,意味着命令提交请求,删除在后台进行。
kafka-topics.sh --delete --topic old-stream \
--bootstrap-server $BROKER_ADDRESS
确认输出:
Deletion of topic old-stream initiated successfully.
主题管理命令总结
| 操作 | 标志 | 目的 | 示例参数 |
|---|---|---|---|
| 创建 | --create |
初始化新主题。 | --partitions 5 --replication-factor 3 |
| 列出 | --list |
显示集群中的所有主题。 | 无 |
| 描述 | --describe |
查看当前配置和布局。 | --topic my-topic |
| 修改(分区) | --alter |
增加分区数。 | --partitions N(N > 当前数量) |
| 修改(配置) | kafka-configs.sh --alter |
覆盖特定主题的代理默认值。 | --add-config retention.ms=... |
| 删除 | --delete |
永久移除主题。 | --topic my-topic |
下一步
- 在开发或暂存环境中练习这些命令。
- 在配置更改前后使用
kafka-configs.sh --describe,以便知道哪些值是主题覆盖。 - 学习生产者和消费者测试的相应CLI命令(
kafka-console-producer.sh和kafka-console-consumer.sh)。