RFC: Add execution concurrency
Related Issues
https://github.com/flyteorg/flyte/issues/5125 https://github.com/flyteorg/flyte/issues/420 https://github.com/flyteorg/flyte/issues/267
Docs link
Codecov Report
All modified and coverable lines are covered by tests :white_check_mark:
Project coverage is 58.05%. Comparing base (
b779bed) to head (ef71289). Report is 69 commits behind head on master.
Additional details and impacted files
@@ Coverage Diff @@
## master #5659 +/- ##
==========================================
- Coverage 58.49% 58.05% -0.45%
==========================================
Files 937 915 -22
Lines 71088 69350 -1738
==========================================
- Hits 41583 40260 -1323
+ Misses 26353 26018 -335
+ Partials 3152 3072 -80
| Flag | Coverage Δ | |
|---|---|---|
| unittests-datacatalog | 59.06% <ø> (ø) |
|
| unittests-flyteadmin | 56.30% <ø> (ø) |
|
| unittests-flytecopilot | 30.99% <ø> (ø) |
|
| unittests-flytectl | 64.70% <ø> (ø) |
|
| unittests-flyteidl | ? |
|
| unittests-flyteplugins | 61.00% <ø> (ø) |
|
| unittests-flytepropeller | 54.79% <ø> (ø) |
|
| unittests-flytestdlib | 64.04% <ø> (ø) |
Flags with carried forward coverage won't be shown. Click here to find out more.
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
:rocket: New features to boost your workflow:
- :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
I'd feel more comfortable if we fleshed out the implementation a bit more, but otherwise, I feel like we're on the same page.
Sounds good, just wanted overall alignment before diving into the implementation. Will do that next and thank you already for all the feedback
added some more implementation details, mind taking another look @eapolinario
I'd add one thing, possibly out of scope for this RFC: it would be really nice to be able to define a "max execution concurrency" on the backend, either propeller-wide or per project/domain. Flyte would benefit from more controls that allow operators to protect quality of service and aren't dependent on workflow authors to set reasonable limits.
hi @corleyma thanks for reviewing! re your comment on platform-max execution concurrency, that's really intriguing - would you want to start a separate discussion on that here: https://github.com/flyteorg/flyte/discussions so we don't lose track of the suggestion?
execution namespace quota is meant to help address quality of service and fairness in a multitenant system but it would be cool to flesh out other mechanisms for managing overall executions
execution namespace quota is meant to help address quality of service and fairness in a multitenant system but it would be cool to flesh out other mechanisms for managing overall executions
execution namespace quota can help protect against workloads that would otherwise utilize too many cluster resources, but it doesn't really help protect e.g. flyte propeller from too many concurrent executions.
I am happy to start a separate conversation though!
09/26/2024 Contributors sync notes: no updates
I would not implement this with too many db calls. I would make the decision simple and completely in memory using Singletons like the scheduler. This is because
- We can easily update the algorithm of how to order, probably support backfilling etc
- Create priorities in the future
- Alleviate DB pressure and make it really scalable
It was suggested I add a use case to this PR -- slack thread
I have a cron scheduled workflow (wf) that should not execute if the wf is already executing (from a previous schedule).
Why? The wf executes a legacy app that checks if new files are available on S3. If so, then the app begins processing, which involves other resources that allow only single use. Processing time can vary from minutes to hours, depending on the data received. If another process is started, both will eventually fail due to resource contention.
Considerations:
- It is impractical to change the legacy app.
- It is desired to set cron to a rather small period. So several wf's can start while the original is running.
- It is not desirable to have tasks stack up waiting for the first to finish (i.e. cache serialize)
Hello, I would like to know if this feature is under development? We really need this feature!
Hello, I would like to know if this feature is under development? We really need this feature!
We are in progress! Still cleaning up final parts of the RFC and I think implementation can be underway soon.
Met with met up with @ ZhouGongZi_yizluo and @thomasjhuang yesterday to discuss this RFC. We just went over the doc in detail to make sure we were all on the same page. Have a series of clarifications we wanted more information/questions that we weren’t able to reach a conclusion on, detailed below.
- Reiterate we’re not going to do concurrency per version. Need a compelling user story here to justify this work.
- We asked/discussed at the end of whether there was talk ever of having an Abort threshold - like the behavior should be to wait, until there’s a backup of N executions all in Pending state. At that point, the N + 1 execution should fail immediately. Definitely also not doing this unless there is a compelling user requirement… it adds a lot of complexity to the project unless we can really justify it.
- In “Creating an execution” item 2 - two questions here: a) what’s the scenario in which the process tries to write QUEUED and the database already has QUEUED ? and b) by “conditionally mark” you just mean a sql statement that checks the current value first right? (compare-and-swap semantics).
- In “Creating an execution” item 3 - from “and swallow any errors…” onwards we didn’t really understand. Could you clarify this point please?
- one scenario we wanted clarity on: let’s say step 1 completes (“create the workflow CRD”). Before step 2 can run, the process crashes, the node running the concurrency manager goes away completely. When it comes up again, it’ll again query the Admin database and see that the execution is still Pending (let’s say propeller hasn’t picked up the crd yet). So we enter the execution create cycle again, only this time let’s say there’s a network partition, and it fails, the kube api is down. But in the background propeller now starts working on the CRD and updates admin to Running. At this point we should move to step 3 right, attempt to mark the execution as Failed?
- related question due to my lack of understanding of the K8s workqueue library - can the same execution be in the workqueue twice? That is, in between steps 1 and 2, is there a chance that another goroutine comes up and tries to run step 1 (‘create the workflow CRD’) again?
- one scenario we wanted clarity on: let’s say step 1 completes (“create the workflow CRD”). Before step 2 can run, the process crashes, the node running the concurrency manager goes away completely. When it comes up again, it’ll again query the Admin database and see that the execution is still Pending (let’s say propeller hasn’t picked up the crd yet). So we enter the execution create cycle again, only this time let’s say there’s a network partition, and it fails, the kube api is down. But in the background propeller now starts working on the CRD and updates admin to Running. At this point we should move to step 3 right, attempt to mark the execution as Failed?
- In “Launch Plan informer” section, the last paragraph discusses the launch plan informer object running queries in the background to get updated launch plan concurrency information. The last sentence though is: “If an execution has terminated since the last time the query ran, it won’t be in the result set and we’ll want to update the in memory map to remove the execution.” What does executions terminating have anything to do with launch plan table update times?
- In “Introduce the Concurrency Controller” - item 2. ii. - “update the map…“. By ‘update’ we mean replace right? Otherwise the Sets will just grow forever.
- In “Introduce the Concurrency Controller” - item 4. i. - “Check a separate in-memory map…“. Shouldn’t this just be the launch plan informer? Or are there two of these informers?
Other points:
- Was there some consideration given to the ‘active’ launch plan marker? One core use-case for this concurrency feature is to make sure only one scheduled workflow is running at a given time. Given that schedules key off of the active launch plan, would it make sense that the concurrency setting on the active launch plan be the one that’s used rather than the latest version by time?
- We should have some sort of a CLI that can show the current status of pending executions. What this API looks like will is to be determined by Thomas, Yizhou et. al. in the course of implementing this feature - they’ll learn what is helpful to see/debug.
@wild-endeavor, @ZhouGongZiBBS and @thomasjhuang thank you for the very detailed feedback, I've updated the RFC in response but wanted to address individual points in a reply as well
Reiterate we’re not going to do concurrency per version. Need a compelling user story here to justify this work.
Done, called this out explicitly as a non-goal although included a potential design approach in case we decide to pursue this in the future to ensure this is option isn't closed off to us
We asked/discussed at the end of whether there was talk ever of having an Abort threshold - like the behavior should be to wait, until there’s a backup of N executions all in Pending state. At that point, the N + 1 execution should fail immediately. Definitely also not doing this unless there is a compelling user requirement… it adds a lot of complexity to the project unless we can really justify it.
Really interesting point, we could always extend the SchedulerPolicy with the abort threshold and have the concurrency controller manage the auto-aborts.
In “Creating an execution” item 2 - two questions here: a) what’s the scenario in which the process tries to write QUEUED and the database already has QUEUED ? and b) by “conditionally mark” you just mean a sql statement that checks the current value first right? (compare-and-swap semantics).
a) For the state machine I envisioned something that could recover at any failure point. So if for whatever reason the process stops after we write a QUEUED event and we reboot the concurrency controller, we shouldn't undo the execution progress we've already recorded. b) yes exactly! clarified in the doc
In “Creating an execution” item 3 - from “and swallow any errors…” onwards we didn’t really understand. Could you clarify this point please?
- one scenario we wanted clarity on: let’s say step 1 completes (“create the workflow CRD”). Before step 2 can run, the process crashes, the node running the concurrency manager goes away completely. When it comes up again, it’ll again query the Admin database and see that the execution is still Pending (let’s say propeller hasn’t picked up the crd yet). So we enter the execution create cycle again, only this time let’s say there’s a network partition, and it fails, the kube api is down. But in the background propeller now starts working on the CRD and updates admin to Running. At this point we should move to step 3 right, attempt to mark the execution as Failed?
- related question due to my lack of understanding of the K8s workqueue library - can the same execution be in the workqueue twice? That is, in between steps 1 and 2, is there a chance that another goroutine comes up and tries to run step 1 (‘create the workflow CRD’) again?
Great points, updated this so we only overwrite for 'PENDING' or 'QUEUED' statuses if the execution progresses due to network partition. Re: same execution, this will have an identical execution identifier, correct? We upstreamed https://github.com/flyteorg/flyte/pull/6025 which gracefully handles re-enqueing the same item.
In “Launch Plan informer” section, the last paragraph discusses the launch plan informer object running queries in the background to get updated launch plan concurrency information. The last sentence though is: “If an execution has terminated since the last time the query ran, it won’t be in the result set and we’ll want to update the in memory map to remove the execution.” What does executions terminating have anything to do with launch plan table update times?
I have no idea, I'm sorry. I updated this section.
In “Introduce the Concurrency Controller” - item 2. ii. - “update the map…“. By ‘update’ we mean replace right? Otherwise the Sets will just grow forever.
Yes, just overwriting the map entry. Clarified in the doc.
In “Introduce the Concurrency Controller” - item 4. i. - “Check a separate in-memory map…“. Shouldn’t this just be the launch plan informer? Or are there two of these informers?
Yes clarified, thank you!
Was there some consideration given to the ‘active’ launch plan marker? One core use-case for this concurrency feature is to make sure only one scheduled workflow is running at a given time. Given that schedules key off of the active launch plan, would it make sense that the concurrency setting on the active launch plan be the one that’s used rather than the latest version by time?
Yes this is exactly what we should use to get the active Scheduler Policy! I clarified the proposal but let me know if this is still ambiguous.
We should have some sort of a CLI that can show the current status of pending executions. What this API looks like will is to be determined by Thomas, Yizhou et. al. in the course of implementing this feature - they’ll learn what is helpful to see/debug.
This sounds like an excellent plan!
Code Review Agent Run Status
- Limitations and other issues: ❌ Failure - The AI Code Review Agent skipped reviewing this change because it is configured to exclude certain pull requests based on the source/target branch or the pull request status. You can change the settings here, or contact the agent instance creator at [email protected].
We have a rather complex scheduling requirement that could be solved by this issue, but only when we can limit concurrency on an arbitrary label (a resource id?), with potentially having thousands of labels in use at the same time.
A bit more details:
- We need to run a hundred to a few thousands tasks, each task updates around 10 partitions of our database.
- The list of partitions (they have a clear id) a task updates is known when the task is added to the task graph (which is not at the start but later).
- A partition may only be updated by 1 task at a time, even across launch plans.
- Tasks are not short lived, they take 30 to 90 minutes.
- We want to run as many tasks in parallel as possible.
Mapping our use case to the ideas in this RFC:
- a task declares which resources it needs (where resource = partition id).
- for every used resource (partition id used in the launch plan) we declare the maximum concurrency to be 1.
Bonus
With smart scheduling the total run time of the launch graph can be greatly reduced. We do this by unblocking as many tasks as possible by first running tasks that have the most resources in common with other tasks. (To prevent launch plans from never completing because of other launch plans, this optimization should only be used within a launch plan. In other words, tasks to schedule for running should be ordered by launch plan start time first, and from no. of resources in common second.)
Discussion
This all sounds pretty complex, but when there is a singleton scheduler, the basic algorithm could be implemented in 20 lines of e.g. python. (Of course other constraints could make this harder.)
This RFC points to https://github.com/flyteorg/flyte/issues/267. However, this RFC does not fully cover the use cases of that issue. This RFC does not mention something like a resource/lock/mutex-id. The result is that this RFC only works at the task level and it does not cover use cases where we need to restrict access to the same resource from different tasks (even across launch plans).
As mentioned in my previous comment, locking multiple resources brings even more use cases in scope.
Work is currently in progress - #6309 6309 . This isn't fully flushed out yet, but not new adds to the RFC, it is mostly following the defined outline in the RFC so far. Will add more details + docs in the PR as we make progress.
@eapolinario since it sounds like we've settled on a design here, should we merge the RFC?
@erikvanoosten , let's take this idea of providing a lock per-resource into a separate discussion.
It's not too hard to extrapolate this idea to also cover the proposal in this RFC (which only covers workflow execution concurrency) with a more general resource concurrency, but it's worth separating these two out as there are implications to both the SDK and how this is implemented in the backend.
Code Review Agent Run Status
- Limitations and other issues: ❌ Failure - The AI Code Review Agent skipped reviewing this change because it is configured to exclude certain pull requests based on the source/target branch or the pull request status. You can change the settings here, or contact the agent instance creator at [email protected].
the basic algorithm could be implemented in 20 lines of e.g. python
Because someone asked for it, the '20 lines' are:
import enum
from dataclasses import dataclass
class TaskState(enum.Enum):
NEW = "new"
RUNNING = "started"
SUCCESS = "success"
FAILED = "failed"
@dataclass
class Task:
task_id: str
lock_ids: list[str]
state: SchedulerTaskState
def tasks_to_start(tasks: list[Task], capacity: int) -> list[Task]:
# Collect locks held by a running task
active_lock_ids: set[str] = set()
for task in tasks:
if task.state == TaskState.RUNNING:
active_lock_ids |= set(task.lock_ids)
startable_tasks: list[Task] = []
for task in tasks:
if (
task.state == TaskState.NEW and
all(lock_id not in active_lock_ids for lock_id in task.lock_ids)
):
startable_tasks.append(task)
# Build a dict from lock_id to number of tasks that require the lock
task_count_by_lock_id = defaultdict(int)
for task in startable_tasks:
for lock_id in task.lock_ids:
task_count_by_lock_id[lock_id] += 1
task_count_by_lock_id: dict[str, int] = dict(task_count_by_lock_id)
# For each task calculate the number of tasks that have a lock in common
common_lock_count_per_task: dict[str, int] = defaultdict(int)
for task in startable_tasks:
for lock_id in task.lock_ids:
common_lock_count_per_task[task.task_id] += task_count_by_lock_id[lock_id]
# Order the tasks by:
# - the number of locks it has in common with other tasks (most first)
# - task id
ordered_tasks = sorted(
startable_tasks,
key=lambda task: (-common_lock_count_per_task[task.task_id], task.task_id)
)
# Select the highest ranked tasks without locking conflicts
selected_tasks: list[Task] = []
selected_task_counts: int = 0
selected_lock_ids: set[str] = set()
for task in ordered_tasks:
task_lock_ids = set(task.lock_ids)
if (
selected_task_counts < capacity and
len(selected_lock_ids.intersection(task_lock_ids)) == 0
):
selected_task_counts += 1
selected_lock_ids = selected_lock_ids.union(task_lock_ids)
selected_tasks.append(task)
return selected_tasks