asyncio exception when monitoring distributed compute
Describe the issue: In the example code below, I try to monitor the progress of a cluster compute. It does return the right results, but there is always an exception as shown below. This exception cannot be suppressed by try/except wrapper I add.
(1, 1, 1)
2023-11-30 21:27:09,396 - distributed.scheduler - ERROR -
Traceback (most recent call last):
File "/miniconda3/envs/X/lib/python3.9/site-packages/distributed/utils.py", line 832, in wrapper
return await func(*args, **kwargs)
File "/miniconda3/envs/X/lib/python3.9/site-packages/distributed/scheduler.py", line 7246, in feed
await asyncio.sleep(interval)
File "/miniconda3/envs/X/lib/python3.9/asyncio/tasks.py", line 652, in sleep
return await future
asyncio.exceptions.CancelledError
Minimal Complete Verifiable Example:
from time import sleep
import dask
from dask.distributed import Client,progress
from dask_jobqueue import SGECluster
from dask import compute, persist, delayed
@dask.delayed
def f():
sleep(1)
return 1
cluster = SGECluster(cores=1, memory='1GB')
cluster.scale(1)
c = Client(cluster)
out=[ f() ]*3
try:
x=persist(*out)
progress(x)
out=compute(x)
except:
print("error")
c.close()
print(out[0])
Anything else we need to know?:
Environment:
- Dask version: 2023.11.0
- Python version: 3.9.18
- Operating System: Red Hat Enterprise Linux release 8.5 (Ootpa)
- Install method (conda, pip, source): mamba install -c conda-forge dask mamba install -c conda-forge dask-jobqueue
I tried this barebone version, still got the exception that I cannot catch and cannot suppress. I have also reinstalled dask: pip3 install "dask[complete]" --upgrade --force-reinstall
Really appreciate your insights.
from time import sleep
from dask.distributed import Client,progress
from dask_jobqueue import SGECluster
def f(x):
sleep(1)
return 1
cluster = SGECluster(cores=1, memory='1GB')
cluster.scale(3)
c = Client(cluster)
x=c.map(f, range(3))
progress(x)
out=c.gather(x)
c.close()
print(out)
Output:
[1, 1, 1]
2023-12-01 14:14:52,476 - distributed.scheduler - ERROR -
Traceback (most recent call last):
File "/miniconda3/envs/X/lib/python3.9/site-packages/distributed/utils.py", line 832, in wrapper
return await func(*args, **kwargs)
File "/miniconda3/envs/X/lib/python3.9/site-packages/distributed/scheduler.py", line 7246, in feed
await asyncio.sleep(interval)
File "/miniconda3/envs/X/lib/python3.9/asyncio/tasks.py", line 652, in sleep
return await future
asyncio.exceptions.CancelledError
I just gave this a go with the latest version of distributed and a slightly simplified reproducer and things are different but still very noisy.
# test.py
from time import sleep
from dask.distributed import Client
def f(x):
sleep(1)
return x
if __name__ == "__main__":
with Client() as c:
x = c.map(f, range(3))
out = c.gather(x)
print(out)
$ python test.py
2025-02-07 10:30:06,069 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
File "/home/jtomlinson/Projects/dask/distributed/distributed/comm/tcp.py", line 226, in read
frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
tornado.iostream.StreamClosedError: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/jtomlinson/Projects/dask/distributed/distributed/worker.py", line 1269, in heartbeat
response = await retry_operation(
^^^^^^^^^^^^^^^^^^^^^^
File "/home/jtomlinson/Projects/dask/distributed/distributed/utils_comm.py", line 416, in retry_operation
return await retry(
^^^^^^^^^^^^
File "/home/jtomlinson/Projects/dask/distributed/distributed/utils_comm.py", line 395, in retry
return await coro()
^^^^^^^^^^^^
File "/home/jtomlinson/Projects/dask/distributed/distributed/core.py", line 1259, in send_recv_from_rpc
return await send_recv(comm=comm, op=key, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/jtomlinson/Projects/dask/distributed/distributed/core.py", line 1018, in send_recv
response = await comm.read(deserializers=deserializers)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/jtomlinson/Projects/dask/distributed/distributed/comm/tcp.py", line 237, in read
convert_stream_closed_error(self, e)
File "/home/jtomlinson/Projects/dask/distributed/distributed/comm/tcp.py", line 137, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.heartbeat_worker local=tcp://127.0.0.1:32998 remote=tcp://127.0.0.1:38203>: Stream is closed
2025-02-07 10:30:06,070 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
File "/home/jtomlinson/Projects/dask/distributed/distributed/comm/tcp.py", line 226, in read
frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
tornado.iostream.StreamClosedError: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/jtomlinson/Projects/dask/distributed/distributed/worker.py", line 1269, in heartbeat
response = await retry_operation(
^^^^^^^^^^^^^^^^^^^^^^
File "/home/jtomlinson/Projects/dask/distributed/distributed/utils_comm.py", line 416, in retry_operation
return await retry(
^^^^^^^^^^^^
File "/home/jtomlinson/Projects/dask/distributed/distributed/utils_comm.py", line 395, in retry
return await coro()
^^^^^^^^^^^^
File "/home/jtomlinson/Projects/dask/distributed/distributed/core.py", line 1259, in send_recv_from_rpc
return await send_recv(comm=comm, op=key, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/jtomlinson/Projects/dask/distributed/distributed/core.py", line 1018, in send_recv
response = await comm.read(deserializers=deserializers)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/jtomlinson/Projects/dask/distributed/distributed/comm/tcp.py", line 237, in read
convert_stream_closed_error(self, e)
File "/home/jtomlinson/Projects/dask/distributed/distributed/comm/tcp.py", line 137, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.heartbeat_worker local=tcp://127.0.0.1:32982 remote=tcp://127.0.0.1:38203>: Stream is closed
[1, 1, 1]
Reducing or removing the sleep inside f() makes the problem go away. Replacing the sleep with slow computation like a for loop with 10M iterations also reproduces the problem.