python-bitget icon indicating copy to clipboard operation
python-bitget copied to clipboard

Websocket doesn't re-subscribe upon disconnect

Open novicetopython opened this issue 1 year ago • 3 comments

Thanks for your great work on this.

The websocket re-connects on a disconnect but doesn't seem to re-subscribe to channels.

2024-02-14 04:48:48.092 | ERROR | pybitget.stream:__on_error:235 - Connection to remote host was lost. 2024-02-14 04:48:48.093 | INFO | pybitget.stream:__re_connect:252 - start reconnection ... 2024-02-14 04:48:48.095 | INFO | pybitget.stream:__on_close:241 - ws is closeing ......close_status:None,close_msg:None 2024-02-14 04:48:49.505 | INFO | pybitget.stream:__on_open:179 - connection is success.... 2024-02-14 04:48:49.566 | INFO | pybitget.stream:__on_open:179 - connection is success....

I have edited stream.py as below in to close the connection and then wait in case it needs time until properly logged in before trying to re-subscribe

def __re_connect(self):
    if not self.__keyboard_interrupt_flag:
        self.__reconnect_status = True
        logger.info("start reconnection ...")
        # Close the current connection
        self.__close()
        # Rebuild the connection
        self.build()
        # Wait for the connection to be established before resubscribing
        while not self.has_connect():
            time.sleep(1)
        for channel in self.__all_suribe:
            self.subscribe([channel])            
        #pass
        logger.info("Resubscribed to channels: {}".format(self.__all_suribe))

Could ideally do with the script catching KeyboardException (I have tried to implement within stream.py but isn't error free) and a close and reconnect if no messages received for a period of time.

Thanks

novicetopython avatar Feb 14 '24 20:02 novicetopython

Have changed the coding to use Websockets and Asyncio so this is not needed now. If you know how to get the V2 websocket working that would be good - can get it to connect and login but get an error when trying to subscribe even though followed the subscription format exactly.

novicetopython avatar Feb 23 '24 17:02 novicetopython

Have changed the coding to use Websockets and Asyncio so this is not needed now. If you know how to get the V2 websocket working that would be good - can get it to connect and login but get an error when trying to subscribe even though followed the subscription format exactly.

It would be nice to show how you changed the Websocket code, since I also have the problem, that once the connection is lost because the webserver of Bitget are overloaded, it is not re-connecting anymore.

This is how it looks when entered the death spiral, not recovering anymore.

2024-03-08 18:25:27.124 | INFO     | pybitget.stream:__on_close:219 - ws is closeing ......close_status:None,close_msg:None
2024-03-08 18:25:27.125 | DEBUG    | pybitget.stream:send_message:131 - {"op": "subscribe", "args": [{"inst_type": "dmcbl", "channel": "ordersAlgo", "inst_id": "default"}]}
2024-03-08 18:25:27.126 | DEBUG    | pybitget.stream:send_message:131 - {"op": "subscribe", "args": [{"inst_type": "dmcbl", "channel": "positions", "inst_id": "default"}]}
2024-03-08 18:25:27.126 | DEBUG    | pybitget.stream:send_message:131 - {"op": "subscribe", "args": [{"inst_type": "dmcbl", "channel": "orders", "inst_id": "default"}]}
2024-03-08 18:25:27.126 | DEBUG    | pybitget.stream:send_message:131 - {"op": "subscribe", "args": [{"inst_type": "dmcbl", "channel": "account", "inst_id": "default"}]}
2024-03-08 18:25:27.127 | INFO     | pybitget.stream:__on_close:219 - ws is closeing ......close_status:None,close_msg:None
2024-03-08 18:25:27.127 | INFO     | pybitget.stream:__re_connect:226 - start reconnection ...
2024-03-08 18:25:27.128 | INFO     | pybitget.stream:build:73 - start connecting...wss://ws.bitget.com/mix/v1/stream
2024-03-08 18:25:27.132 | ERROR    | pybitget.stream:__keep_connected:126 - socket is already closed.
2024-03-08 18:25:27.205 | ERROR    | pybitget.stream:__keep_connected:126 - socket is already closed.
2024-03-08 18:25:27.206 | ERROR    | pybitget.stream:__keep_connected:126 - socket is already closed.
2024-03-08 18:25:27.207 | ERROR    | pybitget.stream:__keep_connected:126 - socket is already closed.
2024-03-08 18:25:27.208 | ERROR    | pybitget.stream:__keep_connected:126 - socket is already closed.
2024-03-08 18:25:27.208 | ERROR    | pybitget.stream:__keep_connected:126 - socket is already closed.
2024-03-08 18:25:27.209 | ERROR    | pybitget.stream:__keep_connected:126 - socket is already closed.
2024-03-08 18:25:27.209 | ERROR    | pybitget.stream:__keep_connected:126 - socket is already closed.
2024-03-08 18:25:27.251 | ERROR    | pybitget.stream:__on_error:213 - Handshake status 400 Bad Request -+-+- {'server': 'CloudFront', 'date': 'Fri, 08 Mar 2024 17:25:27 GMT', 'content-type': 'text/html', 'content-length': '915', 'connection': 'close', 'x-cache': 'Error from cloudfront', 'via': '1.1 9a97e41242551c9a56be1311e4d3db70.cloudfront.net (CloudFront)', 'x-amz-cf-pop': 'FRA60-P10', 'x-amz-cf-id': 'CKYJJoFQ0beJIu6F-g_leOeV3Z8aTxEAA=='} -+-+- b'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">\n<HTML><HEAD><META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1">\n<TITLE>ERROR: The request could not be satisfied</TITLE>\n</HEAD><BODY>\n<H1>400 ERROR</H1>\n<H2>The request could not be satisfied.</H2>\n<HR noshade size="1px">\nBad request.\nWe can\'t connect to the server for this app or website at this time. There might be too much traffic or a configuration error. Try again later, or contact the app or website owner.\n<BR clear="all">\nIf you provide content to customers through CloudFront, you can find steps to troubleshoot and help prevent this error by reviewing the CloudFront documentation.\n<BR clear="all">\n<HR noshade size="1px">\n<PRE>\nGenerated by cloudfront (CloudFront)\nRequest ID: CKYJJoFQ0beJIu6F-g_leOeV3Z8aTxEjvxQ5vf-piwvAA==\n</PRE>\n<ADDRESS>\n</ADDRESS>\n</BODY></HTML>'
2024-03-08 18:25:27.251 | INFO     | pybitget.stream:__on_close:219 - ws is closeing ......close_status:None,close_msg:None
2024-03-08 18:25:27.253 | INFO     | pybitget.stream:__on_close:219 - ws is closeing ......close_status:None,close_msg:None
2024-03-08 18:25:27.389 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.390 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.391 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.626 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.636 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.705 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.726 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.972 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.972 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.976 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:27.986 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.084 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.133 | INFO     | pybitget.stream:build:73 - start connecting...wss://ws.bitget.com/mix/v1/stream
2024-03-08 18:25:28.151 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.458 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.514 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.517 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.639 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.698 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.704 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.713 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.721 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.724 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.824 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.824 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:28.864 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:29.064 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:29.068 | ERROR    | pybitget.stream:__keep_connected:126 - Connection is already closed.
2024-03-08 18:25:29.136 | INFO     | pybitget.stream:build:73 - start connecting...wss://ws.bitget.com/mix/v1/stream

erichegit avatar Mar 09 '24 13:03 erichegit

Hi

Builds on what you had already done but I am not trading using the websocket so not logging in.

Thanks

class BitgetWsClient: def init(self, api_key: Optional[str] = None, api_secret: Optional[str] = None, passphrase: Optional[str] = None, channels: Optional[list[SubscribeReq]] = None, # Add channels parameter ping_interval: Optional[int] = 20, ws_url: Optional[str] = None, verbose: Optional[str] = False, handle=None, handle_error=None, ): self.api_key = api_key self.api_secret = api_secret self.passphrase = passphrase self.channels = channels self.ws_url = ws_url or CONTRACT_WS_URL self.verbose = verbose self.connection = False self.login_status = False self.subscribe_status = False self.all_subscribe = set() self.listener = self.handle if handle is None else handle self.error_listener = self.handle_error if handle_error is None else handle_error self.scribe_map = {} self.ws_client = None self.keyboard_interrupt_flag = False self.last_receive_time = self.lastrestarttime = self.last_ping_time = time.time() self.errormonitor = 'unset' self.inerror = False self.reconnect_interval = 60 # 2 minutes in seconds self.ping_interval = ping_interval self.logwrites = 0 self.restarts = 0 # Initialize the websocket_lock attribute self.websocket_lock = asyncio.Lock()

async def connect(self):
    # signal.signal(signal.SIGINT, lambda signal, frame: asyncio.create_task(self.handle_keyboard_interrupt(signal, frame)))
    signal.signal(signal.SIGINT, self.handle_keyboard_interrupt)
    while True:
        if self.keyboard_interrupt_flag:
            await self.unsubscribe(list(self.all_subscribe))
            await asyncio.sleep(2)
            await self.close()
            await asyncio.sleep(2)
            break
        try:
            if self.inerror: await asyncio.sleep(20) # if in error wait 20 seconds before trying to connect again
            logger.info("Trying to connect to WebSocket server.")
            async with websockets.connect(self.ws_url) as websocket:
                logger.info("Connected to WebSocket server.")
                self.websocket = websocket
                self.connection, self.inerror, self.subscribe_status, self.errormonitor = True, False, False, 'unset'
                self.restarts = self.restarts + 1
                if self.restarts > 10: self.update_log("Websocket has restarted 10 times, resetting to zero")
                # don't need to login for public information
                # await self.login()
                await asyncio.sleep(2)
                await self.subscribe(self.channels)
                while True:
                    if self.keyboard_interrupt_flag: break
                    elif time.time() > self.last_receive_time + 600: # if no message received for 10 minutes
                        self.update_log("No data received for 10 minutes")
                        self.connection, self.inerror, self.last_receive_time = False, True, time.time()
                        break
                    elif not self.connection:
                        try:
                            await self.unsubscribe(list(self.all_subscribe))
                            await asyncio.sleep(2)
                            await self.close()
                            await asyncio.sleep(2)
                            break
                        except: break
                    elif time.time() - self.last_ping_time > self.ping_interval: await self.send_ping()
                    else:
                        #async with self.websocket_lock: message = await self.websocket.recv()
                        await self.on_message()
        except websockets.WebSocketException as e:
            logger.error(f"WebSocket error: {e}")
            self.update_log("websockets.WebsocketException in connect, error is "+str(e))
            await asyncio.sleep(20) # wait 20 seconds before trying to reconnect
            continue
        except KeyboardInterrupt:
            print("\nKeyboardInterrupt in main while loop: Closing WebSocket connection.")
            self.keyboard_interrupt_flag = True
            break
        except Exception as e:
            logger.error(f"Error connecting to WebSocket server: {e}")
            self.update_log("Exception in connect, error is "+str(e))
            await asyncio.sleep(20) # wait 20 seconds before trying to reconnect
            continue
    sys.exit()

def update_log(self, msg):
    if self.logwrites < 50:
        updatelogfile(msg,logfile)
        self.logwrites = self.logwrites + 1
        if "restarted" in msg: self.restarts = 0
    return

def handle_keyboard_interrupt(self, signal, frame):
    print("\nKeyboardInterrupt: Closing WebSocket connection.")
    self.keyboard_interrupt_flag = True
    return

async def send_ping(self):
    try:
        self.last_ping_time = time.time()
        await self.websocket.send("ping")
    except Exception as e: 
        logger.error(f"Error sending ping: {e}")
        if self.errormonitor == 'unset': self.errormonitor = time.time()
    return

async def on_message(self):
    try:
        try: message = await asyncio.wait_for(self.websocket.recv(), timeout=120)  # Set timeout to 1 minute
        except asyncio.TimeoutError: return
        #message = await self.websocket.recv()
        if message == 'pong': return
        json_obj = json.loads(message)
        listener = None
        if "data" in str(json_obj):
            if not self.__check_sum(json_obj):
                return
            listener = self.get_listener(json_obj)
            # Update the last receive time whenever data is received
            self.last_receive_time = time.time()
            self.subscribe_status, self.inerror, self.errormonitor = True, False, 'unset'
        elif "connection close" in str(json_obj):
            self.update_log("Connection is closed, error is "+str(json_obj))
            self.connection, self.inerror = False, True
            return
        elif "code" in str(json_obj) and json_obj.get("code") != 0:
            self.update_log("Error message received, error is "+str(json_obj))
            if self.errormonitor == 'unset': self.errormonitor = time.time()
            elif time.time() > self.errormonitor + 300: # if not connected for 5 minutes try to reconnect to server
                self.connection, self.inerror = False, True
            return
        elif "subscribe" in str(json_obj):
            logger.info(f"Subscribed to: {json_obj}")
            return
        elif "login" in str(json_obj):
            if self.verbose: print("login msg: %s" % message)
            self.login_status = True
            return
        else:
            self.update_log("Unknown message received: "+(json_obj))
            print("unknown message is: "+str(json_obj))
            return
        if listener:
            print("in listener")
            listener(message)
            return
        try: self.listener(json_obj)
        except: 
            self.listener(message)
            pass
    except KeyboardInterrupt:
        print("\nKeyboardInterrupt in on_message: Closing WebSocket connection.")
        self.keyboard_interrupt_flag = True
        return
    except websockets.ConnectionClosedOK:
        logger.error("WebSocket connection closed gracefully.")
        self.connection, self.inerror = False, True
    except websockets.ConnectionClosedError as e:
        logger.error(f"WebSocket connection closed unexpectedly: {e}")
        self.update_log("Connection Closeed Error: "+str(e))
        self.connection, self.inerror = False, True
    except Exception as e:
        logger.error(f"Error in on message: {e}")
        self.update_log("Error in on_message: "+str(e))
        if "sent 1000 (OK)" in str(e) or "sent 1011 (unexpected error)" in str(e): self.connection, self.inerror = False, True
        elif self.errormonitor == 'unset': self.errormonitor = time.time()
        elif time.time() > self.errormonitor + 300: # if not connected for 5 minutes try to reconnect to server
            self.connection, self.inerror = False, True
        return

novicetopython avatar Mar 11 '24 11:03 novicetopython