distributed icon indicating copy to clipboard operation
distributed copied to clipboard

Serializer not passed to workers

Open AlecThomson opened this issue 3 years ago • 1 comments

Describe the issue: Possibly related to #5561. I'm attempted to find a fix for issues related https://github.com/dask/dask-mpi/issues/76, and https://github.com/astropy/astropy/issues/11317. As part of this I have tried changing the serializer/deserializer (to e.g. ["pickle"] for both) options in my Client. I found, however, that this then gave me the error of:

2022-12-09 13:30:31,195 - distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/protocol/core.py", line 158, in loads
    return msgpack.loads(
  File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
  File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/protocol/core.py", line 138, in _decode_default
    return merge_and_deserialize(
  File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 497, in merge_and_deserialize
    return deserialize(header, merged_frames, deserializers=deserializers)
  File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 421, in deserialize
    raise TypeError(
TypeError: Data serialized with dask but only able to deserialize data with ['pickle']

After reading #5561, it looks like a similar issue is taking place but instead with the predefined serializers.

Minimal Complete Verifiable Example:

from dask.distributed import Client, get_client
from dask import delayed


@delayed()
def worker(i):
    client = get_client()
    print(f"Worker -- {client._serializers=}")
    print(f"Worker -- {client._deserializers=}")
    return i

def main():
    with Client(serializers=["pickle"], deserializers=["pickle"]) as client:
        print(f"Main -- {client._serializers=}")
        print(f"Main -- {client._deserializers=}")

        tests = []
        for i in range(10):
            tests.append(worker(i))
        _ = client.compute(tests)


if __name__ == "__main__":
    main()

Output:

Main -- client._serializers=['pickle']
Main -- client._deserializers=['pickle']
Worker -- client._serializers=None
Worker -- client._deserializers=None
Worker -- client._serializers=None
Worker -- client._serializers=None
Worker -- client._deserializers=None
Worker -- client._serializers=None
Worker -- client._deserializers=None
Worker -- client._deserializers=None
Worker -- client._serializers=None
Worker -- client._serializers=None
Worker -- client._deserializers=None
Worker -- client._serializers=None
Worker -- client._deserializers=None
Worker -- client._deserializers=None
Worker -- client._serializers=None
Worker -- client._serializers=None
Worker -- client._deserializers=None
Worker -- client._deserializers=None
Worker -- client._serializers=None
Worker -- client._deserializers=None

Anything else we need to know?: As a slight aside, I think this lies at the heart of compatibility issues with astropy.coordinates and astropy.units -- since the serialization of these modules appears to be breaking.

Environment:

  • Dask version: 2022.9.2
  • Python version: Python 3.8.13
  • Operating System: SUSE Linux Enterprise Server 12 SP3
  • Install method (conda, pip, source): conda

AlecThomson avatar Dec 09 '22 05:12 AlecThomson

Hi @AlecThomson , my apologies for the long delay in getting back to you

You're correct, passing serializers through on the client side doesn't get to the workers. The primary application for customer serialization that we've seen has been for security reasons, folks who don't want to allow arbitrary pickle between potentially unsafe clients and the scheduler/workers. Because of that, we don't allow the client to determine the seriailzation to be used. It has to be determined ahead of time. Typically we leave this to the system setting up Dask.

Does that make sense? I apologize if this isn't the functionality that you expected.

Most folks looking for custom serialization are better served leveraging Pickle extensibility with methods like __setstate__ or __reduce__. Do those work for you?

mrocklin avatar Dec 20 '22 19:12 mrocklin