Aggregating data every n elements
Hi,
I would like to calculate the .value_counts() from a pandas dataframe in chunks of n elements and output the aggregated result.
For example, I have 1000 elements. Calculate the value_counts() for the first 100 and output the result, aggregate the result to the next 100 elements and output it again.
I tried
def func(df):
return df
stream.map(df["OFFENSE_CODE"]).partition(100).to_batch().map_partitions(func)
but I get
TypeError Traceback (most recent call last)
<ipython-input-291-e68a39a9e8bb> in <module>
1 def func(df):
2 return df
----> 3 stream.map(df["OFFENSE_CODE"]).partition(100).to_batch().map_partitions(func)
4
5 # a.value_counts()
~\Anaconda3\lib\site-packages\streamz\collection.py in map_partitions(func, *args, **kwargs)
19 example = kwargs.pop('example', None)
20 if example is None:
---> 21 example = func(*[getattr(arg, 'example', arg) for arg in args], **kwargs)
22
23 streams = [arg for arg in args if isinstance(arg, Streaming)]
TypeError: func() missing 1 required positional argument: 'df'
BTW I am not sure this is the best approach to do this. Any hint?
This does not seem to be a streaming problem, you should be able to achieve what you want with pandas alone.
Assuming that you have a stream of pandas dataframe rows (so, that would be pandas series) and not just a single dataframe (because then, @martindurant is right: you do not need to use streamz), you could do something like this:
from streamz import Stream
from operator import add
stream = Stream()
stream.partition(100).to_batch(example=pd.DataFrame(columns=["my_column"])) \
.to_dataframe()["my_column"] \
.value_counts() \
.accumulate_partitions(add)
For example, if you feed it with your series:
import pandas as pd
from time import sleep
for i in range(200):
stream.emit(pd.Series({"my_column": i % 16}))
sleep(0.001)
sleep(0.001)
This is to make it "real-time"ish :)
Thinking about it, you could ~aggregate~ accumulate with a collections.Counter, you don't really need dataframes at all here.