EmrServerlessStartJobOperator does not cancel EMR Serverless job when waiter_max_attempts is reached
Apache Airflow version
Other Airflow 2 version (please specify below)
If "Other Airflow 2 version" selected, which one?
2.8.1
What happened?
When using the EmrServerlessStartJobOperator with wait_for_completion=True, and specifying waiter_delay and waiter_max_attempts, the EMR Serverless job is not canceled when the maximum waiter attempts are reached. Instead, the Airflow task fails and a new task instance is started due to retries being configured. This results in multiple EMR Serverless jobs running concurrently, as the original job continues to run even after the Airflow task has failed and retried.
What you think should happen instead?
When the waiter_max_attempts limit is reached, the EMR Serverless job should be automatically canceled as a result of this event, before the Airflow task proceeds to a retry. This ensures that upon retrying, Airflow starts a new EMR Serverless job, and only one job is active at any given time
How to reproduce
-
Create an Airflow DAG with a task using EmrServerlessStartJobOperator
-
Configure the operator with wait_for_completion=True, and set waiter_delay and waiter_max_attempts to values that will cause a timeout before the job completes
-
Use a dummy Spark job that runs longer than the total wait time (waiter_delay * waiter_max_attempts)
-
Configure the Airflow task to have retries (e.g., retries=2)
-
Run the DAG
-
Observe that when the waiter_max_attempts limit is reached, the Airflow task fails and retries, starting a new EMR Serverless job while the previous job continues to run
Operating System
Amazon Linux 2023
Versions of Apache Airflow Providers
apache-airflow-providers-amazon==8.16.0
Deployment
Amazon (AWS) MWAA
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
Do you ever see https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/operators/emr.py#L1272 ("Unable to request query cancel on EMR Serverless. Exiting") in the logs?
Yes this is the current behaviour of the operator. you may add support for this :)
The issue persists in:
- Apache Airflow 2.10.1
- apache-airflow-providers-amazon==8.28.0
I'll have look at it