streamz icon indicating copy to clipboard operation
streamz copied to clipboard

Live OHLC V resampling

Open femtotrader opened this issue 8 years ago • 7 comments

Hello,

let's imagine you are receiving trades from an exchange (buy/sell, price, volume).

Streamz should provide a way to live resample this kind of data.

price could be resampled using OHLC (Open, High, Low, Close) and volume could be resampled using sum

ideally Pandas offsets should be supported https://pandas.pydata.org/pandas-docs/stable/timeseries.html#offset-aliases

Kind regards

femtotrader avatar Nov 02 '17 09:11 femtotrader

@mrocklin I noticed your comment http://matthewrocklin.com/blog/work/2017/10/16/streaming-dataframes-1#comment-3599845311

Pull requests welcome.

A first step could be to define an API for this streaming approach of resampling given the fact that resampling a Pandas Series returns a DataFrame but resampling a Pandas DataFrame returns a DataFrame with MultiIndex

Resampling a Series

import pandas_datareader as pdr
df = pdr.DataReader("IBM", "google")
prices = df["Close"]
volumes = df["Volume"]
offset = "M"
prices_monthly = prices.resample(offset).ohlc()
volumes_monthly = volumes.resample(offset).sum()

Resampling a DataFrame

import pandas_datareader as pdr
data = pdr.DataReader(["IBM", "MSFT"], "google")
prices = data["Close"]
volumes = data["Volume"].fillna(0).astype(int)
offset = "M"
prices_monthly = prices.resample(offset).ohlc()
volumes_monthly = volumes.resample(offset).sum()

femtotrader avatar Nov 04 '17 19:11 femtotrader

My guess is that resample could be done in a similar way to how we handle rolling currently. We maintain enough of a history of recent data to fill in history. Whenever new data comes in we concatenate it to this history, perform the pandas operation, slice off the historical bit, emit that data and then keep enough of the new data to serve the next data that comes through. If you wanted to take a look at the Rolling implementation I suspect that that would be a decent start.

mrocklin avatar Nov 04 '17 22:11 mrocklin

Not sure I will have enough time and knowledge about how streamz is working to give you some help about this. Really sorry but it could be a very nice feature

femtotrader avatar Dec 01 '17 14:12 femtotrader

Here is a sample code to generate fake data (trades like):

from streamz import Stream

import pandas as pd
import numpy as np

index = pd.date_range("2017-1-1", "2018-1-1", freq="1Min")[:-1]
n = len(index)
df = pd.DataFrame(index=index)
np.random.seed(123)
df["direction"] = np.random.choice([1,-1], n)
df["price"] = 100 + pd.Series(np.random.rand(n)-0.5, index=index).cumsum()
df["volume"] = (pd.Series(np.random.rand(n) * 1000, index=index)).astype(int)

print(df)

for row in df.iterrows():
    print(row)

for t in df.itertuples():
    print(t)

or ticks like

import pandas as pd
import numpy as np

index = pd.date_range("2017-1-1", "2018-1-1", freq="1Min")[:-1]
n = len(index)
df = pd.DataFrame(index=index)
np.random.seed(123)
df["bid"] = 100 + pd.Series(np.random.rand(n)-0.5, index=index).cumsum()
df["spread"] = pd.Series(np.random.rand(n), index=index)
df["ask"] = df["bid"] + df["spread"] 
df["volume"] = (pd.Series(np.random.rand(n) * 1000, index=index)).astype(int)
df = df[["ask", "bid", "spread", "volume"]]

femtotrader avatar Dec 27 '17 09:12 femtotrader

resampling a dataframe that's already OHLCV in pandas is problematic since pandas will take each column and resample it into OHLC. So you get OHLC for the O, for the H, for L and for C. The whole multindex mess... To avoid this, you gotta resample each series inside the df by its own rules.

something like this:

pd.DataFrame(
{"O":ohlc_prices.O.resample(offset).first(),
 "H": ohlc_prices.H.resample(offset).max(),
 "L": ohlc_prices.L.resample(offset).min(),
 "C":ohlc_prices.L.resample(offset).last(),
  "V": volumes.resample(offset).sum()})

PabTorre avatar Feb 03 '18 00:02 PabTorre

Alternatively, resample can be used with the agg function df.resample(offset).agg({'Open':'first', 'High':'max', 'Low':'min', 'Close':'last', 'Volume':'sum'})

ajakaiye33 avatar Nov 15 '21 22:11 ajakaiye33

In related news, @shwina / the rapids/cudf folks have merged groupby frequency and resampling functionality into cudf v21.12: https://github.com/rapidsai/cudf/pull/9178

jwest75674 avatar Dec 03 '21 13:12 jwest75674