distributed icon indicating copy to clipboard operation
distributed copied to clipboard

Autocompression

Open prasunanand opened this issue 5 years ago • 13 comments

Fixes: https://github.com/dask/distributed/issues/3656

  • Add TypeCompression class
  • Added setitem and getitem
  • Need to get rid of maybe_compress in distributed/protocol/core.py and distributed/protocol/serialize.py(currently trying to understand headers and comm)

cc: @mrocklin

prasunanand avatar Apr 12 '20 15:04 prasunanand

Thanks for starting on this @prasunanand . I've added more context to the original issue. I hope that that helps to clarify the objectives: https://github.com/dask/distributed/issues/3656

mrocklin avatar Apr 12 '20 16:04 mrocklin

Thanks @mrocklin :) .

prasunanand avatar Apr 12 '20 17:04 prasunanand

@mrocklin I request you to review again

prasunanand avatar Apr 13 '20 16:04 prasunanand

Thanks @mrocklin. I have a doubt. Does Serialization and compression mean the same for `large results or Python specific data, like NumPy arrays or Pandas. If yes, Should I call serialize() function on Python specific datasets which looks for different serializers registered ?

if type(key)  == np.ndarray or type(key)  ==  pd.Dataframe:
    compress_func = serialize
    decompress_func = serialize

Sorry for replying late .

prasunanand avatar Apr 23 '20 16:04 prasunanand

Looking at the code in dumps , are both serialize() and compress() called on the data ?

prasunanand avatar Apr 23 '20 17:04 prasunanand

@prasunanand I would have a look at https://github.com/dask/distributed/blob/6db09f32d8ca58a56a50385a49150ef16a5d51b0/distributed/worker.py#L233-L234.

I think the idea is to have Worker.data be the mutable mapping that implements automatic compression.

Compressing data prior to communication (in distributed/protocol) is also an interesting idea, but might be more involved.

TomAugspurger avatar Apr 23 '20 21:04 TomAugspurger

Thanks @mrocklin. I have a doubt. Does Serialization and compression mean the same for `large results or Python specific data, like NumPy arrays or Pandas. If yes, Should I call serialize() function on Python specific datasets which looks for different serializers registered ?

if type(key)  == np.ndarray or type(key)  ==  pd.Dataframe:
    compress_func = serialize
    decompress_func = serialize

It should work on any python object that is passed in. To do this well you will need to understand more about how dask handles serialization and compression.

In general, I get the sense that this issue may be beyond your current understanding of Dask. I was probably wrong to mark this as a good first issue.

mrocklin avatar Apr 27 '20 18:04 mrocklin

Apologies @mrocklin . I studied zict codebase and worker.py as I was unable to get my head around what this feature demanded. Now I have a better understanding. Hence, I have tried once more and hope it solves the issue(or the solution is close).

If not I will close this PR and work on other issues.

prasunanand avatar May 01 '20 14:05 prasunanand

cc @madsbk (as this may be of interest 🙂)

jakirkham avatar Jun 23 '20 21:06 jakirkham

Any thoughts or questions on the feedback so far @prasunanand ? 🙂

jakirkham avatar Jul 14 '20 11:07 jakirkham

Sorry I have been busy at work. Last week I have been trying to measure the performance using different workloads. Found out that Numpy array were not getting deserialized properly. Corrected that. Regarding measung performance for different workloads I have been trying to use dask diagnostics.

Please let me know if its the right approach. In the following code, I am trying to compute over an ndarray of size (1000, 1000).

import asyncio

async def test_compression():
	import numpy as np
	import dask.array as da
	from distributed.protocol import TypeCompressor
	from distributed import Scheduler, Worker, Client, wait, performance_report

	x = np.ones(1000000)  # a large but compressible piece of data

	d = TypeCompressor()

	d["x"] = x  # put Python object into d
	out = d["x"]  # get the object back out
	d["z"] = 3

	assert str(out) == str(x)
	np.testing.assert_allclose(
	    np.frombuffer(out, x.dtype), x
	)  # the two objects should match

	# assuming here that the underlying bytes are
	# stored in something like a `.storage` attribute, but this isn't required
	# we check that the amount of actual data stored is small
	assert sum(map(len, d.storage.values())) < x.nbytes

	async with Scheduler() as s:
	    async with Worker(s.address, data=d) as worker:
	        async with Client(s.address, asynchronous=True) as c:
	        	async with performance_report(filename="dask-report.html"): #measure performance
	        		x = da.ones((1000, 1000))
	        		y = await x.persist()  # put data in memory
	        		y = await (x + x.T).mean().persist()  # do some work
	        		future = c.compute(y)
	        		await wait(future)
	        		assert sum(map(len, worker.data.storage.values())) < x.nbytes

asyncio.run(test_compression())

Attached is an html file that I got as result.(To view change the extension from .txt to .html) dask-report.txt

prasunanand avatar Jul 14 '20 17:07 prasunanand

Apart from this: What are the other data-types I should investigate with (numpy.ndarray, string, pandas.Series) ?

prasunanand avatar Jul 14 '20 17:07 prasunanand

I tried another approach to this problem (hope that is ok 🙂) in PR ( https://github.com/dask/distributed/pull/3968 ). We need not go with that. Just trying to see if we are able to get the behavior we want out of Zict alone. Would be curious to hear what people think of it.

jakirkham avatar Jul 19 '20 00:07 jakirkham