Allow DAGs to trigger on Datasets with wildcard/regex
Description
I would like to have the possibility to have a DAG trigger on ANY dataset matching a wildcard/regex.
Use case/motivation
We have 2 (similar) use cases.
Trigger ingestion process on file arrival.
Files land in blob storage, which triggers events like:
{
"filename": "/raw/my/folder1/2024/04/15/file1.csv",
"event": "fileCreated"
}
and
{
"filename": "/raw/my/folder1/2024/04/15/file1.json",
"event": "fileCreated"
}
and
{
"filename": "/raw/my/folder1/2024/02/10/file1.json",
"event": "fileCreated"
}
These events are converted to Dataset update events, either by calling the Airflow API, or a DAG that polls a queue that contains the above events. In the above case, three Datasets would be updated:
-
ingestionblob://raw/my/folder1/2024/04/15/file1.csv -
ingestionblob://raw/my/folder1/2024/04/15/file1.json -
ingestionblob://raw/my/folder1/2024/02/10/file1.json
Image that we have a DAG ingestCSVFromRawRealTime. I would like to be able to trigger that DAG on datasets with wildcards ingestionblob://raw/my/folder1/{today.year}/{today.month}/{today.day}/*.csv.
Similarly for some DAGs like ingestJsonFromRawRealTime or ingestBackfilledCsvFromRaw.
Ideally, we can create capture groups in the regex of the Dataset trigger, and use these capture groups as DAG parameter.
Monitor database updates
Our (Databricks) DWH, like almost all other dbms's, has a 3-level namespace: database.schema.table. We will create a mechanism to emit dataset events like
-
database1.bronze.table7 -
database4.silver.view3 -
database2.gold.streamingTable1 - ...
Sometimes, we want to be notified on updates to the whole database, schema or table. There are some use cases where having wildcard triggers would be beneficial:
- Airflow needs to trigger a
syncin our BI tool (eg. PowerBI, Tableau) if an object from thegoldschema indatabase2is updated. - When data arrived in the
bronzeschema, some curation scripts needs to be kicked-off to generate a 'cleaned' silver table.
Related issues
- 34534. But with the new logical operators for Datasets, we can already trigger on more than 1 dataset.
Are you willing to submit a PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Hello, could a colleague (@TiDeane) and I be assigned to this issue? We are working for a university project and would be grateful to implement this feature. Thank you in advance.
Also, we would be very grateful for any guidance from anyone if possible.
The original proposal seconded.
Hi @lotrias17, I just assigned this one to you. I can not assign it to @TiDeane. Once a comment has been left from @TiDeane, I should be able to assign it
@sunank200 As you submitted a bunch of dataset related PRs these days, I think you might be interested in join the discussion 🙂
Hello, just commenting so you can assign me as well @Lee-W. I'm going to be working on this with @lotrias17.
Just assigned @TiDeane . Thanks !
We are running into problems with Session. How could we initialize a session in a function inside class DAG (in models/dag.py), with the intention of searching the db for the datasets related to the DAG? Thank you in advance.
Is @provider_session something you're looking for?
We haven't been able to get much progress on this feature, so unfortunately we're going to drop it so others with more experience can pick it up. We're very sorry, and hope for the best for future contributors.