airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Variable.get inside of a custom Timetable breaks the Scheduler

Open Gollum999 opened this issue 3 years ago • 12 comments

Apache Airflow version

2.3.4

What happened

If you try to use Variable.get from inside of a custom Timetable, the Scheduler will break with errors like:

 scheduler | [2022-09-20 10:19:36,104] {variable.py:269} ERROR - Unable to retrieve variable from secrets backend (MetastoreBackend). Checking subsequent secrets backend.
 scheduler | Traceback (most recent call last):
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/variable.py", line 265, in get_variable_from_secrets
 scheduler | var_val = secrets_backend.get_variable(key=key)
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 71, in wrapper
 scheduler | return func(*args, session=session, **kwargs)
 scheduler | File "/opt/conda/envs/production/lib/python3.9/contextlib.py", line 126, in __exit__
 scheduler | next(self.gen)
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 33, in create_session
 scheduler | session.commit()
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 1435, in commit
 scheduler | self._transaction.commit(_to_root=self.future)
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 829, in commit
 scheduler | self._prepare_impl()
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 797, in _prepare_impl
 scheduler | self.session.dispatch.before_commit(self.session)
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/event/attr.py", line 343, in __call__
 scheduler | fn(*args, **kw)
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/sqlalchemy.py", line 341, in _validate_commit
 scheduler | raise RuntimeError("UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!")
 scheduler | RuntimeError: UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!
 scheduler | [2022-09-20 10:19:36,105] {plugins_manager.py:264} ERROR - Failed to import plugin /home/tsanders/airflow_standalone_sqlite/plugins/custom_timetable.py
 scheduler | Traceback (most recent call last):
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/plugins_manager.py", line 256, in load_plugins_from_plugin_directory
 scheduler | loader.exec_module(mod)
 scheduler | File "<frozen importlib._bootstrap_external>", line 850, in exec_module
 scheduler | File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
 scheduler | File "/home/tsanders/airflow_standalone_sqlite/plugins/custom_timetable.py", line 9, in <module>
 scheduler | class CustomTimetable(CronDataIntervalTimetable):
 scheduler | File "/home/tsanders/airflow_standalone_sqlite/plugins/custom_timetable.py", line 10, in CustomTimetable
 scheduler | def __init__(self, *args, something=Variable.get('something'), **kwargs):
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/variable.py", line 138, in get
 scheduler | raise KeyError(f'Variable {key} does not exist')
 scheduler | KeyError: 'Variable something does not exist'
 scheduler | [2022-09-20 10:19:36,179] {scheduler_job.py:769} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
 scheduler | Traceback (most recent call last):
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 752, in _execute
 scheduler | self._run_scheduler_loop()
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 840, in _run_scheduler_loop
 scheduler | num_queued_tis = self._do_scheduling(session)
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 914, in _do_scheduling
 scheduler | self._start_queued_dagruns(session)
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 1086, in _start_queued_dagruns
 scheduler | dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
 scheduler | return func(*args, **kwargs)
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dagbag.py", line 179, in get_dag
 scheduler | self._add_dag_from_db(dag_id=dag_id, session=session)
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dagbag.py", line 254, in _add_dag_from_db
 scheduler | dag = row.dag
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/serialized_dag.py", line 209, in dag
 scheduler | dag = SerializedDAG.from_dict(self.data)  # type: Any
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 1099, in from_dict
 scheduler | return cls.deserialize_dag(serialized_obj['dag'])
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 1021, in deserialize_dag
 scheduler | v = _decode_timetable(v)
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 189, in _decode_timetable
 scheduler | raise _TimetableNotRegistered(importable_string)
 scheduler | airflow.serialization.serialized_objects._TimetableNotRegistered: Timetable class 'custom_timetable.CustomTimetable' is not registered

Note that in this case, the Variable in question does exist, and the KeyError is a red herring.

If you add a default_var, things seem to work, though I wouldn't trust it since there is clearly some context where it will fail to load the Variable and will always fall back to the default. Additionally, this still raises the UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS! error, which I assume is a bad thing.

What you think should happen instead

I'm not sure whether or not this should be allowed. In my case, I was able to work around the error by making all Timetable initializer args required (no default values) and pulling the Variable.get out into a wrapper function.

How to reproduce

custom_timetable.py

#!/usr/bin/env python3
from __future__ import annotations

from airflow.models.variable import Variable
from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.interval import CronDataIntervalTimetable


class CustomTimetable(CronDataIntervalTimetable):
    def __init__(self, *args, something=Variable.get('something'), **kwargs):
        self._something = something
        super().__init__(*args, **kwargs)


class CustomTimetablePlugin(AirflowPlugin):
    name = 'custom_timetable_plugin'
    timetables = [CustomTimetable]

test_custom_timetable.py

#!/usr/bin/env python3
import datetime

import pendulum
from airflow.decorators import dag, task
from custom_timetable import CustomTimetable


@dag(
    start_date=datetime.datetime(2022, 9, 19),
    timetable=CustomTimetable(cron='0 0 * * *', timezone=pendulum.UTC),
)
def test_custom_timetable():
    @task
    def a_task():
        print('hello')

    a_task()


dag = test_custom_timetable()


if __name__ == '__main__':
    dag.cli()
airflow variables set something foo
airflow dags trigger test_custom_timetable

Operating System

CentOS Stream 8

Versions of Apache Airflow Providers

None

Deployment

Other

Deployment details

I was able to reproduce this with:

  • Standalone mode, SQLite DB, SequentialExecutor
  • Self-hosted deployment, Postgres DB, CeleryExecutor

Anything else

Related: #21895

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

Gollum999 avatar Sep 20 '22 16:09 Gollum999

This happens because we load timetable plugins in the scheduler process(decoding timetable during deserialization). So with Variable.get, we are using a new session different from the scheduler session. When that new session commits, it throws an error because of the different sessions. It doesn't seem like a simple fix but I'm taking a look

ephraimbuddy avatar Sep 21 '22 14:09 ephraimbuddy

This happens because we load timetable plugins in the scheduler process(decoding timetable during deserialization). So with Variable.get, we are using a new session different from the scheduler session. When that new session commits, it throws an error because of the different sessions. It doesn't seem like a simple fix but I'm taking a look

Storing a session in ThreadLocal and checkig if it is set there, might be a possibillity

potiuk avatar Sep 21 '22 14:09 potiuk

Storing a session in ThreadLocal and checkig if it is set there, might be a possibillity

I suspect that users may create their own session object in a timetable class by using airflow's creation_session or provide_session, in that case, this issue will come up again, ThreadLocal will not solve it?

ephraimbuddy avatar Sep 21 '22 14:09 ephraimbuddy

I suspect that users may create their own session object in a timetable class by using airflow's creation_session or provide_session, in that case, this issue will come up again, ThreadLocal will not solve it?

Indeed. I doubt user would want to create their own session, but you are right we would have to handle it in all "user" facing APIs (which we currently have no definition of). And that would exclude "saving" anythong anyway as that woudl imply a commit So if we can find a way to not create the custom timetable in commit-protected section, that would have been way better.

potiuk avatar Sep 21 '22 15:09 potiuk

The "fix" for this is to be able to pass the session down to airflow.secrets.metastore.get_variable, but that isn't currently possible when calling Variable.

The underlying issue here is that @provide_session does the wrong things; or looking deeper session handling in Airflow is too magic.

ashb avatar Sep 21 '22 16:09 ashb

Something like this might fix the problem, but I've not thought about the wider implications:

diff --git a/airflow/models/variable.py b/airflow/models/variable.py
index 83ed310011..ef1d316432 100644
--- a/airflow/models/variable.py
+++ b/airflow/models/variable.py
@@ -125,6 +125,7 @@ class Variable(Base, LoggingMixin):
         key: str,
         default_var: Any = __NO_DEFAULT_SENTINEL,
         deserialize_json: bool = False,
+        **kwargs,
     ) -> Any:
         """
         Gets a value for an Airflow Variable Key
@@ -133,7 +134,7 @@ class Variable(Base, LoggingMixin):
         :param default_var: Default value of the Variable if the Variable doesn't exist
         :param deserialize_json: Deserialize the value to a Python dict
         """
-        var_val = Variable.get_variable_from_secrets(key=key)
+        var_val = Variable.get_variable_from_secrets(key=key, **kwargs)
         if var_val is None:
             if default_var is not cls.__NO_DEFAULT_SENTINEL:
                 return default_var
@@ -256,7 +257,7 @@ class Variable(Base, LoggingMixin):
             return None
 
     @staticmethod
-    def get_variable_from_secrets(key: str) -> str | None:
+    def get_variable_from_secrets(key: str, **backend_kwargs) -> str | None:
         """
         Get Airflow Variable by iterating over all Secret Backends.
 
@@ -265,7 +266,7 @@ class Variable(Base, LoggingMixin):
         """
         for secrets_backend in ensure_secrets_loaded():
             try:
-                var_val = secrets_backend.get_variable(key=key)
+                var_val = secrets_backend.get_variable(key=key, **backend_kwargs)
                 if var_val is not None:
                     return var_val
             except Exception:

ashb avatar Sep 21 '22 16:09 ashb

Storing a session in ThreadLocal and checkig if it is set there, might be a possibillity

That's basically what SQLA does for us. That's why when @provide_session commits the main session in the scheduler catches the commit and throws the error - cos it's the same session object!

ashb avatar Sep 21 '22 17:09 ashb

Yeah . I thought about creating a new sesision and storing it additinally in thread local (independently of our main session or replacing it temporarily)

potiuk avatar Sep 21 '22 17:09 potiuk

@ashb , I tried your fix and it doesn't seem to fix it. The problem as I understand it is that during decoding of timetable, we try to load the plugin , the error occurs here which is as a result of RuntimeError(UNEXPECTED COMMIT) arising from calling Variable.get('something') without supplying the scheduler session. In this case, the Variable.get('something') used a session created by it's provide_session here

ephraimbuddy avatar Sep 21 '22 17:09 ephraimbuddy

Sorry, yes, my diff along with passing the session to Variable.get`. Let me think how to get that ...

Oh it's quite easy. To get access to the current session airflow.settings.Session() -- that'll return the thread-local active session, and that coupled with previous fix means it won't try to commit.

ashb avatar Sep 21 '22 17:09 ashb

Same issue/discussion in https://github.com/apache/airflow/discussions/26533

potiuk avatar Sep 21 '22 21:09 potiuk

Sorry, yes, my diff along with passing the session to Variable.get`. Let me think how to get that ...

Oh it's quite easy. To get access to the current session airflow.settings.Session() -- that'll return the thread-local active session, and that coupled with previous fix means it won't try to commit.

It seems to work without needing to apply the previous diff but I'm still testing

ephraimbuddy avatar Sep 22 '22 08:09 ephraimbuddy

The solution to use airflow.settings.Session() works fine, however, once the plugin fails to register during webserver startup, e.g say the variable is not yet added, then the webserver must be restarted even after adding the variable.

ephraimbuddy avatar Sep 23 '22 07:09 ephraimbuddy

I’d say this should be considered an user error. We can add a note in the documentation explaining that value retrievals (not just Variable but all db and config accesses in general) should be done lazily. This is just how Python works.

uranusjr avatar Sep 23 '22 07:09 uranusjr

I’d say this should be considered an user error. We can add a note in the documentation explaining that value retrievals (not just Variable but all db and config accesses in general) should be done lazily. This is just how Python works.

I tend to agree with @uranusjr with one twist. We should make it in the way that this kind of error is better handled - the error message should explicitly say that you should not make any database operation inside of the Custom Timetable. Adding documentation is not enough. People won't read the documentation and we can expect more issues like that popping up. I think ti shoudl be possible to catch this error and turn it into more explanatory message.

potiuk avatar Sep 23 '22 09:09 potiuk

I’d say this should be considered an user error. We can add a note in the documentation explaining that value retrievals (not just Variable but all db and config accesses in general) should be done lazily. This is just how Python works.

I tend to agree with @uranusjr with one twist. We should make it in the way that this kind of error is better handled - the error message should explicitly say that you should not make any database operation inside of the Custom Timetable. Adding documentation is not enough. People won't read the documentation and we can expect more issues like that popping up. I think ti shoudl be possible to catch this error and turn it into more explanatory message.

Opps. Reading this now.

ephraimbuddy avatar Sep 24 '22 06:09 ephraimbuddy