airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Add deferrable_timeout to inntialize _timeout_sec of TaskStateTrigger

Open Xiroo opened this issue 2 years ago • 13 comments

Use execution_timeout when inntializing _timeout_sec of TaskStateTrigger

Previously, the state 'self._timeout_sec = 60' fixed timeout seconds to check if dag run exists. It caused timeout even if the dag run state is in queued or scheduled due to resource issue like hundreds of dag runs exists and they are pending transitting into running states.

So I changed to use execution_timeout instead of fixed 60 seconds. But I'm wondering if using execution_timeout makes sense so I want to get some suggestions.

I know the current logic of deferred mode of ExternalTaskSensor has some bugs. I want to address the all following issues but I'll do it step by step. This is the first step of fixing the deferred mode of ExternalTaskSensor #34204 #34205 #34207


^ Add meaningful description above Read the Pull Request Guidelines for more information. In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed. In case of a new dependency, check compliance with the ASF 3rd Party License Policy. In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

Xiroo avatar Oct 02 '23 10:10 Xiroo

@uranusjr I don't know how to reply to above comment so I wrote a new comment.

I don't think I have to make the argument optional since BaseOperator has the default value of execution_timeout and its type is timedelta or None. You can see it in airflow/models/baseoperator.py line763 So I added None case to cover all types of execution_timeout of BaseOperator.

Xiroo avatar Oct 02 '23 14:10 Xiroo

I changed to use 'timeout' parameter.

I think I don't have to make this parameter optional since BaseSensorOperator has default value of timeout https://github.com/apache/airflow/blob/6618c5f90d037d57e9f3bf1e90cd0712426d6caa/airflow/sensors/base.py#L138

and have validating logic to have int or float https://github.com/apache/airflow/blob/6618c5f90d037d57e9f3bf1e90cd0712426d6caa/airflow/sensors/base.py#L185-L186

Xiroo avatar Oct 02 '23 17:10 Xiroo

@hussein-awala Hello, I changed to use timeout parameter and set default value of 60 if timeout has value of 'conf.getfloat("sensors", "default_timeout")'.

Xiroo avatar Oct 17 '23 12:10 Xiroo

@hussein-awala hello, I changed to use timeout parameter Can I get review for change?

Xiroo avatar Oct 25 '23 12:10 Xiroo

@hussein-awala Hello, I changed to use timeout parameter and set default value of 60 if timeout has value of 'conf.getfloat("sensors", "default_timeout")'.

By using timeout parameter you will have the same problem I explained before, where:

min(self.start_date + execution_timeout, self.trigger_timeout) 

the trigger_timeout in this formula is calculated from the timeout parameter: https://github.com/apache/airflow/blob/b75f9e880614fa0427e7d24a1817955f5de658b3/airflow/models/taskinstance.py#L2454-L2457

I suggest adding a new parameter to the trigger to override the timeout instead of using timeout param.

hussein-awala avatar Oct 27 '23 17:10 hussein-awala

@hussein-awala Thank you for your feedback.

I added deferrable_timeout parameter instead of overrding timeout. but I'm wondering if this is proper naming.

Xiroo avatar Oct 31 '23 15:10 Xiroo

Implemented execution_timeout in place of a fixed 60-second timeout (self._timeout_sec) for checking dag run existence in the ExternalTaskSensor. The previous fixed timeout caused issues when numerous dag runs were pending transition into running states. Now, with execution_timeout, the sensor dynamically adapts to varying execution times.

Seeking feedback on the use of execution_timeout and open to suggestions. Currently addressing deferred mode issues in ExternalTaskSensor step by step. This PR (Pull Request) addresses the first set of fixes for the deferred mode:

#34204 #34205 #34207 Note: Acknowledge known bugs in the current deferred mode logic and plan to address them incrementally. Refer to the Pull Request Guidelines for more details. For significant code changes, adhere to the Airflow Improvement Proposal (AIP) and ASF 3rd Party License Policy. Additionally, document backwards incompatible changes in newsfragments with appropriate naming conventions.

itstalmeez avatar Dec 06 '23 20:12 itstalmeez

This might be slightly off topic for this PR but, when talking about the behaviour of sensors in general, I feel like there's a discrepancy between what a timeout does in normal mode versus deferrable.

When the timeout is reached in non-deferrable (with reschedule mode), this causes an AirflowSensorTimeout and the task fails without retrying, which is the correct behaviour IMO.

I would like to see this be consistent across ALL sensors regardless of whether it is running in deferrable mode or not.

nathadfield avatar Jan 11 '24 11:01 nathadfield

@hussein-awala

From the behavior you mentioned, it seems that whether I use execution_timeout or timeout to set _timeout_sec, they both lead to the same termination conditions. Upon further reflection, I'm questioning the need for explicitly checking if the dag_run is in a running state using _timeout_sec. This check appears to produce an outcome that overlaps with what is already achieved through the existing timeout parameter of ExeternalTaskSensor.

I want to change the async def run method internal loop logic like following . FROM

while True:
    delta = utcnow() - self.trigger_start_time
    if delta.total_seconds() < self._timeout_sec:
        # mypy confuses typing here
        if await self.count_running_dags() == 0:  # type: ignore[call-arg]
            self.log.info("Waiting for DAG to start execution...")
            await asyncio.sleep(self.poll_interval)
    else:
        yield TriggerEvent({"status": "timeout"})
        return
    # mypy confuses typing here
    if await self.count_tasks() == len(self.execution_dates):  # type: ignore[call-arg]
        yield TriggerEvent({"status": "success"})
        return
    self.log.info("Task is still running, sleeping for %s seconds...", self.poll_interval)
    await asyncio.sleep(self.poll_interval)

TO

while True:
    if await self.count_running_dags() == 0:  # type: ignore[call-arg]
        self.log.info("Waiting for DAG to start execution...")
        await asyncio.sleep(self.poll_interval)
    # mypy confuses typing here
    if await self.count_tasks() == len(self.execution_dates):  # type: ignore[call-arg]
        yield TriggerEvent({"status": "success"})
        return
    self.log.info("Task is still running, sleeping for %s seconds...", self.poll_interval)
    await asyncio.sleep(self.poll_interval)

Instead of making immediate changes, I try adding a deprecation message in this PR. I believe this approach will allow for a smoother transition and provide users with adequate notice before the functionality is eventually removed.

If there's anything I've misunderstood, please let me know.

Xiroo avatar Mar 19 '24 16:03 Xiroo

@hussein-awala Hello, I'm waiting for your answer. Could you recommend how to deal with this? Or If this issue is resolved by other PR, let me know if I can close this PR.

Xiroo avatar Mar 30 '24 06:03 Xiroo

@hussein-awala

Hello, I'm waiting for your answer.

Could you recommend how to deal with this?

Or If this issue is resolved by other PR, let me know if I can close this PR.

I will take a look tonight

hussein-awala avatar Mar 30 '24 18:03 hussein-awala

@hussein-awala Hello,

I understand you have a busy schedule, and I just wanted to gently remind you about the review of my pull request. You mentioned planning to review it last night, and I was wondering if you had a chance to look at it. Your guidance is greatly appreciated, and I’m more than willing to make any necessary adjustments.

Thank you very much for your time and support.

Xiroo avatar Apr 02 '24 05:04 Xiroo

@hussein-awala Hello, I hope this message finds you well. I wanted to follow up regarding the pull request I submitted. It has been a few weeks since I last mentioned it, and I want to get your feedback to find the way.

Thank you for your time.

Xiroo avatar Apr 22 '24 07:04 Xiroo

There has been no feedback or response for several weeks. Therefore, I will close this PR. This content may be addressed in a different PR. Thank you.

Xiroo avatar May 31 '24 06:05 Xiroo