Variable.get inside of a custom Timetable breaks the Scheduler
Apache Airflow version
2.3.4
What happened
If you try to use Variable.get from inside of a custom Timetable, the Scheduler will break with errors like:
scheduler | [2022-09-20 10:19:36,104] {variable.py:269} ERROR - Unable to retrieve variable from secrets backend (MetastoreBackend). Checking subsequent secrets backend.
scheduler | Traceback (most recent call last):
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/variable.py", line 265, in get_variable_from_secrets
scheduler | var_val = secrets_backend.get_variable(key=key)
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 71, in wrapper
scheduler | return func(*args, session=session, **kwargs)
scheduler | File "/opt/conda/envs/production/lib/python3.9/contextlib.py", line 126, in __exit__
scheduler | next(self.gen)
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 33, in create_session
scheduler | session.commit()
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 1435, in commit
scheduler | self._transaction.commit(_to_root=self.future)
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 829, in commit
scheduler | self._prepare_impl()
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 797, in _prepare_impl
scheduler | self.session.dispatch.before_commit(self.session)
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/event/attr.py", line 343, in __call__
scheduler | fn(*args, **kw)
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/sqlalchemy.py", line 341, in _validate_commit
scheduler | raise RuntimeError("UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!")
scheduler | RuntimeError: UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!
scheduler | [2022-09-20 10:19:36,105] {plugins_manager.py:264} ERROR - Failed to import plugin /home/tsanders/airflow_standalone_sqlite/plugins/custom_timetable.py
scheduler | Traceback (most recent call last):
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/plugins_manager.py", line 256, in load_plugins_from_plugin_directory
scheduler | loader.exec_module(mod)
scheduler | File "<frozen importlib._bootstrap_external>", line 850, in exec_module
scheduler | File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
scheduler | File "/home/tsanders/airflow_standalone_sqlite/plugins/custom_timetable.py", line 9, in <module>
scheduler | class CustomTimetable(CronDataIntervalTimetable):
scheduler | File "/home/tsanders/airflow_standalone_sqlite/plugins/custom_timetable.py", line 10, in CustomTimetable
scheduler | def __init__(self, *args, something=Variable.get('something'), **kwargs):
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/variable.py", line 138, in get
scheduler | raise KeyError(f'Variable {key} does not exist')
scheduler | KeyError: 'Variable something does not exist'
scheduler | [2022-09-20 10:19:36,179] {scheduler_job.py:769} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
scheduler | Traceback (most recent call last):
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 752, in _execute
scheduler | self._run_scheduler_loop()
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 840, in _run_scheduler_loop
scheduler | num_queued_tis = self._do_scheduling(session)
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 914, in _do_scheduling
scheduler | self._start_queued_dagruns(session)
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 1086, in _start_queued_dagruns
scheduler | dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
scheduler | return func(*args, **kwargs)
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dagbag.py", line 179, in get_dag
scheduler | self._add_dag_from_db(dag_id=dag_id, session=session)
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dagbag.py", line 254, in _add_dag_from_db
scheduler | dag = row.dag
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/serialized_dag.py", line 209, in dag
scheduler | dag = SerializedDAG.from_dict(self.data) # type: Any
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 1099, in from_dict
scheduler | return cls.deserialize_dag(serialized_obj['dag'])
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 1021, in deserialize_dag
scheduler | v = _decode_timetable(v)
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 189, in _decode_timetable
scheduler | raise _TimetableNotRegistered(importable_string)
scheduler | airflow.serialization.serialized_objects._TimetableNotRegistered: Timetable class 'custom_timetable.CustomTimetable' is not registered
Note that in this case, the Variable in question does exist, and the KeyError is a red herring.
If you add a default_var, things seem to work, though I wouldn't trust it since there is clearly some context where it will fail to load the Variable and will always fall back to the default. Additionally, this still raises the UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS! error, which I assume is a bad thing.
What you think should happen instead
I'm not sure whether or not this should be allowed. In my case, I was able to work around the error by making all Timetable initializer args required (no default values) and pulling the Variable.get out into a wrapper function.
How to reproduce
custom_timetable.py
#!/usr/bin/env python3
from __future__ import annotations
from airflow.models.variable import Variable
from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.interval import CronDataIntervalTimetable
class CustomTimetable(CronDataIntervalTimetable):
def __init__(self, *args, something=Variable.get('something'), **kwargs):
self._something = something
super().__init__(*args, **kwargs)
class CustomTimetablePlugin(AirflowPlugin):
name = 'custom_timetable_plugin'
timetables = [CustomTimetable]
test_custom_timetable.py
#!/usr/bin/env python3
import datetime
import pendulum
from airflow.decorators import dag, task
from custom_timetable import CustomTimetable
@dag(
start_date=datetime.datetime(2022, 9, 19),
timetable=CustomTimetable(cron='0 0 * * *', timezone=pendulum.UTC),
)
def test_custom_timetable():
@task
def a_task():
print('hello')
a_task()
dag = test_custom_timetable()
if __name__ == '__main__':
dag.cli()
airflow variables set something foo
airflow dags trigger test_custom_timetable
Operating System
CentOS Stream 8
Versions of Apache Airflow Providers
None
Deployment
Other
Deployment details
I was able to reproduce this with:
- Standalone mode, SQLite DB, SequentialExecutor
- Self-hosted deployment, Postgres DB, CeleryExecutor
Anything else
Related: #21895
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
This happens because we load timetable plugins in the scheduler process(decoding timetable during deserialization).
So with Variable.get, we are using a new session different from the scheduler session. When that new session commits, it throws an error because of the different sessions.
It doesn't seem like a simple fix but I'm taking a look
This happens because we load timetable plugins in the scheduler process(decoding timetable during deserialization). So with
Variable.get, we are using a new session different from the scheduler session. When that new session commits, it throws an error because of the different sessions. It doesn't seem like a simple fix but I'm taking a look
Storing a session in ThreadLocal and checkig if it is set there, might be a possibillity
Storing a session in ThreadLocal and checkig if it is set there, might be a possibillity
I suspect that users may create their own session object in a timetable class by using airflow's creation_session or provide_session, in that case, this issue will come up again, ThreadLocal will not solve it?
I suspect that users may create their own
sessionobject in a timetable class by using airflow'screation_sessionorprovide_session, in that case, this issue will come up again,ThreadLocalwill not solve it?
Indeed. I doubt user would want to create their own session, but you are right we would have to handle it in all "user" facing APIs (which we currently have no definition of). And that would exclude "saving" anythong anyway as that woudl imply a commit So if we can find a way to not create the custom timetable in commit-protected section, that would have been way better.
The "fix" for this is to be able to pass the session down to airflow.secrets.metastore.get_variable, but that isn't currently possible when calling Variable.
The underlying issue here is that @provide_session does the wrong things; or looking deeper session handling in Airflow is too magic.
Something like this might fix the problem, but I've not thought about the wider implications:
diff --git a/airflow/models/variable.py b/airflow/models/variable.py
index 83ed310011..ef1d316432 100644
--- a/airflow/models/variable.py
+++ b/airflow/models/variable.py
@@ -125,6 +125,7 @@ class Variable(Base, LoggingMixin):
key: str,
default_var: Any = __NO_DEFAULT_SENTINEL,
deserialize_json: bool = False,
+ **kwargs,
) -> Any:
"""
Gets a value for an Airflow Variable Key
@@ -133,7 +134,7 @@ class Variable(Base, LoggingMixin):
:param default_var: Default value of the Variable if the Variable doesn't exist
:param deserialize_json: Deserialize the value to a Python dict
"""
- var_val = Variable.get_variable_from_secrets(key=key)
+ var_val = Variable.get_variable_from_secrets(key=key, **kwargs)
if var_val is None:
if default_var is not cls.__NO_DEFAULT_SENTINEL:
return default_var
@@ -256,7 +257,7 @@ class Variable(Base, LoggingMixin):
return None
@staticmethod
- def get_variable_from_secrets(key: str) -> str | None:
+ def get_variable_from_secrets(key: str, **backend_kwargs) -> str | None:
"""
Get Airflow Variable by iterating over all Secret Backends.
@@ -265,7 +266,7 @@ class Variable(Base, LoggingMixin):
"""
for secrets_backend in ensure_secrets_loaded():
try:
- var_val = secrets_backend.get_variable(key=key)
+ var_val = secrets_backend.get_variable(key=key, **backend_kwargs)
if var_val is not None:
return var_val
except Exception:
Storing a session in ThreadLocal and checkig if it is set there, might be a possibillity
That's basically what SQLA does for us. That's why when @provide_session commits the main session in the scheduler catches the commit and throws the error - cos it's the same session object!
Yeah . I thought about creating a new sesision and storing it additinally in thread local (independently of our main session or replacing it temporarily)
@ashb , I tried your fix and it doesn't seem to fix it. The problem as I understand it is that during decoding of timetable, we try to load the plugin , the error occurs here which is as a result of RuntimeError(UNEXPECTED COMMIT) arising from calling Variable.get('something') without supplying the scheduler session. In this case, the Variable.get('something') used a session created by it's provide_session here
Sorry, yes, my diff along with passing the session to Variable.get`. Let me think how to get that ...
Oh it's quite easy. To get access to the current session airflow.settings.Session() -- that'll return the thread-local active session, and that coupled with previous fix means it won't try to commit.
Same issue/discussion in https://github.com/apache/airflow/discussions/26533
Sorry, yes, my diff along with passing the session to Variable.get`. Let me think how to get that ...
Oh it's quite easy. To get access to the current session
airflow.settings.Session()-- that'll return the thread-local active session, and that coupled with previous fix means it won't try to commit.
It seems to work without needing to apply the previous diff but I'm still testing
The solution to use airflow.settings.Session() works fine, however, once the plugin fails to register during webserver startup, e.g say the variable is not yet added, then the webserver must be restarted even after adding the variable.
I’d say this should be considered an user error. We can add a note in the documentation explaining that value retrievals (not just Variable but all db and config accesses in general) should be done lazily. This is just how Python works.
I’d say this should be considered an user error. We can add a note in the documentation explaining that value retrievals (not just
Variablebut all db and config accesses in general) should be done lazily. This is just how Python works.
I tend to agree with @uranusjr with one twist. We should make it in the way that this kind of error is better handled - the error message should explicitly say that you should not make any database operation inside of the Custom Timetable. Adding documentation is not enough. People won't read the documentation and we can expect more issues like that popping up. I think ti shoudl be possible to catch this error and turn it into more explanatory message.
I’d say this should be considered an user error. We can add a note in the documentation explaining that value retrievals (not just
Variablebut all db and config accesses in general) should be done lazily. This is just how Python works.I tend to agree with @uranusjr with one twist. We should make it in the way that this kind of error is better handled - the error message should explicitly say that you should not make any database operation inside of the Custom Timetable. Adding documentation is not enough. People won't read the documentation and we can expect more issues like that popping up. I think ti shoudl be possible to catch this error and turn it into more explanatory message.
Opps. Reading this now.