airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Implement `on_killed` support for tasks during DAG run timeout

Open MRLab12 opened this issue 1 year ago • 2 comments


This PR addresses issue #41036 by adding support for the on_killed callback on tasks that are still running when a DAG run reaches its timeout.

Key changes:

  1. Added a new configuration option kill_tasks_on_dagrun_timeout to control whether tasks should be killed when a DAG run times out (default: False).
  2. Updated logic in _schedule_dag_run to call task on_kill if kill_tasks_on_dagrun_timeout is enabled.

Testing:

  • Added unit test for the new timeout handling logic
  • Verified that the new configuration option works as expected

MRLab12 avatar Aug 20 '24 21:08 MRLab12

I'm having some issues with the test I added. Looking at other tests in test_scheduler_job I see that session.refresh(dr) is used before asserting the dag run state. In my test it causes a missing state error. Has anyone encountered this before?

Also. I want to check that the on_kill method of BashOperator is called, but haven't gotten the Mock to work and it is currently failing. Looking into it.

I wanted to get this draft PR up to check that I did the correct changes. Hopefully I understood the issue correctly.

MRLab12 avatar Aug 20 '24 21:08 MRLab12

Apologies for the merge mess!

MRLab12 avatar Aug 28 '24 19:08 MRLab12