Implement `on_killed` support for tasks during DAG run timeout
This PR addresses issue #41036 by adding support for the on_killed callback on tasks that are still running when a DAG run reaches its timeout.
Key changes:
- Added a new configuration option
kill_tasks_on_dagrun_timeoutto control whether tasks should be killed when a DAG run times out (default: False). - Updated logic in
_schedule_dag_runto call taskon_killifkill_tasks_on_dagrun_timeoutis enabled.
Testing:
- Added unit test for the new timeout handling logic
- Verified that the new configuration option works as expected
I'm having some issues with the test I added. Looking at other tests in test_scheduler_job I see that session.refresh(dr) is used before asserting the dag run state. In my test it causes a missing state error. Has anyone encountered this before?
Also. I want to check that the on_kill method of BashOperator is called, but haven't gotten the Mock to work and it is currently failing. Looking into it.
I wanted to get this draft PR up to check that I did the correct changes. Hopefully I understood the issue correctly.
Apologies for the merge mess!