airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Allow DAGs to trigger on Datasets with wildcard/regex

Open w0ut0 opened this issue 1 year ago • 8 comments

Description

I would like to have the possibility to have a DAG trigger on ANY dataset matching a wildcard/regex.

Use case/motivation

We have 2 (similar) use cases.

Trigger ingestion process on file arrival.

Files land in blob storage, which triggers events like:

{
"filename": "/raw/my/folder1/2024/04/15/file1.csv",
"event": "fileCreated"
}

and

{
"filename": "/raw/my/folder1/2024/04/15/file1.json",
"event": "fileCreated"
}

and

{
"filename": "/raw/my/folder1/2024/02/10/file1.json",
"event": "fileCreated"
}

These events are converted to Dataset update events, either by calling the Airflow API, or a DAG that polls a queue that contains the above events. In the above case, three Datasets would be updated:

  • ingestionblob://raw/my/folder1/2024/04/15/file1.csv
  • ingestionblob://raw/my/folder1/2024/04/15/file1.json
  • ingestionblob://raw/my/folder1/2024/02/10/file1.json

Image that we have a DAG ingestCSVFromRawRealTime. I would like to be able to trigger that DAG on datasets with wildcards ingestionblob://raw/my/folder1/{today.year}/{today.month}/{today.day}/*.csv.

Similarly for some DAGs like ingestJsonFromRawRealTime or ingestBackfilledCsvFromRaw.

Ideally, we can create capture groups in the regex of the Dataset trigger, and use these capture groups as DAG parameter.

Monitor database updates

Our (Databricks) DWH, like almost all other dbms's, has a 3-level namespace: database.schema.table. We will create a mechanism to emit dataset events like

  • database1.bronze.table7
  • database4.silver.view3
  • database2.gold.streamingTable1
  • ...

Sometimes, we want to be notified on updates to the whole database, schema or table. There are some use cases where having wildcard triggers would be beneficial:

  • Airflow needs to trigger a sync in our BI tool (eg. PowerBI, Tableau) if an object from the gold schema in database2 is updated.
  • When data arrived in the bronze schema, some curation scripts needs to be kicked-off to generate a 'cleaned' silver table.

Related issues

  • 34534. But with the new logical operators for Datasets, we can already trigger on more than 1 dataset.

Are you willing to submit a PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

w0ut0 avatar Apr 15 '24 07:04 w0ut0

Hello, could a colleague (@TiDeane) and I be assigned to this issue? We are working for a university project and would be grateful to implement this feature. Thank you in advance.

lotrias17 avatar Apr 22 '24 09:04 lotrias17

Also, we would be very grateful for any guidance from anyone if possible.

lotrias17 avatar Apr 22 '24 09:04 lotrias17

The original proposal seconded.

kovla avatar Apr 22 '24 15:04 kovla

Hi @lotrias17, I just assigned this one to you. I can not assign it to @TiDeane. Once a comment has been left from @TiDeane, I should be able to assign it

Lee-W avatar Apr 30 '24 09:04 Lee-W

@sunank200 As you submitted a bunch of dataset related PRs these days, I think you might be interested in join the discussion 🙂

Lee-W avatar Apr 30 '24 09:04 Lee-W

Hello, just commenting so you can assign me as well @Lee-W. I'm going to be working on this with @lotrias17.

TiDeane avatar Apr 30 '24 12:04 TiDeane

Just assigned @TiDeane . Thanks !

Lee-W avatar Apr 30 '24 12:04 Lee-W

We are running into problems with Session. How could we initialize a session in a function inside class DAG (in models/dag.py), with the intention of searching the db for the datasets related to the DAG? Thank you in advance.

lotrias17 avatar May 18 '24 03:05 lotrias17

Is @provider_session something you're looking for?

Lee-W avatar May 28 '24 14:05 Lee-W

We haven't been able to get much progress on this feature, so unfortunately we're going to drop it so others with more experience can pick it up. We're very sorry, and hope for the best for future contributors.

TiDeane avatar Jun 14 '24 10:06 TiDeane