Network transfer of objects with circular recursion hangs
Describe the issue: I have Dask submitting jobs to condor. They seem to work fine and produce the output. However they crash at the end with the following errors:
2023-11-29 14:21:19,635 - distributed.sizeof - WARNING - Sizeof calculation failed. Defaulting to -1 B
Traceback (most recent call last):
File "xxx/python3.10/site-packages/distributed/sizeof.py", line 17, in safe_sizeof
return sizeof(obj)
File "xxx/python3.10/site-packages/dask/utils.py", line 642, in __call__
return meth(arg, *args, **kwargs)
File "xxx/python3.10/site-packages/dask/sizeof.py", line 96, in sizeof_python_dict
+ sizeof(list(d.values()))
File "xxx/python3.10/site-packages/dask/utils.py", line 642, in __call__
return meth(arg, *args, **kwargs)
File "xxx/python3.10/site-packages/dask/sizeof.py", line 59, in sizeof_python_collection
return sys.getsizeof(seq) + sum(map(sizeof, seq))
File "xxx/python3.10/site-packages/dask/utils.py", line 642, in __call__
--> Here the last two errors repeat in cycle <--
RecursionError: maximum recursion depth exceeded
Workers crash due to exceeding recursion depth. But it seems like the problem is in safe_sizeof() method or in the meth:
meth = self.dispatch(type(arg))
return meth(arg, *args, **kwargs)
Minimal Complete Verifiable Example: None
Environment:
- Dask version: 2023.11.0
- Python version: 3.10.11
- Operating System: AlmaLinux9
- Install method (conda, pip, source): pip
do you have a reproducer for this error? If the safe_sizeof fails, it should not fail your computation
cc @crusaderky
Reproduced. Fairly sure it's not safe_sizeof though.
import distributed
client = distributed.Client(n_workers=1)
def f():
d = {}
d[0] = d
return d
fut = client.submit(f, key="x")
distributed.wait(fut)
# So far so good - infinite recursion is handled gracefully
2023-11-30 14:23:48,192 - distributed.sizeof - WARNING - Sizeof calculation failed. Defaulting to 0.95 MiB
Traceback (most recent call last):
...
RecursionError: maximum recursion depth exceeded while calling a Python object
# Task is finished successfully and output is stored on the worker
client.run(lambda dask_worker: str(dask_worker.data["x"]))
{'tcp://127.0.0.1:35311': '{0: {...}}'}
# However, network transfer hangs
fut.result()
gather_dep from one worker to another also hangs.
I worked on this fairly recently (#8214). Investigating.
FWIW If #8214 is the cause, this has already been released
Reproduced with dask=2023.9.3 msgpack=1.0.5 (before #8214). This is not a recent regression.
Just to mention that the issue is still there after upgrading to dask/distributed=2023.12.1.
Even more minimal reproducer:
>>> from distributed.protocol import serialize
>>> d = {}
>>> d[0] = d
>>> serialize(d)
RecursionError: maximum recursion depth exceeded
>>> from collections import UserDict
>>> d2 = UserDict(d) # Wrap in opaque object to use plain pickle
>>> serialize(d2)
({'serializer': 'pickle', 'writeable': ()},
[b'\x80\x05\x956\x00\x00\x00\x00\x00\x00\x00\x8c\x0bcollections\x94\x8c\x08UserDict\x94\x93\x94)\x81\x94}\x94\x8c\x04data\x94}\x94K\x00}\x94K\x00h\x07sssb.'])
I am experiencing the same issue when attempting to use performance_report on a on the execution some Futures.