airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Running tasks marked as skipped on DagRun timeout

Open erdos2n opened this issue 2 years ago • 34 comments

Apache Airflow version

2.5.2

What happened

Users are experiencing the following:

  • A DAG begins to run
  • Task(s) go into running state, as expected
  • The DagRun times out, marking any currently running task as SKIPPED
  • Because tasks are not mark as failed the on_failure_callback never gets revoked

Here are some example logs:

[2023-03-22, 16:30:02 PDT] {local_task_job.py:266} WARNING - DagRun timed out after 4:00:02.394287.
[2023-03-22, 16:30:07 PDT] {local_task_job.py:266} WARNING - DagRun timed out after 4:00:07.447373.
[2023-03-22, 16:30:07 PDT] {local_task_job.py:272} WARNING - State of this instance has been externally set to skipped. Terminating instance.
[2023-03-22, 16:30:07 PDT] {process_utils.py:129} INFO - Sending Signals.SIGTERM to group 8515. PIDs of all processes in the group: [8515]

What you think should happen instead

Once a DagRun times out, tasks that are currently in RUNNING should be marked as FAILED and downstream tasks should be marked as UPSTREAM_FAILED

How to reproduce

The following DAG will cause this intermittently

import time
import logging

from airflow.decorators import dag, task
from airflow.utils.dates import datetime, timedelta



@task
def task_1():
    import random
    pulses = random.randint(5, 10)
    for i in range(pulses):
        logging.info(f"pulsing: pulse...{i}")
        time.sleep(4)


@task
def task_2():
    import random
    pulses = random.randint(10, 20)
    for i in range(pulses):
        logging.info(f"pulsing: pulse...{i}")
        time.sleep(5)

@task
def downstream_finished_task():
    logging.info("task finished")
    time.sleep(20)

@dag(dag_id="dagrun_interval_test",
     schedule_interval="*/5 * * * *",
     start_date=datetime(2023, 3, 23),
     dagrun_timeout=timedelta(seconds=30),
     catchup=False)
def my_dag():
    return [task_1(), task_2()] >> downstream_finished_task()


dag = my_dag()
  • Running tasks marked skipped
  • Downstream left with no status

See screenshot Screen Shot 2023-03-23 at 4 23 34 PM

Operating System

MacOS

Versions of Apache Airflow Providers

N/A

Deployment

Astronomer

Deployment details

Airflow Version 2.5.2

Anything else

Every time a DagRun times out

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

erdos2n avatar Mar 23 '23 21:03 erdos2n

The dag timeouted so the dag status is marked as failed. Tasks did not finish so why should they be set to fail? https://github.com/apache/airflow/blob/3239720c6e9f8a868c6a74871be24049b367aa34/airflow/jobs/scheduler_job.py#L1302-L1315

eladkal avatar Mar 23 '23 23:03 eladkal

@eladkal Maybe they should not be set to fail, but they should also not be set to skipped. The task was not skipped, technically speaking.

An issue that has come up is that a user wants an alert for a specific task failure, so they don't want to set the on_failure_callback on the dag level. That specific task gets marked skipped on a dagrun_timeout and the on_failure_callback isn't triggered.

I believe it's worth discussing either marking these tasks that are stopped mid-run as FAILED or introducing a new state into the task instance.

I'm curious if SHUTDOWN makes more sense in this instance. It seems to fit what is occurring more than skipped.

SHUTDOWN # External request to shut down (e.g. marked failed when running)

https://github.com/apache/airflow/blob/main/airflow/utils/state.py#L42

Thoughts?

erdos2n avatar Mar 24 '23 12:03 erdos2n

Instead of setting the state to SKIPPED, I propose calling handle_failure such that the callbacks are executed.

wolfier avatar Mar 24 '23 16:03 wolfier

Hey Alan, did more digging. handle_failure is only called on failures, which means that my initial proposal of SHUTDOWN would not work, anyways. But if the remaining tasks are marked as upstream_failed then they would trigger the handle_failure callback.

So in short, adding the handle_failure callback would work, or marking downstream tasks as upstream_failed would work. Thoughts?

erdos2n avatar Mar 24 '23 16:03 erdos2n

Just to clarify if the goal is to change the current behavior from skipped to fail this is a breaking change and can not happen before Airflow 3.

Before discussing how to get it done I suggest first to discuss if this should be done. I'm not convinced setting tasks to failure when dag timeout is the desired behavior.

eladkal avatar Mar 24 '23 16:03 eladkal

The goal is to change the behavior, but not necessarily from skipped to fail, just something to trigger the handle_failure method so callbacks can exhibit (more) expected behavior from users.

erdos2n avatar Mar 24 '23 20:03 erdos2n

The goal is to change the behavior, but not necessarily from skipped to fail, just something to trigger the handle_failure method so callbacks can exhibit (more) expected behavior from users.

Since the task didn't fail, I don't see the need to run the failure callback in every stopped task, the dag failure callback is enough to handle this case, where we can check if the run failed due to timeout, and select skipped tasks in metadata to do what we need to do. WDYT?

hussein-awala avatar Mar 25 '23 14:03 hussein-awala

Well this user wants a callback if this specific task fails, so not on the dag level. Could be that we need a on skipped callback. Thoughts?

erdos2n avatar Mar 25 '23 14:03 erdos2n

Well this user wants a callback if this specific task fails, so not on the dag level. Could be that we need a on skipped callback. Thoughts?

I'm OK with adding on_skipped_callback (regardless of what we discuss here, this is probably something we should add)

eladkal avatar Apr 12 '23 18:04 eladkal

Should we scope this issue to adding on_skipped_callback ? @erdos2n is that a suitable solution for your use case?

eladkal avatar Apr 27 '23 11:04 eladkal

@erdos2n would you have an update on the last question from Elad?

pankajkoti avatar Jul 21 '23 20:07 pankajkoti

Experiencing the same issue, and it is my opinion that because the Airflow Scheduler is SIGTERM'ing running tasks, that is a legitimate reason to mark them as failed. The task was running and now it is not and it did complete successfully, that is a task failure, not a skipped task.

seanmuth avatar Jul 21 '23 20:07 seanmuth

Hello, I'm of the opinion that an on skipped callback would be a good addition.

erdos2n avatar Jul 21 '23 20:07 erdos2n

I believe the question is what does it mean when a dagrun times out.

If dagrun timeout means "I need everything to stop including the task instances" then forcing task termination is appropriate. I don't agree with setting the ending state as skipped if a task was in the running state since the task in the middle of execution.

Looking at @RNHTTR's PR, I see the logic is to mark all tasks that are unfinished to skipped.

            TaskInstanceState.SCHEDULED,
            TaskInstanceState.QUEUED,
            TaskInstanceState.RUNNING,
            TaskInstanceState.SHUTDOWN,
            TaskInstanceState.RESTARTING,
            TaskInstanceState.UP_FOR_RETRY,
            TaskInstanceState.UP_FOR_RESCHEDULE,
            TaskInstanceState.DEFERRED,

Instead, I think it should be more refined.

The scheduled and queued state should be set to skipped IF that was their first attempt (checking try number). Though this may not work for sensors that are rescheduled and are in the middle of being scheduled / queued.

The rest of the states should be set to failed because they imply the task instance was attempted. Tasks that are attempted should be failed.


It is worth noting that the PR was written and released for an Airflow version (see 2.0.0) where the active daguns are determined by task instances instead of the dagrun state, as pointed out by issues/13407, for Airflow 2.0.0. In Airflow 2.6.x, active dagruns are determined by the state of the dagrun and not the task instances states. This means that it does not matter which state the running task ends up as, skipped, failed, or even running.

Referring back to the question I posed earlier, depending on what it means when the dagrun times out, the state of a running task should reflect that definition.

wolfier avatar Jul 21 '23 22:07 wolfier

I agree with @wolfier . If a task was running, I feel, then it could proceed to Failed / Shutdown instead of Skipped. Wouldn't Skipped mean that it was never attempted or went to Running state at all?

Looking at our definitions for the states: https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#task-instances

Failed / Shutdown sounds more reasonable than the Skipped state.

pankajkoti avatar Jul 26 '23 06:07 pankajkoti

This issue has been automatically marked as stale because it has been open for 30 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

github-actions[bot] avatar Aug 26 '23 00:08 github-actions[bot]

This issue has been closed because it has not received response from the issue author.

github-actions[bot] avatar Sep 02 '23 06:09 github-actions[bot]

Can this be re-opened? We also encountered this, and were very surprised that the on_failure_callback was not fired, because it only runs on task failure, but the task that was running when the timeout was hit was skipped not failed.

First, that behavior seems wrong: if a task is taking too long and hits the dagrun_timeout, I would expect that task (as well as the DAG) to fail.

Second, @hussein-awala wrote,

I don't see the need to run the failure callback in every stopped task, the dag failure callback is enough to handle this case

But what is the "dag failure callback"? I don't see a callback like that in these docs: https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/callbacks.html

(Do you mean the sla_miss_callback? i.e. set the DAG's SLA to the same as the DAG's dagrun_timeout?)

A DAG-level failure callback would be very nice to have.

Thank you.

benbuckman avatar Sep 07 '23 20:09 benbuckman

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

github-actions[bot] avatar Sep 24 '23 00:09 github-actions[bot]

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

github-actions[bot] avatar Oct 10 '23 00:10 github-actions[bot]

This issue has been closed because it has not received response from the issue author.

github-actions[bot] avatar Oct 17 '23 00:10 github-actions[bot]

I agree with @pankajkoti

Failed / Shutdown sounds more reasonable than the Skipped state.

I think we should re-open this issue

raphaelauv avatar Feb 05 '24 09:02 raphaelauv

@pankajkoti could you please re-open the issue, thanks

raphaelauv avatar Aug 24 '24 13:08 raphaelauv

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

github-actions[bot] avatar Sep 24 '24 00:09 github-actions[bot]

no stale

raphaelauv avatar Sep 24 '24 04:09 raphaelauv

I agree that this should be reopened.
IMO skipped has ne sense for the task instance state, as it was running but was terminating before it ends cause of the dag run timeout.
Beside that, we try a workaround using the 'on_skipped_callback' to set the TI state to failed, but it seems that the callback is not executed in that scenario (airflow 2.9.3). Does someone knows why ? Thanks

jledrumics avatar Sep 24 '24 09:09 jledrumics

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

github-actions[bot] avatar Oct 26 '24 17:10 github-actions[bot]

no stale

raphaelauv avatar Oct 26 '24 17:10 raphaelauv

Related: #42005

RNHTTR avatar Nov 02 '24 21:11 RNHTTR

I agree that this should be reopened. IMO skipped has ne sense for the task instance state, as it was running but was terminating before it ends cause of the dag run timeout. Beside that, we try a workaround using the 'on_skipped_callback' to set the TI state to failed, but it seems that the callback is not executed in that scenario (airflow 2.9.3). Does someone knows why ? Thanks

We are running into the same issue too. In our case, we have our dags timeout, which can cause some tasks to skip while running; these finish with this:

[2024-11-22, 20:27:36 UTC] {taskinstance.py:3092} ERROR - Received SIGTERM. Terminating subprocesses.

We believe this means that AirflowSkipException is not being raised, so on_skip_callback will not run at all. In order to actually call on_skipped_callback, something would need to specifically raise AirflowSkipException.

Can anyone else confirm if our understanding is correct? At the end of the day, we would like a way to have `on_kill' callback function that sends out an email if any task is killed

jrmidkiff avatar Nov 22 '24 20:11 jrmidkiff