add new param to allow multiple workers in vmcluster, worker_mixin
Intended to address #173
Overview
The goal is to have the ability to spin up mulitple workers on VM Clusters (like EC2Cluster).
- A new param
num_workers_per_vmis added to theVMClusterandWorkerMixinclasses. - This param defaults to 1 in both classes (this is the current behavior).
-
num_workers_per_vmworker classes are added to the spec (which are passed toSpecCluster) to initialize multiple workers. - To keep changes minimal, there is no validation of this parameter to ensure
num_workers_per_vmis less than number of cores available.
Testing
Confirmed this works by creating an EC2Cluster as follows
ec2_cluster = EC2Cluster(
env_vars=env_vars,
extra_bootstrap=EC2_BOOTSTRAP_COMMANDS,
filesystem_size=cluster_config["volume_size"],
instance_tags=cluster_config["ec2_instance_tags"],
n_workers=10,
worker_class="distributed.nanny.Nanny",
worker_options={"nthreads": 2, "memory_limit": "7GiB"},
num_workers_per_vm=4,
scheduler_instance_type=cluster_config["scheduler_instance_type"],
auto_shutdown=False,
shutdown_on_close=False,
security=False, # https://github.com/dask/dask-cloudprovider/issues/249,
volume_tags=cluster_config["ec2_instance_tags"],
worker_instance_type="m5.2xlarge",
)
creates an EC2Cluster with 10 m5.2xlarge worker machines and a total of 40 workers (as num_workers_per_vm = 4).
@jacobtomlinson Thanks for the quick review. Sure, I can add tests and update docstrings. Do have a question for tests. Quickly looked at existing tests and it seems like the right place to put this test would be in EC2Cluster tests, rather than VMCluster tests. Is this correct?
Yeah that sounds good.
Did you have any progress on this pull request in recent time?
This option would be very helpful as well for the GCPCluster. Most machine types on GCP have more than one CPU - this makes dask-cloudprovider wasting resources currently.
I tested this new option num_workers_per_vm and it does not seem to work for GCPCluster. This code reproduces the problem:
import time
from dask_cloudprovider.gcp import GCPCluster
from dask.distributed import Client
cluster = GCPCluster(n_workers=1, num_workers_per_vm=2, worker_options={'nthreads': 1}, projectid='the-project-id')
time.sleep(180)
client = Client(cluster)
print(client)
cluster.close()
The printout is <Client: 'tls://10.142.0.11:8786' processes=1 threads=1, memory=3.59 GiB>, i.e., only one worker got created.
Most machine types on GCP have more than one CPU - this makes dask-cloudprovider wasting resources currently.
I'm not sure this is true. the default behaviour should start 1 thread per CPU core.
Maybe, but I have an application that is not thread-safe due to the underlying FORTRAN code being used.
Ah fair enough. I'm not sure I understand why this following isn't suitable though.
cluster = GCPCluster(n_workers=1, worker_options={'nthreads': 1, 'nworkers': 2}, projectid='the-project-id')
According to the documentation, this option does not exist for workers, only threads: https://distributed.dask.org/en/latest/worker.html#distributed.worker.Worker
Trying it also fails.
Ah yeah I remember, in dask-cloudprovider we use the dask spec CLI to start the worker instead of dask worker which is why we can't do that.
To get this PR over the line we need:
- Tests
- Docs
- Update to use the Dask config system
- Maybe rename
num_workers_per_vmto something more consistent with other options liken_worker_procs
Given that this seems abandoned @erl987 do you have any interest in pushing it forwards?
:wave: Hi @jacobtomlinson! I indeed have interest in continuing this PR, since it is something we are needing at https://github.com/CaviDBOrg/.
After struggling for a few days last week with the same problem, I found this PR and I am currently testing those changes on AWS. In parallel, I have created a new branch based on this one - https://github.com/flbulgarelli/dask-cloudprovider/commits/feature-allow-multiple-workers-in-vm-cluster - that address some of your requests, but I am still a bit unsure about your expectations about testing it.
As far as I understand, all of the tests here are smoke tests, while others do actually run distributed computations at some point. I am thinking of starting a cluster and checking the actual number of workers using client.scheduler_info()['workers']. Do you have any other recommendations?
Thanks!
That sounds great @flbulgarelli. Thats pretty much what I was hoping for in terms of tests.
There has been no activity here for a long time, so I'm going to close it out. Thanks for the effort here, sorry it never got merged.