Websocket doesn't re-subscribe upon disconnect
Thanks for your great work on this.
The websocket re-connects on a disconnect but doesn't seem to re-subscribe to channels.
2024-02-14 04:48:48.092 | ERROR | pybitget.stream:__on_error:235 - Connection to remote host was lost. 2024-02-14 04:48:48.093 | INFO | pybitget.stream:__re_connect:252 - start reconnection ... 2024-02-14 04:48:48.095 | INFO | pybitget.stream:__on_close:241 - ws is closeing ......close_status:None,close_msg:None 2024-02-14 04:48:49.505 | INFO | pybitget.stream:__on_open:179 - connection is success.... 2024-02-14 04:48:49.566 | INFO | pybitget.stream:__on_open:179 - connection is success....
I have edited stream.py as below in to close the connection and then wait in case it needs time until properly logged in before trying to re-subscribe
def __re_connect(self):
if not self.__keyboard_interrupt_flag:
self.__reconnect_status = True
logger.info("start reconnection ...")
# Close the current connection
self.__close()
# Rebuild the connection
self.build()
# Wait for the connection to be established before resubscribing
while not self.has_connect():
time.sleep(1)
for channel in self.__all_suribe:
self.subscribe([channel])
#pass
logger.info("Resubscribed to channels: {}".format(self.__all_suribe))
Could ideally do with the script catching KeyboardException (I have tried to implement within stream.py but isn't error free) and a close and reconnect if no messages received for a period of time.
Thanks
Have changed the coding to use Websockets and Asyncio so this is not needed now. If you know how to get the V2 websocket working that would be good - can get it to connect and login but get an error when trying to subscribe even though followed the subscription format exactly.
Have changed the coding to use Websockets and Asyncio so this is not needed now. If you know how to get the V2 websocket working that would be good - can get it to connect and login but get an error when trying to subscribe even though followed the subscription format exactly.
It would be nice to show how you changed the Websocket code, since I also have the problem, that once the connection is lost because the webserver of Bitget are overloaded, it is not re-connecting anymore.
This is how it looks when entered the death spiral, not recovering anymore.
2024-03-08 18:25:27.124 | INFO | pybitget.stream:__on_close:219 - ws is closeing ......close_status:None,close_msg:None
2024-03-08 18:25:27.125 | DEBUG | pybitget.stream:send_message:131 - {"op": "subscribe", "args": [{"inst_type": "dmcbl", "channel": "ordersAlgo", "inst_id": "default"}]}
2024-03-08 18:25:27.126 | DEBUG | pybitget.stream:send_message:131 - {"op": "subscribe", "args": [{"inst_type": "dmcbl", "channel": "positions", "inst_id": "default"}]}
2024-03-08 18:25:27.126 | DEBUG | pybitget.stream:send_message:131 - {"op": "subscribe", "args": [{"inst_type": "dmcbl", "channel": "orders", "inst_id": "default"}]}
2024-03-08 18:25:27.126 | DEBUG | pybitget.stream:send_message:131 - {"op": "subscribe", "args": [{"inst_type": "dmcbl", "channel": "account", "inst_id": "default"}]}
2024-03-08 18:25:27.127 | INFO | pybitget.stream:__on_close:219 - ws is closeing ......close_status:None,close_msg:None
2024-03-08 18:25:27.127 | INFO | pybitget.stream:__re_connect:226 - start reconnection ...
2024-03-08 18:25:27.128 | INFO | pybitget.stream:build:73 - start connecting...wss://ws.bitget.com/mix/v1/stream
2024-03-08 18:25:27.132 | ERROR | pybitget.stream:__keep_connected:126 - socket is already closed.
2024-03-08 18:25:27.205 | ERROR | pybitget.stream:__keep_connected:126 - socket is already closed.
2024-03-08 18:25:27.206 | ERROR | pybitget.stream:__keep_connected:126 - socket is already closed.
2024-03-08 18:25:27.207 | ERROR | pybitget.stream:__keep_connected:126 - socket is already closed.
2024-03-08 18:25:27.208 | ERROR | pybitget.stream:__keep_connected:126 - socket is already closed.
2024-03-08 18:25:27.208 | ERROR | pybitget.stream:__keep_connected:126 - socket is already closed.
2024-03-08 18:25:27.209 | ERROR | pybitget.stream:__keep_connected:126 - socket is already closed.
2024-03-08 18:25:27.209 | ERROR | pybitget.stream:__keep_connected:126 - socket is already closed.
2024-03-08 18:25:27.251 | ERROR | pybitget.stream:__on_error:213 - Handshake status 400 Bad Request -+-+- {'server': 'CloudFront', 'date': 'Fri, 08 Mar 2024 17:25:27 GMT', 'content-type': 'text/html', 'content-length': '915', 'connection': 'close', 'x-cache': 'Error from cloudfront', 'via': '1.1 9a97e41242551c9a56be1311e4d3db70.cloudfront.net (CloudFront)', 'x-amz-cf-pop': 'FRA60-P10', 'x-amz-cf-id': 'CKYJJoFQ0beJIu6F-g_leOeV3Z8aTxEAA=='} -+-+- b'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">\n<HTML><HEAD><META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1">\n<TITLE>ERROR: The request could not be satisfied</TITLE>\n</HEAD><BODY>\n<H1>400 ERROR</H1>\n<H2>The request could not be satisfied.</H2>\n<HR noshade size="1px">\nBad request.\nWe can\'t connect to the server for this app or website at this time. There might be too much traffic or a configuration error. Try again later, or contact the app or website owner.\n<BR clear="all">\nIf you provide content to customers through CloudFront, you can find steps to troubleshoot and help prevent this error by reviewing the CloudFront documentation.\n<BR clear="all">\n<HR noshade size="1px">\n<PRE>\nGenerated by cloudfront (CloudFront)\nRequest ID: CKYJJoFQ0beJIu6F-g_leOeV3Z8aTxEjvxQ5vf-piwvAA==\n</PRE>\n<ADDRESS>\n</ADDRESS>\n</BODY></HTML>'
2024-03-08 18:25:27.251 | INFO | pybitget.stream:__on_close:219 - ws is closeing ......close_status:None,close_msg:None
2024-03-08 18:25:27.253 | INFO | pybitget.stream:__on_close:219 - ws is closeing ......close_status:None,close_msg:None
2024-03-08 18:25:27.389 | ERROR | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.390 | ERROR | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.391 | ERROR | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.626 | ERROR | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.636 | ERROR | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.705 | ERROR | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.726 | ERROR | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.972 | ERROR | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.972 | ERROR | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.976 | ERROR | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.986 | ERROR | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.084 | ERROR | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.133 | INFO | pybitget.stream:build:73 - start connecting...wss://ws.bitget.com/mix/v1/stream
2024-03-08 18:25:28.151 | ERROR | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.458 | ERROR | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.514 | ERROR | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.517 | ERROR | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.639 | ERROR | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.698 | ERROR | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.704 | ERROR | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.713 | ERROR | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.721 | ERROR | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.724 | ERROR | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.824 | ERROR | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.824 | ERROR | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.864 | ERROR | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:29.064 | ERROR | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:29.068 | ERROR | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:29.136 | INFO | pybitget.stream:build:73 - start connecting...wss://ws.bitget.com/mix/v1/stream
Hi
Builds on what you had already done but I am not trading using the websocket so not logging in.
Thanks
class BitgetWsClient: def init(self, api_key: Optional[str] = None, api_secret: Optional[str] = None, passphrase: Optional[str] = None, channels: Optional[list[SubscribeReq]] = None, # Add channels parameter ping_interval: Optional[int] = 20, ws_url: Optional[str] = None, verbose: Optional[str] = False, handle=None, handle_error=None, ): self.api_key = api_key self.api_secret = api_secret self.passphrase = passphrase self.channels = channels self.ws_url = ws_url or CONTRACT_WS_URL self.verbose = verbose self.connection = False self.login_status = False self.subscribe_status = False self.all_subscribe = set() self.listener = self.handle if handle is None else handle self.error_listener = self.handle_error if handle_error is None else handle_error self.scribe_map = {} self.ws_client = None self.keyboard_interrupt_flag = False self.last_receive_time = self.lastrestarttime = self.last_ping_time = time.time() self.errormonitor = 'unset' self.inerror = False self.reconnect_interval = 60 # 2 minutes in seconds self.ping_interval = ping_interval self.logwrites = 0 self.restarts = 0 # Initialize the websocket_lock attribute self.websocket_lock = asyncio.Lock()
async def connect(self):
# signal.signal(signal.SIGINT, lambda signal, frame: asyncio.create_task(self.handle_keyboard_interrupt(signal, frame)))
signal.signal(signal.SIGINT, self.handle_keyboard_interrupt)
while True:
if self.keyboard_interrupt_flag:
await self.unsubscribe(list(self.all_subscribe))
await asyncio.sleep(2)
await self.close()
await asyncio.sleep(2)
break
try:
if self.inerror: await asyncio.sleep(20) # if in error wait 20 seconds before trying to connect again
logger.info("Trying to connect to WebSocket server.")
async with websockets.connect(self.ws_url) as websocket:
logger.info("Connected to WebSocket server.")
self.websocket = websocket
self.connection, self.inerror, self.subscribe_status, self.errormonitor = True, False, False, 'unset'
self.restarts = self.restarts + 1
if self.restarts > 10: self.update_log("Websocket has restarted 10 times, resetting to zero")
# don't need to login for public information
# await self.login()
await asyncio.sleep(2)
await self.subscribe(self.channels)
while True:
if self.keyboard_interrupt_flag: break
elif time.time() > self.last_receive_time + 600: # if no message received for 10 minutes
self.update_log("No data received for 10 minutes")
self.connection, self.inerror, self.last_receive_time = False, True, time.time()
break
elif not self.connection:
try:
await self.unsubscribe(list(self.all_subscribe))
await asyncio.sleep(2)
await self.close()
await asyncio.sleep(2)
break
except: break
elif time.time() - self.last_ping_time > self.ping_interval: await self.send_ping()
else:
#async with self.websocket_lock: message = await self.websocket.recv()
await self.on_message()
except websockets.WebSocketException as e:
logger.error(f"WebSocket error: {e}")
self.update_log("websockets.WebsocketException in connect, error is "+str(e))
await asyncio.sleep(20) # wait 20 seconds before trying to reconnect
continue
except KeyboardInterrupt:
print("\nKeyboardInterrupt in main while loop: Closing WebSocket connection.")
self.keyboard_interrupt_flag = True
break
except Exception as e:
logger.error(f"Error connecting to WebSocket server: {e}")
self.update_log("Exception in connect, error is "+str(e))
await asyncio.sleep(20) # wait 20 seconds before trying to reconnect
continue
sys.exit()
def update_log(self, msg):
if self.logwrites < 50:
updatelogfile(msg,logfile)
self.logwrites = self.logwrites + 1
if "restarted" in msg: self.restarts = 0
return
def handle_keyboard_interrupt(self, signal, frame):
print("\nKeyboardInterrupt: Closing WebSocket connection.")
self.keyboard_interrupt_flag = True
return
async def send_ping(self):
try:
self.last_ping_time = time.time()
await self.websocket.send("ping")
except Exception as e:
logger.error(f"Error sending ping: {e}")
if self.errormonitor == 'unset': self.errormonitor = time.time()
return
async def on_message(self):
try:
try: message = await asyncio.wait_for(self.websocket.recv(), timeout=120) # Set timeout to 1 minute
except asyncio.TimeoutError: return
#message = await self.websocket.recv()
if message == 'pong': return
json_obj = json.loads(message)
listener = None
if "data" in str(json_obj):
if not self.__check_sum(json_obj):
return
listener = self.get_listener(json_obj)
# Update the last receive time whenever data is received
self.last_receive_time = time.time()
self.subscribe_status, self.inerror, self.errormonitor = True, False, 'unset'
elif "connection close" in str(json_obj):
self.update_log("Connection is closed, error is "+str(json_obj))
self.connection, self.inerror = False, True
return
elif "code" in str(json_obj) and json_obj.get("code") != 0:
self.update_log("Error message received, error is "+str(json_obj))
if self.errormonitor == 'unset': self.errormonitor = time.time()
elif time.time() > self.errormonitor + 300: # if not connected for 5 minutes try to reconnect to server
self.connection, self.inerror = False, True
return
elif "subscribe" in str(json_obj):
logger.info(f"Subscribed to: {json_obj}")
return
elif "login" in str(json_obj):
if self.verbose: print("login msg: %s" % message)
self.login_status = True
return
else:
self.update_log("Unknown message received: "+(json_obj))
print("unknown message is: "+str(json_obj))
return
if listener:
print("in listener")
listener(message)
return
try: self.listener(json_obj)
except:
self.listener(message)
pass
except KeyboardInterrupt:
print("\nKeyboardInterrupt in on_message: Closing WebSocket connection.")
self.keyboard_interrupt_flag = True
return
except websockets.ConnectionClosedOK:
logger.error("WebSocket connection closed gracefully.")
self.connection, self.inerror = False, True
except websockets.ConnectionClosedError as e:
logger.error(f"WebSocket connection closed unexpectedly: {e}")
self.update_log("Connection Closeed Error: "+str(e))
self.connection, self.inerror = False, True
except Exception as e:
logger.error(f"Error in on message: {e}")
self.update_log("Error in on_message: "+str(e))
if "sent 1000 (OK)" in str(e) or "sent 1011 (unexpected error)" in str(e): self.connection, self.inerror = False, True
elif self.errormonitor == 'unset': self.errormonitor = time.time()
elif time.time() > self.errormonitor + 300: # if not connected for 5 minutes try to reconnect to server
self.connection, self.inerror = False, True
return