data-prepper icon indicating copy to clipboard operation
data-prepper copied to clipboard

Set pipeline for bulk request in OpenSearch sink

Open miguel-vila opened this issue 1 year ago • 3 comments

We want to use the same index, with the same data source and compute multiple text embedding fields, each one using a different model. A document in the index would look like this :

{
  "id": "doc-id",
  "passage_text": "foo bar",
  "passage_embedding_model1": [....],
  "passage_embedding_model2": [....]
}

In order to do this we would need multiple pipelines, using the same data source, same target index.

The index would be created without a default ingest pipeline, and instead, we would create one pipeline per model, targetting a different field each:

{
  "text_embedding": {
    "model_id": "<model_id_1>",
    "field_map": {
      "passage_text": "passage_embedding_model1"
    }
  }
}
{
  "text_embedding": {
    "model_id": "<model_id_2>",
    "field_map": {
      "passage_text": "passage_embedding_model2"
    }
  }
}

but the OpenSearch sink for each data-prepper pipeline should:

  • use a different ingest pipeline
  • do an upsert as to not remove the embedding field in case the document was already created by the other pipeline

The bulk endpoint has a pipeline parameter, which I think can be used for this, but I don't think the OpenSearch sink receives a pipeline parameter.

This PR uses the pipeline value when doing that request, but not sure what other changes would be required to support this.

miguel-vila avatar Sep 20 '24 14:09 miguel-vila

@sb2k16 thanks! I see that the pipeline parameter is being set for the CREATE and index operations, but not for the UPDATE/UPSERTs. I tried to do the same (something like pipeline.ifPresent(updateOperationBuilder::pipeline);) and it seems UpdateOperation doesn't have a pipeline parameter.

miguel-vila avatar Sep 23 '24 21:09 miguel-vila

Yes @miguel-vila. The Bulk API update operation does not support the pipeline parameter.

sb2k16 avatar Sep 25 '24 01:09 sb2k16

@sb2k16 I think the _bulk does support it. I ran a test:

First, I upserted a document with a pipeline parameter. The document was created, and the pipeline was used. But doing the same bulk request with a different pipeline (targeting the same document with the same fields) doesn't have any effect (the pipeline doesn't seem to be called).

Is there any way we could do this? our main objective is:

  • to have an index with multiple text embedding fields
  • when we want to add a new text embedding field, we can update all the old documents so they have the new field (whether this is through a new data-prepper pipeline or something else)

miguel-vila avatar Sep 25 '24 10:09 miguel-vila

Regarding the update action, could we have a simple validation to prevent setting the pipeline when the action is update?

dlvenable avatar Nov 12 '24 21:11 dlvenable