flyte icon indicating copy to clipboard operation
flyte copied to clipboard

[BUG] Workflow Failure Policy Not Working For Sub-Workflows

Open Sovietaced opened this issue 10 months ago • 3 comments

Describe the bug

The following code does not appear to let all of the maybe_fail_task calls complete when one of them fails.

@workflow()
def parent_workflow() -> str:
    return randomly_fail_workflow()


@workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)  
def randomly_fail_workflow() -> str:
    maybe_fail_task(idx=0)
    maybe_fail_task(idx=1)
    maybe_fail_task(idx=2)
    maybe_fail_task(idx=3)

    return "test"

@task(
    requests=ResourceFactory.gig_resources(1, 1, 1),
)  # type:ignore [misc]
def maybe_fail_task(idx: int) -> str:
    """Foo."""
    time.sleep(20 * float(idx))
    if idx % 2 == 0:
        raise ValueError("failed")
    print(idx)
    return str(idx)

Expected behavior

I would expect that any nodes inside of a workflow that has a WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE configured would be allowed to fail without aborting the rest of the workflow's nodes.

Additional context to reproduce

No response

Screenshots

No response

Are you sure this issue hasn't been raised already?

  • [x] Yes

Have you read the Code of Conduct?

  • [x] Yes

Sovietaced avatar Mar 13 '25 18:03 Sovietaced

#take

popojk avatar May 18 '25 07:05 popojk

Hi @Sovietaced . Currently, the failure_policy is not applicable to sub-workflows because they inherit the execution context from the parent node. As a workaround, you can use a sub-launch plan instead of a sub-workflow, since launch plans have their own independent execution contexts. Please refer to the example code below, and let me know if you have any thoughts or suggestions.

import time
from flytekit import task, workflow, LaunchPlan
from flytekit.core.workflow import WorkflowFailurePolicy


@workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)  
def randomly_fail_workflow() -> str:
    maybe_fail_task(idx=2)
    maybe_fail_task(idx=3)
    maybe_fail_task(idx=4)
    maybe_fail_task(idx=5)

    return "test"

@task
def maybe_fail_task(idx: int) -> str:
    """Foo."""
    time.sleep(20 * float(idx))
    if idx % 2 == 0:
        raise ValueError("failed")
    print(idx)
    return str(idx)

lp = LaunchPlan.get_or_create(randomly_fail_workflow)

@workflow()
def parent_workflow() -> str:
    return lp()

popojk avatar Jun 16 '25 10:06 popojk

Hi @Sovietaced . Currently, the failure_policy is not applicable to sub-workflows because they inherit the execution context from the parent node.

I wonder if it would be possible or make sense to allow for child workflow failure policies if they are non-nil.

Sovietaced avatar Jun 16 '25 14:06 Sovietaced

I don't think failure policy makes sense for subworkflows - this is because, failure policies for workflow are handled at the top layer. Subworkflows are just nodes

kumare3 avatar Jun 17 '25 22:06 kumare3