Autocompression
Fixes: https://github.com/dask/distributed/issues/3656
- Add TypeCompression class
- Added setitem and getitem
- Need to get rid of maybe_compress in
distributed/protocol/core.pyanddistributed/protocol/serialize.py(currently trying to understand headers and comm)
cc: @mrocklin
Thanks for starting on this @prasunanand . I've added more context to the original issue. I hope that that helps to clarify the objectives: https://github.com/dask/distributed/issues/3656
Thanks @mrocklin :) .
@mrocklin I request you to review again
Thanks @mrocklin. I have a doubt. Does Serialization and compression mean the same for `large results or Python specific data, like NumPy arrays or Pandas. If yes, Should I call serialize() function on Python specific datasets which looks for different serializers registered ?
if type(key) == np.ndarray or type(key) == pd.Dataframe:
compress_func = serialize
decompress_func = serialize
Sorry for replying late .
Looking at the code in dumps , are both serialize() and compress() called on the data ?
@prasunanand I would have a look at https://github.com/dask/distributed/blob/6db09f32d8ca58a56a50385a49150ef16a5d51b0/distributed/worker.py#L233-L234.
I think the idea is to have Worker.data be the mutable mapping that implements automatic compression.
Compressing data prior to communication (in distributed/protocol) is also an interesting idea, but might be more involved.
Thanks @mrocklin. I have a doubt. Does Serialization and compression mean the same for `large results or Python specific data, like NumPy arrays or Pandas. If yes, Should I call serialize() function on Python specific datasets which looks for different serializers registered ?
if type(key) == np.ndarray or type(key) == pd.Dataframe:
compress_func = serialize
decompress_func = serialize
It should work on any python object that is passed in. To do this well you will need to understand more about how dask handles serialization and compression.
In general, I get the sense that this issue may be beyond your current understanding of Dask. I was probably wrong to mark this as a good first issue.
Apologies @mrocklin . I studied zict codebase and worker.py as I was unable to get my head around what this feature demanded. Now I have a better understanding. Hence, I have tried once more and hope it solves the issue(or the solution is close).
If not I will close this PR and work on other issues.
cc @madsbk (as this may be of interest 🙂)
Any thoughts or questions on the feedback so far @prasunanand ? 🙂
Sorry I have been busy at work. Last week I have been trying to measure the performance using different workloads. Found out that Numpy array were not getting deserialized properly. Corrected that. Regarding measung performance for different workloads I have been trying to use dask diagnostics.
Please let me know if its the right approach. In the following code, I am trying to compute over an ndarray of size (1000, 1000).
import asyncio
async def test_compression():
import numpy as np
import dask.array as da
from distributed.protocol import TypeCompressor
from distributed import Scheduler, Worker, Client, wait, performance_report
x = np.ones(1000000) # a large but compressible piece of data
d = TypeCompressor()
d["x"] = x # put Python object into d
out = d["x"] # get the object back out
d["z"] = 3
assert str(out) == str(x)
np.testing.assert_allclose(
np.frombuffer(out, x.dtype), x
) # the two objects should match
# assuming here that the underlying bytes are
# stored in something like a `.storage` attribute, but this isn't required
# we check that the amount of actual data stored is small
assert sum(map(len, d.storage.values())) < x.nbytes
async with Scheduler() as s:
async with Worker(s.address, data=d) as worker:
async with Client(s.address, asynchronous=True) as c:
async with performance_report(filename="dask-report.html"): #measure performance
x = da.ones((1000, 1000))
y = await x.persist() # put data in memory
y = await (x + x.T).mean().persist() # do some work
future = c.compute(y)
await wait(future)
assert sum(map(len, worker.data.storage.values())) < x.nbytes
asyncio.run(test_compression())
Attached is an html file that I got as result.(To view change the extension from .txt to .html) dask-report.txt
Apart from this: What are the other data-types I should investigate with (numpy.ndarray, string, pandas.Series) ?
I tried another approach to this problem (hope that is ok 🙂) in PR ( https://github.com/dask/distributed/pull/3968 ). We need not go with that. Just trying to see if we are able to get the behavior we want out of Zict alone. Would be curious to hear what people think of it.