Worker saturation settings are not respected when using unique keys
Describe the issue:
When using unique keys for each task, tasks appear to be distributed completely to workers immediately regardless of worker saturation config (i.e. matching the behaviour if distributed.scheduler.worker-saturation is inf even when it is set otherwise).
Additionally, in some cases I see that workers don't steal tasks from workers computing long tasks until said long task completes. This appears to depend on the distribution of task times (when running with infinite saturation, either due to the above bug or by setting config). This doesn't happen when tasks are grouped by key prefix.
Minimal Complete Verifiable Example:
import dask
# Ensure config is set as we expect defaults
dask.config.set({
'distributed.scheduler.work-stealing': True,
'distributed.scheduler.worker-saturation': 1.,
})
from dask.distributed import LocalCluster, Client
from time import sleep
from functools import partial
def func(t):
sleep(t)
# I generated these task times randomly while testing but this provides a less stochastic example
# Task times generated by np.exp(random()*n) appear to display this behaviour often
task_times = [8.53, 2.41, 4.11, 1.21, 4.74, 2.71, 14.46, 4.57, 5.11, 2.86, 4.15, 12.17, 1.13, 3.28, 18.66, 5.14, 10.21, 13.28, 17.69, 1.0]
# Changing the '_' to '-' here to group the tasks shows proper queuing behaviour
tasks = [(f"task_{i}", partial(func, t)) for i, t in enumerate(task_times)]
with Client(n_workers=4, threads_per_worker=1) as client:
futures = [client.submit(func, key=key, pure=False) for key, func in tasks]
for future, result in dask.distributed.as_completed(futures, with_results=True):
future.release()
Watching the dashboard while this runs shows that all tasks are distributed to workers immediately, and often tasks build up on workers without being redistributed until a task finishes. Example of a task stream showing this:
Changing the task names to have the same prefix shows the tasks are queued on the scheduler instead. Setting worker saturation to inf distributes all tasks to workers as expected but doesn't appear to have the same blocking problem. Example run of the above where tasks have the same task/prefix group and inf worker saturation:
Tasks are trivial and fast in this example but I've also seen this in workloads where all tasks are >1 hour in runtime.
Anything else we need to know?:
I can take a look at possible causes for this in more detail when I have some time.
Environment:
- Dask version: 2023.6.0
- Python version: 3.10
- Operating System: Linux (RHEL)
- Install method (conda, pip, source): conda