Avoid resetting adopted task instances when retrying for kubernetes executor
Closes: https://github.com/apache/airflow/issues/39088
As described in https://github.com/apache/airflow/issues/39088#issuecomment-2093473579,
when an OperationalError happens (in my case, it was MySQLdb.OperationalError),
the method executor.try_adopt_task_instances(tis_to_adopt_or_reset) is retried,
and in the first attempt, all running pods have been adopted successfully, so adopt_launched_task will not be called even once which leads to a situation that tis_to_flush_by_key in the second attempt contains TIs that are already adopted.
The TIs then are flushed unnecessarily, and could be annoying as some heavy tasks get terminated and retried.
Notes
Please note that this PR only fixes the issue happening with kubernetes executor. (It might happen with other executors as well but I am only using kubernetes executor at the present and not familiar with the others)
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.
Tested on my environment, confirmed that in the second attempt, the task instances whose pods have been adopted are not flushed anymore
[2024-05-04T13:03:12.681+0000] {scheduler_job_runner.py:935} INFO - Starting the scheduler loop
[2024-05-04T13:03:12.681+0000] {scheduler_job_runner.py:1612} INFO - Adopting or resetting orphaned tasks for active dag runs
[2024-05-04T13:03:12.682+0000] {scheduler_job_runner.py:1617} INFO - Running SchedulerJob.adopt_or_reset_orphaned_tasks with retries. Try 1 of 3
....
[2024-05-04T13:03:18.691+0000] {before_sleep.py:65} INFO - Retrying <unknown> in 0.31128888434353585 seconds as it raised OperationalError: (MySQLdb.OperationalError) (2013, 'Lost connection to MySQL server during query')
[SQL: UPDATE task_instance SET queued_by_job_id=%s, updated_at=%s WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s AND task_instance.map_index = %s]
[parameters: ((10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689778), 'test__orphaned_test_dag', 'select_0', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689792), 'test__orphaned_test_dag', 'select_1', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689797), 'test__orphaned_test_dag', 'select_10', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689801), 'test__orphaned_test_dag', 'select_11', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689805), 'test__orphaned_test_dag', 'select_12', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689808), 'test__orphaned_test_dag', 'select_21', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689812), 'test__orphaned_test_dag', 'select_22', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689815), 'test__orphaned_test_dag', 'select_23', 'scheduled__2024-05-04T10:30:00+00:00', -1) ... displaying 10 of 32 total bound parameter sets ... (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689894), 'test__orphaned_test_dag', 'select_52', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689897), 'test__orphaned_test_dag', 'select_53', 'scheduled__2024-05-04T10:30:00+00:00', -1))]
(Background on this error at: https://sqlalche.me/e/14/e3q8).
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context
self.dialect.do_executemany(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 183, in do_executemany
rowcount = cursor.executemany(statement, parameters)
File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in executemany
self.rowcount = sum(self.execute(query, arg) for arg in args)
File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in <genexpr>
self.rowcount = sum(self.execute(query, arg) for arg in args)
File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 179, in execute
res = self._query(mogrified_query)
File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 330, in _query
db.query(q)
File "/usr/local/lib/python3.10/site-packages/MySQLdb/connections.py", line 261, in query
_mysql.connection.query(self, query)
MySQLdb.OperationalError: (2013, 'Lost connection to MySQL server during query')
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/airflow_src/airflow/jobs/scheduler_job_runner.py", line 1684, in adopt_or_reset_orphaned_tasks
session.flush()
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
self._flush(objects)
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
with util.safe_reraise():
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
compat.raise_(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
flush_context.execute()
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
rec.execute(self)
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
util.preloaded.orm_persistence.save_obj(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 237, in save_obj
_emit_update_statements(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 1001, in _emit_update_statements
c = connection._execute_20(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
return connection._execute_clauseelement(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
ret = self._execute_context(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context
self._handle_dbapi_exception(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception
util.raise_(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context
self.dialect.do_executemany(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 183, in do_executemany
rowcount = cursor.executemany(statement, parameters)
File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in executemany
self.rowcount = sum(self.execute(query, arg) for arg in args)
File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in <genexpr>
self.rowcount = sum(self.execute(query, arg) for arg in args)
File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 179, in execute
res = self._query(mogrified_query)
File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 330, in _query
db.query(q)
File "/usr/local/lib/python3.10/site-packages/MySQLdb/connections.py", line 261, in query
_mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (2013, 'Lost connection to MySQL server during query')
[SQL: UPDATE task_instance SET queued_by_job_id=%s, updated_at=%s WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s AND task_instance.map_index = %s]
[parameters: ((10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689778), 'test__orphaned_test_dag', 'select_0', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689792), 'test__orphaned_test_dag', 'select_1', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689797), 'test__orphaned_test_dag', 'select_10', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689801), 'test__orphaned_test_dag', 'select_11', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689805), 'test__orphaned_test_dag', 'select_12', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689808), 'test__orphaned_test_dag', 'select_21', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689812), 'test__orphaned_test_dag', 'select_22', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689815), 'test__orphaned_test_dag', 'select_23', 'scheduled__2024-05-04T10:30:00+00:00', -1) ... displaying 10 of 32 total bound parameter sets ... (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689894), 'test__orphaned_test_dag', 'select_52', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689897), 'test__orphaned_test_dag', 'select_53', 'scheduled__2024-05-04T10:30:00+00:00', -1))]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
[2024-05-04T13:03:19.007+0000] {scheduler_job_runner.py:1617} INFO - Running SchedulerJob.adopt_or_reset_orphaned_tasks with retries. Try 2 of 3
...
[2024-05-04T13:03:19.397+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_0', run_id='scheduled__2024-05-04T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-05-04T13:03:19.397+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_1', run_id='scheduled__2024-05-04T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-05-04T13:03:19.397+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_10', run_id='scheduled__2024-05-04T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-05-04T13:03:19.397+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_11', run_id='scheduled__2024-05-04T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-05-04T13:03:19.398+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_12', run_id='scheduled__2024-05-04T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-05-04T13:03:19.398+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_21', run_id='scheduled__2024-05-04T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-05-04T13:03:19.398+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_22', run_id='scheduled__2024-05-04T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-05-04T13:03:19.398+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_23', run_id='scheduled__2024-05-04T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
...
@jedcunningham @hussein-awala Please take a look at your convenience.
@jedcunningham @hussein-awala @potiuk @eladkal Hi, we are running a service based on Airflow and there are thousands of tasks running hourly. Many of them are heavy tasks and it is very annoying that every time our service is re-deployed, many of them get terminated and reset. (The details are described in https://github.com/apache/airflow/issues/39088) Could you please take a look at this and give your opinions? I would be more than happy to provide further information if needed 🙇
@jedcunningham @hussein-awala ping
Thank you all for your reviews! I have updated and tested manually again (on source code of version 2.8.4 with some debug logs) Confirmed that it is working as expected
[2024-06-05T11:57:46.698+0000] {before_sleep.py:65} INFO - Retrying <unknown> in 0.10147814315789988 seconds as it raised OperationalError: (MySQLdb.OperationalError) (2013, 'Lost connection to MySQL server during query')
[SQL: UPDATE task_instance SET queued_by_job_id=%s, updated_at=%s WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s AND task_instance.map_index = %s]
[parameters: ((12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696354), 'test__orphaned_test_dag', 'select_70', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696370), 'test__orphaned_test_dag', 'select_71', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696376), 'test__orphaned_test_dag', 'select_72', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696379), 'test__orphaned_test_dag', 'select_73', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696383), 'test__orphaned_test_dag', 'select_74', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696386), 'test__orphaned_test_dag', 'select_75', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696390), 'test__orphaned_test_dag', 'select_76', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696393), 'test__orphaned_test_dag', 'select_77', 'scheduled__2024-06-05T10:30:00+00:00', -1) ... displaying 10 of 32 total bound parameter sets ... (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696473), 'test__orphaned_test_dag', 'select_98', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696477), 'test__orphaned_test_dag', 'select_99', 'scheduled__2024-06-05T10:30:00+00:00', -1))]
(Background on this error at: https://sqlalche.me/e/14/e3q8).
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context
self.dialect.do_executemany(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 183, in do_executemany
rowcount = cursor.executemany(statement, parameters)
File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in executemany
self.rowcount = sum(self.execute(query, arg) for arg in args)
File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in <genexpr>
self.rowcount = sum(self.execute(query, arg) for arg in args)
File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 179, in execute
res = self._query(mogrified_query)
File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 330, in _query
db.query(q)
File "/usr/local/lib/python3.10/site-packages/MySQLdb/connections.py", line 261, in query
_mysql.connection.query(self, query)
MySQLdb.OperationalError: (2013, 'Lost connection to MySQL server during query')
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/airflow_src/airflow/jobs/scheduler_job_runner.py", line 1684, in adopt_or_reset_orphaned_tasks
session.flush()
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
self._flush(objects)
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
with util.safe_reraise():
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
compat.raise_(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
flush_context.execute()
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
rec.execute(self)
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
util.preloaded.orm_persistence.save_obj(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 237, in save_obj
_emit_update_statements(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 1001, in _emit_update_statements
c = connection._execute_20(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
return connection._execute_clauseelement(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
ret = self._execute_context(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context
self._handle_dbapi_exception(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception
util.raise_(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context
self.dialect.do_executemany(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 183, in do_executemany
rowcount = cursor.executemany(statement, parameters)
File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in executemany
self.rowcount = sum(self.execute(query, arg) for arg in args)
File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in <genexpr>
self.rowcount = sum(self.execute(query, arg) for arg in args)
File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 179, in execute
res = self._query(mogrified_query)
File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 330, in _query
db.query(q)
File "/usr/local/lib/python3.10/site-packages/MySQLdb/connections.py", line 261, in query
_mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (2013, 'Lost connection to MySQL server during query')
[SQL: UPDATE task_instance SET queued_by_job_id=%s, updated_at=%s WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s AND task_instance.map_index = %s]
[parameters: ((12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696354), 'test__orphaned_test_dag', 'select_70', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696370), 'test__orphaned_test_dag', 'select_71', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696376), 'test__orphaned_test_dag', 'select_72', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696379), 'test__orphaned_test_dag', 'select_73', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696383), 'test__orphaned_test_dag', 'select_74', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696386), 'test__orphaned_test_dag', 'select_75', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696390), 'test__orphaned_test_dag', 'select_76', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696393), 'test__orphaned_test_dag', 'select_77', 'scheduled__2024-06-05T10:30:00+00:00', -1) ... displaying 10 of 32 total bound parameter sets ... (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696473), 'test__orphaned_test_dag', 'select_98', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696477), 'test__orphaned_test_dag', 'select_99', 'scheduled__2024-06-05T10:30:00+00:00', -1))]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
[2024-06-05T11:57:46.805+0000] {scheduler_job_runner.py:1617} INFO - Running SchedulerJob.adopt_or_reset_orphaned_tasks with retries. Try 2 of 3
[2024-06-05T11:57:46.805+0000] {scheduler_job_runner.py:1622} INFO - Calling SchedulerJob.adopt_or_reset_orphaned_tasks method, with attempt <tenacity.AttemptManager object at 0x7f6b86100af0>
[2024-06-05T11:57:46.918+0000] {kubernetes_executor.py:554} INFO - tis_to_flush: []
....
[2024-06-05T11:57:47.207+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_70', run_id='scheduled__2024-06-05T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-06-05T11:57:47.208+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_71', run_id='scheduled__2024-06-05T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-06-05T11:57:47.208+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_72', run_id='scheduled__2024-06-05T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-06-05T11:57:47.208+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_73', run_id='scheduled__2024-06-05T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-06-05T11:57:47.208+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_74', run_id='scheduled__2024-06-05T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
...
@tanvn is it possible to cover this fix with unit tests to avoid regression?
is it possible to cover this fix with unit tests to avoid regression?
@eladkal Let me take a look!
@eladkal @Lee-W Unit test added! PTAL.