高效使用Elasticsearch _bulk API命令管理数据
通过NDJSON示例、响应检查、批次大小调整和安全重试指导,正确使用Elasticsearch _bulk API。
高效使用Elasticsearch _bulk API命令管理数据
当您的应用需要索引、更新或删除大量文档,而不想为每个文档支付一次HTTP请求的开销时,Elasticsearch的_bulk API是合适的工具。容易让人困惑的是请求体:它是换行分隔的JSON,而不是一个格式美观的JSON数组。
在加载日志、同步其他数据库的记录或应用批量清理删除时,请使用_bulk。您仍然需要检查响应中的每个项目,因为单个操作可能失败,而整个HTTP请求却成功。
理解_bulk API的结构
_bulk API接受换行分隔的JSON,通常称为NDJSON。每个动作定义在一行上。需要文档主体的动作使用下一行作为源或更新负载。最后一行也必须以换行符结束。
_bulk请求的关键组成部分:
- 动作和元数据行: 此行指定操作类型(
index、create、update或delete)、目标索引,以及可选的文档ID。现代Elasticsearch API中不使用文档类型。 - 源行: 此行包含要索引或更新的实际JSON文档。对于
delete操作,此行被省略。 - 换行分隔符: 每个动作/元数据对及其对应的源(如果适用)必须由换行符(
\n)分隔。整个请求体应以换行符结束。
示例结构:
{ "index": { "_index": "my-index", "_id": "1" } }
{ "field1": "value1" }
{ "delete": { "_index": "my-index", "_id": "2" } }
或者对于删除操作:
curl -sS -H 'Content-Type: application/x-ndjson' \
-X POST 'http://localhost:9200/_bulk' \
--data-binary @bulk.ndjson
使用_bulk执行常见操作
_bulk API非常灵活,可以在单个请求中处理混合操作。这正是其真正强大之处,允许您在单次往返中执行复杂的数据操作。
索引多个文档
要索引多个文档,请使用index操作。如果具有指定ID的文档已存在,index将覆盖它。如果您希望确保仅在文档不存在时才索引,请改用create操作。
示例: 索引两个新文档。
{ "index": { "_index": "my-index", "_id": "1" } }
{ "field1": "value1", "field2": "value2" }
{ "index": { "_index": "my-index", "_id": "2" } }
{ "field1": "another_value", "field2": "different_value" }
更新文档
可以使用update操作更新文档。您指定要更新的文档ID,并提供包含要更改字段的部分文档。如果您想使用脚本进行更新,可以在update操作中实现。
示例: 更新现有文档中的字段。
{ "update": { "_index": "my-index", "_id": "1" } }
{ "doc": { "field1": "updated_value" } }
删除文档
要删除文档,请使用delete操作,指定要删除文档的_index和_id。删除操作不需要源文档。
示例: 删除一个文档。
{ "delete": { "_index": "my-index", "_id": "2" } }
组合操作
真正的效率来自于混合这些操作。您可以在同一个_bulk请求中索引新文档、更新现有文档和删除其他文档。
示例: 在一个请求中索引、更新和删除。
{ "index": { "_index": "my-index", "_id": "3" } }
{ "field1": "new_document_field", "field2": "new_document_value" }
{ "update": { "_index": "my-index", "_id": "1" } }
{ "doc": { "field1": "further_updated_value" } }
{ "delete": { "_index": "my-index", "_id": "2" } }
响应处理
_bulk API返回一个JSON响应,详细说明每个单独操作的结果。解析此响应以验证所有操作是否成功并识别任何错误至关重要。
响应将包含一个items数组,其中每个元素对应于请求中的一个操作,顺序相同。每个项目将包括index、create、update或delete操作,以及其状态(例如created、updated、deleted、noop)和其他相关元数据。
示例响应片段:
{
"took": 150,
"errors": false,
"items": [
{
"index": {
"_index": "my-index",
"_id": "3",
"version": 1,
"result": "created",
"_shards": {"total": 2, "successful": 1, "failed": 0},
"_seq_no": 0,
"_primary_term": 1
}
},
{
"update": {
"_index": "my-index",
"_id": "1",
"version": 2,
"result": "updated",
"_shards": {"total": 2, "successful": 1, "failed": 0},
"_seq_no": 1,
"_primary_term": 1
}
},
{
"delete": {
"_index": "my-index",
"_id": "2",
"version": 2,
"result": "deleted",
"_shards": {"total": 2, "successful": 1, "failed": 0},
"_seq_no": 2,
"_primary_term": 1
}
}
]
}
如果任何操作失败,响应中的顶级errors字段将为true,并且失败操作的单个项目将包含一个error对象,详细说明问题。
最佳实践和提示
- 批次大小: 非常大的批次可能会给客户端内存、协调节点和数据节点带来压力。从适中的负载开始,测量吞吐量和拒绝率,然后根据您的集群和文档大小进行调整。
- 错误处理: 始终解析响应中的错误。如有必要,为临时错误实现重试逻辑。
- 换行分隔符: 确保在每个JSON对象之间正确使用换行符(
\n)。格式不正确是_bulkAPI失败的常见原因。 - 并行化: 对于非常高的摄取速率,考虑并行发送多个
_bulk请求,但要注意集群的容量。 create与index: 当您希望操作在ID已存在时失败时,请使用create。当替换现有文档可以接受时,请使用index。- API客户端: 大多数Elasticsearch客户端库提供了方便的方法来构建和执行
_bulk请求,抽象掉了一些手动格式化的麻烦。
实用要点
_bulk API之所以快,是因为它减少了请求开销,但只有当您的客户端将响应视为单个结果的列表时,它才是安全的。发送有效的NDJSON,设置Content-Type: application/x-ndjson,保持批次大小在集群可吸收的范围内,并且仅重试因临时原因失败的操作。