[BUG] Workflow Failure Policy Not Working For Sub-Workflows
Describe the bug
The following code does not appear to let all of the maybe_fail_task calls complete when one of them fails.
@workflow()
def parent_workflow() -> str:
return randomly_fail_workflow()
@workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def randomly_fail_workflow() -> str:
maybe_fail_task(idx=0)
maybe_fail_task(idx=1)
maybe_fail_task(idx=2)
maybe_fail_task(idx=3)
return "test"
@task(
requests=ResourceFactory.gig_resources(1, 1, 1),
) # type:ignore [misc]
def maybe_fail_task(idx: int) -> str:
"""Foo."""
time.sleep(20 * float(idx))
if idx % 2 == 0:
raise ValueError("failed")
print(idx)
return str(idx)
Expected behavior
I would expect that any nodes inside of a workflow that has a WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE configured would be allowed to fail without aborting the rest of the workflow's nodes.
Additional context to reproduce
No response
Screenshots
No response
Are you sure this issue hasn't been raised already?
- [x] Yes
Have you read the Code of Conduct?
- [x] Yes
#take
Hi @Sovietaced . Currently, the failure_policy is not applicable to sub-workflows because they inherit the execution context from the parent node. As a workaround, you can use a sub-launch plan instead of a sub-workflow, since launch plans have their own independent execution contexts. Please refer to the example code below, and let me know if you have any thoughts or suggestions.
import time
from flytekit import task, workflow, LaunchPlan
from flytekit.core.workflow import WorkflowFailurePolicy
@workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def randomly_fail_workflow() -> str:
maybe_fail_task(idx=2)
maybe_fail_task(idx=3)
maybe_fail_task(idx=4)
maybe_fail_task(idx=5)
return "test"
@task
def maybe_fail_task(idx: int) -> str:
"""Foo."""
time.sleep(20 * float(idx))
if idx % 2 == 0:
raise ValueError("failed")
print(idx)
return str(idx)
lp = LaunchPlan.get_or_create(randomly_fail_workflow)
@workflow()
def parent_workflow() -> str:
return lp()
Hi @Sovietaced . Currently, the failure_policy is not applicable to sub-workflows because they inherit the execution context from the parent node.
I wonder if it would be possible or make sense to allow for child workflow failure policies if they are non-nil.
I don't think failure policy makes sense for subworkflows - this is because, failure policies for workflow are handled at the top layer. Subworkflows are just nodes