ray icon indicating copy to clipboard operation
ray copied to clipboard

[Data] Sorting on grouped data not working with pandas format

Open tanzeyy opened this issue 1 year ago • 13 comments

What happened + What you expected to happen

I am working on handling grouped data and sorting on certain column, and I want to use pandas operations to process the grouped data. However, it seems sort operation doesn't work with map_groups(..., batch_format='pandas').

Reproduction process:

import ray

data = [
    {"tag": "kitty", "number": 12},
    {"tag": "karen", "number": 1},
    {"tag": "karen", "number": 3},
    {"tag": "times", "number": 2},
]

ds = ray.data.from_items(data)

results = (
    ds.groupby("tag")
    .map_groups(lambda g: g, batch_format="pandas")
    .sort("number", descending=True)
    .to_pandas()
)
print(results)

Run above script will produce AttributeError: 'DataFrame' object has no attribute 'num_rows':

...
ray.exceptions.RayTaskError(AttributeError): ray::reduce() (pid=1919807, ip=10.208.49.99)
  File "/.../venvs/dev/lib/python3.10/site-packages/ray/data/_internal/planner/exchange/sort_task_spec.py", line 143, in reduce
    return BlockAccessor.for_block(mapper_outputs[0]).merge_sorted_blocks(
  File "/.../venvs/dev/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 554, in merge_sorted_blocks
    blocks = [b for b in blocks if b.num_rows > 0]
  File "/.../venvs/dev/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 554, in <listcomp>
    blocks = [b for b in blocks if b.num_rows > 0]
  File "/.../venvs/dev/lib/python3.10/site-packages/pandas/core/generic.py", line 6299, in __getattr__
    return object.__getattribute__(self, name)
AttributeError: 'DataFrame' object has no attribute 'num_rows'

After changing batch_format to pyarrow, above code would work properly: image

Moreover, remove sort and it will also work:

image

Versions / Dependencies

  • Python 3.10.12
  • ray 2.24.0

Reproduction script

import ray

data = [
    {"tag": "kitty", "number": 12},
    {"tag": "karen", "number": 1},
    {"tag": "karen", "number": 3},
    {"tag": "times", "number": 2},
]

ds = ray.data.from_items(data)

results = (
    ds.groupby("tag")
    .map_groups(lambda g: g, batch_format="pandas")
    .sort("number", descending=True)
    .to_pandas()
)
print(results)

Issue Severity

Medium: It is a significant difficulty but I can work around it.

tanzeyy avatar Jul 23 '24 03:07 tanzeyy

Hey, I am trying to contribute to Ray and I am willing to try this as my first issue. I wanted to understand what you exactly want with the output or what exactly you are looking to solve from this issue. As far as I understand you want Karen to group by and then provide a sorted list? Correct me if I am wrong

import ray
import pandas as pd

data = [
    {"tag": "kitty", "number": 12},
    {"tag": "karen", "number": 1},
    {"tag": "karen", "number": 3},
    {"tag": "times", "number": 2},
]

ds = ray.data.from_items(data)

def process_data(df):
    return (df.groupby("tag")
              .agg({"number": "sum"}) 
              .reset_index()
              .sort_values("number", ascending=False))

results = (
    ds.to_pandas()
    .pipe(process_data)
)

print(results)

This code should give you the output of groupby as well as sorted

PranitKatwe avatar Jul 23 '24 23:07 PranitKatwe

Hey, I am trying to contribute to Ray and I am willing to try this as my first issue. I wanted to understand what you exactly want with the output or what exactly you are looking to solve from this issue. As far as I understand you want Karen to group by and then provide a sorted list? Correct me if I am wrong

import ray
import pandas as pd

data = [
    {"tag": "kitty", "number": 12},
    {"tag": "karen", "number": 1},
    {"tag": "karen", "number": 3},
    {"tag": "times", "number": 2},
]

ds = ray.data.from_items(data)

def process_data(df):
    return (df.groupby("tag")
              .agg({"number": "sum"}) 
              .reset_index()
              .sort_values("number", ascending=False))

results = (
    ds.to_pandas()
    .pipe(process_data)
)

print(results)

This code should give you the output of groupby as well as sorted

Thank you for your reply!

I need to process data at a scale of hundreds of gigabytes, so the initial ds.to_pandas() approach might not be feasible for me. Also, my workflow involves complex processing on the grouped data, which is why I intended to use map_groups.

Additionally, I managed to work around this issue by returning an arrow table in the function provided to map_groups, but it should be a bug related to the data structures used in sort operation I think.

tanzeyy avatar Jul 25 '24 03:07 tanzeyy

Thank you for the acknowledgment. I am glad you found a workaround. If you are working on a large dataset instead of pandas you can use Modin as well to scale the processing. https://modin.readthedocs.io/en/latest/

PranitKatwe avatar Jul 25 '24 14:07 PranitKatwe

Hi @scottjlee, could I take this one? I think we probably we miss some code to handle panda.Dataframe in sort function

xingyu-long avatar Sep 12 '24 02:09 xingyu-long

I did some initial research on this, here are my thoughts:

  File "/Users/xingyulong/Desktop/virtualenvs/ray_dev/lib/python3.9/site-packages/ray/data/_internal/planner/exchange/sort_task_spec.py", line 145, in reduce
    return BlockAccessor.for_block(mapper_outputs[0]).merge_sorted_blocks(
  File "/Users/xingyulong/Desktop/virtualenvs/ray_dev/lib/python3.9/site-packages/ray/data/_internal/arrow_block.py", line 602, in merge_sorted_blocks
    blocks = [b for b in blocks if b.num_rows > 0]
  File "/Users/xingyulong/Desktop/virtualenvs/ray_dev/lib/python3.9/site-packages/ray/data/_internal/arrow_block.py", line 602, in <listcomp>
    blocks = [b for b in blocks if b.num_rows > 0]
  File "/Users/xingyulong/Desktop/virtualenvs/ray_dev/lib/python3.9/site-packages/pandas/core/generic.py", line 5902, in __getattr__
    return object.__getattribute__(self, name)
AttributeError: 'DataFrame' object has no attribute 'num_rows'

mapper_outputs is the list of pyarrow.Table blocks (which is why we use arrow_block.py afterwards) and somehow, the inside data was DataFrame and block type was pyarrow.Table, so we called arrow_block.py.

Instead, we should invoke code at here https://github.com/ray-project/ray/blob/a1be06346e532e656c128ffd2230f06a4d72679e/python/ray/data/_internal/pandas_block.py#L504-L518

I have a few questions:

  1. what's the data format would be with following code
results = (
    ds.groupby("tag")
    .map_groups(lambda g: g, batch_format="pandas")
  1. if we add sort, do we have data format info along the way? (I didn't find it in codebase)

@scottjlee could you give some thoughts on this?

xingyu-long avatar Sep 12 '24 04:09 xingyu-long

mapper_outputs is the list of pyarrow.Table blocks (which is why we use arrow_block.py afterwards) and somehow, the inside data was DataFrame and block type was pyarrow.Table, so we called arrow_block.py.

Yes, this sounds like the right analysis of the bug here. The core problem is that in this line, we assume that all of the block types are the same, and we grab the first block to get its corresponding BlockAccessor.

My initial thoughts for the fix is to add a block normalization call like:

blocks = TableBlockAccessor.normalize_block_types(blocks, format)

to the beginning of SortTaskSpec, where format would be the batch/block format from its previous operator.

  1. what's the data format would be with following code results = ( ds.groupby("tag") .map_groups(lambda g: g, batch_format="pandas")

In this case, the output block format would be pandas DataFrames.

  1. if we add sort, do we have data format info along the way? (I didn't find it in codebase)

Sort method shouldn't be specifying any other specific data type, it should use the data type from previous operator.

Thanks @xingyu-long !

scottjlee avatar Sep 12 '24 23:09 scottjlee

@scottjlee Thanks for the clarification! Could you assign this to me? I can give it a try. Thanks!

xingyu-long avatar Sep 12 '24 23:09 xingyu-long

Hi @scottjlee, after spending more time on this issue, I found out after map_groups, we will create empty pyarrow tables and later we use apply udf (https://github.com/ray-project/ray/blob/ee320aa6670514b85e77fce8d6903affcf883cc4/python/ray/data/grouped_data.py#L223) with batch data (so we don't update empty table). so one place we may update beforehand is the beginning of the map_groups(...) function with TableBlockAccessor.normalize_block_types(blocks, format) https://github.com/ray-project/ray/blob/ee320aa6670514b85e77fce8d6903affcf883cc4/python/ray/data/grouped_data.py#L192

However, I have trouble to get all blocks there, and don't know how to replace the blocks within the Dataset.

Could you give some hints on this? or if it's not the place you wanted, could you elaborate it a little more? Thanks!

xingyu-long avatar Sep 17 '24 04:09 xingyu-long

Hi! I have a sequence of the following steps on my dataset: flatmap().groupby().map_groups().

I was facing the same error as above.

When I repartitioned my dataset before calling flatmap, my job succeeded.

I am not sure what happened with my dataset and I will try investigating and posting here, but just wanted to share that there may be something unintentional happening in the map_groups code as described by @xingyu-long which got corrected for me when I used repartition.

dhananjaisharma10 avatar Sep 24 '24 04:09 dhananjaisharma10

I think this solution might works. #39960. It convert every block to arrow type first. But it is not merged, i'm not sure if I can use it. @xingyu-long @scottjlee

wxy117 avatar Oct 11 '24 10:10 wxy117

Thanks @wxy117, it do looks similar to the same issue. Maybe we can wait @scottjlee to decide.

xingyu-long avatar Oct 14 '24 02:10 xingyu-long

Hi! I have a sequence of the following steps on my dataset: flatmap().groupby().map_groups().

I was facing the same error as above.

When I repartitioned my dataset before calling flatmap, my job succeeded.

I am not sure what happened with my dataset and I will try investigating and posting here, but just wanted to share that there may be something unintentional happening in the map_groups code as described by @xingyu-long which got corrected for me when I used repartition.

This worked for me, nice!

mritterfigma avatar Oct 17 '24 19:10 mritterfigma

Sorry for the long delay here. After some deeper investigation, I confirmed that the issue specifically comes up with the case where there is a key specified in the groupby, which will trigger a sort(). (This is also the reason why removing sort() in the example will actually have it succeed. And removing the key in the groupby will also have the example succeed.)

I think the best generalization for fixing the bug here is to modify ExchangeTaskScheduler.execute(), where we should normalize the input blocks. For example, in PullBasedShuffleTaskScheduler.execute(), we can call TableBlockAccessor.normalize_block_types(blocks, format) here. You'll also need to pass the batch_format to ExchangeTaskSpec so you can access it here.

@xingyu-long let me know if this makes sense, or if you have other suggested approaches. thanks!

scottjlee avatar Oct 18 '24 22:10 scottjlee