flyte icon indicating copy to clipboard operation
flyte copied to clipboard

[BUG] sync_node_executions fails for workflows with BranchNode and FailureNodes

Open cjidboon94 opened this issue 8 months ago • 9 comments

Flyte & Flytekit version

flytekit==1.15 Flyte-binary v1.15

Describe the bug

When using flytekit's FlyteRemote and calling remote.wait on an execution of a workflow that contains a BranchNode, I get the following error because of a subcall to sync_node_execution:

   execution = remote.wait(execution=execution, timeout=timeout)
../../../../../anaconda3/envs/flyte/lib/python3.10/site-packages/flytekit/remote/remote.py:2384: in wait
    execution = self.sync_execution(execution, sync_nodes=sync_nodes)
../../../../../anaconda3/envs/flyte/lib/python3.10/site-packages/flytekit/remote/remote.py:2479: in sync_execution
    node_execs[n.id.node_id] = self.sync_node_execution(n, node_mapping)  
E           AttributeError: 'FlyteBranchNode' object has no attribute 'interface'

../../../../../anaconda3/envs/flyte/lib/python3.10/site-packages/flytekit/remote/remote.py:2648: AttributeError

Similarly when using remote.wait on an execution that has FailureNodes specified and the execution fails I get the following error: ValueError: Missing node from mapping: fn0

Expected behavior

That a call to FlyteRemote.wait works for all possible workflows (including ones with FailureNodes and FlyteBranchNodes) as expected.

Additional context to reproduce

  # Trigger workflow using local code
  execution_id = run("workflow.py","workflow")
  # Observe it
  remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN)
  execution = remote.fetch_execution(name=execution_id)
  # Assert that it ran under 120 seconds. Wait throws an exception in case of timeout
  try:
    execution = remote.wait(execution=execution, timeout=datetime.timedelta(minutes=2))
  except FlyteTimeout as timeout:
     assert False, timeout
  print("Execution Error:", execution.error)
  assert execution.closure.phase == WorkflowExecutionPhase.SUCCEEDED, f"Execution failed with phase: {execution.closure.phase}"

where the workflow in workflow.py is some workflow that has either conditionals (which create a to FlyteBranchNode) or FailureNode.

Screenshots

No response

Are you sure this issue hasn't been raised already?

  • [x] Yes

Have you read the Code of Conduct?

  • [x] Yes

cjidboon94 avatar Jun 05 '25 17:06 cjidboon94

This is a known issue: FlyteRemote.wait() and sync_node_execution() fail with an AttributeError when encountering FlyteBranchNode (from conditionals) or workflows with FailureNode. The error occurs because FlyteBranchNode and FlyteGateNode do not have an interface attribute, but the code attempts to access it, leading to the exception. For FailureNode, a ValueError about a missing node mapping can also occur. These issues have been reported in the community and are not yet fully resolved in Flytekit v1.15 (issue 3628, issue 3265, issue 4398).

Currently, there is no official workaround that enables remote.wait() to work seamlessly with workflows containing these node types. Users have implemented custom wrappers with try/except to handle the exceptions, but a robust fix would require changes in Flytekit to properly handle branch and failure nodes during remote execution synchronization. Would you like more detail on the technical root cause or possible workarounds?

Sources:


This is an AI-generated response and your feedback is appreciated! Please leave a 👍 if this is helpful and 👎 if it is not.

Mention @runllm to ask follow-up questions. See runllm.com for more!

runllm[bot] avatar Jun 05 '25 17:06 runllm[bot]

#take

machichima avatar Jun 06 '25 06:06 machichima

Hi @cjidboon94 ,

I cannot reproduce the error for the BranchNode one. I use flytekit version 1.5.0 and following workflow:

import flytekit as fl
from flytekit import conditional


@fl.task
def calculate_circle_circumference(radius: float) -> float:
    return 2 * 3.14 * radius  # Task to calculate the circumference of a circle


@fl.task
def calculate_circle_area(radius: float) -> float:
    return 3.14 * radius * radius  # Task to calculate the area of a circle


@fl.workflow
def wf(radius: float = 0.5) -> float:
    return (
        conditional("shape_properties")
        .if_((radius >= 0.1) & (radius < 1.0))
        .then(calculate_circle_circumference(radius=radius))
        .else_()
        .then(calculate_circle_area(radius=radius))
    )

Then use the code similar to the one you provided for remote.wait. It succeed with no error.

Could you please provide the code you use that cause the error?

Thanks!

machichima avatar Jun 08 '25 03:06 machichima

@machichima I'll check for you. The workflow I'm using has subworkflows inside of conditionals so the combination of the two might be leading to that. Let me whip up a minimal example that is reproducible

cjidboon94 avatar Jun 08 '25 08:06 cjidboon94

@machichima This is my minimal example that reproduces the FlyteBranchNode error. It seems it has to do with that there is some task/workflow whose outputs are inputs to the conditionals If I replace the nop task by the EchoTask echo_radius the error goes away btw.

import flytekit as fl
from flytekit import conditional
from flytekit.core.task import Echo

echo_radius = Echo(name="noop", inputs={"radius":float}) 

@fl.task
def calculate_circle_circumference(radius: float) -> float:
    return 2 * 3.14 * radius  # Task to calculate the circumference of a circle


@fl.task
def calculate_circle_area(radius: float) -> float:
    return 3.14 * radius * radius  # Task to calculate the area of a circle


@fl.task
def nop(radius: float) -> float:
    return radius # Task that does nothing, effectively a no-op


@fl.workflow
def wf(radius: float, get_area:bool, get_circumference:bool):
    echoed_radius = nop(radius=radius)
    (conditional("if_area").if_(get_area.is_true())
      .then(calculate_circle_area(radius=radius))
      .else_()
      .then(echo_radius(echoed_radius))
    )
    (conditional("if_circumference").if_(get_circumference.is_true())
      .then(calculate_circle_circumference(radius=echoed_radius))
      .else_()
      .then(echo_radius(echoed_radius))
    )

cjidboon94 avatar Jun 08 '25 09:06 cjidboon94

@machichima This is my minimal example that reproduces the FlyteBranchNode error. It seems it has to do with that there is some task/workflow whose outputs are inputs to the conditionals If I replace the nop task by the EchoTask echo_radius the error goes away btw.

Thanks a lot for this! I'll look into it

machichima avatar Jun 08 '25 12:06 machichima

Hi @cjidboon94 ,

I tried your example in the newest flytekit version (1.16.0) and the BranchNode error does not occur. Could you please upgrade flytekit and give it a try?

I think the problem is because somehow the BranchNode is regard as not parent_node (if it is parent node the error will not occur). I am not sure if it is version issue, sometimes the error occurs for me and sometimes does not.

The error occurs here, where it does not distinguish BranchNode and other workflow node:

https://github.com/flyteorg/flytekit/blob/3c6b61d4396f312def03ab6ccc6a839480c16288/flytekit/remote/remote.py#L2707-L2714

machichima avatar Jun 08 '25 15:06 machichima

I tried your example in the newest flytekit version (1.16.0) and the BranchNode error does not occur. Could you please upgrade flytekit and give it a try?

I just upgraded my flytekit to version 1.16.0, but this still seems to give me an error for both my minimal example above aswell as my actual code.

cjidboon94 avatar Jun 10 '25 08:06 cjidboon94

I just upgraded my flytekit to version 1.16.0, but this still seems to give me an error for both my minimal example above aswell as my actual code.

Thank you for trying this out! I will then investigate this a bit more

machichima avatar Jun 10 '25 11:06 machichima