StreamClosedError when calling client.shutdown()
Describe the issue:
I'm trying to run Dask in a SageMaker training/processing job and everything is working fine except that I'm getting an error in the scheduler and workers when calling client.shutdown()
Worker Error:
2025-03-27 00:16:54,367 - distributed.worker - INFO - Stopping worker at tcp://10.0.146.252:43319. Reason: scheduler-close
2025-03-27 00:16:54,367 - distributed.worker - INFO - Removing Worker plugin shuffle
2025-03-27 00:16:54,368 - distributed.batched - INFO - Batched Comm Closed <TCP (closed) Worker->Scheduler local=tcp://10.0.146.252:32892 remote=tcp://algo-2:8786>
Traceback (most recent call last):
File "/usr/local/lib/python3.12/site-packages/distributed/comm/tcp.py", line 298, in write
raise StreamClosedError()
tornado.iostream.StreamClosedError: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.12/site-packages/distributed/batched.py", line 115, in _background_send
nbytes = yield coro
^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/tornado/gen.py", line 766, in run
value = future.result()
^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/distributed/comm/tcp.py", line 308, in write
convert_stream_closed_error(self, e)
File "/usr/local/lib/python3.12/site-packages/distributed/comm/tcp.py", line 137, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Worker->Scheduler local=tcp://10.0.146.252:32892 remote=tcp://algo-2:8786>: Stream is closed
2025-03-27 00:16:54,372 - distributed.core - INFO - Received 'close-stream' from tcp://algo-2:8786; closing.
Scheduler Error:
2025-03-27 00:16:54,365 - distributed.scheduler - INFO - Closing scheduler. Reason: unknown
2025-03-27 00:16:54,366 - distributed.scheduler - INFO - Scheduler closing all comms
2025-03-27 00:16:54,367 - distributed.core - INFO - Connection to tcp://10.0.172.211:47714 has been closed.
2025-03-27 00:16:54,367 - distributed.scheduler - INFO - Remove worker addr: tcp://10.0.172.211:46717 name: tcp://10.0.172.211:46717 (stimulus_id='handle-worker-cleanup-1743034614.3673875')
2025-03-27 00:16:54,367 - distributed.scheduler - WARNING - Removing worker 'tcp://10.0.172.211:46717' caused the cluster to lose already computed task(s), which will be recomputed elsewhere: {'compute-370249c7007c98cd91d71a31c040470f', 'compute-09b07314d50d8280d4975166465796dd', 'compute-3e7069bd725a0d319b3c4f74619d1a4c', 'compute-a026990cf4b6bd2562eb91ae24fd9257', 'compute-634bdcb70319d9d55105495cf828dc9f'} (stimulus_id='handle-worker-cleanup-1743034614.3673875')
2025-03-27 00:16:54,368 - distributed.core - INFO - Connection to tcp://10.0.146.252:32892 has been closed.
2025-03-27 00:16:54,368 - distributed.scheduler - INFO - Remove worker addr: tcp://10.0.146.252:43319 name: tcp://10.0.146.252:43319 (stimulus_id='handle-worker-cleanup-1743034614.3685637')
2025-03-27 00:16:54,368 - distributed.scheduler - WARNING - Removing worker 'tcp://10.0.146.252:43319' caused the cluster to lose already computed task(s), which will be recomputed elsewhere: {'compute-a298a11d0634ccbe5bd1502400ad2a94', 'compute-e5c305c5a3f20519438085130e73ff4d', 'compute-ab4f324c0dbfa011c899b004ba886942', 'compute-85c7289d7ac8c7d7dfaf09bc8546e9ef', 'compute-2de9831faeee24c2827286de6550358d'} (stimulus_id='handle-worker-cleanup-1743034614.3685637')
2025-03-27 00:16:54,369 - distributed.scheduler - INFO - Lost all workers
2025-03-27 00:16:54,370 - distributed.batched - INFO - Batched Comm Closed <TCP (closed) Scheduler->Client local=tcp://10.0.185.14:8786 remote=tcp://10.0.162.85:34804>
Traceback (most recent call last):
File "/usr/local/lib/python3.12/site-packages/distributed/batched.py", line 115, in _background_send
nbytes = yield coro
^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/tornado/gen.py", line 766, in run
value = future.result()
^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/distributed/comm/tcp.py", line 263, in write
raise CommClosedError()
distributed.comm.core.CommClosedError
Minimal Complete Verifiable Example:
import asyncio
import json
import os
import time
from typing import Optional
from dask.distributed import Client
from distributed import Scheduler, Worker
SCHEDULER_PORT = 8786
def setup_dask_cluster() -> Optional[Client]:
hosts = json.loads(os.environ["SM_HOSTS"])
current_host = os.environ["SM_CURRENT_HOST"]
client_host, scheduler_host = hosts[0], hosts[1]
scheduler_address = f"tcp://{scheduler_host}:{SCHEDULER_PORT}"
if current_host == client_host:
client = Client(scheduler_address)
client.wait_for_workers(len(hosts) - 2, timeout=30)
return client
async def run_scheduler():
async with Scheduler(host=scheduler_host, port=SCHEDULER_PORT) as scheduler:
await scheduler.finished()
async def run_worker():
async with Worker(scheduler_address) as worker:
await worker.finished()
asyncio.run(run_scheduler() if current_host == scheduler_host else run_worker())
def main():
client = setup_dask_cluster()
if not client:
return # Scheduler or worker node
def compute(x):
time.sleep(1)
return 2 * x
futures = client.map(compute, range(10))
results = client.gather(futures)
print(sum(results))
client.shutdown()
if __name__ == '__main__':
main()
Anything else we need to know?:
Environment: AWS SageMaker processing job
- Dask version: 2025.3.0
- Python version: 3.12
- Operating System: Linux
- Install method (conda, pip, source): pip
I'm curious why you are using Scheduler and Worker directly, this is typically an advanced use case. For the use case you shared above I would recommend using LocalCluster instead.
@jacobtomlinson I need to be able to scale these jobs horizontally. The docs indicate LocalCluster is only intended to run on a single machine?
Ah sure, your example doesn't make it clear that you're running on multiple nodes.
Looking at your logs it seems the worker is being killed. You may want to try using the Nanny instead of the Worker class, which handles the lifecycle of worker processes. But it looks like something in shutting your worker down, perhaps you job is completing when you don't expect it to?
There is a case where a df is persisted using .persist() on workers, and when .close() or .scale(1) is called, some workers exit earlier than others. As a result, I get the following error: distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat. del df0 helps