Adaptive target duration is scaled by `threads_per` worker
Adaptive Clusters with threads_per_worker > 1 will scale to reach a duration target_duration / threads_per_worker:
from dask.distributed import LocalCluster, Client, Adaptive
from dask import bag as db
from time import time, sleep
cluster = LocalCluster(n_workers=1, threads_per_worker=2, memory_limit=1e9)
ca = cluster.adapt(minimum=1, maximum=10, target_duration="20s", scale_factor=1)
client = Client(cluster)
start = time()
db.from_sequence((0.5 for n in range(100)), npartitions=100).map(sleep).compute()
print(time() - start, "seconds with", len(cluster.workers), "workers")
9.84409236907959 seconds with 3 workers
(Note that I set scale_factor=1 to prevent anything but the target duration from scaling up the cluster.)
With single-threaded workers, we get what we desired (approx. 20s):
[...]
cluster = LocalCluster(n_workers=1, threads_per_worker=1, memory_limit=1e9)
ca = cluster.adapt(minimum=1, maximum=10, target_duration="20s", scale_factor=1)
[...]
18.264188766479492 seconds with 3 workers
- Should this be documented?
- Or should behaviour of Adaptive be corrected to include the number of threads per worker in the estimate of the expected target duration?
Yep, cores per worker is not taken into account here : https://github.com/dask/distributed/blob/7ebe65980e7fef90fd25cc0d35e2fcfc0c266881/distributed/deploy/adaptive.py#L295.
Not sure if we can get and be sure of the number of cores per new workers, but de can probably make a good assumption using the number of workers and total cores.
I think the current behavior is not the one intended.
https://github.com/dask/distributed/blob/4115f55c655000b04a8132c9ca62fa07bd6df6f5/distributed/scheduler.py#L5357-L5360
This started to hit us heavily. Can you point me in the right direction to implement this? Any suggestion?