airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Ensuring XCom return value can be mapped for dynamically-mapped `@task_group`'s

Open jroachgolf84 opened this issue 8 months ago • 5 comments

#51109 outlined an issue where trying to dynamically map over an XCom return value using a custom key resulted in unexpected behavior. Rather than leveraging the list that was stored at that key, it would dynamically-map TaskGroups using the key-value pairs in the return value itself.

This PR addresses this by applying the same logic present in the _TaskDecorator class to _TaskGroupFactory. When attempting to map over a list stored via a key b in the dict return value of a Task t`, the following exception will be raised:

ValueError: cannot map over XCom with custom key 'b' from <Task(_PythonDecoratedOperator): t>`

Rather than the faulty logic outlined in #51109, the DAG will fail to parse. This matches the same behavior as @task. This logic was implemented for both .expand() and .expand_kwargs().

To test this, the following DAG was written. The result is now a DAG Import Error, with the stack trace below.

import pendulum
from airflow.decorators import dag, task, task_group


@dag(schedule=None, start_date=pendulum.datetime(2022, 1, 1))
def pipeline():
    @task
    def t():
        return {"b": [2, 3]}

    @task_group()
    def tg(a, b):
        pass

    tg.partial(a=1).expand(b=t()["b"])

pipeline()
Traceback (most recent call last):
  File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/mappedoperator.py", line 132, in ensure_xcomarg_return_value
    ensure_xcomarg_return_value(v)
  File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/mappedoperator.py", line 127, in ensure_xcomarg_return_value
    raise ValueError(f"cannot map over XCom with custom key {key!r} from {operator}")
ValueError: cannot map over XCom with custom key 'b' from <Task(_PythonDecoratedOperator): t>

jroachgolf84 avatar Jun 09 '25 20:06 jroachgolf84

cc: @rawwar, do you mind taking a peek at this one?

jroachgolf84 avatar Jun 10 '25 13:06 jroachgolf84

Why is the expected behavior that mapping over a custom key is forbidden?

Would it possible to add a fix suggestion (or a doc link with fix suggestions) to the error message? It could be one of the following:

  • Encouraging the user to split the task such that mapped-on output is the only output. (Workaround suggested by @jroachgolf84 )
  • Add a passthrough task for unpacking the specific output to map on. (Workaround mentioned in the issue)
  • Mapping over a task group docs (after adding a note on this topic).

Dev-iL avatar Jun 12 '25 08:06 Dev-iL

Custom keys were forbidden to reduce the workload (both design and implementation) when the feature was initially rolled out. Feel free to open an issue to discuss how it can happen and propose an implementation.

uranusjr avatar Jun 12 '25 08:06 uranusjr

From @rawwar, moving this here.

@jroachgolf84 , the issue seems to be reported for 2.x. While the PR is raised for 3.x. I'm going through the issue and will also review the PR. But, can you confirm if you verified the bug in 3.x as well? And, will you be raising a separate PR for 2.x branch?(If yes, you need to raise it to v2-11-test)

I verified this in 3.x - I will make sure to create a PR to v2-11-test.

jroachgolf84 avatar Jun 12 '25 15:06 jroachgolf84

@rawwar, a PR was opened for v2-11. Please see here: https://github.com/apache/airflow/pull/51668

jroachgolf84 avatar Jun 12 '25 18:06 jroachgolf84