airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Avoid resetting adopted task instances when retrying for kubernetes executor

Open tanvn opened this issue 1 year ago • 2 comments

Closes: https://github.com/apache/airflow/issues/39088

As described in https://github.com/apache/airflow/issues/39088#issuecomment-2093473579, when an OperationalError happens (in my case, it was MySQLdb.OperationalError), the method executor.try_adopt_task_instances(tis_to_adopt_or_reset) is retried, and in the first attempt, all running pods have been adopted successfully, so adopt_launched_task will not be called even once which leads to a situation that tis_to_flush_by_key in the second attempt contains TIs that are already adopted. The TIs then are flushed unnecessarily, and could be annoying as some heavy tasks get terminated and retried.

Notes

Please note that this PR only fixes the issue happening with kubernetes executor. (It might happen with other executors as well but I am only using kubernetes executor at the present and not familiar with the others)


^ Add meaningful description above Read the Pull Request Guidelines for more information. In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed. In case of a new dependency, check compliance with the ASF 3rd Party License Policy. In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

tanvn avatar May 04 '24 12:05 tanvn

Tested on my environment, confirmed that in the second attempt, the task instances whose pods have been adopted are not flushed anymore

[2024-05-04T13:03:12.681+0000] {scheduler_job_runner.py:935} INFO - Starting the scheduler loop
[2024-05-04T13:03:12.681+0000] {scheduler_job_runner.py:1612} INFO - Adopting or resetting orphaned tasks for active dag runs
[2024-05-04T13:03:12.682+0000] {scheduler_job_runner.py:1617} INFO - Running SchedulerJob.adopt_or_reset_orphaned_tasks with retries. Try 1 of 3
....
[2024-05-04T13:03:18.691+0000] {before_sleep.py:65} INFO - Retrying <unknown> in 0.31128888434353585 seconds as it raised OperationalError: (MySQLdb.OperationalError) (2013, 'Lost connection to MySQL server during query')
[SQL: UPDATE task_instance SET queued_by_job_id=%s, updated_at=%s WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s AND task_instance.map_index = %s]
[parameters: ((10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689778), 'test__orphaned_test_dag', 'select_0', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689792), 'test__orphaned_test_dag', 'select_1', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689797), 'test__orphaned_test_dag', 'select_10', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689801), 'test__orphaned_test_dag', 'select_11', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689805), 'test__orphaned_test_dag', 'select_12', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689808), 'test__orphaned_test_dag', 'select_21', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689812), 'test__orphaned_test_dag', 'select_22', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689815), 'test__orphaned_test_dag', 'select_23', 'scheduled__2024-05-04T10:30:00+00:00', -1)  ... displaying 10 of 32 total bound parameter sets ...  (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689894), 'test__orphaned_test_dag', 'select_52', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689897), 'test__orphaned_test_dag', 'select_53', 'scheduled__2024-05-04T10:30:00+00:00', -1))]
(Background on this error at: https://sqlalche.me/e/14/e3q8).
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context
    self.dialect.do_executemany(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 183, in do_executemany
    rowcount = cursor.executemany(statement, parameters)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in executemany
    self.rowcount = sum(self.execute(query, arg) for arg in args)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in <genexpr>
    self.rowcount = sum(self.execute(query, arg) for arg in args)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 179, in execute
    res = self._query(mogrified_query)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 330, in _query
    db.query(q)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/connections.py", line 261, in query
    _mysql.connection.query(self, query)
MySQLdb.OperationalError: (2013, 'Lost connection to MySQL server during query')

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/airflow_src/airflow/jobs/scheduler_job_runner.py", line 1684, in adopt_or_reset_orphaned_tasks
    session.flush()
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
    self._flush(objects)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
    with util.safe_reraise():
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
    flush_context.execute()
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
    rec.execute(self)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
    util.preloaded.orm_persistence.save_obj(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 237, in save_obj
    _emit_update_statements(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 1001, in _emit_update_statements
    c = connection._execute_20(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
    ret = self._execute_context(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context
    self.dialect.do_executemany(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 183, in do_executemany
    rowcount = cursor.executemany(statement, parameters)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in executemany
    self.rowcount = sum(self.execute(query, arg) for arg in args)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in <genexpr>
    self.rowcount = sum(self.execute(query, arg) for arg in args)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 179, in execute
    res = self._query(mogrified_query)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 330, in _query
    db.query(q)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/connections.py", line 261, in query
    _mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (2013, 'Lost connection to MySQL server during query')
[SQL: UPDATE task_instance SET queued_by_job_id=%s, updated_at=%s WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s AND task_instance.map_index = %s]
[parameters: ((10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689778), 'test__orphaned_test_dag', 'select_0', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689792), 'test__orphaned_test_dag', 'select_1', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689797), 'test__orphaned_test_dag', 'select_10', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689801), 'test__orphaned_test_dag', 'select_11', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689805), 'test__orphaned_test_dag', 'select_12', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689808), 'test__orphaned_test_dag', 'select_21', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689812), 'test__orphaned_test_dag', 'select_22', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689815), 'test__orphaned_test_dag', 'select_23', 'scheduled__2024-05-04T10:30:00+00:00', -1)  ... displaying 10 of 32 total bound parameter sets ...  (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689894), 'test__orphaned_test_dag', 'select_52', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689897), 'test__orphaned_test_dag', 'select_53', 'scheduled__2024-05-04T10:30:00+00:00', -1))]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
[2024-05-04T13:03:19.007+0000] {scheduler_job_runner.py:1617} INFO - Running SchedulerJob.adopt_or_reset_orphaned_tasks with retries. Try 2 of 3
...

[2024-05-04T13:03:19.397+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_0', run_id='scheduled__2024-05-04T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-05-04T13:03:19.397+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_1', run_id='scheduled__2024-05-04T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-05-04T13:03:19.397+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_10', run_id='scheduled__2024-05-04T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-05-04T13:03:19.397+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_11', run_id='scheduled__2024-05-04T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-05-04T13:03:19.398+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_12', run_id='scheduled__2024-05-04T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-05-04T13:03:19.398+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_21', run_id='scheduled__2024-05-04T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-05-04T13:03:19.398+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_22', run_id='scheduled__2024-05-04T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-05-04T13:03:19.398+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_23', run_id='scheduled__2024-05-04T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
...

tanvn avatar May 04 '24 13:05 tanvn

@jedcunningham @hussein-awala Please take a look at your convenience.

tanvn avatar May 05 '24 05:05 tanvn

@jedcunningham @hussein-awala @potiuk @eladkal Hi, we are running a service based on Airflow and there are thousands of tasks running hourly. Many of them are heavy tasks and it is very annoying that every time our service is re-deployed, many of them get terminated and reset. (The details are described in https://github.com/apache/airflow/issues/39088) Could you please take a look at this and give your opinions? I would be more than happy to provide further information if needed 🙇

tanvn avatar May 08 '24 06:05 tanvn

@jedcunningham @hussein-awala ping

tanvn avatar May 21 '24 05:05 tanvn

Thank you all for your reviews! I have updated and tested manually again (on source code of version 2.8.4 with some debug logs) Confirmed that it is working as expected

[2024-06-05T11:57:46.698+0000] {before_sleep.py:65} INFO - Retrying <unknown> in 0.10147814315789988 seconds as it raised OperationalError: (MySQLdb.OperationalError) (2013, 'Lost connection to MySQL server during query')
[SQL: UPDATE task_instance SET queued_by_job_id=%s, updated_at=%s WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s AND task_instance.map_index = %s]
[parameters: ((12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696354), 'test__orphaned_test_dag', 'select_70', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696370), 'test__orphaned_test_dag', 'select_71', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696376), 'test__orphaned_test_dag', 'select_72', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696379), 'test__orphaned_test_dag', 'select_73', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696383), 'test__orphaned_test_dag', 'select_74', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696386), 'test__orphaned_test_dag', 'select_75', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696390), 'test__orphaned_test_dag', 'select_76', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696393), 'test__orphaned_test_dag', 'select_77', 'scheduled__2024-06-05T10:30:00+00:00', -1)  ... displaying 10 of 32 total bound parameter sets ...  (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696473), 'test__orphaned_test_dag', 'select_98', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696477), 'test__orphaned_test_dag', 'select_99', 'scheduled__2024-06-05T10:30:00+00:00', -1))]
(Background on this error at: https://sqlalche.me/e/14/e3q8).
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context
    self.dialect.do_executemany(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 183, in do_executemany
    rowcount = cursor.executemany(statement, parameters)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in executemany
    self.rowcount = sum(self.execute(query, arg) for arg in args)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in <genexpr>
    self.rowcount = sum(self.execute(query, arg) for arg in args)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 179, in execute
    res = self._query(mogrified_query)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 330, in _query
    db.query(q)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/connections.py", line 261, in query
    _mysql.connection.query(self, query)
MySQLdb.OperationalError: (2013, 'Lost connection to MySQL server during query')

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/airflow_src/airflow/jobs/scheduler_job_runner.py", line 1684, in adopt_or_reset_orphaned_tasks
    session.flush()
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
    self._flush(objects)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
    with util.safe_reraise():
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
    flush_context.execute()
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
    rec.execute(self)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
    util.preloaded.orm_persistence.save_obj(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 237, in save_obj
    _emit_update_statements(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 1001, in _emit_update_statements
    c = connection._execute_20(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
    ret = self._execute_context(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context
    self.dialect.do_executemany(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 183, in do_executemany
    rowcount = cursor.executemany(statement, parameters)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in executemany
    self.rowcount = sum(self.execute(query, arg) for arg in args)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in <genexpr>
    self.rowcount = sum(self.execute(query, arg) for arg in args)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 179, in execute
    res = self._query(mogrified_query)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 330, in _query
    db.query(q)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/connections.py", line 261, in query
    _mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (2013, 'Lost connection to MySQL server during query')
[SQL: UPDATE task_instance SET queued_by_job_id=%s, updated_at=%s WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s AND task_instance.map_index = %s]
[parameters: ((12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696354), 'test__orphaned_test_dag', 'select_70', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696370), 'test__orphaned_test_dag', 'select_71', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696376), 'test__orphaned_test_dag', 'select_72', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696379), 'test__orphaned_test_dag', 'select_73', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696383), 'test__orphaned_test_dag', 'select_74', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696386), 'test__orphaned_test_dag', 'select_75', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696390), 'test__orphaned_test_dag', 'select_76', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696393), 'test__orphaned_test_dag', 'select_77', 'scheduled__2024-06-05T10:30:00+00:00', -1)  ... displaying 10 of 32 total bound parameter sets ...  (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696473), 'test__orphaned_test_dag', 'select_98', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696477), 'test__orphaned_test_dag', 'select_99', 'scheduled__2024-06-05T10:30:00+00:00', -1))]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
[2024-06-05T11:57:46.805+0000] {scheduler_job_runner.py:1617} INFO - Running SchedulerJob.adopt_or_reset_orphaned_tasks with retries. Try 2 of 3
[2024-06-05T11:57:46.805+0000] {scheduler_job_runner.py:1622} INFO - Calling SchedulerJob.adopt_or_reset_orphaned_tasks method, with attempt <tenacity.AttemptManager object at 0x7f6b86100af0>
[2024-06-05T11:57:46.918+0000] {kubernetes_executor.py:554} INFO - tis_to_flush: []

....

[2024-06-05T11:57:47.207+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_70', run_id='scheduled__2024-06-05T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-06-05T11:57:47.208+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_71', run_id='scheduled__2024-06-05T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-06-05T11:57:47.208+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_72', run_id='scheduled__2024-06-05T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-06-05T11:57:47.208+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_73', run_id='scheduled__2024-06-05T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-06-05T11:57:47.208+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_74', run_id='scheduled__2024-06-05T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
...

tanvn avatar Jun 05 '24 12:06 tanvn

@tanvn is it possible to cover this fix with unit tests to avoid regression?

eladkal avatar Jun 05 '24 15:06 eladkal

is it possible to cover this fix with unit tests to avoid regression?

@eladkal Let me take a look!

tanvn avatar Jun 06 '24 01:06 tanvn

@eladkal @Lee-W Unit test added! PTAL.

tanvn avatar Jun 07 '24 05:06 tanvn