如何使用命令行创建和管理 Kafka 主题

本专家指南提供了关于仅通过命令行界面 (CLI) 管理 Kafka 主题的全面、分步教程。学习使用 `kafka-topics.sh` 实用程序进行关键管理任务的基础命令:创建具有定义分区和副本因子的主题、通过描述验证配置、安全地增加分区数量以及执行主题删除。掌握这些可操作的命令,以确保 Kafka 集群管理强大而高效。

43 浏览量

如何使用命令行创建和管理 Kafka 主题

Apache Kafka 是一个分布式事件流平台,常用于高吞吐量数据管道、实时分析和微服务通信。Kafka 中最基本的组织单位是主题 (Topic),它是发布记录的类别或订阅名称。

虽然存在图形界面工具,但与 Kafka 基础设施交互和管理的最强大、最可靠和最常见的方式是直接通过命令行界面 (CLI)。掌握这些基本命令对于负责维护健康高效的 Kafka 集群的管理员和开发人员至关重要。本指南将提供一个分步教程,介绍如何使用 kafka-topics.sh 脚本来执行最常见的主题管理任务。

先决条件和设置

要执行本指南中的命令,您必须能够访问已安装 Kafka 二进制文件的机器。所有主题管理操作都是使用 kafka-topics.sh 实用程序执行的,该实用程序通常位于 Kafka 安装目录的 bin 文件夹中。

所有命令都需要至少一个 Kafka 代理的地址,使用 --bootstrap-server 标志指定。如果您使用的是旧版 Kafka (2.2 之前的版本),您可能仍需要使用 --zookeeper 标志,但 --bootstrap-server 是推荐和现代的标准。

对于以下示例,我们假设代理在本地默认端口上运行:

# 标准代理地址占位符
BROKER_ADDRESS="localhost:9092"

1. 创建新的 Kafka 主题

创建主题需要定义其名称,以及两个决定其行为和容错能力的关键参数:分区数和复制因子。

关键参数

  • --topic <name>: 主题的名称。
  • --partitions <N>: 主题将被分割成的分区数量。分区是主题内并行度和顺序的单位。
  • --replication-factor <N>: 将在不同代理之间维护的数据副本数量。复制因子为 1 意味着没有冗余。

创建 sales-data 的命令示例

此命令创建一个名为 sales-data 的主题,包含 3 个分区和 2 的复制因子(意味着集群中每个分区将存在 2 个副本)。

kafka-topics.sh --create --topic sales-data \n  --bootstrap-server $BROKER_ADDRESS \n  --partitions 3 \n  --replication-factor 2

提示: 在拥有 N 个代理的生产环境中,通常建议复制因子为 3 以实现高可用性(允许在数据丢失前丢失两个代理),并且应根据预期的吞吐量和消费者并行需求来调整分区数量。

2. 列出所有主题

要查看 Kafka 集群中当前可用的所有主题,请使用 --list 标志。

命令示例

kafka-topics.sh --list --bootstrap-server $BROKER_ADDRESS

输出示例:

sales-data
logistics-stream
__consumer_offsets

3. 描述主题配置

检查特定主题的现有配置、分区计数和代理分配对于故障排除和验证至关重要。使用 --describe 标志。

描述 sales-data 的命令示例

kafka-topics.sh --describe --topic sales-data \n  --bootstrap-server $BROKER_ADDRESS

输出解读:

输出同时显示了主题级别和分区级别的配置:

Topic: sales-data  PartitionCount: 3 ReplicationFactor: 2 Configs:
  Topic: sales-data  Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
  Topic: sales-data  Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
  Topic: sales-data  Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
  • Leader (首领): 当前负责处理该分区读/写的代理。
  • Replicas (副本): 持有该分区副本的代理列表。
  • Isr (In-Sync Replicas - 同步副本): 与 Leader 完全同步的副本子集。高可用性要求 Leader 位于 ISR 中。

4. 修改现有主题

Kafka 在创建后提供了有限的主题修改机制。两个最常见的修改任务是增加分区计数和覆盖默认的代理配置设置。

A. 增加分区

分区只能增加,永远不能减少。增加分区有助于扩展消费者的并行性。

警告: 增加分区会更改消息如何被映射(哈希)到分区。如果您的生产者依赖基于键的排序保证,增加分区可能会破坏现有键的有序传递。

如果 sales-data 当前有 3 个分区,我们可以将其增加到 5 个:

kafka-topics.sh --alter --topic sales-data \n  --bootstrap-server $BROKER_ADDRESS \n  --partitions 5

B. 修改主题特定配置

您可以使用 --config 标志覆盖特定主题的全局代理设置(如消息保留时间或清理策略)。

示例:为 sales-data 设置消息保留时间为 24 小时(86400000 毫秒)。

kafka-topics.sh --alter --topic sales-data \n  --bootstrap-server $BROKER_ADDRESS \n  --config retention.ms=86400000

要删除特定的配置覆盖并恢复为默认的代理设置,请使用 --delete-config 标志:

kafka-topics.sh --alter --topic sales-data \n  --bootstrap-server $BROKER_ADDRESS \n  --delete-config retention.ms

5. 删除 Kafka 主题

不再使用的主题应被正确删除,以回收磁盘空间并维护集群的整洁。

启用主题删除

默认情况下,Kafka 代理可能会禁用主题删除以确保安全。在删除主题之前,请确保在所有代理的 server.properties 文件中启用了以下设置:

delete.topic.enable=true

删除 old-stream 的命令示例

使用 --delete 标志启动主题移除。主题删除通常是异步的,意味着命令提交请求,而删除操作在后台发生。

kafka-topics.sh --delete --topic old-stream \n  --bootstrap-server $BROKER_ADDRESS

确认输出:

Deletion of topic old-stream initiated successfully.

主题管理命令摘要

操作 标志(们) 目的 示例参数
创建 --create 初始化一个新主题。 --partitions 5 --replication-factor 3
列出 --list 显示集群中所有主题。 不适用
描述 --describe 查看当前配置和布局。 --topic my-topic
修改 (分区) --alter 增加分区数量。 --partitions N (N > 当前计数)
修改 (配置) --alter --config 覆盖特定主题的代理默认设置。 --config retention.ms=...
删除 --delete 永久移除一个主题。 --topic my-topic

结论和后续步骤

命令行仍然是管理 Kafka 集群最强大和最灵活的接口。通过掌握 kafka-topics.sh 实用程序,您可以对主题创建参数、配置覆盖以及删除和描述等必要的管理操作获得精细的控制。

后续步骤:

  1. 在开发或分阶段环境中练习这些命令。
  2. 使用 --describe 命令探索高级配置选项,以查看所有可配置属性的完整列表(例如 cleanup.policymax.message.bytes)。
  3. 学习用于生产者和消费者测试的相应 CLI 命令 (kafka-console-producer.shkafka-console-consumer.sh)。