distributed icon indicating copy to clipboard operation
distributed copied to clipboard

StreamClosedError when calling client.shutdown()

Open kevinphs opened this issue 10 months ago • 4 comments

Describe the issue:

I'm trying to run Dask in a SageMaker training/processing job and everything is working fine except that I'm getting an error in the scheduler and workers when calling client.shutdown()

Worker Error:

2025-03-27 00:16:54,367 - distributed.worker - INFO - Stopping worker at tcp://10.0.146.252:43319. Reason: scheduler-close
2025-03-27 00:16:54,367 - distributed.worker - INFO - Removing Worker plugin shuffle
2025-03-27 00:16:54,368 - distributed.batched - INFO - Batched Comm Closed <TCP (closed) Worker->Scheduler local=tcp://10.0.146.252:32892 remote=tcp://algo-2:8786>
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/distributed/comm/tcp.py", line 298, in write
    raise StreamClosedError()
tornado.iostream.StreamClosedError: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/distributed/batched.py", line 115, in _background_send
    nbytes = yield coro
             ^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/tornado/gen.py", line 766, in run
    value = future.result()
            ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/distributed/comm/tcp.py", line 308, in write
    convert_stream_closed_error(self, e)
  File "/usr/local/lib/python3.12/site-packages/distributed/comm/tcp.py", line 137, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Worker->Scheduler local=tcp://10.0.146.252:32892 remote=tcp://algo-2:8786>: Stream is closed
2025-03-27 00:16:54,372 - distributed.core - INFO - Received 'close-stream' from tcp://algo-2:8786; closing.

Scheduler Error:

2025-03-27 00:16:54,365 - distributed.scheduler - INFO - Closing scheduler. Reason: unknown
2025-03-27 00:16:54,366 - distributed.scheduler - INFO - Scheduler closing all comms
2025-03-27 00:16:54,367 - distributed.core - INFO - Connection to tcp://10.0.172.211:47714 has been closed.
2025-03-27 00:16:54,367 - distributed.scheduler - INFO - Remove worker addr: tcp://10.0.172.211:46717 name: tcp://10.0.172.211:46717 (stimulus_id='handle-worker-cleanup-1743034614.3673875')
2025-03-27 00:16:54,367 - distributed.scheduler - WARNING - Removing worker 'tcp://10.0.172.211:46717' caused the cluster to lose already computed task(s), which will be recomputed elsewhere: {'compute-370249c7007c98cd91d71a31c040470f', 'compute-09b07314d50d8280d4975166465796dd', 'compute-3e7069bd725a0d319b3c4f74619d1a4c', 'compute-a026990cf4b6bd2562eb91ae24fd9257', 'compute-634bdcb70319d9d55105495cf828dc9f'} (stimulus_id='handle-worker-cleanup-1743034614.3673875')
2025-03-27 00:16:54,368 - distributed.core - INFO - Connection to tcp://10.0.146.252:32892 has been closed.
2025-03-27 00:16:54,368 - distributed.scheduler - INFO - Remove worker addr: tcp://10.0.146.252:43319 name: tcp://10.0.146.252:43319 (stimulus_id='handle-worker-cleanup-1743034614.3685637')
2025-03-27 00:16:54,368 - distributed.scheduler - WARNING - Removing worker 'tcp://10.0.146.252:43319' caused the cluster to lose already computed task(s), which will be recomputed elsewhere: {'compute-a298a11d0634ccbe5bd1502400ad2a94', 'compute-e5c305c5a3f20519438085130e73ff4d', 'compute-ab4f324c0dbfa011c899b004ba886942', 'compute-85c7289d7ac8c7d7dfaf09bc8546e9ef', 'compute-2de9831faeee24c2827286de6550358d'} (stimulus_id='handle-worker-cleanup-1743034614.3685637')
2025-03-27 00:16:54,369 - distributed.scheduler - INFO - Lost all workers
2025-03-27 00:16:54,370 - distributed.batched - INFO - Batched Comm Closed <TCP (closed) Scheduler->Client local=tcp://10.0.185.14:8786 remote=tcp://10.0.162.85:34804>
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/distributed/batched.py", line 115, in _background_send
    nbytes = yield coro
             ^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/tornado/gen.py", line 766, in run
    value = future.result()
            ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/distributed/comm/tcp.py", line 263, in write
    raise CommClosedError()
distributed.comm.core.CommClosedError

Minimal Complete Verifiable Example:

import asyncio
import json
import os
import time
from typing import Optional

from dask.distributed import Client
from distributed import Scheduler, Worker

SCHEDULER_PORT = 8786


def setup_dask_cluster() -> Optional[Client]:
    hosts = json.loads(os.environ["SM_HOSTS"])
    current_host = os.environ["SM_CURRENT_HOST"]

    client_host, scheduler_host = hosts[0], hosts[1]
    scheduler_address = f"tcp://{scheduler_host}:{SCHEDULER_PORT}"

    if current_host == client_host:
        client = Client(scheduler_address)
        client.wait_for_workers(len(hosts) - 2, timeout=30)
        return client

    async def run_scheduler():
        async with Scheduler(host=scheduler_host, port=SCHEDULER_PORT) as scheduler:
            await scheduler.finished()

    async def run_worker():
        async with Worker(scheduler_address) as worker:
            await worker.finished()

    asyncio.run(run_scheduler() if current_host == scheduler_host else run_worker())


def main():
    client = setup_dask_cluster()
    if not client:
        return  # Scheduler or worker node

    def compute(x):
        time.sleep(1)
        return 2 * x

    futures = client.map(compute, range(10))
    results = client.gather(futures)
    print(sum(results))

    client.shutdown()


if __name__ == '__main__':
    main()

Anything else we need to know?:

Environment: AWS SageMaker processing job

  • Dask version: 2025.3.0
  • Python version: 3.12
  • Operating System: Linux
  • Install method (conda, pip, source): pip

kevinphs avatar Mar 27 '25 00:03 kevinphs

I'm curious why you are using Scheduler and Worker directly, this is typically an advanced use case. For the use case you shared above I would recommend using LocalCluster instead.

jacobtomlinson avatar Mar 31 '25 10:03 jacobtomlinson

@jacobtomlinson I need to be able to scale these jobs horizontally. The docs indicate LocalCluster is only intended to run on a single machine?

kevinphs avatar Apr 11 '25 22:04 kevinphs

Ah sure, your example doesn't make it clear that you're running on multiple nodes.

Looking at your logs it seems the worker is being killed. You may want to try using the Nanny instead of the Worker class, which handles the lifecycle of worker processes. But it looks like something in shutting your worker down, perhaps you job is completing when you don't expect it to?

jacobtomlinson avatar Apr 14 '25 10:04 jacobtomlinson

There is a case where a df is persisted using .persist() on workers, and when .close() or .scale(1) is called, some workers exit earlier than others. As a result, I get the following error: distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat. del df0 helps

noreentry avatar May 04 '25 22:05 noreentry