partd icon indicating copy to clipboard operation
partd copied to clipboard

Generalizing pandas serialization methods

Open charlesbluca opened this issue 3 years ago • 0 comments

When doing shuffle operations on non-pandas dataframes, we run into issues in partd due to its pandas-specific serialization logic; for example, when trying to do groupby.apply operations with dask-cudf, we run into issues due to not implementing an internal pandas function:

import cudf
import dask_cudf
​
df = cudf.DataFrame()
df['key'] = [0,0,1,1,1]
df['val']= range(5)
​
ddf = dask_cudf.from_cudf(df, npartitions=1)
​
ddf.groupby("key").val.apply(lambda x: x.sum()).compute()
/tmp/ipykernel_61671/1646734033.py:10: UserWarning: `meta` is not specified, inferred from partial data. Please provide `meta` if the result is unexpected.
  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  ddf.groupby("key").val.apply(lambda x: x.sum()).compute()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
/tmp/ipykernel_61671/1646734033.py in <module>
      8 ddf = dask_cudf.from_cudf(df, npartitions=1)
      9 
---> 10 ddf.groupby("key").val.apply(lambda x: x.sum()).compute()

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    286         dask.base.compute
    287         """
--> 288         (result,) = compute(self, traverse=False, **kwargs)
    289         return result
    290 

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/base.py in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    569         postcomputes.append(x.__dask_postcompute__())
    570 
--> 571     results = schedule(dsk, keys, **kwargs)
    572     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    573 

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/local.py in get_sync(dsk, keys, **kwargs)
    551     """
    552     kwargs.pop("num_workers", None)  # if num_workers present, remove it
--> 553     return get_async(
    554         synchronous_executor.submit,
    555         synchronous_executor._max_workers,

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/local.py in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    494             while state["waiting"] or state["ready"] or state["running"]:
    495                 fire_tasks(chunksize)
--> 496                 for key, res_info, failed in queue_get(queue).result():
    497                     if failed:
    498                         exc, tb = loads(res_info)

~/conda/envs/rapids-21.12/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
    435                     raise CancelledError()
    436                 elif self._state == FINISHED:
--> 437                     return self.__get_result()
    438 
    439                 self._condition.wait(timeout)

~/conda/envs/rapids-21.12/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
    387         if self._exception:
    388             try:
--> 389                 raise self._exception
    390             finally:
    391                 # Break a reference cycle with the exception in self._exception

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/local.py in submit(self, fn, *args, **kwargs)
    536         fut = Future()
    537         try:
--> 538             fut.set_result(fn(*args, **kwargs))
    539         except BaseException as e:
    540             fut.set_exception(e)

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/local.py in batch_execute_tasks(it)
    232     Batch computing of multiple tasks with `execute_task`
    233     """
--> 234     return [execute_task(*a) for a in it]
    235 
    236 

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/local.py in <listcomp>(.0)
    232     Batch computing of multiple tasks with `execute_task`
    233     """
--> 234     return [execute_task(*a) for a in it]
    235 
    236 

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    223         failed = False
    224     except BaseException as e:
--> 225         result = pack_exception(e, dumps)
    226         failed = True
    227     return key, result, failed

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    218     try:
    219         task, data = loads(task_info)
--> 220         result = _execute_task(task, data)
    221         id = get_id()
    222         result = dumps((result, id))

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    117         # temporaries by their reference count and can execute certain
    118         # operations in-place.
--> 119         return func(*(_execute_task(a, cache) for a in args))
    120     elif not ishashable(arg):
    121         return arg

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/dataframe/shuffle.py in shuffle_group_3(df, col, npartitions, p)
    916         g = df.groupby(col)
    917         d = {i: g.get_group(i) for i in g.groups}
--> 918         p.append(d, fsync=True)
    919 
    920 

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/partd/encode.py in append(self, data, **kwargs)
     21 
     22     def append(self, data, **kwargs):
---> 23         data = valmap(self.encode, data)
     24         data = valmap(frame, data)
     25         self.partd.append(data, **kwargs)

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/toolz/dicttoolz.py in valmap(func, d, factory)
     81     """
     82     rv = factory()
---> 83     rv.update(zip(d.keys(), map(func, d.values())))
     84     return rv
     85 

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/partd/pandas.py in serialize(df)
    179     """
    180     col_header, col_bytes = index_to_header_bytes(df.columns)
--> 181     ind_header, ind_bytes = index_to_header_bytes(df.index)
    182     headers = [col_header, ind_header]
    183     bytes = [col_bytes, ind_bytes]

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/partd/pandas.py in index_to_header_bytes(ind)
    111         values = ind.values
    112 
--> 113     header = (type(ind), ind._get_attributes_dict(), values.dtype, cat)
    114     bytes = pnp.compress(pnp.serialize(values), values.dtype)
    115     return header, bytes

AttributeError: 'Int64Index' object has no attribute '_get_attributes_dict'

It also looks like this has caused issues with pandas dataframe subclasses not being maintained through serialization roundtrips as noted in #52.

Looking through the serialization code itself, it seems like things haven't been modified significantly in a few years, which raises the question of if we can do things in a different (ideally more flexible) way today. For example, we currently only use pickle.dumps for a small subset of pandas Index subclasses:

https://github.com/dask/partd/blob/236a44bda3a35d153f24d97e880a9a37d941203f/partd/pandas.py#L98-L102

When it appears that all Index subclasses should support serialization through pickle. Additionally, it looks like we're opting to manually construct header/bytes from pandas-like objects during serialization when many already implement serialization/deserialization functions that could be used for this purpose.

Essentially, I'm wondering if we could refactor the serialization methods here to:

  • check for serialize/deserialize methods and use them if available
  • if not, fall back on pickle.dumps if __reduce__ is available
  • fall back on manual serialization as a last resort

Some other related goals could be to examine if something similar can be done for the NumPy serialization methods, and potentially adding testing for non-pandas / pandas subclass dataframes.

cc @quasiben @jakirkham

charlesbluca avatar Aug 02 '22 13:08 charlesbluca