Work stealing causes a sudden death of workers without any informative traceback.
Describe the issue:
I'm sorry I can't share the code (it is confidential) but I'll try to describe the 'crash' scenario as best I can.
I'll start with brief data description:
Data description:
- ~10k parquet files
- ~10k records per file (this results in ~100e6 records in total)
- each file has 10 columns tops, at most 6 columns are loaded at once
- files are stored in S3
100e6 records might seem like quite small data set but our process is a bit RAM hungry, e.g. to process 2e6 records we need 2 x ml.m5.xlarge SageMaker machines.
We first faced this problem when we tries to process all records at once (100e6 records stored across ~10k partitions). We used following cluster setups (we also start a worker on the node which runs the scheduler):
- 10 x ml.m5.4xlarge (640 GBs of RAM and additional 100 GBs of disk space per worker)
- 4 x ml.m4.16xlarge (1280 GBs of RAM and additional 100 GBs of disk space per worker)
Brief process description:
- we spawn a SageMaker cluster with custom image and fully replicated data on each worker
- we load parquet data -> we use delayed API for this so dask will decide which files to load on which worker and files will be loaded 'locally' without pushing data through master. This results in the same number of partitions as files
- we compute some stats on the input data needed for preprocessing
- we perform preprocessing with stats from 3.
- we train XGBoost model on preprocessed data
For loading, stats computation and preprocessing DataFrame and Bag APIs are used. Somewhere during stages 3. and 4. workers start dying without any traceback other than asyncio or TCP connection timeouts / errors if work stealing is enabled:
Example logs from 4 x ml.m4.16xlarge cluster:
Logs from master:
2023-01-31T03:23:58.374+01:00
2023-01-31 02:23:58,347 - distributed.worker - ERROR - failed during get data with tcp://10.0.101.87:42561 -> tcp://10.0.90.223:34087
2023-01-31 02:23:58,347 - distributed.worker - ERROR - failed during get data with tcp://10.0.101.87:42561 -> tcp://10.0.90.223:34087
2023-01-31T03:23:58.374+01:00
Traceback (most recent call last):
File "/code/venv/lib/python3.8/site-packages/tornado/iostream.py", line 869, in _read_to_buffer
bytes_read = self.read_from_fd(buf)
File "/code/venv/lib/python3.8/site-packages/tornado/iostream.py", line 1138, in read_from_fd
return self.socket.recv_into(buf, len(buf))
Traceback (most recent call last): File "/code/venv/lib/python3.8/site-packages/tornado/iostream.py", line 869, in _read_to_buffer bytes_read = self.read_from_fd(buf) File "/code/venv/lib/python3.8/site-packages/tornado/iostream.py", line 1138, in read_from_fd return self.socket.recv_into(buf, len(buf))
2023-01-31T03:23:58.374+01:00
TimeoutError: [Errno 110] Connection timed out
TimeoutError: [Errno 110] Connection timed out
2023-01-31T03:23:58.374+01:00
The above exception was the direct cause of the following exception:
The above exception was the direct cause of the following exception:
2023-01-31T03:23:58.374+01:00
Traceback (most recent call last):
File "/code/venv/lib/python3.8/site-packages/distributed/worker.py", line 1767, in get_data
response = await comm.read(deserializers=serializers)
File "/code/venv/lib/python3.8/site-packages/distributed/comm/tcp.py", line 241, in read
convert_stream_closed_error(self, e)
File "/code/venv/lib/python3.8/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
Traceback (most recent call last): File "/code/venv/lib/python3.8/site-packages/distributed/worker.py", line 1767, in get_data response = await comm.read(deserializers=serializers) File "/code/venv/lib/python3.8/site-packages/distributed/comm/tcp.py", line 241, in read convert_stream_closed_error(self, e) File "/code/venv/lib/python3.8/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
2023-01-31T03:23:58.374+01:00
distributed.comm.core.CommClosedError: in <TCP (closed) local=tcp://10.0.101.87:42561 remote=tcp://10.0.90.223:42906>: TimeoutError: [Errno 110] Connection timed out
distributed.comm.core.CommClosedError: in <TCP (closed) local=tcp://10.0.101.87:42561 remote=tcp://10.0.90.223:42906>: TimeoutError: [Errno 110] Connection timed out
2023-01-31T03:23:58.374+01:00
2023-01-31 02:23:58,348 - distributed.core - INFO - Lost connection to 'tcp://10.0.90.223:42906'
2023-01-31 02:23:58,348 - distributed.core - INFO - Lost connection to 'tcp://10.0.90.223:42906'
2023-01-31T03:23:58.374+01:00
Traceback (most recent call last):
File "/code/venv/lib/python3.8/site-packages/tornado/iostream.py", line 869, in _read_to_buffer
bytes_read = self.read_from_fd(buf)
File "/code/venv/lib/python3.8/site-packages/tornado/iostream.py", line 1138, in read_from_fd
return self.socket.recv_into(buf, len(buf))
Traceback (most recent call last): File "/code/venv/lib/python3.8/site-packages/tornado/iostream.py", line 869, in _read_to_buffer bytes_read = self.read_from_fd(buf) File "/code/venv/lib/python3.8/site-packages/tornado/iostream.py", line 1138, in read_from_fd return self.socket.recv_into(buf, len(buf))
2023-01-31T03:23:58.374+01:00
TimeoutError: [Errno 110] Connection timed out
TimeoutError: [Errno 110] Connection timed out
2023-01-31T03:23:58.374+01:00
The above exception was the direct cause of the following exception:
The above exception was the direct cause of the following exception:
2023-01-31T03:23:58.374+01:00
Traceback (most recent call last):
File "/code/venv/lib/python3.8/site-packages/distributed/core.py", line 820, in _handle_comm
result = await result
File "/code/venv/lib/python3.8/site-packages/distributed/worker.py", line 1767, in get_data
response = await comm.read(deserializers=serializers)
File "/code/venv/lib/python3.8/site-packages/distributed/comm/tcp.py", line 241, in read
convert_stream_closed_error(self, e)
File "/code/venv/lib/python3.8/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
Traceback (most recent call last): File "/code/venv/lib/python3.8/site-packages/distributed/core.py", line 820, in _handle_comm result = await result File "/code/venv/lib/python3.8/site-packages/distributed/worker.py", line 1767, in get_data response = await comm.read(deserializers=serializers) File "/code/venv/lib/python3.8/site-packages/distributed/comm/tcp.py", line 241, in read convert_stream_closed_error(self, e) File "/code/venv/lib/python3.8/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
2023-01-31T03:23:58.375+01:00
distributed.comm.core.CommClosedError: in <TCP (closed) local=tcp://10.0.101.87:42561 remote=tcp://10.0.90.223:42906>: TimeoutError: [Errno 110] Connection timed out
distributed.comm.core.CommClosedError: in <TCP (closed) local=tcp://10.0.101.87:42561 remote=tcp://10.0.90.223:42906>: TimeoutError: [Errno 110] Connection timed out
Log from worker:
2023-01-31 02:23:58,002 - distributed.utils_comm - INFO - Retrying get_data_from_worker after exception in attempt 0/10: in <TCP (closed) Ephemeral Worker->Worker for gather local=tcp://10.0.90.223:42906 remote=tcp://10.0.101.87:42561>: ConnectionResetError: [Errno 104] Connection reset by peer
Briefly after such message worker dies. Sometimes it is restarted and killed again by scheduler, sometimes it just dies. Container is stopped once worker is dead.
With work stealing disabled the problem never occurs. Increasing timeouts to 120 seconds or retry attempts to 10 does not help. Along with 'sudden death' problem an increased disk usage can be observed on other workers which attempt to complete the task (?). After first death other workers also die one after another with same errors.
I tried a couple of dask versions (including latest) - error always prevailed.
This seems to be a problem for large clusters and long running jobs / tasks (?) because it makes long runs which fail after quite some time e.g. 4 hours, costly.
Anything else we need to know?:
Unfortunately I can't provide any additional info. I tried to create a self-contained example of this behavior with small synthetic data set but i was unable to reproduce this crash.
Environment:
- Dask version: 2023.1.1
- Python version: 3.8
- Operating System: public.ecr.aws/lambda/python:3.8
- Install method (conda, pip, source): pip install during docker image build
Logs from master:
What is the "master" here? Are these logs from the scheduler? The logs suggest this is another worker.
With work stealing disabled the problem never occurs.
I strongly suspect that work stealing is just a red herring. the error you are seeing is a very generic connection failure. depending on where this connection failure occurs, we're closing the worker. That's intended behavior, the question is rather why the connection drops.
Are you executing anything that can hold the GIL? It may also be the case that one of your workers just runs out of memory. this would match the observation that after the first one died, others die as well because they are running OOM
Thank you for your fast response.
- Sorry for not being precise enough. By 'master' I mean a SageMaker node which runs scheduler. We also start a worker on this node. So the logs come from 'master`s' worker not the scheduler?
- I'm still trying to prepare a self contained, SageMaker based example. During the jobs we run the GIL is often held for quite some time (I've seen warnings stating that event loop is unresponsive for ~60 seconds)
Short GIL holding summary:
- work stealing disabled (successful run), 3 x 16xlarge -> worker event loop was unresponsive for 55 seconds at most
- work stealing enabled (failed run) 5 x 16xlarge -> worker event loop was unresponsive for 27 seconds at most
I am 100% sure we are not running out of RAM (each ml.m4.16xlarge has 64 vcores and 256 GB of RAM) - please find RAM and Disk usage charts attached (on the first 2 charts disk usage stays around 0%):
- for successful 3 x 16xlarge run:

- for failed 4 x 16xlarge runs: 2.1. Same code as 1. I terminated job manually once I saw one of the workers died (I pasted errors in the latter part of this message):

2.2. Slightly different code than 1. I terminated job manually once I saw one of the workers died (this job encountered TCP errors described in previous message):

I managed to dig up some more logs of most recent large cluster (4 x 16xlarge) failures. Logs come from the 'master' node (nose which runs both scheduler and a worker):
2023-01-30T13:30:39.729+01:00 2023-01-30 12:30:39,635 - distributed.scheduler - ERROR - Error transitioning "('truediv-fa20ba92c80874b53a919740e793cb6e', 0)" from 'processing' to 'memory'
2023-01-30T13:30:39.729+01:00
Traceback (most recent call last):
File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 1849, in _transition
recommendations, client_msgs, worker_msgs = func(
File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 2311, in transition_processing_memory
steal.recalculate_cost(tts)
File "/code/venv/lib/python3.8/site-packages/distributed/stealing.py", line 212, in recalculate_cost
self.put_key_in_stealable(ts)
File "/code/venv/lib/python3.8/site-packages/distributed/stealing.py", line 215, in put_key_in_stealable
cost_multiplier, level = self.steal_time_ratio(ts)
File "/code/venv/lib/python3.8/site-packages/distributed/stealing.py", line 257, in steal_time_ratio
assert ts in ts.processing_on.long_running
Traceback (most recent call last): File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 1849, in _transition recommendations, client_msgs, worker_msgs = func( File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 2311, in transition_processing_memory steal.recalculate_cost(tts) File "/code/venv/lib/python3.8/site-packages/distributed/stealing.py", line 212, in recalculate_cost self.put_key_in_stealable(ts) File "/code/venv/lib/python3.8/site-packages/distributed/stealing.py", line 215, in put_key_in_stealable cost_multiplier, level = self.steal_time_ratio(ts) File "/code/venv/lib/python3.8/site-packages/distributed/stealing.py", line 257, in steal_time_ratio assert ts in ts.processing_on.long_running
2023-01-30T13:30:39.729+01:00 AssertionError
2023-01-30T13:30:39.729+01:00 2023-01-30 12:30:39,636 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://10.0.248.78:43691', status: running, memory: 2051, processing: 1>
2023-01-30T13:30:39.730+01:00 2023-01-30 12:30:39,636 - distributed.core - INFO - Removing comms to tcp://10.0.248.78:43691
2023-01-30T13:30:42.731+01:00 2023-01-30 12:30:42,205 - distributed.core - ERROR -
2023-01-30T13:30:42.731+01:00
Traceback (most recent call last):
File "/code/venv/lib/python3.8/site-packages/distributed/utils.py", line 741, in wrapper
return await func(*args, **kwargs)
File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 4231, in add_worker
await self.handle_worker(comm, address)
File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 5470, in handle_worker
await self.handle_stream(comm=comm, extra={"worker": worker})
File "/code/venv/lib/python3.8/site-packages/distributed/core.py", line 904, in handle_stream
handler(**merge(extra, msg))
File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 5329, in handle_task_finished
r: tuple = self.stimulus_task_finished(
File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 4673, in stimulus_task_finished
r: tuple = self._transition(
File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 1849, in _transition
recommendations, client_msgs, worker_msgs = func(
File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 2311, in transition_processing_memory
steal.recalculate_cost(tts)
File "/code/venv/lib/python3.8/site-packages/distributed/stealing.py", line 212, in recalculate_cost
self.put_key_in_stealable(ts)
File "/code/venv/lib/python3.8/site-packages/distributed/stealing.py", line 215, in put_key_in_stealable
cost_multiplier, level = self.steal_time_ratio(ts)
File "/code/venv/lib/python3.8/site-packages/distributed/stealing.py", line 257, in steal_time_ratio
assert ts in ts.processing_on.long_running
Traceback (most recent call last): File "/code/venv/lib/python3.8/site-packages/distributed/utils.py", line 741, in wrapper return await func(*args, **kwargs) File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 4231, in add_worker await self.handle_worker(comm, address) File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 5470, in handle_worker await self.handle_stream(comm=comm, extra={"worker": worker}) File "/code/venv/lib/python3.8/site-packages/distributed/core.py", line 904, in handle_stream handler(**merge(extra, msg)) File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 5329, in handle_task_finished r: tuple = self.stimulus_task_finished( File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 4673, in stimulus_task_finished r: tuple = self._transition( File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 1849, in _transition recommendations, client_msgs, worker_msgs = func( File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 2311, in transition_processing_memory steal.recalculate_cost(tts) File "/code/venv/lib/python3.8/site-packages/distributed/stealing.py", line 212, in recalculate_cost self.put_key_in_stealable(ts) File "/code/venv/lib/python3.8/site-packages/distributed/stealing.py", line 215, in put_key_in_stealable cost_multiplier, level = self.steal_time_ratio(ts) File "/code/venv/lib/python3.8/site-packages/distributed/stealing.py", line 257, in steal_time_ratio assert ts in ts.processing_on.long_running
2023-01-30T13:30:42.731+01:00 AssertionError
2023-01-30T13:30:42.731+01:00 2023-01-30 12:30:42,205 - distributed.core - ERROR - Exception while handling op register-worker
2023-01-30T13:30:42.731+01:00
Traceback (most recent call last):
File "/code/venv/lib/python3.8/site-packages/distributed/core.py", line 820, in _handle_comm
result = await result
File "/code/venv/lib/python3.8/site-packages/distributed/utils.py", line 741, in wrapper
return await func(*args, **kwargs)
File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 4231, in add_worker
await self.handle_worker(comm, address)
File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 5470, in handle_worker
await self.handle_stream(comm=comm, extra={"worker": worker})
File "/code/venv/lib/python3.8/site-packages/distributed/core.py", line 904, in handle_stream
handler(**merge(extra, msg))
File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 5329, in handle_task_finished
r: tuple = self.stimulus_task_finished(
File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 4673, in stimulus_task_finished
r: tuple = self._transition(
File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 1849, in _transition
recommendations, client_msgs, worker_msgs = func(
File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 2311, in transition_processing_memory
steal.recalculate_cost(tts)
File "/code/venv/lib/python3.8/site-packages/distributed/stealing.py", line 212, in recalculate_cost
self.put_key_in_stealable(ts)
File "/code/venv/lib/python3.8/site-packages/distributed/stealing.py", line 215, in put_key_in_stealable
cost_multiplier, level = self.steal_time_ratio(ts)
File "/code/venv/lib/python3.8/site-packages/distributed/stealing.py", line 257, in steal_time_ratio
assert ts in ts.processing_on.long_running
Traceback (most recent call last): File "/code/venv/lib/python3.8/site-packages/distributed/core.py", line 820, in _handle_comm result = await result File "/code/venv/lib/python3.8/site-packages/distributed/utils.py", line 741, in wrapper return await func(*args, **kwargs) File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 4231, in add_worker await self.handle_worker(comm, address) File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 5470, in handle_worker await self.handle_stream(comm=comm, extra={"worker": worker}) File "/code/venv/lib/python3.8/site-packages/distributed/core.py", line 904, in handle_stream handler(**merge(extra, msg)) File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 5329, in handle_task_finished r: tuple = self.stimulus_task_finished( File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 4673, in stimulus_task_finished r: tuple = self._transition( File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 1849, in _transition recommendations, client_msgs, worker_msgs = func( File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 2311, in transition_processing_memory steal.recalculate_cost(tts) File "/code/venv/lib/python3.8/site-packages/distributed/stealing.py", line 212, in recalculate_cost self.put_key_in_stealable(ts) File "/code/venv/lib/python3.8/site-packages/distributed/stealing.py", line 215, in put_key_in_stealable cost_multiplier, level = self.steal_time_ratio(ts) File "/code/venv/lib/python3.8/site-packages/distributed/stealing.py", line 257, in steal_time_ratio assert ts in ts.processing_on.long_running
2023-01-30T13:30:42.731+01:00 AssertionError
2023-01-30T13:30:42.731+01:00 Task exception was never retrieved
2023-01-30T13:30:42.731+01:00 future: <Task finished name='Task-424' coro=<Server._handle_comm() done, defined at /code/venv/lib/python3.8/site-packages/distributed/core.py:726> exception=AssertionError()>
2023-01-30T13:30:42.732+01:00
Traceback (most recent call last):
File "/code/venv/lib/python3.8/site-packages/distributed/core.py", line 820, in _handle_comm
result = await result
File "/code/venv/lib/python3.8/site-packages/distributed/utils.py", line 741, in wrapper
return await func(*args, **kwargs)
File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 4231, in add_worker
await self.handle_worker(comm, address)
File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 5470, in handle_worker
await self.handle_stream(comm=comm, extra={"worker": worker})
File "/code/venv/lib/python3.8/site-packages/distributed/core.py", line 904, in handle_stream
handler(**merge(extra, msg))
File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 5329, in handle_task_finished
r: tuple = self.stimulus_task_finished(
File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 4673, in stimulus_task_finished
r: tuple = self._transition(
File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 1849, in _transition
recommendations, client_msgs, worker_msgs = func(
File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 2311, in transition_processing_memory
steal.recalculate_cost(tts)
File "/code/venv/lib/python3.8/site-packages/distributed/stealing.py", line 212, in recalculate_cost
self.put_key_in_stealable(ts)
File "/code/venv/lib/python3.8/site-packages/distributed/stealing.py", line 215, in put_key_in_stealable
cost_multiplier, level = self.steal_time_ratio(ts)
File "/code/venv/lib/python3.8/site-packages/distributed/stealing.py", line 257, in steal_time_ratio
assert ts in ts.processing_on.long_running
Traceback (most recent call last): File "/code/venv/lib/python3.8/site-packages/distributed/core.py", line 820, in _handle_comm result = await result File "/code/venv/lib/python3.8/site-packages/distributed/utils.py", line 741, in wrapper return await func(*args, **kwargs) File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 4231, in add_worker await self.handle_worker(comm, address) File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 5470, in handle_worker await self.handle_stream(comm=comm, extra={"worker": worker}) File "/code/venv/lib/python3.8/site-packages/distributed/core.py", line 904, in handle_stream handler(**merge(extra, msg)) File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 5329, in handle_task_finished r: tuple = self.stimulus_task_finished( File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 4673, in stimulus_task_finished r: tuple = self._transition( File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 1849, in _transition recommendations, client_msgs, worker_msgs = func( File "/code/venv/lib/python3.8/site-packages/distributed/scheduler.py", line 2311, in transition_processing_memory steal.recalculate_cost(tts) File "/code/venv/lib/python3.8/site-packages/distributed/stealing.py", line 212, in recalculate_cost self.put_key_in_stealable(ts) File "/code/venv/lib/python3.8/site-packages/distributed/stealing.py", line 215, in put_key_in_stealable cost_multiplier, level = self.steal_time_ratio(ts) File "/code/venv/lib/python3.8/site-packages/distributed/stealing.py", line 257, in steal_time_ratio assert ts in ts.processing_on.long_running
2023-01-30T13:30:42.732+01:00 AssertionError
2023-01-30T13:31:54.754+01:00 2023-01-30 12:31:54,534 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.28s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T13:32:02.757+01:00 2023-01-30 12:32:02,295 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.80s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T13:32:08.759+01:00 2023-01-30 12:32:08,721 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.29s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
For work stealing enabled I got multiple failures even for large clusters (for which available resources were far from being saturated). The only successful runs I got had work stealing disabled - this is why I thought that work stealing might be responsible.
assert ts in ts.processing_on.long_running
This assertionerror is interesting. Can you share what code you are running? You seem to be using a worker_client or secede and something appears to not work as intended.
This
orker event loop was unresponsive for
This message does not necessarily indicate that the GIL was held. It could also be a blocker event loop which is also bad but not as severe.
can you share logs of the dying workers?
Thank you for your question and sorry for my late response.
Here are logs of workers which suddenly died from 2 separate jobs:
Job 1:
2023-01-30T13:18:32.522+01:00 starting worker
2023-01-30T13:18:33.523+01:00 /code/venv/lib/python3.8/site-packages/distributed/cli/dask_worker.py:264: FutureWarning: dask-worker is deprecated and will be removed in a future release; use `dask worker` instead warnings.warn(
2023-01-30T13:18:33.523+01:00 2023-01-30 12:18:33,090 - distributed.nanny - INFO - Start Nanny at: 'tcp://10.0.248.78:43645'
2023-01-30T13:18:34.523+01:00 2023-01-30 12:18:33,611 - distributed.worker - INFO - Start worker at: tcp://10.0.248.78:43691
2023-01-30T13:18:34.523+01:00 2023-01-30 12:18:33,611 - distributed.worker - INFO - Listening to: tcp://10.0.248.78:43691
2023-01-30T13:18:34.523+01:00 2023-01-30 12:18:33,611 - distributed.worker - INFO - dashboard at: 10.0.248.78:33441
2023-01-30T13:18:34.523+01:00 2023-01-30 12:18:33,611 - distributed.worker - INFO - Waiting to connect to: tcp://10.0.194.243:8786
2023-01-30T13:18:34.523+01:00 2023-01-30 12:18:33,612 - distributed.worker - INFO - -------------------------------------------------
2023-01-30T13:18:34.524+01:00 2023-01-30 12:18:33,612 - distributed.worker - INFO - Threads: 64
2023-01-30T13:18:34.524+01:00 2023-01-30 12:18:33,612 - distributed.worker - INFO - Memory: 244.58 GiB
2023-01-30T13:18:34.524+01:00 2023-01-30 12:18:33,612 - distributed.worker - INFO - Local Directory: /tmp/dask-worker-space/worker-pe29tk0m
2023-01-30T13:18:34.524+01:00 2023-01-30 12:18:33,612 - distributed.worker - INFO - -------------------------------------------------
2023-01-30T13:18:34.524+01:00 2023-01-30 12:18:33,833 - distributed.worker - INFO - Registered to: tcp://10.0.194.243:8786
2023-01-30T13:18:34.524+01:00 2023-01-30 12:18:33,833 - distributed.worker - INFO - -------------------------------------------------
2023-01-30T13:18:34.524+01:00 2023-01-30 12:18:33,834 - distributed.core - INFO - Starting established connection to tcp://10.0.194.243:8786
2023-01-30T13:23:04.605+01:00 2023-01-30 12:23:03,699 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.18s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T13:23:14.609+01:00 2023-01-30 12:23:13,989 - distributed.core - INFO - Event loop was unresponsive in Worker for 5.09s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T13:23:52.621+01:00 2023-01-30 12:23:51,801 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.92s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T13:23:55.622+01:00 2023-01-30 12:23:55,077 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.28s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T13:24:03.625+01:00 2023-01-30 12:24:03,563 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.88s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T13:24:46.637+01:00 2023-01-30 12:24:46,073 - distributed.utils_perf - INFO - full garbage collection released 16.59 MiB from 136 reference cycles (threshold: 9.54 MiB)
2023-01-30T13:25:10.645+01:00 2023-01-30 12:25:10,352 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.95s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T13:25:20.648+01:00 2023-01-30 12:25:20,322 - distributed.core - INFO - Event loop was unresponsive in Worker for 4.49s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T13:25:46.657+01:00 2023-01-30 12:25:46,242 - distributed.core - INFO - Event loop was unresponsive in Worker for 4.41s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T13:26:15.666+01:00 2023-01-30 12:26:15,389 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.72s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T13:26:19.667+01:00 2023-01-30 12:26:19,148 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.76s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T13:26:30.671+01:00 2023-01-30 12:26:29,703 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.03s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T13:26:47.676+01:00 2023-01-30 12:26:47,462 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.19s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T13:27:09.684+01:00 2023-01-30 12:27:09,195 - distributed.core - INFO - Event loop was unresponsive in Worker for 4.22s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T13:27:18.687+01:00 2023-01-30 12:27:18,358 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.00s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T13:27:32.692+01:00 2023-01-30 12:27:32,222 - distributed.core - INFO - Event loop was unresponsive in Worker for 4.21s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T13:27:46.696+01:00 2023-01-30 12:27:46,356 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.83s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T13:27:58.700+01:00 2023-01-30 12:27:58,497 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.43s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T13:28:32.711+01:00 2023-01-30 12:28:32,152 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.87s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T13:29:06.722+01:00 2023-01-30 12:29:06,225 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.09s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T13:29:14.725+01:00 2023-01-30 12:29:14,605 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.42s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T13:30:39.751+01:00 2023-01-30 12:30:39,636 - distributed.core - INFO - Connection to tcp://10.0.194.243:8786 has been closed.
2023-01-30T13:30:39.752+01:00 2023-01-30 12:30:39,638 - distributed.worker - INFO - Stopping worker at tcp://10.0.248.78:43691. Reason: worker-handle-scheduler-connection-broken
2023-01-30T13:30:39.752+01:00 2023-01-30 12:30:39,641 - distributed.nanny - INFO - Closing Nanny gracefully at 'tcp://10.0.248.78:43645'. Reason: worker-handle-scheduler-connection-broken
2023-01-30T13:30:39.752+01:00 2023-01-30 12:30:39,653 - distributed.nanny - INFO - Worker closed
2023-01-30T13:30:41.752+01:00 2023-01-30 12:30:41,656 - distributed.nanny - ERROR - Worker process died unexpectedly
2023-01-30T13:30:49.755+01:00 2023-01-30 12:30:49,453 - distributed.nanny - INFO - Closing Nanny at 'tcp://10.0.248.78:43645'. Reason: nanny-close-gracefully
2023-01-30T13:30:49.755+01:00 2023-01-30 12:30:49,454 - distributed.dask_worker - INFO - End worker
2023-01-30T13:30:53.756+01:00 Got: 0 status from Dask worker: algo-4 -> worker process has been stopped
2023-01-30T13:30:53.756+01:00 Ending entrypoint on node: algo-4
Job 2:
2023-01-30T14:43:59.680+01:00 starting worker
2023-01-30T14:44:00.681+01:00 /code/venv/lib/python3.8/site-packages/distributed/cli/dask_worker.py:264: FutureWarning: dask-worker is deprecated and will be removed in a future release; use `dask worker` instead warnings.warn(
2023-01-30T14:44:00.681+01:00 2023-01-30 13:44:00,002 - distributed.nanny - INFO - Start Nanny at: 'tcp://10.0.241.195:34377'
2023-01-30T14:44:00.681+01:00 2023-01-30 13:44:00,517 - distributed.worker - INFO - Start worker at: tcp://10.0.241.195:36595
2023-01-30T14:44:00.681+01:00 2023-01-30 13:44:00,517 - distributed.worker - INFO - Listening to: tcp://10.0.241.195:36595
2023-01-30T14:44:00.681+01:00 2023-01-30 13:44:00,517 - distributed.worker - INFO - dashboard at: 10.0.241.195:40329
2023-01-30T14:44:00.681+01:00 2023-01-30 13:44:00,517 - distributed.worker - INFO - Waiting to connect to: tcp://10.0.230.135:8786
2023-01-30T14:44:00.681+01:00 2023-01-30 13:44:00,517 - distributed.worker - INFO - -------------------------------------------------
2023-01-30T14:44:00.681+01:00 2023-01-30 13:44:00,517 - distributed.worker - INFO - Threads: 64
2023-01-30T14:44:00.681+01:00 2023-01-30 13:44:00,517 - distributed.worker - INFO - Memory: 244.61 GiB
2023-01-30T14:44:00.681+01:00 2023-01-30 13:44:00,517 - distributed.worker - INFO - Local Directory: /tmp/dask-worker-space/worker-5wfgbr8h
2023-01-30T14:44:00.681+01:00 2023-01-30 13:44:00,517 - distributed.worker - INFO - -------------------------------------------------
2023-01-30T14:44:01.682+01:00 2023-01-30 13:44:00,736 - distributed.worker - INFO - Registered to: tcp://10.0.230.135:8786
2023-01-30T14:44:01.682+01:00 2023-01-30 13:44:00,736 - distributed.worker - INFO - -------------------------------------------------
2023-01-30T14:44:01.682+01:00 2023-01-30 13:44:00,737 - distributed.core - INFO - Starting established connection to tcp://10.0.230.135:8786
2023-01-30T14:49:23.785+01:00 2023-01-30 13:49:23,132 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.92s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T14:49:30.787+01:00 2023-01-30 13:49:30,509 - distributed.core - INFO - Event loop was unresponsive in Worker for 6.63s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T14:49:35.789+01:00 2023-01-30 13:49:34,803 - distributed.core - INFO - Event loop was unresponsive in Worker for 4.06s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T14:49:42.791+01:00 2023-01-30 13:49:42,292 - distributed.utils_perf - INFO - full garbage collection released 10.07 MiB from 521 reference cycles (threshold: 9.54 MiB)
2023-01-30T14:50:07.798+01:00 2023-01-30 13:50:07,378 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.14s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T14:52:13.835+01:00 2023-01-30 13:52:12,881 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.88s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T14:52:25.839+01:00 2023-01-30 13:52:25,709 - distributed.core - INFO - Event loop was unresponsive in Worker for 4.25s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T14:52:42.844+01:00 2023-01-30 13:52:42,813 - distributed.utils_perf - INFO - full garbage collection released 38.47 MiB from 92 reference cycles (threshold: 9.54 MiB)
2023-01-30T14:52:50.847+01:00 2023-01-30 13:52:49,955 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.99s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T14:52:59.850+01:00 2023-01-30 13:52:59,633 - distributed.core - INFO - Event loop was unresponsive in Worker for 4.39s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T14:53:03.851+01:00 2023-01-30 13:53:03,597 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.05s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T14:53:30.859+01:00 2023-01-30 13:53:30,596 - distributed.core - INFO - Event loop was unresponsive in Worker for 4.34s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T14:53:46.864+01:00 2023-01-30 13:53:46,558 - distributed.core - INFO - Event loop was unresponsive in Worker for 4.58s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T14:55:33.896+01:00 2023-01-30 13:55:33,275 - distributed.utils_perf - INFO - full garbage collection released 9.55 MiB from 267 reference cycles (threshold: 9.54 MiB)
2023-01-30T14:55:55.902+01:00 2023-01-30 13:55:55,539 - distributed.utils_perf - INFO - full garbage collection released 14.38 MiB from 323 reference cycles (threshold: 9.54 MiB)
2023-01-30T14:56:49.918+01:00 2023-01-30 13:56:49,781 - distributed.utils_perf - INFO - full garbage collection released 22.49 MiB from 183 reference cycles (threshold: 9.54 MiB)
2023-01-30T14:56:51.919+01:00 2023-01-30 13:56:51,012 - distributed.core - INFO - Event loop was unresponsive in Worker for 4.26s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-01-30T14:57:03.923+01:00 2023-01-30 13:57:03,285 - distributed.core - INFO - Connection to tcp://10.0.230.135:8786 has been closed.
2023-01-30T14:57:03.923+01:00 2023-01-30 13:57:03,285 - distributed.worker - INFO - Stopping worker at tcp://10.0.241.195:36595. Reason: worker-handle-scheduler-connection-broken
2023-01-30T14:57:03.923+01:00 2023-01-30 13:57:03,288 - distributed.nanny - INFO - Closing Nanny gracefully at 'tcp://10.0.241.195:34377'. Reason: worker-handle-scheduler-connection-broken
2023-01-30T14:57:03.923+01:00 2023-01-30 13:57:03,296 - distributed.nanny - INFO - Worker closed
2023-01-30T14:57:05.923+01:00 2023-01-30 13:57:05,298 - distributed.nanny - ERROR - Worker process died unexpectedly
2023-01-30T14:57:12.925+01:00 2023-01-30 13:57:12,795 - distributed.nanny - INFO - Closing Nanny at 'tcp://10.0.241.195:34377'. Reason: nanny-close-gracefully
2023-01-30T14:57:12.925+01:00 2023-01-30 13:57:12,796 - distributed.dask_worker - INFO - End worker
2023-01-30T14:57:15.926+01:00 Got: 0 status from Dask worker: algo-4 -> worker process has been stopped
2023-01-30T14:57:15.926+01:00 Ending entrypoint on node: algo-4
Logs from node which runs scheduler always look like those posted in my previous post.
I can't share code but here is a brief summary of how our dask flow looks like:
- We use
client.submit()in a loop to create futures for parquet files loading - We use
dask.DataFrameAPI to concatenate and process loaded files - Later on we use
dask.BagAPI (we persist it once withclient.persist()) anddask.DataFrameAPI - For xgboost training we use XGBoost's dask API
We start scheduler with dask-scheduler command and worker with dask-worker.
2023-01-30T13:18:34.524+01:00 2023-01-30 12:18:33,612 - distributed.worker - INFO - Threads: 64 2023-01-30T13:18:34.524+01:00 2023-01-30 12:18:33,612 - distributed.worker - INFO - Memory: 244.58 GiB
I see you are launching a single worker with 64 threads. This is likely not going to end well. While many scientific compute libraries (pandas, numpy, etc.) are releasing the GIL, there is still some python code running and the likelihood to cause congestion when running 64 workers is still very high.
You can launch multiple workers with dask worker --nworkers=16 <scheduler_address>. This would spawn 16 workers where every worker has 4 threads. This will likely run much smoother.
This doesn't explain the AssertionError but should help with the event loop/GIL warnings and make everything run a bit smoother. The ideal number of threads per worker is depending on your application so you might be better of with more/fewer workers on the same machine.
IIRC memory usage for dask-xboost is also correlated with 1 / #workers, i.e. having more workers should reduce the memory requirement of the xgboost step.
Thank you for the multiple workers advice - it helped with multiple things, including runtime!