[Data] Sorting on grouped data not working with pandas format
What happened + What you expected to happen
I am working on handling grouped data and sorting on certain column, and I want to use pandas operations to process the grouped data. However, it seems sort operation doesn't work with map_groups(..., batch_format='pandas').
Reproduction process:
import ray
data = [
{"tag": "kitty", "number": 12},
{"tag": "karen", "number": 1},
{"tag": "karen", "number": 3},
{"tag": "times", "number": 2},
]
ds = ray.data.from_items(data)
results = (
ds.groupby("tag")
.map_groups(lambda g: g, batch_format="pandas")
.sort("number", descending=True)
.to_pandas()
)
print(results)
Run above script will produce AttributeError: 'DataFrame' object has no attribute 'num_rows':
...
ray.exceptions.RayTaskError(AttributeError): ray::reduce() (pid=1919807, ip=10.208.49.99)
File "/.../venvs/dev/lib/python3.10/site-packages/ray/data/_internal/planner/exchange/sort_task_spec.py", line 143, in reduce
return BlockAccessor.for_block(mapper_outputs[0]).merge_sorted_blocks(
File "/.../venvs/dev/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 554, in merge_sorted_blocks
blocks = [b for b in blocks if b.num_rows > 0]
File "/.../venvs/dev/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 554, in <listcomp>
blocks = [b for b in blocks if b.num_rows > 0]
File "/.../venvs/dev/lib/python3.10/site-packages/pandas/core/generic.py", line 6299, in __getattr__
return object.__getattribute__(self, name)
AttributeError: 'DataFrame' object has no attribute 'num_rows'
After changing batch_format to pyarrow, above code would work properly:
Moreover, remove sort and it will also work:
Versions / Dependencies
- Python 3.10.12
- ray 2.24.0
Reproduction script
import ray
data = [
{"tag": "kitty", "number": 12},
{"tag": "karen", "number": 1},
{"tag": "karen", "number": 3},
{"tag": "times", "number": 2},
]
ds = ray.data.from_items(data)
results = (
ds.groupby("tag")
.map_groups(lambda g: g, batch_format="pandas")
.sort("number", descending=True)
.to_pandas()
)
print(results)
Issue Severity
Medium: It is a significant difficulty but I can work around it.
Hey, I am trying to contribute to Ray and I am willing to try this as my first issue. I wanted to understand what you exactly want with the output or what exactly you are looking to solve from this issue. As far as I understand you want Karen to group by and then provide a sorted list? Correct me if I am wrong
import ray
import pandas as pd
data = [
{"tag": "kitty", "number": 12},
{"tag": "karen", "number": 1},
{"tag": "karen", "number": 3},
{"tag": "times", "number": 2},
]
ds = ray.data.from_items(data)
def process_data(df):
return (df.groupby("tag")
.agg({"number": "sum"})
.reset_index()
.sort_values("number", ascending=False))
results = (
ds.to_pandas()
.pipe(process_data)
)
print(results)
This code should give you the output of groupby as well as sorted
Hey, I am trying to contribute to Ray and I am willing to try this as my first issue. I wanted to understand what you exactly want with the output or what exactly you are looking to solve from this issue. As far as I understand you want Karen to group by and then provide a sorted list? Correct me if I am wrong
import ray import pandas as pd data = [ {"tag": "kitty", "number": 12}, {"tag": "karen", "number": 1}, {"tag": "karen", "number": 3}, {"tag": "times", "number": 2}, ] ds = ray.data.from_items(data) def process_data(df): return (df.groupby("tag") .agg({"number": "sum"}) .reset_index() .sort_values("number", ascending=False)) results = ( ds.to_pandas() .pipe(process_data) ) print(results)This code should give you the output of
groupbyas well assorted
Thank you for your reply!
I need to process data at a scale of hundreds of gigabytes, so the initial ds.to_pandas() approach might not be feasible for me. Also, my workflow involves complex processing on the grouped data, which is why I intended to use map_groups.
Additionally, I managed to work around this issue by returning an arrow table in the function provided to map_groups, but it should be a bug related to the data structures used in sort operation I think.
Thank you for the acknowledgment. I am glad you found a workaround. If you are working on a large dataset instead of pandas you can use Modin as well to scale the processing. https://modin.readthedocs.io/en/latest/
Hi @scottjlee, could I take this one? I think we probably we miss some code to handle panda.Dataframe in sort function
I did some initial research on this, here are my thoughts:
File "/Users/xingyulong/Desktop/virtualenvs/ray_dev/lib/python3.9/site-packages/ray/data/_internal/planner/exchange/sort_task_spec.py", line 145, in reduce
return BlockAccessor.for_block(mapper_outputs[0]).merge_sorted_blocks(
File "/Users/xingyulong/Desktop/virtualenvs/ray_dev/lib/python3.9/site-packages/ray/data/_internal/arrow_block.py", line 602, in merge_sorted_blocks
blocks = [b for b in blocks if b.num_rows > 0]
File "/Users/xingyulong/Desktop/virtualenvs/ray_dev/lib/python3.9/site-packages/ray/data/_internal/arrow_block.py", line 602, in <listcomp>
blocks = [b for b in blocks if b.num_rows > 0]
File "/Users/xingyulong/Desktop/virtualenvs/ray_dev/lib/python3.9/site-packages/pandas/core/generic.py", line 5902, in __getattr__
return object.__getattribute__(self, name)
AttributeError: 'DataFrame' object has no attribute 'num_rows'
mapper_outputs is the list of pyarrow.Table blocks (which is why we use arrow_block.py afterwards) and somehow, the inside data was DataFrame and block type was pyarrow.Table, so we called arrow_block.py.
Instead, we should invoke code at here https://github.com/ray-project/ray/blob/a1be06346e532e656c128ffd2230f06a4d72679e/python/ray/data/_internal/pandas_block.py#L504-L518
I have a few questions:
- what's the data format would be with following code
results = (
ds.groupby("tag")
.map_groups(lambda g: g, batch_format="pandas")
- if we add sort, do we have data format info along the way? (I didn't find it in codebase)
@scottjlee could you give some thoughts on this?
mapper_outputs is the list of pyarrow.Table blocks (which is why we use arrow_block.py afterwards) and somehow, the inside data was DataFrame and block type was pyarrow.Table, so we called arrow_block.py.
Yes, this sounds like the right analysis of the bug here. The core problem is that in this line, we assume that all of the block types are the same, and we grab the first block to get its corresponding BlockAccessor.
My initial thoughts for the fix is to add a block normalization call like:
blocks = TableBlockAccessor.normalize_block_types(blocks, format)
to the beginning of SortTaskSpec, where format would be the batch/block format from its previous operator.
- what's the data format would be with following code
results = ( ds.groupby("tag") .map_groups(lambda g: g, batch_format="pandas")
In this case, the output block format would be pandas DataFrames.
- if we add sort, do we have data format info along the way? (I didn't find it in codebase)
Sort method shouldn't be specifying any other specific data type, it should use the data type from previous operator.
Thanks @xingyu-long !
@scottjlee Thanks for the clarification! Could you assign this to me? I can give it a try. Thanks!
Hi @scottjlee, after spending more time on this issue, I found out after map_groups, we will create empty pyarrow tables and later we use apply udf (https://github.com/ray-project/ray/blob/ee320aa6670514b85e77fce8d6903affcf883cc4/python/ray/data/grouped_data.py#L223) with batch data (so we don't update empty table). so one place we may update beforehand is the beginning of the map_groups(...) function with TableBlockAccessor.normalize_block_types(blocks, format) https://github.com/ray-project/ray/blob/ee320aa6670514b85e77fce8d6903affcf883cc4/python/ray/data/grouped_data.py#L192
However, I have trouble to get all blocks there, and don't know how to replace the blocks within the Dataset.
Could you give some hints on this? or if it's not the place you wanted, could you elaborate it a little more? Thanks!
Hi! I have a sequence of the following steps on my dataset: flatmap().groupby().map_groups().
I was facing the same error as above.
When I repartitioned my dataset before calling flatmap, my job succeeded.
I am not sure what happened with my dataset and I will try investigating and posting here, but just wanted to share that there may be something unintentional happening in the map_groups code as described by @xingyu-long which got corrected for me when I used repartition.
I think this solution might works. #39960. It convert every block to arrow type first. But it is not merged, i'm not sure if I can use it. @xingyu-long @scottjlee
Thanks @wxy117, it do looks similar to the same issue. Maybe we can wait @scottjlee to decide.
Hi! I have a sequence of the following steps on my dataset:
flatmap().groupby().map_groups().I was facing the same error as above.
When I repartitioned my dataset before calling
flatmap, my job succeeded.I am not sure what happened with my dataset and I will try investigating and posting here, but just wanted to share that there may be something unintentional happening in the
map_groupscode as described by @xingyu-long which got corrected for me when I usedrepartition.
This worked for me, nice!
Sorry for the long delay here. After some deeper investigation, I confirmed that the issue specifically comes up with the case where there is a key specified in the groupby, which will trigger a sort(). (This is also the reason why removing sort() in the example will actually have it succeed. And removing the key in the groupby will also have the example succeed.)
I think the best generalization for fixing the bug here is to modify ExchangeTaskScheduler.execute(), where we should normalize the input blocks. For example, in PullBasedShuffleTaskScheduler.execute(), we can call TableBlockAccessor.normalize_block_types(blocks, format) here. You'll also need to pass the batch_format to ExchangeTaskSpec so you can access it here.
@xingyu-long let me know if this makes sense, or if you have other suggested approaches. thanks!