Add deferrable_timeout to inntialize _timeout_sec of TaskStateTrigger
Use execution_timeout when inntializing _timeout_sec of TaskStateTrigger
Previously, the state 'self._timeout_sec = 60' fixed timeout seconds to check if dag run exists. It caused timeout even if the dag run state is in queued or scheduled due to resource issue like hundreds of dag runs exists and they are pending transitting into running states.
So I changed to use execution_timeout instead of fixed 60 seconds. But I'm wondering if using execution_timeout makes sense so I want to get some suggestions.
I know the current logic of deferred mode of ExternalTaskSensor has some bugs. I want to address the all following issues but I'll do it step by step. This is the first step of fixing the deferred mode of ExternalTaskSensor #34204 #34205 #34207
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.
@uranusjr I don't know how to reply to above comment so I wrote a new comment.
I don't think I have to make the argument optional since BaseOperator has the default value of execution_timeout and its type is timedelta or None. You can see it in airflow/models/baseoperator.py line763 So I added None case to cover all types of execution_timeout of BaseOperator.
I changed to use 'timeout' parameter.
I think I don't have to make this parameter optional since BaseSensorOperator has default value of timeout https://github.com/apache/airflow/blob/6618c5f90d037d57e9f3bf1e90cd0712426d6caa/airflow/sensors/base.py#L138
and have validating logic to have int or float https://github.com/apache/airflow/blob/6618c5f90d037d57e9f3bf1e90cd0712426d6caa/airflow/sensors/base.py#L185-L186
@hussein-awala Hello, I changed to use timeout parameter and set default value of 60 if timeout has value of 'conf.getfloat("sensors", "default_timeout")'.
@hussein-awala hello, I changed to use timeout parameter Can I get review for change?
@hussein-awala Hello, I changed to use timeout parameter and set default value of 60 if timeout has value of 'conf.getfloat("sensors", "default_timeout")'.
By using timeout parameter you will have the same problem I explained before, where:
min(self.start_date + execution_timeout, self.trigger_timeout)
the trigger_timeout in this formula is calculated from the timeout parameter:
https://github.com/apache/airflow/blob/b75f9e880614fa0427e7d24a1817955f5de658b3/airflow/models/taskinstance.py#L2454-L2457
I suggest adding a new parameter to the trigger to override the timeout instead of using timeout param.
@hussein-awala Thank you for your feedback.
I added deferrable_timeout parameter instead of overrding timeout. but I'm wondering if this is proper naming.
Implemented execution_timeout in place of a fixed 60-second timeout (self._timeout_sec) for checking dag run existence in the ExternalTaskSensor. The previous fixed timeout caused issues when numerous dag runs were pending transition into running states. Now, with execution_timeout, the sensor dynamically adapts to varying execution times.
Seeking feedback on the use of execution_timeout and open to suggestions. Currently addressing deferred mode issues in ExternalTaskSensor step by step. This PR (Pull Request) addresses the first set of fixes for the deferred mode:
#34204 #34205 #34207 Note: Acknowledge known bugs in the current deferred mode logic and plan to address them incrementally. Refer to the Pull Request Guidelines for more details. For significant code changes, adhere to the Airflow Improvement Proposal (AIP) and ASF 3rd Party License Policy. Additionally, document backwards incompatible changes in newsfragments with appropriate naming conventions.
This might be slightly off topic for this PR but, when talking about the behaviour of sensors in general, I feel like there's a discrepancy between what a timeout does in normal mode versus deferrable.
When the timeout is reached in non-deferrable (with reschedule mode), this causes an AirflowSensorTimeout and the task fails without retrying, which is the correct behaviour IMO.
I would like to see this be consistent across ALL sensors regardless of whether it is running in deferrable mode or not.
@hussein-awala
From the behavior you mentioned, it seems that whether I use execution_timeout or timeout to set _timeout_sec, they both lead to the same termination conditions. Upon further reflection, I'm questioning the need for explicitly checking if the dag_run is in a running state using _timeout_sec. This check appears to produce an outcome that overlaps with what is already achieved through the existing timeout parameter of ExeternalTaskSensor.
I want to change the async def run method internal loop logic like following . FROM
while True:
delta = utcnow() - self.trigger_start_time
if delta.total_seconds() < self._timeout_sec:
# mypy confuses typing here
if await self.count_running_dags() == 0: # type: ignore[call-arg]
self.log.info("Waiting for DAG to start execution...")
await asyncio.sleep(self.poll_interval)
else:
yield TriggerEvent({"status": "timeout"})
return
# mypy confuses typing here
if await self.count_tasks() == len(self.execution_dates): # type: ignore[call-arg]
yield TriggerEvent({"status": "success"})
return
self.log.info("Task is still running, sleeping for %s seconds...", self.poll_interval)
await asyncio.sleep(self.poll_interval)
TO
while True:
if await self.count_running_dags() == 0: # type: ignore[call-arg]
self.log.info("Waiting for DAG to start execution...")
await asyncio.sleep(self.poll_interval)
# mypy confuses typing here
if await self.count_tasks() == len(self.execution_dates): # type: ignore[call-arg]
yield TriggerEvent({"status": "success"})
return
self.log.info("Task is still running, sleeping for %s seconds...", self.poll_interval)
await asyncio.sleep(self.poll_interval)
Instead of making immediate changes, I try adding a deprecation message in this PR. I believe this approach will allow for a smoother transition and provide users with adequate notice before the functionality is eventually removed.
If there's anything I've misunderstood, please let me know.
@hussein-awala Hello, I'm waiting for your answer. Could you recommend how to deal with this? Or If this issue is resolved by other PR, let me know if I can close this PR.
@hussein-awala
Hello, I'm waiting for your answer.
Could you recommend how to deal with this?
Or If this issue is resolved by other PR, let me know if I can close this PR.
I will take a look tonight
@hussein-awala Hello,
I understand you have a busy schedule, and I just wanted to gently remind you about the review of my pull request. You mentioned planning to review it last night, and I was wondering if you had a chance to look at it. Your guidance is greatly appreciated, and I’m more than willing to make any necessary adjustments.
Thank you very much for your time and support.
@hussein-awala Hello, I hope this message finds you well. I wanted to follow up regarding the pull request I submitted. It has been a few weeks since I last mentioned it, and I want to get your feedback to find the way.
Thank you for your time.
There has been no feedback or response for several weeks. Therefore, I will close this PR. This content may be addressed in a different PR. Thank you.