marquez icon indicating copy to clipboard operation
marquez copied to clipboard

No Columns or ability to add field tags when using Job Event static lineage

Open davidsharp7 opened this issue 1 year ago • 0 comments

Given the following static lineage post

curl -X POST http://localhost:8080/api/v1/lineage \
  -i -H 'Content-Type: application/json' \
  -d '{
        "eventTime": "2024-12-28T20:52:00.001+10:00",
        "job": {
          "namespace": "my-namespace",
          "name": "newtestfoobarmeeeepppppppppp"
        },
        "outputs": [{
          "namespace": "my-namespace",
          "name": "pppppspooky",
          "facets": {
            "schema": {
              "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
              "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.json#/definitions/SchemaDatasetFacet",
              "fields": [
                { "name": "a", "type": "VARCHAR"},
                { "name": "b", "type": "VARCHAR"}
              ]
            }
          }
        }],
        "producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
        "schemaURL": "https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/JobEvent"
      }'

it appears the columns for the datasets won't render in the UI as well as an inability to add field level tags.

Upon investigation it looks like its to do with the current dataset version is not being updated in the OpenLineageDao for the Job Event

    if (event.getInputs() != null) {
      for (Dataset dataset : event.getInputs()) {
        DatasetRecord record = upsertLineageDataset(daos, dataset, now, null, true);
        datasetInputs.add(record);
        insertDatasetFacets(daos, dataset, record, null, null, now);
        insertInputDatasetFacets(daos, dataset, record, null, null, now);
      }
    }

by adding the following the current version is updated in the datasets table

        daos.getDatasetDao()
        .updateVersion(
            record.getDatasetVersionRow().getDatasetUuid(),
            Instant.now(),
            record.getDatasetVersionRow().getUuid());
            

which resolves the columns being displayed.

There is subsequent step where we would need to propagate the tags which are linked to to the dataset version fields. Looks like we can use the dao

        List<Field> dsvTags = daos.getDatasetFieldDao().findByDatasetVersion(record.getDatasetVersionRow().getUuid());
        daos.getDatasetVersionDao().updateFields(
record.getDatasetVersionRow().getUuid(), daos.getDatasetVersionDao().toPgObjectFields(dsvTags));

@wslulciuc does that sound like a fair way of doing it?

davidsharp7 avatar Jun 27 '24 07:06 davidsharp7