Эффективное управление данными с помощью команды _bulk API Elasticsearch

Используйте Elasticsearch _bulk API правильно: примеры NDJSON, проверка ответов, настройка размера пакетов и безопасные повторные попытки.

Эффективное управление данными с помощью команды _bulk API Elasticsearch

Elasticsearch _bulk API — это правильный инструмент, когда вашему приложению нужно индексировать, обновлять или удалять много документов без затрат на один HTTP-запрос на документ. Чаще всего проблемы возникают из-за тела запроса: это JSON, разделённый переводами строк, а не один красиво отформатированный JSON-массив.

Используйте _bulk, когда вы загружаете логи, синхронизируете записи из другой базы данных или применяете пакетное удаление. Вам всё равно нужно проверять каждый элемент в ответе, потому что одна операция может завершиться ошибкой, в то время как весь HTTP-запрос выполнится успешно.

Понимание структуры _bulk API

_bulk API принимает JSON, разделённый переводами строк, обычно называемый NDJSON. Каждое действие определяется на одной строке. Действия, требующие тела документа, используют следующую строку как источник или полезную нагрузку обновления. Последняя строка также должна заканчиваться переводом строки.

Ключевые компоненты запроса _bulk:

  • Строка действия и метаданных: Эта строка указывает тип операции (index, create, update или delete), целевой индекс и, опционально, идентификатор документа. Типы документов не используются в современных API Elasticsearch.
  • Строка источника: Эта строка содержит фактический JSON-документ для индексации или обновления. Эта строка опускается для операций delete.
  • Разделитель перевода строки: Каждая пара действия/метаданных и соответствующий источник (если применимо) должны быть разделены символом перевода строки (\n). Всё тело запроса должно заканчиваться символом перевода строки.

Пример структуры:

{ "index": { "_index": "my-index", "_id": "1" } }
{ "field1": "value1" }
{ "delete": { "_index": "my-index", "_id": "2" } }

Или для операции удаления:

curl -sS -H 'Content-Type: application/x-ndjson' \
  -X POST 'http://localhost:9200/_bulk' \
  --data-binary @bulk.ndjson

Выполнение распространённых операций с помощью _bulk

_bulk API универсален и может обрабатывать смесь операций в одном запросе. В этом и заключается его истинная сила, позволяющая выполнять сложные манипуляции с данными за один круговой обход.

Индексация нескольких документов

Для индексации нескольких документов используется действие index. Если документ с указанным ID уже существует, index перезапишет его. Если вы хотите гарантировать, что документ будет проиндексирован только в том случае, если он ещё не существует, используйте вместо этого действие create.

Пример: Индексация двух новых документов.

{ "index": { "_index": "my-index", "_id": "1" } }
{ "field1": "value1", "field2": "value2" }
{ "index": { "_index": "my-index", "_id": "2" } }
{ "field1": "another_value", "field2": "different_value" }

Обновление документов

Обновление документов можно выполнить с помощью действия update. Вы указываете идентификатор документа для обновления и предоставляете частичный документ с полями, которые хотите изменить. Если вы хотите использовать скрипт для обновления, вы можете сделать это в рамках действия update.

Пример: Обновление поля в существующем документе.

{ "update": { "_index": "my-index", "_id": "1" } }
{ "doc": { "field1": "updated_value" } }

Удаление документов

Для удаления документов используется действие delete с указанием _index и _id удаляемого документа. Для операций удаления исходный документ не требуется.

Пример: Удаление документа.

{ "delete": { "_index": "my-index", "_id": "2" } }

Комбинирование операций

Настоящая эффективность достигается при смешивании этих операций. Вы можете индексировать новые документы, обновлять существующие и удалять другие в одном запросе _bulk.

Пример: Индексация, обновление и удаление в одном запросе.

{ "index": { "_index": "my-index", "_id": "3" } }
{ "field1": "new_document_field", "field2": "new_document_value" }
{ "update": { "_index": "my-index", "_id": "1" } }
{ "doc": { "field1": "further_updated_value" } }
{ "delete": { "_index": "my-index", "_id": "2" } }

Обработка ответов

_bulk API возвращает JSON-ответ, который детализирует результат каждой отдельной операции. Крайне важно разобрать этот ответ, чтобы убедиться, что все операции прошли успешно, и выявить любые ошибки.

Ответ будет содержать массив items, где каждый элемент соответствует одной из операций в вашем запросе в том же порядке. Каждый элемент будет включать операцию index, create, update или delete, а также её статус (например, created, updated, deleted, noop) и другие соответствующие метаданные.

Пример фрагмента ответа:

{
  "took": 150,
  "errors": false,
  "items": [
    {
      "index": {
        "_index": "my-index",
        "_id": "3",
        "version": 1,
        "result": "created",
        "_shards": {"total": 2, "successful": 1, "failed": 0},
        "_seq_no": 0,
        "_primary_term": 1
      }
    },
    {
      "update": {
        "_index": "my-index",
        "_id": "1",
        "version": 2,
        "result": "updated",
        "_shards": {"total": 2, "successful": 1, "failed": 0},
        "_seq_no": 1,
        "_primary_term": 1
      }
    },
    {
      "delete": {
        "_index": "my-index",
        "_id": "2",
        "version": 2,
        "result": "deleted",
        "_shards": {"total": 2, "successful": 1, "failed": 0},
        "_seq_no": 2,
        "_primary_term": 1
      }
    }
  ]
}

Если какая-либо операция завершилась ошибкой, поле errors верхнего уровня в ответе будет true, а отдельный элемент для неудачной операции будет содержать объект error с описанием проблемы.

Лучшие практики и советы

  • Размер пакета: Очень большие пакеты могут нагружать память клиента, координирующие узлы и узлы данных. Начните с умеренных полезных нагрузок, измерьте пропускную способность и уровень отказов, затем отрегулируйте под ваш кластер и размер документа.
  • Обработка ошибок: Всегда анализируйте ответ на наличие ошибок. При необходимости реализуйте логику повторных попыток для временных ошибок.
  • Разделители перевода строки: Убедитесь, что символы перевода строки (\n) правильно используются между каждым JSON-объектом. Неправильное форматирование — частая причина сбоев _bulk API.
  • Параллелизация: Для очень высоких скоростей приёма данных рассмотрите возможность отправки нескольких запросов _bulk параллельно, но учитывайте ёмкость вашего кластера.
  • create против index: Используйте create, когда хотите, чтобы операция завершилась ошибкой, если ID уже существует. Используйте index, когда замена существующего документа допустима.
  • Клиенты API: Большинство клиентских библиотек Elasticsearch предоставляют удобные методы для построения и выполнения запросов _bulk, абстрагируя часть ручного форматирования.

Практический вывод

_bulk API быстр, потому что снижает накладные расходы на запросы, но он безопасен только тогда, когда ваш клиент обрабатывает ответ как список отдельных результатов. Отправляйте корректный NDJSON с Content-Type: application/x-ndjson, поддерживайте размер пакетов, который ваш кластер может поглотить, и повторяйте только те операции, которые завершились ошибкой по временным причинам.