如何使用命令行创建和管理 Kafka 主题
Apache Kafka 是一个分布式事件流平台,常用于高吞吐量数据管道、实时分析和微服务通信。Kafka 中最基本的组织单位是主题 (Topic),它是发布记录的类别或订阅名称。
虽然存在图形界面工具,但与 Kafka 基础设施交互和管理的最强大、最可靠和最常见的方式是直接通过命令行界面 (CLI)。掌握这些基本命令对于负责维护健康高效的 Kafka 集群的管理员和开发人员至关重要。本指南将提供一个分步教程,介绍如何使用 kafka-topics.sh 脚本来执行最常见的主题管理任务。
先决条件和设置
要执行本指南中的命令,您必须能够访问已安装 Kafka 二进制文件的机器。所有主题管理操作都是使用 kafka-topics.sh 实用程序执行的,该实用程序通常位于 Kafka 安装目录的 bin 文件夹中。
所有命令都需要至少一个 Kafka 代理的地址,使用 --bootstrap-server 标志指定。如果您使用的是旧版 Kafka (2.2 之前的版本),您可能仍需要使用 --zookeeper 标志,但 --bootstrap-server 是推荐和现代的标准。
对于以下示例,我们假设代理在本地默认端口上运行:
# 标准代理地址占位符
BROKER_ADDRESS="localhost:9092"
1. 创建新的 Kafka 主题
创建主题需要定义其名称,以及两个决定其行为和容错能力的关键参数:分区数和复制因子。
关键参数
--topic <name>: 主题的名称。--partitions <N>: 主题将被分割成的分区数量。分区是主题内并行度和顺序的单位。--replication-factor <N>: 将在不同代理之间维护的数据副本数量。复制因子为 1 意味着没有冗余。
创建 sales-data 的命令示例
此命令创建一个名为 sales-data 的主题,包含 3 个分区和 2 的复制因子(意味着集群中每个分区将存在 2 个副本)。
kafka-topics.sh --create --topic sales-data \n --bootstrap-server $BROKER_ADDRESS \n --partitions 3 \n --replication-factor 2
提示: 在拥有 N 个代理的生产环境中,通常建议复制因子为 3 以实现高可用性(允许在数据丢失前丢失两个代理),并且应根据预期的吞吐量和消费者并行需求来调整分区数量。
2. 列出所有主题
要查看 Kafka 集群中当前可用的所有主题,请使用 --list 标志。
命令示例
kafka-topics.sh --list --bootstrap-server $BROKER_ADDRESS
输出示例:
sales-data
logistics-stream
__consumer_offsets
3. 描述主题配置
检查特定主题的现有配置、分区计数和代理分配对于故障排除和验证至关重要。使用 --describe 标志。
描述 sales-data 的命令示例
kafka-topics.sh --describe --topic sales-data \n --bootstrap-server $BROKER_ADDRESS
输出解读:
输出同时显示了主题级别和分区级别的配置:
Topic: sales-data PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: sales-data Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: sales-data Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: sales-data Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
- Leader (首领): 当前负责处理该分区读/写的代理。
- Replicas (副本): 持有该分区副本的代理列表。
- Isr (In-Sync Replicas - 同步副本): 与 Leader 完全同步的副本子集。高可用性要求 Leader 位于 ISR 中。
4. 修改现有主题
Kafka 在创建后提供了有限的主题修改机制。两个最常见的修改任务是增加分区计数和覆盖默认的代理配置设置。
A. 增加分区
分区只能增加,永远不能减少。增加分区有助于扩展消费者的并行性。
警告: 增加分区会更改消息如何被映射(哈希)到分区。如果您的生产者依赖基于键的排序保证,增加分区可能会破坏现有键的有序传递。
如果 sales-data 当前有 3 个分区,我们可以将其增加到 5 个:
kafka-topics.sh --alter --topic sales-data \n --bootstrap-server $BROKER_ADDRESS \n --partitions 5
B. 修改主题特定配置
您可以使用 --config 标志覆盖特定主题的全局代理设置(如消息保留时间或清理策略)。
示例:为 sales-data 设置消息保留时间为 24 小时(86400000 毫秒)。
kafka-topics.sh --alter --topic sales-data \n --bootstrap-server $BROKER_ADDRESS \n --config retention.ms=86400000
要删除特定的配置覆盖并恢复为默认的代理设置,请使用 --delete-config 标志:
kafka-topics.sh --alter --topic sales-data \n --bootstrap-server $BROKER_ADDRESS \n --delete-config retention.ms
5. 删除 Kafka 主题
不再使用的主题应被正确删除,以回收磁盘空间并维护集群的整洁。
启用主题删除
默认情况下,Kafka 代理可能会禁用主题删除以确保安全。在删除主题之前,请确保在所有代理的 server.properties 文件中启用了以下设置:
delete.topic.enable=true
删除 old-stream 的命令示例
使用 --delete 标志启动主题移除。主题删除通常是异步的,意味着命令提交请求,而删除操作在后台发生。
kafka-topics.sh --delete --topic old-stream \n --bootstrap-server $BROKER_ADDRESS
确认输出:
Deletion of topic old-stream initiated successfully.
主题管理命令摘要
| 操作 | 标志(们) | 目的 | 示例参数 |
|---|---|---|---|
| 创建 | --create |
初始化一个新主题。 | --partitions 5 --replication-factor 3 |
| 列出 | --list |
显示集群中所有主题。 | 不适用 |
| 描述 | --describe |
查看当前配置和布局。 | --topic my-topic |
| 修改 (分区) | --alter |
增加分区数量。 | --partitions N (N > 当前计数) |
| 修改 (配置) | --alter --config |
覆盖特定主题的代理默认设置。 | --config retention.ms=... |
| 删除 | --delete |
永久移除一个主题。 | --topic my-topic |
结论和后续步骤
命令行仍然是管理 Kafka 集群最强大和最灵活的接口。通过掌握 kafka-topics.sh 实用程序,您可以对主题创建参数、配置覆盖以及删除和描述等必要的管理操作获得精细的控制。
后续步骤:
- 在开发或分阶段环境中练习这些命令。
- 使用
--describe命令探索高级配置选项,以查看所有可配置属性的完整列表(例如cleanup.policy、max.message.bytes)。 - 学习用于生产者和消费者测试的相应 CLI 命令 (
kafka-console-producer.sh和kafka-console-consumer.sh)。