dask-yarn icon indicating copy to clipboard operation
dask-yarn copied to clipboard

distributed 2022.3.0 no more compatible with dask-yarn because of missing "status" attribute in YarnCluster

Open NHanser opened this issue 3 years ago • 7 comments

What happened: error while creating Client object

What you expected to happen: correct init

Minimal Complete Verifiable Example:

from dask_yarn import YarnCluster
from dask.distributed import Client
# Create a cluster
cluster = YarnCluster()
# Connect to the cluster
client = Client(cluster)

Anything else we need to know?:

Environment:

  • Dask version: 2022.3.0
  • Python version: 3.8.X
  • Operating System: Linux
  • Install method (conda, pip, source): pip

NHanser avatar Mar 30 '22 14:03 NHanser

Can you provide the traceback of the error you saw?

jcrist avatar Mar 30 '22 14:03 jcrist

AttributeError                            Traceback (most recent call last)
Input In [2], in <cell line: 7>()
      3 from dask import dataframe as dd
      6 cluster = YarnCluster()
----> 7 client = Client(cluster)
      8 cluster

File ~/dask_tests/daskenv/lib64/python3.8/site-packages/distributed/client.py:834, in Client.__init__(self, address, loop, timeout, set_as_default, scheduler_file, security, asynchronous, name, heartbeat_interval, serializers, deserializers, extensions, direct_to_workers, connection_limit, **kwargs)
    831 elif isinstance(getattr(address, "scheduler_address", None), str):
    832     # It's a LocalCluster or LocalCluster-compatible object
    833     self.cluster = address
--> 834     status = getattr(self.cluster, "status")
    835     if status and status in [Status.closed, Status.closing]:
    836         raise RuntimeError(
    837             f"Trying to connect to an already closed or closing Cluster {self.cluster}."
    838         )

AttributeError: 'YarnCluster' object has no attribute 'status'

NHanser avatar Mar 30 '22 14:03 NHanser

I've noticed this error popping up in other places too.

jacobtomlinson avatar Mar 31 '22 13:03 jacobtomlinson

I have same issue.

yup111 avatar May 09 '22 13:05 yup111

Same issue using dask/distributed 2022.6.1 with dask-yarn 0.9.

jtrive84 avatar Jun 29 '22 20:06 jtrive84

As per newer implementations in distributed we need to extend SpecCluster to implement resource managers like yarn.

Minimal example I hacked together:

import skein
from distributed import SpecCluster
from distributed.deploy import ProcessInterface


class YarnProcess(ProcessInterface):
    def __init__(self, **kwargs):
        super().__init__()
        self.service_name = None
        self.cli: skein.ApplicationClient = skein.ApplicationClient.from_current()
        self.container = None
        _ = kwargs

    async def close(self):
        self.cli.kill_container(self.container)
        await super().close()


class DaskYarnScheduler(YarnProcess):
    def __init__(self, **kwargs):
        super().__init__()
        self.service_name: str = "dask.scheduler"
        self.container = None
        _ = kwargs

    async def start(self):
        self.cli.scale(self.service_name, count=1)
        self.address = self.cli.kv.wait("dask.scheduler").decode()
        self.container = self.cli.get_containers(services=[self.service_name])[0].id
        await super().start()


class DaskYarnWorker(YarnProcess):
    def __init__(self, address, **kwargs):
        super().__init__()
        self.service_name: str = "dask.worker"
        _ = kwargs, address

    async def start(self):
        self.container = self.cli.add_container(self.service_name).id
        await super().start()


class YarnCluster(SpecCluster):
    def __init__(self, security=None):
        super().__init__(
            scheduler={"cls": DaskYarnScheduler, "options": {}},
            worker={"cls": DaskYarnWorker, "options": {}},
            security=security,
        )
        self.spec = skein.ApplicationClient.from_current().get_specification()

cluster = YarnCluster()
client = Client(cluster.scheduler_address)

santosh-d3vpl3x avatar Aug 30 '22 10:08 santosh-d3vpl3x

@santosh-d3vpl3x - can you please review #162 to make sure that I'm implementing correctly?

@bradmiro - I could use some help executing the tests and sanity checking these changes.

cjac avatar Aug 10 '24 00:08 cjac