dask-cloudprovider icon indicating copy to clipboard operation
dask-cloudprovider copied to clipboard

Support multiple workers per VM

Open eric-czech opened this issue 5 years ago • 33 comments

It may be more efficient to run many workers on larger VMs as opposed to larger numbers of worker processes on small VMs, presumably to avoid inter-VM communication. This was a suggestion that resulted from this thread on optimizing some dask array workflows: https://github.com/pystatgen/sgkit/issues/390.

@quasiben mentioned that threads per worker can be controlled with worker_options={"nthreads": 2 }, but there appears to be no way to run more than one worker on a single VM.

eric-czech avatar Nov 17 '20 22:11 eric-czech

I think for this to work we need to change:

https://github.com/dask/dask-cloudprovider/blob/d5dfd99e0d29b1c6b03cb291633484fdd367ed4d/dask_cloudprovider/gcp/instances.py#L318-L330

To handle a spec which looks like the following:

workers_options = {
    'worker-1': {"cls": "dask.distributed.Nanny", "opts": {"nthreads": 1}},
    'worker-2': {"cls": "dask.distributed.Nanny", "opts": {"nthreads": 2}},
}

quasiben avatar Nov 17 '20 22:11 quasiben

When running on a local machine LocalCluster optimises this for you right? On my laptop with 12 cores it runs three workers with four threads each.

Perhaps we can reuse this logic?

jacobtomlinson avatar Nov 18 '20 09:11 jacobtomlinson

You mean, have dask-cloudprovider inspect the system and assign worker/threads accordingly ?

quasiben avatar Nov 18 '20 15:11 quasiben

Yeah. We do this for LocalCluster here.

jacobtomlinson avatar Nov 18 '20 15:11 jacobtomlinson

dask/distributed#4377 allows us to detect and create worker/threads automatically.

dask-worker --nprocs=auto

Once a release is out we can update things here to use this new option.

jacobtomlinson avatar Jan 12 '21 16:01 jacobtomlinson

any progress here?

eric-valente avatar May 03 '21 04:05 eric-valente

@eric-valente with the latest release of dask and distributed you should be able to set worker_options={"nprocs": "auto"}. Although I haven't managed to come back here and test this yet.

If you could give it a go and report back it would be much appreciated!

jacobtomlinson avatar May 04 '21 09:05 jacobtomlinson

Hi @jacobtomlinson Tried it with packages and no luck. The workers will turn on in EC2 and won't connect to the scheduler and then proceed to die. When I remove the nprocs it works fine but again only has 1 worker per VM with threads = # of cores

worker_options={'nprocs':'auto'}

dask 2021.4.1 pyhd8ed1ab_0 conda-forge dask-cloudprovider 2021.3.1 pyhd8ed1ab_0 conda-forge dask-core 2021.4.1 pyhd8ed1ab_0 conda-forge distributed 2021.4.1 py38h578d9bd_0 conda-forge

eric-valente avatar May 05 '21 00:05 eric-valente

Strange! Are you able to get any logs from the EC2 instances? They tend to get dumped in var/log/cloud-init-output.log.

jacobtomlinson avatar May 05 '21 14:05 jacobtomlinson

@jacobtomlinson

This error then worker dies: image

Here are my settings: image

Note, this works if i simply remove nprocs from the worker_options. Thanks for your help!

eric-valente avatar May 05 '21 15:05 eric-valente

Thanks @eric-valente. You also need to set the worker_class kwarg to "dask.distributed.Nanny".

The traceback you got above looks like a bug, I'll raise that back in distributed.

@quasiben what do you think about making dask.distributed.Nanny the default instead of dask.distributed.Worker?

jacobtomlinson avatar May 06 '21 13:05 jacobtomlinson

Thanks again for your help here @jacobtomlinson

Tried setting the worker class to Nanny but still same issue:

image

image

eric-valente avatar May 06 '21 17:05 eric-valente

https://github.com/dask/distributed/issues/4640 seems maybe related

eric-valente avatar May 06 '21 18:05 eric-valente

Seems dask.distributed.Nanny does not accept nprocs either https://distributed.dask.org/en/latest/_modules/distributed/nanny.html#Nanny

Fails with above init error: --spec '{"cls": "dask.distributed.Nanny", "opts": {"nprocs": 4, "name": "dask-4ffc56fd-worker-dd6509e3"}}'

Works: --spec '{"cls": "dask.distributed.Nanny", "opts": {"ncores": 4, "name": "dask-4ffc56fd-worker-dd6509e3"}}'

eric-valente avatar May 06 '21 19:05 eric-valente

Seems like the cloud-init to add a worker uses this style of starting a worker: python -m distributed.cli.dask_spec tcp://x.x.x.x:8786 --spec {"cls": "distributed.nanny.Nanny", "opts": {"nprocs": 4, "name": "dask-4ffc56fd-worker-dd6509e3"}}

vs. this style: dask-worker --nprocs=auto

eric-valente avatar May 06 '21 20:05 eric-valente

+1 to making dask.distributed.Nanny the default.

quasiben avatar May 06 '21 20:05 quasiben

@jacobtomlinson Yeah it seems like EC2Cluster uses python -m distributed.cli.dask_spec and passing in worker_class

image

As suggested above, I think you might need to accept multiiple workers defined in worker_options and skip nprocs?

workers_options = {
    'worker-1': {"cls": "dask.distributed.Nanny", "opts": {"nthreads": 1}},
    'worker-2': {"cls": "dask.distributed.Nanny", "opts": {"nthreads": 2}},
}

I think I could use worker_module but it is not a valid parameter for EC2Cluster: image This would allow me to use dask-worker with --nprocs=x

eric-valente avatar May 06 '21 23:05 eric-valente

I think I could use worker_module but it is not a valid parameter for EC2Cluster

It should be, have you tried it?

jacobtomlinson avatar May 07 '21 08:05 jacobtomlinson

Hello Is there an update on this issue? I am trying to do something similar. Currently I am creating my cluster manually where I start the scheduler on one EC2 and then on another EC2 I create the workers like this:

dask-worker  tcp://XX.X.X.XXX:8786 --nprocs n --nthreads 1

Where n is the number of CPU's on that EC2.

It would be great if this can be done by passing nprocs and nthreads as worker_options to EC2Cluster. If I can help in anyway please let me know.

Thank you

shireenrao avatar Oct 19 '21 17:10 shireenrao

@shireenrao have you tried it?

I would expect the following to work.

cluster = EC2Cluster(..., worker_class="distributed.nanny.Nanny", worker_options={"nprocs": "n"})

jacobtomlinson avatar Oct 21 '21 08:10 jacobtomlinson

@jacobtomlinson - I tried that and it fails. This is the stack trace I see

python -m distributed.cli.dask_spec tcp://XXX.X.X.XXX:8786 --spec '{"cls": "distributed.nanny.Nanny", "opts": {"nprocs":"4", "name": "dask-44bad6fe-worker-ecaffb24"}}'
no environment.yml
distributed.nanny - INFO -         Start Nanny at: 'tcp://XXX.17.0.X:41233'
distributed.nanny - ERROR - Failed to initialize Worker
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/distributed/nanny.py", line 844, in _run
    worker = Worker(**worker_kwargs)
  File "/opt/conda/lib/python3.8/site-packages/distributed/worker.py", line 733, in __init__
    super().__init__(
TypeError: __init__() got an unexpected keyword argument 'nprocs'
distributed.nanny - ERROR - Failed while trying to start worker process: __init__() got an unexpected keyword argument 'nprocs'
distributed.nanny - INFO - Closing Nanny at 'tcp://XXX.17.0.X:41233'
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/opt/conda/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/opt/conda/lib/python3.8/site-packages/distributed/cli/dask_spec.py", line 43, in <module>
    main()
  File "/opt/conda/lib/python3.8/site-packages/click/core.py", line 1128, in __call__
    return self.main(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/click/core.py", line 1053, in main
    rv = self.invoke(ctx)
  File "/opt/conda/lib/python3.8/site-packages/click/core.py", line 1395, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/opt/conda/lib/python3.8/site-packages/click/core.py", line 754, in invoke
    return __callback(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/distributed/cli/dask_spec.py", line 39, in main
    asyncio.get_event_loop().run_until_complete(run())
  File "/opt/conda/lib/python3.8/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/opt/conda/lib/python3.8/site-packages/distributed/cli/dask_spec.py", line 33, in run
    servers = await run_spec(_spec, *args)
  File "/opt/conda/lib/python3.8/site-packages/distributed/deploy/spec.py", line 659, in run_spec
    await asyncio.gather(*workers.values())
  File "/opt/conda/lib/python3.8/asyncio/tasks.py", line 690, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/opt/conda/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/opt/conda/lib/python3.8/site-packages/distributed/nanny.py", line 335, in start
    response = await self.instantiate()
  File "/opt/conda/lib/python3.8/site-packages/distributed/nanny.py", line 418, in instantiate
    result = await self.process.start()
  File "/opt/conda/lib/python3.8/site-packages/distributed/nanny.py", line 692, in start
    msg = await self._wait_until_connected(uid)
  File "/opt/conda/lib/python3.8/site-packages/distributed/nanny.py", line 811, in _wait_until_connected
    raise msg["exception"]
  File "/opt/conda/lib/python3.8/site-packages/distributed/nanny.py", line 844, in _run
    worker = Worker(**worker_kwargs)
  File "/opt/conda/lib/python3.8/site-packages/distributed/worker.py", line 733, in __init__
    super().__init__(
TypeError: __init__() got an unexpected keyword argument 'nprocs'

shireenrao avatar Oct 21 '21 11:10 shireenrao

Sorry my bad looks like you should be setting nthreads and ncores.

https://github.com/dask/distributed/blob/3afc6703fb480b9fe5c983c84d6ea115810dd1ff/distributed/nanny.py#L88-L89

jacobtomlinson avatar Oct 21 '21 11:10 jacobtomlinson

@jacobtomlinson - I am trying this on a ec2 instance with 36 vCPU's. Setting ncores and nthreads only starts 1 Nanny process with 36 threads. Here are the logs from the worker

python -m distributed.cli.dask_spec tcp://XX.XXX.XX.XX:8786 --spec '{"cls": "distributed.nanny.Nanny", "opts": {"ncores":36, "nthreads":1, "name": "dask-44bad6fe-worker-ecaffb24"}}'
/opt/conda/lib/python3.8/site-packages/distributed/nanny.py:172: UserWarning: the ncores= parameter has moved to nthreads=
  warnings.warn("the ncores= parameter has moved to nthreads=")
distributed.nanny - INFO -         Start Nanny at: 'tcp://172.17.0.2:45419'
distributed.worker - INFO -       Start worker at:     tcp://172.17.0.2:42719
distributed.worker - INFO -          Listening to:     tcp://172.17.0.2:42719
distributed.worker - INFO -          dashboard at:           172.17.0.2:45887
distributed.worker - INFO - Waiting to connect to:    tcp://XX.XXX.XX.XX:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                         36
distributed.worker - INFO -                Memory:                  68.59 GiB
distributed.worker - INFO -       Local Directory: /dask-worker-space/worker-2ywv7v5h
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:    tcp://XX.XXX.XX.XX:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection

From a jupyter notebook session this is what one sees for the cluster

<Client: 'tcp://172.17.0.2:8786' processes=1 threads=36, memory=68.59 GiB>

Where as when I start the worker manually with

dask-worker tcp://XX.XXX.XX.XX:8786 --nprocs=36

Jupyter Notebooks shows the cluster to be

<Client: 'tcp://172.17.0.2:8786' processes=36 threads=36, memory=68.59 GiB>

All the CPU's are not being utilized.

shireenrao avatar Oct 21 '21 12:10 shireenrao

Right I'm with you sorry. It seems that dask_spec doesn't support that today. Implementing that would have to be done upstream in distributed.

jacobtomlinson avatar Oct 21 '21 14:10 jacobtomlinson

Could we please request / create issue in distributed upstream to integrate this work.

For my use case it makes more sense to have a few very large machines than lots of small VMs.

ansar-sa avatar Dec 02 '21 01:12 ansar-sa

The spec in distributes now accepts a list of specs. This does assume that nodes are regular as we would need to specify number of workers and cores/memory for each worker. But we should be able to implement this here now without needing any upstream changes.

jacobtomlinson avatar Dec 02 '21 11:12 jacobtomlinson

Has anybody been able to achieve this?

cspagemarine avatar Jun 10 '22 15:06 cspagemarine

Are there any news on this issue?

emkademy avatar Oct 05 '22 09:10 emkademy

@jacobtomlinson I tried this on EC2Cluster like this:

ec2_cluster = EC2Cluster(
    env_vars=env_vars,
    extra_bootstrap=EC2_BOOTSTRAP_COMMANDS,
    filesystem_size=cluster_config["volume_size"],
    instance_tags=cluster_config["ec2_instance_tags"],
    n_workers=cluster_config["n_workers"],
    #worker_class="distributed.nanny.Nanny", 
    worker_options={i: {"cls": Nanny, "options": {"nthreads": 2}} for i in range(4)},
    scheduler_instance_type=cluster_config["scheduler_instance_type"],
    auto_shutdown=False,
    shutdown_on_close=False,
    security=False,  # https://github.com/dask/dask-cloudprovider/issues/249,
    volume_tags=cluster_config["ec2_instance_tags"],
    worker_instance_type=cluster_config["worker_instance_type"],
)

but this errors out with a json encode error in VMCluster

~/miniconda3/envs/pricing/lib/python3.7/site-packages/dask_cloudprovider/generic/vmcluster.py in __init__(self, scheduler, worker_module, worker_class, worker_options, *args, **kwargs)
    138                             "opts": {
    139                                 **worker_options,
--> 140                                 "name": self.name,
    141                             },
    142                         }

~/miniconda3/envs/pricing/lib/python3.7/json/__init__.py in dumps(obj, skipkeys, ensure_ascii, check_circular, allow_nan, cls, indent, separators, default, sort_keys, **kw)
    229         cls is None and indent is None and separators is None and
    230         default is None and not sort_keys and not kw):
--> 231         return _default_encoder.encode(obj)
    232     if cls is None:
    233         cls = JSONEncoder

Any ideas? Will this require some changes in cloudprovider?

kumarprabhu1988 avatar Nov 04 '22 05:11 kumarprabhu1988

@kumarprabhu1988 I think the cls need to be a string, and it probably needs to be "distributed.nanny.Nanny".

jacobtomlinson avatar Nov 04 '22 15:11 jacobtomlinson