airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Trigger Rule ONE_FAILED does not work in task group with mapped tasks

Open benbuckman opened this issue 2 years ago • 17 comments

Apache Airflow version

2.7.0

What happened

I have the following DAG:

from __future__ import annotations
from datetime import datetime

from airflow.decorators import dag, task, task_group
from airflow.utils.trigger_rule import TriggerRule

@task
def get_records() -> list[str]:
    return ["a", "b", "c"]


@task
def submit_job(record: str) -> None:
    pass

@task
def fake_sensor(record: str) -> bool:
    raise RuntimeError("boo")


@task
def deliver_record(record: str) -> None:
    pass


@task(trigger_rule=TriggerRule.ONE_FAILED)
def handle_failed_delivery(record: str) -> None:
    pass


@task_group(group_id="deliver_records")
def deliver_record_task_group(record: str):
    (
        submit_job(record=record)
        >> fake_sensor(record=record)
        >> deliver_record(record=record)
        >> handle_failed_delivery(record=record)
    )

@dag(
    dag_id="demo_trigger_one_failed",
    schedule=None,
    start_date=datetime(2023, 1, 1),
)
def demo_trigger_one_failed() -> None:
    records = get_records()
    deliver_record_task_group.expand(record=records)


demo_trigger_one_failed()
  • fake_sensor is simulating a task that raises an exception. (It could be a @task.sensor raising a AirflowSensorTimeout; it doesn't matter, the behavior is the same.)
  • handle_failed_delivery's TriggerRule.ONE_FAILED means it is supposed to run whenever any task upstream fails. So when fake_sensor fails, handle_failed_delivery should run.

But this does not work. handle_failed_delivery is skipped, and (based on the UI) it's skipped very early, before it can know if the upstream tasks have completed successfully or errored.

Here's what I see, progressively (see How to reproduce below for how I got this):

started ... skipped too early ... fake sensor about to fail... ... done, didn't run
Screenshot 2023-09-01 at 3 26 49 PM Screenshot 2023-09-01 at 3 26 50 PM Screenshot 2023-09-01 at 3 26 53 PM Screenshot 2023-09-01 at 3 26 56 PM

If I remove the task group and instead do,

@dag(
    dag_id="demo_trigger_one_failed",
    schedule=None,
    start_date=datetime(2023, 1, 1),
)
def demo_trigger_one_failed() -> None:
    records = get_records()
    (
        submit_job(record=records)
        >> fake_sensor.expand(record=records)
        >> deliver_record.expand(record=records)
        >> handle_failed_delivery.expand(record=records)
    )

then it does the right thing:

started ... waiting ... ... done, triggered correctly
Screenshot 2023-09-01 at 3 46 48 PM Screenshot 2023-09-01 at 3 46 50 PM Screenshot 2023-09-01 at 3 46 53 PM

What you think should happen instead

The behavior with the task group should be the same as without the task group: the handle_failed_delivery task with trigger_rule=TriggerRule.ONE_FAILED should be run when the upstream fake_sensor task fails.

How to reproduce

  1. Put the above DAG at a local path, /tmp/dags/demo_trigger_one_failed.py.

  2. docker run -it --rm --mount type=bind,source="/tmp/dags",target=/opt/airflow/dags -p 8080:8080 apache/airflow:2.7.0-python3.10 bash

  3. In the container:

    airflow db init
    airflow users create --role Admin --username airflow --email airflow --firstname airflow --lastname airflow --password airflow
    airflow scheduler --daemon
    airflow webserver
    
  4. Open http://localhost:8080 on the host. Login with airflow / airflow. Run the DAG.

I tested this with:

  • apache/airflow:2.6.2-python3.10
  • apache/airflow:2.6.3-python3.10
  • apache/airflow:2.7.0-python3.10

Operating System

Debian GNU/Linux 11 (bullseye)

Versions of Apache Airflow Providers

n/a

Deployment

Other Docker-based deployment

Deployment details

This can be reproduced using standalone Docker images, see Repro steps above.

Anything else

I wonder if this is related to (or fixed by?) https://github.com/apache/airflow/issues/33446 -> https://github.com/apache/airflow/pull/33732 ? (The latter was "added to the Airflow 2.7.1 milestone 3 days ago." I can try to install that pre-release code in the container and see if it's fixed.) edit: nope, not fixed

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

benbuckman avatar Sep 01 '23 19:09 benbuckman

I was curious if https://github.com/apache/airflow/pull/33732 (https://github.com/apache/airflow/commit/fe27031382e2034b59a23db1c6b9bdbfef259137) fixes the same issue I'm describing here. That fix is on main.

  1. In my git clone of this repo,
git checkout main
git pull

(I'm at 3ae6b4e86fe807c00bd736c59df58733df2b9bf9)

docker build . -f Dockerfile --pull --tag airflow-trigger-rule-test:0.0.1
docker run -it --rm --mount type=bind,source="/tmp/dags",target=/opt/airflow/dags -p 8080:8080 airflow-trigger-rule-test:0.0.1 bash
  1. "In the container:" step 3 from above, run the DAG ...

Nope – it exhibits the same incorrect behavior of skipping handle_failed_delivery before the task group has finished, and not respecting the trigger_rule.

benbuckman avatar Sep 01 '23 20:09 benbuckman

Seems as though it only affects mapped tasks. That is, it runs as expected if you replace deliver_record_task_group.expand(record=records) with deliver_record_task_group(record=records).

RNHTTR avatar Sep 02 '23 22:09 RNHTTR

Curiously, the GANTT chart shows the task group has been queued for 24 hours

image

RNHTTR avatar Sep 02 '23 22:09 RNHTTR

Like @benbuckman reported, the task was skipped before submit_job or fake_sensor ran.

debug logs:

[2023-09-02T23:21:23.108+0000] {retries.py:92} DEBUG - Running SchedulerJobRunner._schedule_all_dag_runs with retries. Try 1 of 3
[2023-09-02T23:21:23.110+0000] {scheduler_job_runner.py:1485} DEBUG - DAG demo_trigger_one_failed not changed structure, skipping dagrun.verify_integrity
[2023-09-02T23:21:23.111+0000] {dagrun.py:740} DEBUG - number of tis tasks for <DagRun demo_trigger_one_failed @ 2023-09-02 23:21:21.860553+00:00: manual__2023-09-02T23:21:21.860553+00:00, state:running, queued_at: 2023-09-02 23:21:21.885385+00:00. externally triggered: True>: 6 task(s)
[2023-09-02T23:21:23.112+0000] {dagrun.py:761} DEBUG - number of scheduleable tasks for <DagRun demo_trigger_one_failed @ 2023-09-02 23:21:21.860553+00:00: manual__2023-09-02T23:21:21.860553+00:00, state:running, queued_at: 2023-09-02 23:21:21.885385+00:00. externally triggered: True>: 2 task(s)
[2023-09-02T23:21:23.116+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-09-02T23:21:23.116+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-09-02T23:21:23.116+0000] {taskinstance.py:1159} DEBUG - Dependencies all met for dep_context=None ti=<TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 [None]>
[2023-09-02T23:21:23.210+0000] {abstractoperator.py:573} DEBUG - Updated in place to become <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=0 [None]>
[2023-09-02T23:21:23.215+0000] {abstractoperator.py:598} DEBUG - Expanding TIs upserted <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=1 [None]>
[2023-09-02T23:21:23.218+0000] {abstractoperator.py:598} DEBUG - Expanding TIs upserted <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=2 [None]>
[2023-09-02T23:21:23.224+0000] {taskinstance.py:956} DEBUG - Setting task state for <TaskInstance: demo_trigger_one_failed.deliver_records.handle_failed_delivery manual__2023-09-02T23:21:21.860553+00:00 [None]> to skipped
[2023-09-02T23:21:23.224+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.handle_failed_delivery manual__2023-09-02T23:21:21.860553+00:00 [skipped]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'one_failed' requires one upstream task failure, but none were found. upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.fake_sensor'}
[2023-09-02T23:21:23.224+0000] {taskinstance.py:1149} DEBUG - Dependencies not met for <TaskInstance: demo_trigger_one_failed.deliver_records.handle_failed_delivery manual__2023-09-02T23:21:21.860553+00:00 [skipped]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'one_failed' requires one upstream task failure, but none were found. upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.fake_sensor'}
[2023-09-02T23:21:23.224+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.handle_failed_delivery manual__2023-09-02T23:21:21.860553+00:00 [skipped]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-09-02T23:21:23.224+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.handle_failed_delivery manual__2023-09-02T23:21:21.860553+00:00 [skipped]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-09-02T23:21:23.226+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=0 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.submit_job'}
[2023-09-02T23:21:23.226+0000] {taskinstance.py:1149} DEBUG - Dependencies not met for <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=0 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.submit_job'}
[2023-09-02T23:21:23.226+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=0 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-09-02T23:21:23.226+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=0 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-09-02T23:21:23.229+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=1 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.submit_job'}
[2023-09-02T23:21:23.229+0000] {taskinstance.py:1149} DEBUG - Dependencies not met for <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=1 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.submit_job'}
[2023-09-02T23:21:23.229+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=1 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-09-02T23:21:23.229+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=1 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-09-02T23:21:23.232+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=2 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.submit_job'}
[2023-09-02T23:21:23.232+0000] {taskinstance.py:1149} DEBUG - Dependencies not met for <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=2 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.submit_job'}
[2023-09-02T23:21:23.232+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=2 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-09-02T23:21:23.232+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=2 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-09-02T23:21:23.236+0000] {scheduler_job_runner.py:1512} DEBUG - Skipping SLA check for <DAG: demo_trigger_one_failed> because no tasks in DAG have SLAs
[2023-09-02T23:21:23.236+0000] {scheduler_job_runner.py:1504} DEBUG - callback is empty
[2023-09-02T23:21:23.241+0000] {scheduler_job_runner.py:414} INFO - 3 tasks up for execution:
	<TaskInstance: demo_trigger_one_failed.deliver_records.submit_job manual__2023-09-02T23:21:21.860553+00:00 map_index=0 [scheduled]>
	<TaskInstance: demo_trigger_one_failed.deliver_records.submit_job manual__2023-09-02T23:21:21.860553+00:00 map_index=1 [scheduled]>
	<TaskInstance: demo_trigger_one_failed.deliver_records.submit_job manual__2023-09-02T23:21:21.860553+00:00 map_index=2 [scheduled]>

RNHTTR avatar Sep 02 '23 23:09 RNHTTR

Seems as though it only affects mapped tasks.

Yes, thanks @RNHTTR for that clarification. I updated the issue title accordingly.

benbuckman avatar Sep 02 '23 23:09 benbuckman

@ephraimbuddy the problem is in the method get_relevant_upstream_map_indexes where when we try to get the relevant map index for upstream deliver_record and we call this method

ti.get_relevant_upstream_map_indexes(
    upstream=ti.task.dag.task_dict[upstream_id],
    ti_count=expanded_ti_count,
    session=session,
)

we call it with this values:

ti.get_relevant_upstream_map_indexes(
    upstream="deliver_records.deliver_record",
    ti_count=3, # we have 3 tis because it's a mapped task group
    session=session,
)

and this method doesn't take into account the mapped task group, so it return -1 instead of the same map index of the checked TI.

So we have two options:

  1. update get_relevant_upstream_map_indexes to make it handling mapped task groups.
  2. trying to detect that the two tasks are in a mapped task group without calling this method, in this case we can return the map index of the checked TI.

I'm already working on refactoring some queries in this class including the one which have this bug:

                task_id_counts = session.execute(
                    select(TaskInstance.task_id, func.count(TaskInstance.task_id))
                    .where(TaskInstance.dag_id == ti.dag_id, TaskInstance.run_id == ti.run_id)
                    .where(or_(*_iter_upstream_conditions(relevant_tasks=upstream_tasks)))
                    .group_by(TaskInstance.task_id)
                ).all()

hussein-awala avatar Sep 04 '23 23:09 hussein-awala

Thanks @hussein-awala for digging into the fix so quickly.

Something else that's worth looking into and fixing here, is why unit tests with DebugExecutor behave differently. Take this unit test for example – simplified again for demonstration:

import unittest
from datetime import datetime, timezone

from airflow.exceptions import BackfillUnfinished
from airflow.executors.debug_executor import DebugExecutor
from airflow.models import DagBag
from airflow.models.taskinstance import TaskInstance

from .demo_trigger_one_failed import demo_trigger_one_failed

class TestDemoDag(unittest.TestCase):
    def test_handle_failed_delivery(self):
        dagbag = DagBag(include_examples=False, safe_mode=False)
        demo_dag = dagbag.get_dag("demo_trigger_one_failed")
        now = datetime.now(timezone.utc)

        # We need to use the slow DebugExecutor (not `dag.test()`) to run this
        # b/c of https://github.com/apache/airflow/discussions/32831
        demo_dag.clear()
        with self.assertRaises(BackfillUnfinished):
            demo_dag.run(
                start_date=now,
                end_date=now,
                executor=DebugExecutor(),
                run_at_least_once=True,
                verbose=True,
                disable_retry=True,
            )

        downstream_task_ids = list(demo_dag.task_group_dict["deliver_records"].children.keys())
        print(f"downstream_task_ids: {downstream_task_ids}")

        task_instance_states: dict[str, str | None] = {}  # task_id => state

        for task_id in downstream_task_ids:
            # (demo simplified w/ hard-coded 0 for single mapped task)
            ti = TaskInstance(demo_dag.task_dict[task_id], execution_date=now, map_index=0)
            task_instance_states[task_id] = ti.current_state()

        print(f"task_instance_states: {task_instance_states}")

        self.assertEqual("success", task_instance_states["deliver_records.submit_job"])
        self.assertEqual("failed", task_instance_states["deliver_records.fake_sensor"])
        self.assertEqual("upstream_failed", task_instance_states["deliver_records.deliver_record"])

        # Test says this ran and succeeded - but in actual scheduler/UI,
        # that's not true!
        self.assertEqual("success", task_instance_states["deliver_records.handle_failed_delivery"])

Put that in a file test_demo_trigger_one_failed.py next to the file with the DAG above, and run it with python -m unittest path/to/test_demo_trigger_one_failed.py.

Note,

  • As commented inline, this uses DebugExecutor not dag.test() because the latter cannot test error cases.
  • The error in fake_sensor_task is in the DAG itself (contrived for demo); in a real test this would be stubbed.

task_instance_states at the end is shown to be:

task_instance_states: 
{'deliver_records.submit_job': 'success',
 'deliver_records.fake_sensor': 'failed',
 'deliver_records.deliver_record': 'upstream_failed',
 'deliver_records.handle_failed_delivery': 'success'
}

and the test passes, asserting that handle_failed_delivery succeeded. This is a misleading test output, because as we see above, in the real scheduler, handle_failed_delivery doesn't run at all! (Its ti.current_state() should be None not success.)

We have unit tests like this in our application, and were confident that our actual task with trigger_rule=ONE_FAILED would work correctly, and were very surprised when it broke in production.

In the process of fixing this with the "real" executors (SequentialExecutor in the simple demo above; KubernetesExecutor et al in a real production application), it would be great if DebugExecutor behaved at parity with the others, so users can rely on test coverage.

Thank you!

benbuckman avatar Sep 05 '23 02:09 benbuckman

@hussein-awala thanks for debugging, I was busy with releases the past 2 days, getting to looking at it now

ephraimbuddy avatar Sep 05 '23 11:09 ephraimbuddy

Ok. Found a fix for this but still need to reason more about it:

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 82282eb39d..31fc1bfa9b 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -2859,7 +2859,7 @@ class TaskInstance(Base, LoggingMixin):
         # and "ti_count == ancestor_ti_count" does not work, since the further
         # expansion may be of length 1.
         if not _is_further_mapped_inside(upstream, common_ancestor):
-            return ancestor_map_index
+            return abs(ancestor_map_index)
 
         # Otherwise we need a partial aggregation for values from selected task
         # instances in the ancestor's expansion context.

ephraimbuddy avatar Sep 05 '23 16:09 ephraimbuddy

Update: This case is similar to the one I raised the PR just now: https://github.com/apache/airflow/pull/34138

Can reproduce this in test and working on it

ephraimbuddy avatar Sep 06 '23 15:09 ephraimbuddy

Ok. Found a fix for this but still need to reason more about it:

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 82282eb39d..31fc1bfa9b 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -2859,7 +2859,7 @@ class TaskInstance(Base, LoggingMixin):
         # and "ti_count == ancestor_ti_count" does not work, since the further
         # expansion may be of length 1.
         if not _is_further_mapped_inside(upstream, common_ancestor):
-            return ancestor_map_index
+            return abs(ancestor_map_index)
 
         # Otherwise we need a partial aggregation for values from selected task
         # instances in the ancestor's expansion context.

Hi @uranusjr , can you chime in on this? I can't find a way to test this but this actually solved this case for mapped taskgroup with one failed. when we calculate ancestor_map_index, we multiply with self.map_index which can be -1. Are there reasons why we chose to not care if the map_index is negative or not?

ephraimbuddy avatar Sep 07 '23 11:09 ephraimbuddy

The variable comes from here:

https://github.com/apache/airflow/blob/5744b424f3ddb3efdf5c2607d13e084955907cc8/airflow/models/taskinstance.py#L2855-L2856

So there are only two scenarios abs would make a difference:

  1. Either ancestor_ti_count or ti_count is negative
  2. self.map_index is negative

The first point seems unlikely since those both count something and should be a non-negative integer. Of course it’s possible we implemented something wrong, but in that case that bug should be fixed so the two values are always either zero or positive.

The second is more viable, and the only possible negative map_index value is -1, i.e. if the task instance is not expanded. This in turn is not right since this function should only be called on a mapped task instance in the first place. A task instance with map index of -1 should not appear here. And if it does, making -1 become 1 does not make sense as a fix.

uranusjr avatar Sep 08 '23 11:09 uranusjr

Any updates on this?

benbuckman avatar Oct 26 '23 14:10 benbuckman

Any updates on this?

The PR is awaiting reviews

ephraimbuddy avatar Oct 26 '23 17:10 ephraimbuddy

I'm reopening this issue as the original PR (#34337) that should have solved it was reverted, and no other fix has been merged since then. I'd like to take a chance to improve @ephraimbuddy 's fix so issues like https://github.com/apache/airflow/issues/35541 won't reoccur.

shahar1 avatar Jun 15 '24 09:06 shahar1

I tried the patch and found an issue. If the one_failed marked task in an expanded task group, one upstream task failure will cause all one_failed marked tasks being triggered.

for example:

task1 -> taskA1
task2 -> taskA2
task3 -> taskA3
task4 -> taskA4

task2's failure will trigger taskA1/2/3/4.

mis98zb avatar Jul 23 '24 06:07 mis98zb

I've resolved the issue for version 2.X, so hopefully it will be available starting v2.10.5/v2.11.0 (whatever comes first). I leave this issue open as there are some arrangements to do before merging the corresponding PR to main (v3+).

shahar1 avatar Dec 18 '24 06:12 shahar1