AIP-61 - Scheduler job: main loop and event processing for multi executors
This change delivers the main portion of hybrid executors to the Airflow scheduler. Each loop the query for TIs from the DB may contain TIs tagged with any executor (or no specific executor, which means run on the default executor). We now sort those TIs by executor after the query and queue tasks for the executors which have space to take on more TIs.
We now also heartbeat all executors and process their events each scheduler loop.
This is a second version of these changes which are fairly concise in source code diff. Most of the PR is testing changes/additions.
Benchmarking
Here is some benchmarking to show performance of the scheduler with the changes. Please refer to the orange (90th quantile) and yellow/green (50th quantile) plots, blue is the 99th quantile which leads to very spikey data.
Important plots include the bottom two (which are the critical sections of the main loop), as well as the loop duration as a whole.
Baseline from main
Hybrid Scheduler
Method:
This data was collected while running the below DAG. Leveraging the standard Airflow metrics along with some metric math (for example the scheduler loop time minus the executor heartbeat time, middle-left plot), using statsd and grafana within the same breeze environment for all data collection.
More samples were taken than just the runs included above, but a representative selection was chosen.
DAG code:
from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
initial_scale = 4
max_scale = 5
scaling_factor = 2
dag_id = "for_loop_tasks_80"
with DAG(
dag_id=dag_id,
catchup=False,
schedule_interval="@once",
start_date=datetime(2023, 1, 1),
max_active_runs=1,
) as dag:
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
for i in range(80):
task = BashOperator(task_id=f"task_{i}", bash_command="date")
start >> task >> end
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.
Do we still need slots_available after this? We should remove or deprecate it if not.
Do we still need
slots_availableafter this? We should remove or deprecate it if not.
Nope! We still use this field in places, even after these changes. So no need to deprecate :)
@potiuk @uranusjr any time to have a/another look? @dstandish maybe?
ship it @o-nikolas
@o-nikolas Can you merge this one? -- looks like we are good to go
Can you merge this one? -- looks like we are good to go
@kaxil Yupp, I plan to merge both PRs tomorrow morning!