distributed icon indicating copy to clipboard operation
distributed copied to clipboard

asyncio exception when monitoring distributed compute

Open data2code opened this issue 2 years ago • 3 comments

Describe the issue: In the example code below, I try to monitor the progress of a cluster compute. It does return the right results, but there is always an exception as shown below. This exception cannot be suppressed by try/except wrapper I add.

(1, 1, 1)
2023-11-30 21:27:09,396 - distributed.scheduler - ERROR -
Traceback (most recent call last):
  File "/miniconda3/envs/X/lib/python3.9/site-packages/distributed/utils.py", line 832, in wrapper
    return await func(*args, **kwargs)
  File "/miniconda3/envs/X/lib/python3.9/site-packages/distributed/scheduler.py", line 7246, in feed
    await asyncio.sleep(interval)
  File "/miniconda3/envs/X/lib/python3.9/asyncio/tasks.py", line 652, in sleep
    return await future
asyncio.exceptions.CancelledError

Minimal Complete Verifiable Example:

from time import sleep
import dask
from dask.distributed import Client,progress
from dask_jobqueue import SGECluster
from dask import compute, persist, delayed

@dask.delayed
def f():
    sleep(1)
    return 1

cluster = SGECluster(cores=1, memory='1GB')
cluster.scale(1)
c = Client(cluster)
out=[ f() ]*3

try:
    x=persist(*out)
    progress(x)
    out=compute(x)
except:
    print("error")

c.close()
print(out[0])

Anything else we need to know?:

Environment:

  • Dask version: 2023.11.0
  • Python version: 3.9.18
  • Operating System: Red Hat Enterprise Linux release 8.5 (Ootpa)
  • Install method (conda, pip, source): mamba install -c conda-forge dask mamba install -c conda-forge dask-jobqueue

data2code avatar Dec 01 '23 05:12 data2code

I tried this barebone version, still got the exception that I cannot catch and cannot suppress. I have also reinstalled dask: pip3 install "dask[complete]" --upgrade --force-reinstall

Really appreciate your insights.

from time import sleep
from dask.distributed import Client,progress
from dask_jobqueue import SGECluster

def f(x):
    sleep(1)
    return 1

cluster = SGECluster(cores=1, memory='1GB')
cluster.scale(3)
c = Client(cluster)

x=c.map(f, range(3))
progress(x)
out=c.gather(x)

c.close()
print(out)

Output:

[1, 1, 1]
2023-12-01 14:14:52,476 - distributed.scheduler - ERROR -
Traceback (most recent call last):
  File "/miniconda3/envs/X/lib/python3.9/site-packages/distributed/utils.py", line 832, in wrapper
    return await func(*args, **kwargs)
  File "/miniconda3/envs/X/lib/python3.9/site-packages/distributed/scheduler.py", line 7246, in feed
    await asyncio.sleep(interval)
  File "/miniconda3/envs/X/lib/python3.9/asyncio/tasks.py", line 652, in sleep
    return await future
asyncio.exceptions.CancelledError

data2code avatar Dec 01 '23 22:12 data2code

I just gave this a go with the latest version of distributed and a slightly simplified reproducer and things are different but still very noisy.

# test.py
from time import sleep
from dask.distributed import Client


def f(x):
    sleep(1)
    return x


if __name__ == "__main__":
    with Client() as c:
        x = c.map(f, range(3))
        out = c.gather(x)

    print(out)
$ python test.py
2025-02-07 10:30:06,069 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
  File "/home/jtomlinson/Projects/dask/distributed/distributed/comm/tcp.py", line 226, in read
    frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/jtomlinson/Projects/dask/distributed/distributed/worker.py", line 1269, in heartbeat
    response = await retry_operation(
               ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jtomlinson/Projects/dask/distributed/distributed/utils_comm.py", line 416, in retry_operation
    return await retry(
           ^^^^^^^^^^^^
  File "/home/jtomlinson/Projects/dask/distributed/distributed/utils_comm.py", line 395, in retry
    return await coro()
           ^^^^^^^^^^^^
  File "/home/jtomlinson/Projects/dask/distributed/distributed/core.py", line 1259, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jtomlinson/Projects/dask/distributed/distributed/core.py", line 1018, in send_recv
    response = await comm.read(deserializers=deserializers)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jtomlinson/Projects/dask/distributed/distributed/comm/tcp.py", line 237, in read
    convert_stream_closed_error(self, e)
  File "/home/jtomlinson/Projects/dask/distributed/distributed/comm/tcp.py", line 137, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.heartbeat_worker local=tcp://127.0.0.1:32998 remote=tcp://127.0.0.1:38203>: Stream is closed
2025-02-07 10:30:06,070 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
  File "/home/jtomlinson/Projects/dask/distributed/distributed/comm/tcp.py", line 226, in read
    frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/jtomlinson/Projects/dask/distributed/distributed/worker.py", line 1269, in heartbeat
    response = await retry_operation(
               ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jtomlinson/Projects/dask/distributed/distributed/utils_comm.py", line 416, in retry_operation
    return await retry(
           ^^^^^^^^^^^^
  File "/home/jtomlinson/Projects/dask/distributed/distributed/utils_comm.py", line 395, in retry
    return await coro()
           ^^^^^^^^^^^^
  File "/home/jtomlinson/Projects/dask/distributed/distributed/core.py", line 1259, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jtomlinson/Projects/dask/distributed/distributed/core.py", line 1018, in send_recv
    response = await comm.read(deserializers=deserializers)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jtomlinson/Projects/dask/distributed/distributed/comm/tcp.py", line 237, in read
    convert_stream_closed_error(self, e)
  File "/home/jtomlinson/Projects/dask/distributed/distributed/comm/tcp.py", line 137, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.heartbeat_worker local=tcp://127.0.0.1:32982 remote=tcp://127.0.0.1:38203>: Stream is closed
[1, 1, 1]

jacobtomlinson avatar Feb 07 '25 10:02 jacobtomlinson

Reducing or removing the sleep inside f() makes the problem go away. Replacing the sleep with slow computation like a for loop with 10M iterations also reproduces the problem.

jacobtomlinson avatar Feb 07 '25 10:02 jacobtomlinson