Trigger Rule ONE_FAILED does not work in task group with mapped tasks
Apache Airflow version
2.7.0
What happened
I have the following DAG:
from __future__ import annotations
from datetime import datetime
from airflow.decorators import dag, task, task_group
from airflow.utils.trigger_rule import TriggerRule
@task
def get_records() -> list[str]:
return ["a", "b", "c"]
@task
def submit_job(record: str) -> None:
pass
@task
def fake_sensor(record: str) -> bool:
raise RuntimeError("boo")
@task
def deliver_record(record: str) -> None:
pass
@task(trigger_rule=TriggerRule.ONE_FAILED)
def handle_failed_delivery(record: str) -> None:
pass
@task_group(group_id="deliver_records")
def deliver_record_task_group(record: str):
(
submit_job(record=record)
>> fake_sensor(record=record)
>> deliver_record(record=record)
>> handle_failed_delivery(record=record)
)
@dag(
dag_id="demo_trigger_one_failed",
schedule=None,
start_date=datetime(2023, 1, 1),
)
def demo_trigger_one_failed() -> None:
records = get_records()
deliver_record_task_group.expand(record=records)
demo_trigger_one_failed()
-
fake_sensoris simulating a task that raises an exception. (It could be a@task.sensorraising aAirflowSensorTimeout; it doesn't matter, the behavior is the same.) -
handle_failed_delivery'sTriggerRule.ONE_FAILEDmeans it is supposed to run whenever any task upstream fails. So whenfake_sensorfails,handle_failed_deliveryshould run.
But this does not work. handle_failed_delivery is skipped, and (based on the UI) it's skipped very early, before it can know if the upstream tasks have completed successfully or errored.
Here's what I see, progressively (see How to reproduce below for how I got this):
| started ... | skipped too early ... | fake sensor about to fail... | ... done, didn't run |
|---|---|---|---|
If I remove the task group and instead do,
@dag(
dag_id="demo_trigger_one_failed",
schedule=None,
start_date=datetime(2023, 1, 1),
)
def demo_trigger_one_failed() -> None:
records = get_records()
(
submit_job(record=records)
>> fake_sensor.expand(record=records)
>> deliver_record.expand(record=records)
>> handle_failed_delivery.expand(record=records)
)
then it does the right thing:
| started ... | waiting ... | ... done, triggered correctly |
|---|---|---|
What you think should happen instead
The behavior with the task group should be the same as without the task group: the handle_failed_delivery task with trigger_rule=TriggerRule.ONE_FAILED should be run when the upstream fake_sensor task fails.
How to reproduce
-
Put the above DAG at a local path,
/tmp/dags/demo_trigger_one_failed.py. -
docker run -it --rm --mount type=bind,source="/tmp/dags",target=/opt/airflow/dags -p 8080:8080 apache/airflow:2.7.0-python3.10 bash -
In the container:
airflow db init airflow users create --role Admin --username airflow --email airflow --firstname airflow --lastname airflow --password airflow airflow scheduler --daemon airflow webserver -
Open
http://localhost:8080on the host. Login withairflow/airflow. Run the DAG.
I tested this with:
-
apache/airflow:2.6.2-python3.10 -
apache/airflow:2.6.3-python3.10 -
apache/airflow:2.7.0-python3.10
Operating System
Debian GNU/Linux 11 (bullseye)
Versions of Apache Airflow Providers
n/a
Deployment
Other Docker-based deployment
Deployment details
This can be reproduced using standalone Docker images, see Repro steps above.
Anything else
I wonder if this is related to (or fixed by?) https://github.com/apache/airflow/issues/33446 -> https://github.com/apache/airflow/pull/33732 ? (The latter was "added to the Airflow 2.7.1 milestone 3 days ago." I can try to install that pre-release code in the container and see if it's fixed.)
edit: nope, not fixed
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
I was curious if https://github.com/apache/airflow/pull/33732 (https://github.com/apache/airflow/commit/fe27031382e2034b59a23db1c6b9bdbfef259137) fixes the same issue I'm describing here. That fix is on main.
- In my git clone of this repo,
git checkout main
git pull
(I'm at 3ae6b4e86fe807c00bd736c59df58733df2b9bf9)
docker build . -f Dockerfile --pull --tag airflow-trigger-rule-test:0.0.1
docker run -it --rm --mount type=bind,source="/tmp/dags",target=/opt/airflow/dags -p 8080:8080 airflow-trigger-rule-test:0.0.1 bash
- "In the container:" step 3 from above, run the DAG ...
Nope – it exhibits the same incorrect behavior of skipping handle_failed_delivery before the task group has finished, and not respecting the trigger_rule.
Seems as though it only affects mapped tasks. That is, it runs as expected if you replace deliver_record_task_group.expand(record=records) with deliver_record_task_group(record=records).
Curiously, the GANTT chart shows the task group has been queued for 24 hours
Like @benbuckman reported, the task was skipped before submit_job or fake_sensor ran.
debug logs:
[2023-09-02T23:21:23.108+0000] {retries.py:92} DEBUG - Running SchedulerJobRunner._schedule_all_dag_runs with retries. Try 1 of 3
[2023-09-02T23:21:23.110+0000] {scheduler_job_runner.py:1485} DEBUG - DAG demo_trigger_one_failed not changed structure, skipping dagrun.verify_integrity
[2023-09-02T23:21:23.111+0000] {dagrun.py:740} DEBUG - number of tis tasks for <DagRun demo_trigger_one_failed @ 2023-09-02 23:21:21.860553+00:00: manual__2023-09-02T23:21:21.860553+00:00, state:running, queued_at: 2023-09-02 23:21:21.885385+00:00. externally triggered: True>: 6 task(s)
[2023-09-02T23:21:23.112+0000] {dagrun.py:761} DEBUG - number of scheduleable tasks for <DagRun demo_trigger_one_failed @ 2023-09-02 23:21:21.860553+00:00: manual__2023-09-02T23:21:21.860553+00:00, state:running, queued_at: 2023-09-02 23:21:21.885385+00:00. externally triggered: True>: 2 task(s)
[2023-09-02T23:21:23.116+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-09-02T23:21:23.116+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-09-02T23:21:23.116+0000] {taskinstance.py:1159} DEBUG - Dependencies all met for dep_context=None ti=<TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 [None]>
[2023-09-02T23:21:23.210+0000] {abstractoperator.py:573} DEBUG - Updated in place to become <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=0 [None]>
[2023-09-02T23:21:23.215+0000] {abstractoperator.py:598} DEBUG - Expanding TIs upserted <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=1 [None]>
[2023-09-02T23:21:23.218+0000] {abstractoperator.py:598} DEBUG - Expanding TIs upserted <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=2 [None]>
[2023-09-02T23:21:23.224+0000] {taskinstance.py:956} DEBUG - Setting task state for <TaskInstance: demo_trigger_one_failed.deliver_records.handle_failed_delivery manual__2023-09-02T23:21:21.860553+00:00 [None]> to skipped
[2023-09-02T23:21:23.224+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.handle_failed_delivery manual__2023-09-02T23:21:21.860553+00:00 [skipped]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'one_failed' requires one upstream task failure, but none were found. upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.fake_sensor'}
[2023-09-02T23:21:23.224+0000] {taskinstance.py:1149} DEBUG - Dependencies not met for <TaskInstance: demo_trigger_one_failed.deliver_records.handle_failed_delivery manual__2023-09-02T23:21:21.860553+00:00 [skipped]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'one_failed' requires one upstream task failure, but none were found. upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.fake_sensor'}
[2023-09-02T23:21:23.224+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.handle_failed_delivery manual__2023-09-02T23:21:21.860553+00:00 [skipped]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-09-02T23:21:23.224+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.handle_failed_delivery manual__2023-09-02T23:21:21.860553+00:00 [skipped]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-09-02T23:21:23.226+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=0 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.submit_job'}
[2023-09-02T23:21:23.226+0000] {taskinstance.py:1149} DEBUG - Dependencies not met for <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=0 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.submit_job'}
[2023-09-02T23:21:23.226+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=0 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-09-02T23:21:23.226+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=0 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-09-02T23:21:23.229+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=1 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.submit_job'}
[2023-09-02T23:21:23.229+0000] {taskinstance.py:1149} DEBUG - Dependencies not met for <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=1 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.submit_job'}
[2023-09-02T23:21:23.229+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=1 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-09-02T23:21:23.229+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=1 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-09-02T23:21:23.232+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=2 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.submit_job'}
[2023-09-02T23:21:23.232+0000] {taskinstance.py:1149} DEBUG - Dependencies not met for <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=2 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.submit_job'}
[2023-09-02T23:21:23.232+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=2 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-09-02T23:21:23.232+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=2 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-09-02T23:21:23.236+0000] {scheduler_job_runner.py:1512} DEBUG - Skipping SLA check for <DAG: demo_trigger_one_failed> because no tasks in DAG have SLAs
[2023-09-02T23:21:23.236+0000] {scheduler_job_runner.py:1504} DEBUG - callback is empty
[2023-09-02T23:21:23.241+0000] {scheduler_job_runner.py:414} INFO - 3 tasks up for execution:
<TaskInstance: demo_trigger_one_failed.deliver_records.submit_job manual__2023-09-02T23:21:21.860553+00:00 map_index=0 [scheduled]>
<TaskInstance: demo_trigger_one_failed.deliver_records.submit_job manual__2023-09-02T23:21:21.860553+00:00 map_index=1 [scheduled]>
<TaskInstance: demo_trigger_one_failed.deliver_records.submit_job manual__2023-09-02T23:21:21.860553+00:00 map_index=2 [scheduled]>
Seems as though it only affects mapped tasks.
Yes, thanks @RNHTTR for that clarification. I updated the issue title accordingly.
@ephraimbuddy the problem is in the method get_relevant_upstream_map_indexes where when we try to get the relevant map index for upstream deliver_record and we call this method
ti.get_relevant_upstream_map_indexes(
upstream=ti.task.dag.task_dict[upstream_id],
ti_count=expanded_ti_count,
session=session,
)
we call it with this values:
ti.get_relevant_upstream_map_indexes(
upstream="deliver_records.deliver_record",
ti_count=3, # we have 3 tis because it's a mapped task group
session=session,
)
and this method doesn't take into account the mapped task group, so it return -1 instead of the same map index of the checked TI.
So we have two options:
- update
get_relevant_upstream_map_indexesto make it handling mapped task groups. - trying to detect that the two tasks are in a mapped task group without calling this method, in this case we can return the map index of the checked TI.
I'm already working on refactoring some queries in this class including the one which have this bug:
task_id_counts = session.execute(
select(TaskInstance.task_id, func.count(TaskInstance.task_id))
.where(TaskInstance.dag_id == ti.dag_id, TaskInstance.run_id == ti.run_id)
.where(or_(*_iter_upstream_conditions(relevant_tasks=upstream_tasks)))
.group_by(TaskInstance.task_id)
).all()
Thanks @hussein-awala for digging into the fix so quickly.
Something else that's worth looking into and fixing here, is why unit tests with DebugExecutor behave differently. Take this unit test for example – simplified again for demonstration:
import unittest
from datetime import datetime, timezone
from airflow.exceptions import BackfillUnfinished
from airflow.executors.debug_executor import DebugExecutor
from airflow.models import DagBag
from airflow.models.taskinstance import TaskInstance
from .demo_trigger_one_failed import demo_trigger_one_failed
class TestDemoDag(unittest.TestCase):
def test_handle_failed_delivery(self):
dagbag = DagBag(include_examples=False, safe_mode=False)
demo_dag = dagbag.get_dag("demo_trigger_one_failed")
now = datetime.now(timezone.utc)
# We need to use the slow DebugExecutor (not `dag.test()`) to run this
# b/c of https://github.com/apache/airflow/discussions/32831
demo_dag.clear()
with self.assertRaises(BackfillUnfinished):
demo_dag.run(
start_date=now,
end_date=now,
executor=DebugExecutor(),
run_at_least_once=True,
verbose=True,
disable_retry=True,
)
downstream_task_ids = list(demo_dag.task_group_dict["deliver_records"].children.keys())
print(f"downstream_task_ids: {downstream_task_ids}")
task_instance_states: dict[str, str | None] = {} # task_id => state
for task_id in downstream_task_ids:
# (demo simplified w/ hard-coded 0 for single mapped task)
ti = TaskInstance(demo_dag.task_dict[task_id], execution_date=now, map_index=0)
task_instance_states[task_id] = ti.current_state()
print(f"task_instance_states: {task_instance_states}")
self.assertEqual("success", task_instance_states["deliver_records.submit_job"])
self.assertEqual("failed", task_instance_states["deliver_records.fake_sensor"])
self.assertEqual("upstream_failed", task_instance_states["deliver_records.deliver_record"])
# Test says this ran and succeeded - but in actual scheduler/UI,
# that's not true!
self.assertEqual("success", task_instance_states["deliver_records.handle_failed_delivery"])
Put that in a file test_demo_trigger_one_failed.py next to the file with the DAG above, and run it with python -m unittest path/to/test_demo_trigger_one_failed.py.
Note,
- As commented inline, this uses
DebugExecutornotdag.test()because the latter cannot test error cases. - The error in
fake_sensor_taskis in the DAG itself (contrived for demo); in a real test this would be stubbed.
task_instance_states at the end is shown to be:
task_instance_states:
{'deliver_records.submit_job': 'success',
'deliver_records.fake_sensor': 'failed',
'deliver_records.deliver_record': 'upstream_failed',
'deliver_records.handle_failed_delivery': 'success'
}
and the test passes, asserting that handle_failed_delivery succeeded. This is a misleading test output, because as we see above, in the real scheduler, handle_failed_delivery doesn't run at all! (Its ti.current_state() should be None not success.)
We have unit tests like this in our application, and were confident that our actual task with trigger_rule=ONE_FAILED would work correctly, and were very surprised when it broke in production.
In the process of fixing this with the "real" executors (SequentialExecutor in the simple demo above; KubernetesExecutor et al in a real production application), it would be great if DebugExecutor behaved at parity with the others, so users can rely on test coverage.
Thank you!
@hussein-awala thanks for debugging, I was busy with releases the past 2 days, getting to looking at it now
Ok. Found a fix for this but still need to reason more about it:
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 82282eb39d..31fc1bfa9b 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -2859,7 +2859,7 @@ class TaskInstance(Base, LoggingMixin):
# and "ti_count == ancestor_ti_count" does not work, since the further
# expansion may be of length 1.
if not _is_further_mapped_inside(upstream, common_ancestor):
- return ancestor_map_index
+ return abs(ancestor_map_index)
# Otherwise we need a partial aggregation for values from selected task
# instances in the ancestor's expansion context.
Update: This case is similar to the one I raised the PR just now: https://github.com/apache/airflow/pull/34138
Can reproduce this in test and working on it
Ok. Found a fix for this but still need to reason more about it:
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 82282eb39d..31fc1bfa9b 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -2859,7 +2859,7 @@ class TaskInstance(Base, LoggingMixin): # and "ti_count == ancestor_ti_count" does not work, since the further # expansion may be of length 1. if not _is_further_mapped_inside(upstream, common_ancestor): - return ancestor_map_index + return abs(ancestor_map_index) # Otherwise we need a partial aggregation for values from selected task # instances in the ancestor's expansion context.
Hi @uranusjr , can you chime in on this? I can't find a way to test this but this actually solved this case for mapped taskgroup with one failed. when we calculate ancestor_map_index, we multiply with self.map_index which can be -1. Are there reasons why we chose to not care if the map_index is negative or not?
The variable comes from here:
https://github.com/apache/airflow/blob/5744b424f3ddb3efdf5c2607d13e084955907cc8/airflow/models/taskinstance.py#L2855-L2856
So there are only two scenarios abs would make a difference:
- Either
ancestor_ti_countorti_countis negative -
self.map_indexis negative
The first point seems unlikely since those both count something and should be a non-negative integer. Of course it’s possible we implemented something wrong, but in that case that bug should be fixed so the two values are always either zero or positive.
The second is more viable, and the only possible negative map_index value is -1, i.e. if the task instance is not expanded. This in turn is not right since this function should only be called on a mapped task instance in the first place. A task instance with map index of -1 should not appear here. And if it does, making -1 become 1 does not make sense as a fix.
Any updates on this?
Any updates on this?
The PR is awaiting reviews
I'm reopening this issue as the original PR (#34337) that should have solved it was reverted, and no other fix has been merged since then. I'd like to take a chance to improve @ephraimbuddy 's fix so issues like https://github.com/apache/airflow/issues/35541 won't reoccur.
I tried the patch and found an issue. If the one_failed marked task in an expanded task group, one upstream task failure will cause all one_failed marked tasks being triggered.
for example:
task1 -> taskA1
task2 -> taskA2
task3 -> taskA3
task4 -> taskA4
task2's failure will trigger taskA1/2/3/4.
I've resolved the issue for version 2.X, so hopefully it will be available starting v2.10.5/v2.11.0 (whatever comes first).
I leave this issue open as there are some arrangements to do before merging the corresponding PR to main (v3+).