[BUG] sync_node_executions fails for workflows with BranchNode and FailureNodes
Flyte & Flytekit version
flytekit==1.15 Flyte-binary v1.15
Describe the bug
When using flytekit's FlyteRemote and calling remote.wait on an execution of a workflow that contains a BranchNode, I get the following error because of a subcall to sync_node_execution:
execution = remote.wait(execution=execution, timeout=timeout)
../../../../../anaconda3/envs/flyte/lib/python3.10/site-packages/flytekit/remote/remote.py:2384: in wait
execution = self.sync_execution(execution, sync_nodes=sync_nodes)
../../../../../anaconda3/envs/flyte/lib/python3.10/site-packages/flytekit/remote/remote.py:2479: in sync_execution
node_execs[n.id.node_id] = self.sync_node_execution(n, node_mapping)
E AttributeError: 'FlyteBranchNode' object has no attribute 'interface'
../../../../../anaconda3/envs/flyte/lib/python3.10/site-packages/flytekit/remote/remote.py:2648: AttributeError
Similarly when using remote.wait on an execution that has FailureNodes specified and the execution fails I get the following error:
ValueError: Missing node from mapping: fn0
Expected behavior
That a call to FlyteRemote.wait works for all possible workflows (including ones with FailureNodes and FlyteBranchNodes) as expected.
Additional context to reproduce
# Trigger workflow using local code
execution_id = run("workflow.py","workflow")
# Observe it
remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN)
execution = remote.fetch_execution(name=execution_id)
# Assert that it ran under 120 seconds. Wait throws an exception in case of timeout
try:
execution = remote.wait(execution=execution, timeout=datetime.timedelta(minutes=2))
except FlyteTimeout as timeout:
assert False, timeout
print("Execution Error:", execution.error)
assert execution.closure.phase == WorkflowExecutionPhase.SUCCEEDED, f"Execution failed with phase: {execution.closure.phase}"
where the workflow in workflow.py is some workflow that has either conditionals (which create a to FlyteBranchNode) or FailureNode.
Screenshots
No response
Are you sure this issue hasn't been raised already?
- [x] Yes
Have you read the Code of Conduct?
- [x] Yes
This is a known issue: FlyteRemote.wait() and sync_node_execution() fail with an AttributeError when encountering FlyteBranchNode (from conditionals) or workflows with FailureNode. The error occurs because FlyteBranchNode and FlyteGateNode do not have an interface attribute, but the code attempts to access it, leading to the exception. For FailureNode, a ValueError about a missing node mapping can also occur. These issues have been reported in the community and are not yet fully resolved in Flytekit v1.15 (issue 3628, issue 3265, issue 4398).
Currently, there is no official workaround that enables remote.wait() to work seamlessly with workflows containing these node types. Users have implemented custom wrappers with try/except to handle the exceptions, but a robust fix would require changes in Flytekit to properly handle branch and failure nodes during remote execution synchronization. Would you like more detail on the technical root cause or possible workarounds?
Sources:
- GitHub Issue: Flytekit 'FlyteBranchNode' object has no attribute 'interface'
- GitHub Issue: FlyteRemote fetch execution with gate node
- issue 4398
This is an AI-generated response and your feedback is appreciated! Please leave a 👍 if this is helpful and 👎 if it is not.
Mention @runllm to ask follow-up questions. See runllm.com for more!
#take
Hi @cjidboon94 ,
I cannot reproduce the error for the BranchNode one. I use flytekit version 1.5.0 and following workflow:
import flytekit as fl
from flytekit import conditional
@fl.task
def calculate_circle_circumference(radius: float) -> float:
return 2 * 3.14 * radius # Task to calculate the circumference of a circle
@fl.task
def calculate_circle_area(radius: float) -> float:
return 3.14 * radius * radius # Task to calculate the area of a circle
@fl.workflow
def wf(radius: float = 0.5) -> float:
return (
conditional("shape_properties")
.if_((radius >= 0.1) & (radius < 1.0))
.then(calculate_circle_circumference(radius=radius))
.else_()
.then(calculate_circle_area(radius=radius))
)
Then use the code similar to the one you provided for remote.wait. It succeed with no error.
Could you please provide the code you use that cause the error?
Thanks!
@machichima I'll check for you. The workflow I'm using has subworkflows inside of conditionals so the combination of the two might be leading to that. Let me whip up a minimal example that is reproducible
@machichima This is my minimal example that reproduces the FlyteBranchNode error.
It seems it has to do with that there is some task/workflow whose outputs are inputs to the conditionals
If I replace the nop task by the EchoTask echo_radius the error goes away btw.
import flytekit as fl
from flytekit import conditional
from flytekit.core.task import Echo
echo_radius = Echo(name="noop", inputs={"radius":float})
@fl.task
def calculate_circle_circumference(radius: float) -> float:
return 2 * 3.14 * radius # Task to calculate the circumference of a circle
@fl.task
def calculate_circle_area(radius: float) -> float:
return 3.14 * radius * radius # Task to calculate the area of a circle
@fl.task
def nop(radius: float) -> float:
return radius # Task that does nothing, effectively a no-op
@fl.workflow
def wf(radius: float, get_area:bool, get_circumference:bool):
echoed_radius = nop(radius=radius)
(conditional("if_area").if_(get_area.is_true())
.then(calculate_circle_area(radius=radius))
.else_()
.then(echo_radius(echoed_radius))
)
(conditional("if_circumference").if_(get_circumference.is_true())
.then(calculate_circle_circumference(radius=echoed_radius))
.else_()
.then(echo_radius(echoed_radius))
)
@machichima This is my minimal example that reproduces the
FlyteBranchNodeerror. It seems it has to do with that there is some task/workflow whose outputs are inputs to the conditionals If I replace thenoptask by the EchoTaskecho_radiusthe error goes away btw.
Thanks a lot for this! I'll look into it
Hi @cjidboon94 ,
I tried your example in the newest flytekit version (1.16.0) and the BranchNode error does not occur. Could you please upgrade flytekit and give it a try?
I think the problem is because somehow the BranchNode is regard as not parent_node (if it is parent node the error will not occur). I am not sure if it is version issue, sometimes the error occurs for me and sometimes does not.
The error occurs here, where it does not distinguish BranchNode and other workflow node:
https://github.com/flyteorg/flytekit/blob/3c6b61d4396f312def03ab6ccc6a839480c16288/flytekit/remote/remote.py#L2707-L2714
I tried your example in the newest flytekit version (1.16.0) and the BranchNode error does not occur. Could you please upgrade flytekit and give it a try?
I just upgraded my flytekit to version 1.16.0, but this still seems to give me an error for both my minimal example above aswell as my actual code.
I just upgraded my flytekit to version 1.16.0, but this still seems to give me an error for both my minimal example above aswell as my actual code.
Thank you for trying this out! I will then investigate this a bit more