Undesired "<SomeOperator>.execute cannot be called outside TaskInstance!" warning
Apache Airflow version
2.9.0
If "Other Airflow 2 version" selected, which one?
No response
What happened?
Decorated operators using the classic API trigger warning message "<SomeOperator>.execute cannot be called outside TaskInstance!"
What you think should happen instead?
PR https://github.com/apache/airflow/pull/37937 introduced a new check to avoid the mixed usage of classic and decorated operators. Because a boundary condition of checking the decorators is missing, the operators cannot be decorated if a DAG uses the classic API.
Hopefully, the check should only be triggered if the operator is decorated with airflow.decorators.task
How to reproduce
Run the DAG below:
from airflow import DAG
from airflow.operators.python import PythonOperator
def deco(cls):
orig_init = cls.__init__
def new_init(self, *args, default_args=None, **kwargs):
orig_init(self, *args, **kwargs)
self.default_args = default_args
cls.__init__ = cls._apply_defaults(new_init)
return cls
@deco
class AlloyPythonOperator(PythonOperator):
def execute(self, context):
super().execute(context)
def no_ops():
pass
with DAG(
dag_id="test-dag",
catchup=False,
):
AlloyPythonOperator(
task_id="trigger-execute",
python_callable=no_ops,
)
Operating System
Ubuntu 22.04 LTS, but the issue is OS independant
Versions of Apache Airflow Providers
core
Deployment
Other Docker-based deployment
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
cc @dabla WDYT?
My 2 cents, this code use internals methods, as result it is expected that this approach could be broken on changes in airflow codebase
def deco(cls):
orig_init = cls.__init__
def new_init(self, *args, default_args=None, **kwargs):
orig_init(self, *args, **kwargs)
self.default_args = default_args
cls.__init__ = cls._apply_defaults(new_init)
return cls
WDYT
Seems like the deco method is considered as calling he execute method of the PythonOperator outside of the TaskInstance, this is the protection mechanism which would warn you when mixing task decorated tasks with native operators. But normally it should still work atm, as this only triggers a warning.
Ok think I understand, it's because the class AlloyPythonOperator is instantiated outside the class TaskInstance, due to the deco method, so the sentinel which acts as a guard isn't set and thus Airflow considers that the execute method is called outside the TaskInstance
Thanks for the investigation and feedback, @Taragolis and @dabla!
Yes, everything still works despite the warning message. I created this issue just as a "preventive" measure because @potiuk wrote in the closing comment of https://github.com/apache/airflow/pull/37937:
Having a warning is "good enough" for now. If we decide to do something more (for example allow such use to behave more "as expected") we can always add it later.
What would you recommend if we need to inject some common logic into the constructor and the execute method of around 15 different (standard) operators? Or shall I patch the TaskInsntance and BaseOperator classes to insert the __sentinel property?
Thanks for the investigation and feedback, @Taragolis and @dabla!
Yes, everything still works despite the warning message. I created this issue just as a "preventive" measure because @potiuk wrote in the closing comment of #37937:
Having a warning is "good enough" for now. If we decide to do something more (for example allow such use to behave more "as expected") we can always add it later.
What would you recommend if we need to inject some common logic into the constructor and the
executemethod of around 15 different (standard) operators? Or shall I patch theTaskInsntanceandBaseOperatorclasses to insert the__sentinelproperty?
In your example above, if you only have this "issue" of common logic with the PythonOpertor, then I would suggest you use the @task decorated method, there you could delegate to another non decorated method having all the common logic. Personally, I would not try to "mess" with native operators and start extending them.
This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.
This issue has been closed because it has not received response from the issue author.