Ensuring XCom return value can be mapped for dynamically-mapped `@task_group`'s
#51109 outlined an issue where trying to dynamically map over an XCom return value using a custom key resulted in unexpected behavior. Rather than leveraging the list that was stored at that key, it would dynamically-map TaskGroups using the key-value pairs in the return value itself.
This PR addresses this by applying the same logic present in the _TaskDecorator class to _TaskGroupFactory. When attempting to map over a list stored via a key b in the dict return value of a Task t`, the following exception will be raised:
ValueError: cannot map over XCom with custom key 'b' from <Task(_PythonDecoratedOperator): t>`
Rather than the faulty logic outlined in #51109, the DAG will fail to parse. This matches the same behavior as @task. This logic was implemented for both .expand() and .expand_kwargs().
To test this, the following DAG was written. The result is now a DAG Import Error, with the stack trace below.
import pendulum
from airflow.decorators import dag, task, task_group
@dag(schedule=None, start_date=pendulum.datetime(2022, 1, 1))
def pipeline():
@task
def t():
return {"b": [2, 3]}
@task_group()
def tg(a, b):
pass
tg.partial(a=1).expand(b=t()["b"])
pipeline()
Traceback (most recent call last):
File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/mappedoperator.py", line 132, in ensure_xcomarg_return_value
ensure_xcomarg_return_value(v)
File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/mappedoperator.py", line 127, in ensure_xcomarg_return_value
raise ValueError(f"cannot map over XCom with custom key {key!r} from {operator}")
ValueError: cannot map over XCom with custom key 'b' from <Task(_PythonDecoratedOperator): t>
cc: @rawwar, do you mind taking a peek at this one?
Why is the expected behavior that mapping over a custom key is forbidden?
Would it possible to add a fix suggestion (or a doc link with fix suggestions) to the error message? It could be one of the following:
- Encouraging the user to split the task such that mapped-on output is the only output. (Workaround suggested by @jroachgolf84 )
- Add a passthrough task for unpacking the specific output to map on. (Workaround mentioned in the issue)
- Mapping over a task group docs (after adding a note on this topic).
Custom keys were forbidden to reduce the workload (both design and implementation) when the feature was initially rolled out. Feel free to open an issue to discuss how it can happen and propose an implementation.
From @rawwar, moving this here.
@jroachgolf84 , the issue seems to be reported for 2.x. While the PR is raised for 3.x. I'm going through the issue and will also review the PR. But, can you confirm if you verified the bug in 3.x as well? And, will you be raising a separate PR for 2.x branch?(If yes, you need to raise it to v2-11-test)
I verified this in 3.x - I will make sure to create a PR to v2-11-test.
@rawwar, a PR was opened for v2-11. Please see here: https://github.com/apache/airflow/pull/51668