Suggestions for improving code performance
This is a code for calculating the rolling average of the future ratio/spot ratio - 1 in real-time. Since there can be a large amount of data streaming in from the websocket every second, about 100-200 data points, I’d like to know if you have any suggestions to improve performance?
import asyncio
import numpy as np
from streamz import Stream
from tradebot.exchange import BinanceWebsocketManager
from tradebot.entity import log_register
from tradebot.constants import MARKET_URLS
# ratio description
# 1. calulate the ratio of future price and spot price
# 2. add ratio to a rolling window of size 20
# 3. calculate the mean of the rolling window
log = log_register.get_logger("BTCUSDT", level="INFO", flush=False)
spot_stream = Stream()
future_stream = Stream()
window_size = 20
def cb_future(msg):
if "e" in msg:
future_stream.emit(msg)
def cb_spot(msg):
if "e" in msg:
spot_stream.emit(msg)
async def main():
try:
ws_spot_client = BinanceWebsocketManager(base_url = "wss://stream.binance.com:9443/ws")
ws_um_client = BinanceWebsocketManager(base_url = "wss://fstream.binance.com/ws")
await ws_um_client.subscribe_trade("BTCUSDT", callback=cb_future)
await ws_spot_client.subscribe_trade("BTCUSDT", callback=cb_spot)
ratio = spot_stream.combine_latest(future_stream).map(lambda x: float(x[1]['p']) / float(x[0]['p']) - 1)
ratio.sliding_window(window_size).map(lambda window: np.mean(window)).sink(lambda x: print(f"Ratio Mean: {x:.8f}"))
# await ws_client.subscribe_book_ticker("ETHUSDT", callback=cb)
# await ws_client.subscribe_agg_trades(["BTCUSDT", "ETHUSDT"], callback=cb)
while True:
await asyncio.sleep(1)
except asyncio.CancelledError:
await ws_spot_client.close()
await ws_um_client.close()
print("Websocket closed")
if __name__ == "__main__":
asyncio.run(main())
Here is the implementation of BinanceWebsocket
class WebsocketManager(ABC):
def __init__(
self,
base_url: str,
ping_interval: int = 5,
ping_timeout: int = 5,
close_timeout: int = 1,
max_queue: int = 12,
):
self._base_url = base_url
self._ping_interval = ping_interval
self._ping_timeout = ping_timeout
self._close_timeout = close_timeout
self._max_queue = max_queue
self._tasks: List[asyncio.Task] = []
self._subscripions = defaultdict(asyncio.Queue)
self._log = log_register.get_logger(name=type(self).__name__, level="INFO", flush=True)
async def _consume(self, subscription_id: str, callback: Callable[..., Any] = None, *args, **kwargs):
while True:
msg = await self._subscripions[subscription_id].get()
if asyncio.iscoroutinefunction(callback):
await callback(msg, *args, **kwargs)
else:
callback(msg, *args, **kwargs)
self._subscripions[subscription_id].task_done()
@abstractmethod
async def _subscribe(self, symbol: str, typ: str, channel: str, queue_id: str):
pass
async def close(self):
for task in self._tasks:
task.cancel()
await asyncio.gather(*self._tasks, return_exceptions=True)
self._log.info("All WebSocket connections closed.")
class BinanceWebsocketManager(WebsocketManager):
def __init__(self, base_url: str):
super().__init__(
base_url=base_url,
ping_interval=5,
ping_timeout=5,
close_timeout=1,
max_queue=12,
)
async def _subscribe(self, payload: Dict[str, Any], subscription_id: str):
async for websocket in websockets.connect(
uri = self._base_url,
ping_interval=self._ping_interval,
ping_timeout=self._ping_timeout,
close_timeout=self._close_timeout,
max_queue=self._max_queue,
):
try:
payload = json.dumps(payload)
await websocket.send(payload)
async for msg in websocket:
msg = orjson.loads(msg)
await self._subscripions[subscription_id].put(msg)
except websockets.ConnectionClosed:
self._log.error(f"Connection closed, reconnecting...")
async def subscribe_book_ticker(self, symbol: str, callback: Callable[..., Any] = None, *args, **kwargs):
subscription_id = f"book_ticker.{symbol}"
id = int(time.time() * 1000)
payload = {
"method": "SUBSCRIBE",
"params": [f"{symbol.lower()}@bookTicker"],
"id": id
}
if subscription_id not in self._subscripions:
self._tasks.append(asyncio.create_task(self._consume(subscription_id, callback=callback, *args, **kwargs)))
self._tasks.append(asyncio.create_task(self._subscribe(payload, subscription_id)))
else:
self._log.info(f"Already subscribed to {subscription_id}")
async def subscribe_book_tickers(self, symbols: List[str], callback: Callable[..., Any] = None, *args, **kwargs):
for symbol in symbols:
await self.subscribe_book_ticker(symbol, callback=callback, *args, **kwargs)
async def subscribe_trade(self, symbol: str, callback: Callable[..., Any] = None, *args, **kwargs):
subscription_id = f"trade.{symbol}"
id = int(time.time() * 1000)
payload = {
"method": "SUBSCRIBE",
"params": [f"{symbol.lower()}@trade"],
"id": id
}
if subscription_id not in self._subscripions:
self._tasks.append(asyncio.create_task(self._consume(subscription_id, callback=callback, *args, **kwargs)))
self._tasks.append(asyncio.create_task(self._subscribe(payload, subscription_id)))
else:
self._log.info(f"Already subscribed to {subscription_id}")
async def subscribe_trades(self, symbols: List[str], callback: Callable[..., Any] = None, *args, **kwargs):
for symbol in symbols:
await self.subscribe_trade(symbol, callback=callback, *args, **kwargs)
async def subscribe_agg_trade(self, symbol: str, callback: Callable[..., Any] = None, *args, **kwargs):
subscription_id = f"agg_trade.{symbol}"
id = int(time.time() * 1000)
payload = {
"method": "SUBSCRIBE",
"params": [f"{symbol.lower()}@aggTrade"],
"id": id
}
if subscription_id not in self._subscripions:
self._tasks.append(asyncio.create_task(self._consume(subscription_id, callback=callback, *args, **kwargs)))
self._tasks.append(asyncio.create_task(self._subscribe(payload, subscription_id)))
else:
self._log.info(f"Already subscribed to {subscription_id}")
async def subscribe_agg_trades(self, symbols: List[str], callback: Callable[..., Any] = None, *args, **kwargs):
for symbol in symbols:
await self.subscribe_agg_trade(symbol, callback=callback, *args, **kwargs)
Do you find the performance is lacking? 100 events per second is not a lot for streamz. However, the library you are using and websocket latency are other things, as indeed any CPU time you might be needing for the calculation - I don't know from you code.
Do you find the performance is lacking? 100 events per second is not a lot for streamz. However, the library you are using and websocket latency are other things, as indeed any CPU time you might be needing for the calculation - I don't know from you code.
Thanks for helping. I have another questions:
from streamz import Stream
import asyncio
import time
def increment(x):
time.sleep(0.1)
return x + 1
async def write(x):
await asyncio.sleep(0.2)
print(x)
async def f():
source = Stream(asynchronous=True)
source.map(increment).rate_limit(0.500).sink(write)
for x in range(10):
await source.emit(x)
if __name__ == "__main__":
asyncio.run(f())
from tornado import gen
import time
from streamz import Stream
from tornado.ioloop import IOLoop
def increment(x):
""" A blocking increment function
Simulates a computational function that was not designed to work
asynchronously
"""
time.sleep(0.1)
return x + 1
@gen.coroutine
def write(x):
""" A non-blocking write function
Simulates writing to a database asynchronously
"""
yield gen.sleep(0.2)
print(x)
@gen.coroutine
def f():
source = Stream(asynchronous=True) # tell the stream we're working asynchronously
source.map(increment).rate_limit(0.500).sink(write)
for x in range(10):
yield source.emit(x)
IOLoop().run_sync(f)
what is the different of this two example? I don't think there's any difference in performance between using async and sync here; await emit still blocks the subsequent processes. I tried using asyncio.create(source.emit(x)), but that just threw an error.
I think there are no difference with:
from streamz import Stream
import asyncio
import time
def increment(x):
time.sleep(0.1)
return x + 1
def write(x):
time.sleep(0.2)
print(x)
def f():
source = Stream()
source.map(increment).rate_limit(0.500).sink(write)
for x in range(10):
source.emit(x)
if __name__ == "__main__":
f()
Correct, there will be no difference to a linear chain of event processing. The point of await, is that other async things can be happening at the same time ("concurrently"). In this case, there are no other things to process while waiting.
Correct, there will be no difference to a linear chain of event processing. The point of
await, is that other async things can be happening at the same time ("concurrently"). In this case, there are no other things to process while waiting.
Can you give me some examples of concurrent or non-linear chain of event processing? I'm really struggling to think of any applications. I am trying to emit concurrently, but it causes error.
from streamz import Stream
import asyncio
import time
def increment(x):
time.sleep(0.1)
return x + 1
async def write(x):
await asyncio.sleep(0.2)
print(x)
async def f():
source = Stream(asynchronous=True)
source.map(increment).rate_limit(0.500).sink(write)
for x in range(10):
asyncio.create_task(source.emit(x)) # raise error
if __name__ == "__main__":
asyncio.run(f())
for x in range(10):
asyncio.create_task(source.emit(x)) # raise error
Does indeed kick off all the coroutines, but they all have first a blocking wait, and then wait again before output.
Consider:
async def write(x):
print(x)
await asyncio.sleep(0.2)
async def f():
source = Stream(asynchronous=True)
source.sink(write)
await asyncio.gather(*[source.emit(x) for x in range(10)])
if __name__ == "__main__":
asyncio.run(f())
Here, all the values print immediately, and the whole takes 0.2s to run.
.emit(msg)
.emit() blocks on each message, whereas ._emit() does not (returns a list futures). You could instead buffer a set number of futures before calling await asyncio.gather(*futures) on them.
Also, asynchronous=True will launch the ioloop in the current thread, whereas asynchronous=False will launch it on a separate thread. I've had issues in the past where I could not use asynchronous=True because the event loop was already running in a .py script.