Unify `run_sql` tasks across DAGs
Description
We have several unique implementations of a common run_sql task across our DAGs. This task was pulled out into common/sql.py in #4836:
https://github.com/WordPress/openverse/blob/41479444c02b394f991d566a6c0e819b706447be/catalog/dags/common/sql.py#L179-L202
We have several DAGs which can now use this run_sql function directly, rather than re-implementing their own:
-
delete_records:
https://github.com/WordPress/openverse/blob/9b4f727d87e026aa2ca7fb9afcda0c05666ef9e5/catalog/dags/database/delete_records/delete_records.py#L21-L38
-
batched_update(this one may require some additional work on either the base function or the call to accommodate thedry_runvariable):
https://github.com/WordPress/openverse/blob/cea805019e259ae903bda097acac2238f297c613/catalog/dags/database/batched_update/batched_update.py#L52-L82
-
add_license_url:
https://github.com/WordPress/openverse/blob/3e3799a716ee7942d574c08f0e5a4c4e3eeac5a4/catalog/dags/maintenance/add_license_url.py#L35-L54
Additional context
This also came up in the discussion of #4572
Can I try this? I've never contributed to this project before.
Hi @soysaucewaso, thank you for your interest in contributing to Openverse! I've assigned this issue to you. If you have any questions, you may leave them here.
Please check out our welcome and general setup documentation pages for getting started with setting up your local environment.
It looks as though airflow doesn't supports tasks calling other tasks. Just to clarify, should I change the DAG logic so it first runs the 1st task and then the 2nd? Having run_sql as a helper method abstracted away the run_sql call from the dag to the task which makes the dags logic less complex. Maybe we could have a common run sql helper method in common which implements all the logic, and then a run_sql task implemented like this so run_sql can also be a task.
def run_sql_helper(args):
logic ...
return hook
@task
def run_sql(args):
return run_sql_helper(args)
And then tasks could call run_sql_helper
What do you think?
@soysaucewaso in what cases would the task be called from another task? Ideally we'd want to unify all of the cases listed into a single run_sql task which could be imported and called in lieu of the current run_sql functions.
Thanks for the response. Currently the alternative run_sql functions are used by various tasks such as create_deleted_records and delete_records_from_media_table.
Here's the create_deleted_records task:
@task
@setup_deleted_db_columns_for_media_type
@setup_db_columns_for_media_type
def create_deleted_records(
*,
select_query: str,
deleted_reason: str,
media_type: str,
db_columns: list[Column] = None,
deleted_db_columns: list[Column] = None,
task: AbstractOperator = None,
postgres_conn_id: str = POSTGRES_CONN_ID,
):
"""
Select records from the given media table using the select query, and then for each
record create a corresponding record in the Deleted Media table.
"""
destination_cols = ", ".join([col.db_name for col in deleted_db_columns])
# To build the source columns, we first list all columns in the main media table
source_cols = ", ".join([col.db_name for col in db_columns])
# Then add the deleted-media specific columns.
# `deleted_on` is set to its insert value to get the current timestamp:
source_cols += f", {DELETED_ON.get_insert_value()}"
# `deleted_reason` is set to the given string
source_cols += f", '{deleted_reason}'"
# The provider, foreign_id pair uniquely identifies a record. When trying to
# add a record to the deleted_media table, if the record's (provider, foreign_id)
# pair is already present in the table, no additional record will be added and the
# existing record in the deleted_media table will not be updated. This preserves the
# record exactly as it was when it was first deleted.
unique_cols = f"({PROVIDER.db_name}, md5({FOREIGN_ID.db_name}))"
return run_sql(
sql_template=constants.CREATE_RECORDS_QUERY,
postgres_conn_id=postgres_conn_id,
task=task,
destination_table=f"deleted_{media_type}",
destination_cols=destination_cols,
source_table=media_type,
source_cols=source_cols,
select_query=select_query,
unique_cols=unique_cols,
)
Are you suggesting we delete tasks like create_deleted_records which call run_sql and instead implement this logic in the DAGs?
Maybe we should keep these tasks, but move them so they return the kwargs for run_sql and then make the DAGs call both tasks?
@AetherUnbound I forgot to tag you in my previous question
Is this issue open for contribution? I would like to contribute.
@mihirahuja1 Hey, feel free to take over. One of the problems I found was that the run_sql implementation in common/sql.py is defined as a task with @task while the other definitions don't.
From my understanding the airflow workflow is supposed to be defined declaratively, so airflow will throw an error when one @task function calls another @task function. There are plenty of ways that you could work around this though, good luck!
@soysaucewaso have some questions to ensure I understand the problem correctly.
1 For tasks that currently call run_sql, what's the preferred approach:
- Should we modify these tasks to return kwargs and have DAGs orchestrate both tasks?
- Should we move this logic directly into the DAGs?
- Or is there another approach you'd recommend?
- For cases with special parameters (like
dry_runinbatched_update), should these be incorporated into the common implementation or handled differently?
This would help me understand the best direction for implementing the solution. Thank you!
Hey, I'm not a major contributor on this project so I'm not sure. 1. The approach I would recommend because it wouldn't change the structure much is to move the logic from the run_sql task in common to a run_sql_helper function in common. Then make the run_sql task, and the other tasks call the run_sql_helper function. Ideally(from my understanding), we would move the run_sql-calling logic in other tasks from their function definitions into the DAG logic, but this would be very time consuming and not that helpful.
You could make run_sql_helper take special parameters, while having the run_sql task pass defaults.
I last pinged a contributor for this project over a month ago and there's been no response, so I think you can feel free to take whatever action you think would work.
I'd like to work on this issue. Please assign it to me.
Thank you, @krysal, for assigning me this PR. I am reviewing this PR and after start the work on this PR. Please explain to me for this PR if I missed something, or you may want to inform me from your side(if you want to provide me with other information for this PR).
Hello @krysal, please review this PR.
Hi, I’ve opened PR CODEBRAKERBOYY/openverse#1 to unify run_sql in delete_records.
Looking forward to your feedback!