Elasticsearch _bulk API コマンドを使用した効率的なデータ管理

NDJSON の例、応答の確認、バッチサイズ設定、安全な再試行ガイダンスを含む、Elasticsearch _bulk API の正しい使用方法。

Elasticsearch _bulk API コマンドを使用した効率的なデータ管理

Elasticsearch の _bulk API は、アプリケーションがドキュメントごとに HTTP リクエストを 1 回行うことなく、多数のドキュメントをインデックス、更新、または削除する必要がある場合に適したツールです。多くの人がつまずくのはリクエスト本文です。これは、きれいに整形された JSON 配列ではなく、改行区切りの JSON です。

ログの読み込み、別のデータベースからのレコードの同期、またはクリーンアップ削除のバッチ適用を行う場合は、_bulk を使用します。HTTP リクエスト全体が成功しても、個々の操作が失敗する可能性があるため、応答内のすべての項目を検査する必要があります。

_bulk API の構造を理解する

_bulk API は、通常 NDJSON と呼ばれる改行区切りの JSON を受け入れます。各アクションは 1 行で定義されます。ドキュメント本文を必要とするアクションは、次の行をソースまたは更新ペイロードとして使用します。最終行も改行で終わる必要があります。

_bulk リクエストの主要コンポーネント:

  • アクションとメタデータ行: この行は、操作タイプ (indexcreateupdate、または delete)、ターゲットインデックス、およびオプションでドキュメント ID を指定します。最新の Elasticsearch API ではドキュメントタイプは使用されません。
  • ソース行: この行には、インデックスまたは更新される実際の JSON ドキュメントが含まれます。この行は delete 操作では省略されます。
  • 改行区切り: 各アクション/メタデータのペアとそれに対応するソース (該当する場合) は、改行文字 (\n) で区切る必要があります。リクエスト本文全体は改行文字で終わる必要があります。

構造例:

{ "index": { "_index": "my-index", "_id": "1" } }
{ "field1": "value1" }
{ "delete": { "_index": "my-index", "_id": "2" } }

または削除操作の場合:

curl -sS -H 'Content-Type: application/x-ndjson' \
  -X POST 'http://localhost:9200/_bulk' \
  --data-binary @bulk.ndjson

_bulk を使用した一般的な操作の実行

_bulk API は汎用性が高く、1 つのリクエスト内で複数の操作を混在させることができます。これが真の力であり、1 回のラウンドトリップで複雑なデータ操作を実行できます。

複数のドキュメントのインデックス作成

複数のドキュメントをインデックスするには、index アクションを使用します。指定された ID のドキュメントがすでに存在する場合、index はそれを上書きします。ドキュメントがまだ存在しない場合にのみインデックスされるようにするには、代わりに create アクションを使用します。

例: 2 つの新しいドキュメントをインデックスします。

{ "index": { "_index": "my-index", "_id": "1" } }
{ "field1": "value1", "field2": "value2" }
{ "index": { "_index": "my-index", "_id": "2" } }
{ "field1": "another_value", "field2": "different_value" }

ドキュメントの更新

ドキュメントの更新は、update アクションを使用して行うことができます。更新するドキュメント ID を指定し、変更するフィールドを含む部分的なドキュメントを提供します。更新にスクリプトを使用する場合は、update アクション内で行うことができます。

例: 既存のドキュメントのフィールドを更新します。

{ "update": { "_index": "my-index", "_id": "1" } }
{ "doc": { "field1": "updated_value" } }

ドキュメントの削除

ドキュメントを削除するには、delete アクションを使用し、削除するドキュメントの _index_id を指定します。削除操作にはソースドキュメントは必要ありません。

例: ドキュメントを削除します。

{ "delete": { "_index": "my-index", "_id": "2" } }

操作の組み合わせ

真の効率性は、これらの操作を混在させることにあります。新しいドキュメントをインデックスし、既存のドキュメントを更新し、他のドキュメントを削除することを、すべて同じ _bulk リクエストで行うことができます。

例: 1 つのリクエストでインデックス、更新、削除を行います。

{ "index": { "_index": "my-index", "_id": "3" } }
{ "field1": "new_document_field", "field2": "new_document_value" }
{ "update": { "_index": "my-index", "_id": "1" } }
{ "doc": { "field1": "further_updated_value" } }
{ "delete": { "_index": "my-index", "_id": "2" } }

応答の処理

_bulk API は、個々の操作の結果を詳細に示す JSON 応答を返します。この応答を解析して、すべての操作が成功したことを確認し、エラーを特定することが重要です。

応答には items 配列が含まれ、各要素はリクエスト内の操作の 1 つに対応し、同じ順序で並びます。各項目には、indexcreateupdate、または delete 操作と、そのステータス (例: createdupdateddeletednoop) およびその他の関連メタデータが含まれます。

応答スニペット例:

{
  "took": 150,
  "errors": false,
  "items": [
    {
      "index": {
        "_index": "my-index",
        "_id": "3",
        "version": 1,
        "result": "created",
        "_shards": {"total": 2, "successful": 1, "failed": 0},
        "_seq_no": 0,
        "_primary_term": 1
      }
    },
    {
      "update": {
        "_index": "my-index",
        "_id": "1",
        "version": 2,
        "result": "updated",
        "_shards": {"total": 2, "successful": 1, "failed": 0},
        "_seq_no": 1,
        "_primary_term": 1
      }
    },
    {
      "delete": {
        "_index": "my-index",
        "_id": "2",
        "version": 2,
        "result": "deleted",
        "_shards": {"total": 2, "successful": 1, "failed": 0},
        "_seq_no": 2,
        "_primary_term": 1
      }
    }
  ]
}

いずれかの操作が失敗した場合、応答のトップレベルの errors フィールドは true になり、失敗した操作の個々の項目には問題の詳細を示す error オブジェクトが含まれます。

ベストプラクティスとヒント

  • バッチサイズ: 非常に大きなバッチは、クライアントメモリ、調整ノード、データノードに負荷をかける可能性があります。控えめなペイロードから始め、スループットと拒否率を測定し、クラスターとドキュメントサイズに合わせて調整します。
  • エラー処理: 常に応答を解析してエラーを確認します。必要に応じて、一時的なエラーに対する再試行ロジックを実装します。
  • 改行区切り: 各 JSON オブジェクト間で改行文字 (\n) が正しく使用されていることを確認します。フォーマットが正しくないことは、_bulk API の失敗の一般的な原因です。
  • 並列化: 非常に高い取り込みレートの場合は、複数の _bulk リクエストを並行して送信することを検討しますが、クラスターの容量に注意してください。
  • createindex: ID がすでに存在する場合に操作を失敗させたい場合は create を使用します。既存のドキュメントの置き換えが許容される場合は index を使用します。
  • API クライアント: ほとんどの Elasticsearch クライアントライブラリは、_bulk リクエストを構築および実行するための便利なメソッドを提供し、手動によるフォーマットの一部を抽象化します。

実践的なポイント

_bulk API はリクエストのオーバーヘッドを削減するため高速ですが、クライアントが応答を個々の結果のリストとして扱う場合にのみ安全です。Content-Type: application/x-ndjson を指定して有効な NDJSON を送信し、クラスターが吸収できるサイズのバッチを維持し、一時的な理由で失敗した操作のみを再試行します。