高效使用Elasticsearch _bulk API命令管理数据

通过NDJSON示例、响应检查、批次大小调整和安全重试指导,正确使用Elasticsearch _bulk API。

高效使用Elasticsearch _bulk API命令管理数据

当您的应用需要索引、更新或删除大量文档,而不想为每个文档支付一次HTTP请求的开销时,Elasticsearch的_bulk API是合适的工具。容易让人困惑的是请求体:它是换行分隔的JSON,而不是一个格式美观的JSON数组。

在加载日志、同步其他数据库的记录或应用批量清理删除时,请使用_bulk。您仍然需要检查响应中的每个项目,因为单个操作可能失败,而整个HTTP请求却成功。

理解_bulk API的结构

_bulk API接受换行分隔的JSON,通常称为NDJSON。每个动作定义在一行上。需要文档主体的动作使用下一行作为源或更新负载。最后一行也必须以换行符结束。

_bulk请求的关键组成部分:

  • 动作和元数据行: 此行指定操作类型(indexcreateupdatedelete)、目标索引,以及可选的文档ID。现代Elasticsearch API中不使用文档类型。
  • 源行: 此行包含要索引或更新的实际JSON文档。对于delete操作,此行被省略。
  • 换行分隔符: 每个动作/元数据对及其对应的源(如果适用)必须由换行符(\n)分隔。整个请求体应以换行符结束。

示例结构:

{ "index": { "_index": "my-index", "_id": "1" } }
{ "field1": "value1" }
{ "delete": { "_index": "my-index", "_id": "2" } }

或者对于删除操作:

curl -sS -H 'Content-Type: application/x-ndjson' \
  -X POST 'http://localhost:9200/_bulk' \
  --data-binary @bulk.ndjson

使用_bulk执行常见操作

_bulk API非常灵活,可以在单个请求中处理混合操作。这正是其真正强大之处,允许您在单次往返中执行复杂的数据操作。

索引多个文档

要索引多个文档,请使用index操作。如果具有指定ID的文档已存在,index将覆盖它。如果您希望确保仅在文档不存在时才索引,请改用create操作。

示例: 索引两个新文档。

{ "index": { "_index": "my-index", "_id": "1" } }
{ "field1": "value1", "field2": "value2" }
{ "index": { "_index": "my-index", "_id": "2" } }
{ "field1": "another_value", "field2": "different_value" }

更新文档

可以使用update操作更新文档。您指定要更新的文档ID,并提供包含要更改字段的部分文档。如果您想使用脚本进行更新,可以在update操作中实现。

示例: 更新现有文档中的字段。

{ "update": { "_index": "my-index", "_id": "1" } }
{ "doc": { "field1": "updated_value" } }

删除文档

要删除文档,请使用delete操作,指定要删除文档的_index_id。删除操作不需要源文档。

示例: 删除一个文档。

{ "delete": { "_index": "my-index", "_id": "2" } }

组合操作

真正的效率来自于混合这些操作。您可以在同一个_bulk请求中索引新文档、更新现有文档和删除其他文档。

示例: 在一个请求中索引、更新和删除。

{ "index": { "_index": "my-index", "_id": "3" } }
{ "field1": "new_document_field", "field2": "new_document_value" }
{ "update": { "_index": "my-index", "_id": "1" } }
{ "doc": { "field1": "further_updated_value" } }
{ "delete": { "_index": "my-index", "_id": "2" } }

响应处理

_bulk API返回一个JSON响应,详细说明每个单独操作的结果。解析此响应以验证所有操作是否成功并识别任何错误至关重要。

响应将包含一个items数组,其中每个元素对应于请求中的一个操作,顺序相同。每个项目将包括indexcreateupdatedelete操作,以及其状态(例如createdupdateddeletednoop)和其他相关元数据。

示例响应片段:

{
  "took": 150,
  "errors": false,
  "items": [
    {
      "index": {
        "_index": "my-index",
        "_id": "3",
        "version": 1,
        "result": "created",
        "_shards": {"total": 2, "successful": 1, "failed": 0},
        "_seq_no": 0,
        "_primary_term": 1
      }
    },
    {
      "update": {
        "_index": "my-index",
        "_id": "1",
        "version": 2,
        "result": "updated",
        "_shards": {"total": 2, "successful": 1, "failed": 0},
        "_seq_no": 1,
        "_primary_term": 1
      }
    },
    {
      "delete": {
        "_index": "my-index",
        "_id": "2",
        "version": 2,
        "result": "deleted",
        "_shards": {"total": 2, "successful": 1, "failed": 0},
        "_seq_no": 2,
        "_primary_term": 1
      }
    }
  ]
}

如果任何操作失败,响应中的顶级errors字段将为true,并且失败操作的单个项目将包含一个error对象,详细说明问题。

最佳实践和提示

  • 批次大小: 非常大的批次可能会给客户端内存、协调节点和数据节点带来压力。从适中的负载开始,测量吞吐量和拒绝率,然后根据您的集群和文档大小进行调整。
  • 错误处理: 始终解析响应中的错误。如有必要,为临时错误实现重试逻辑。
  • 换行分隔符: 确保在每个JSON对象之间正确使用换行符(\n)。格式不正确是_bulk API失败的常见原因。
  • 并行化: 对于非常高的摄取速率,考虑并行发送多个_bulk请求,但要注意集群的容量。
  • createindex 当您希望操作在ID已存在时失败时,请使用create。当替换现有文档可以接受时,请使用index
  • API客户端: 大多数Elasticsearch客户端库提供了方便的方法来构建和执行_bulk请求,抽象掉了一些手动格式化的麻烦。

实用要点

_bulk API之所以快,是因为它减少了请求开销,但只有当您的客户端将响应视为单个结果的列表时,它才是安全的。发送有效的NDJSON,设置Content-Type: application/x-ndjson,保持批次大小在集群可吸收的范围内,并且仅重试因临时原因失败的操作。