ocpp icon indicating copy to clipboard operation
ocpp copied to clipboard

Offline behaviour. Reconnecting client properly troubles

Open ferfersan6 opened this issue 5 years ago • 4 comments

Hello, it has been a time since I am trying to implement Offline behaviour as described on the specification. When the server or connection is not available, the client is supposed to reconnect (or try it) but also offer the possibility of offline transactions. I have wrapped the ConnectionClosed exception but, the way it is done, it reinstantiates the class and starts the tasks again (the old tasks are not closed). Also, it resends boot notification (which should not). Here it is my idea to do this, although it is not fully working:

async def main(server_ip):
    print("Connecting...")
    try:
        async with websockets.connect(
                'ws://' + server_ip + ':9000/CP_1',
                subprotocols=['ocpp2.0.1']
        ) as ws:
            cp = ChargePoint('CP_1', ws)
            try:
                await asyncio.gather(cp.start(), cp.send_boot_notification(), cp.wait_transaction())
            except websockets.exceptions.ConnectionClosedError as e:
                print(e)
                await main(server_ip)
    except ConnectionRefusedError as e:
        print(e)
        await main(server_ip)

cp.wait_transaction() is just a routine to wait for the ev plug in or authorization.

So, my main objective is:

  1. Reconnect client maintaining the same class instantiation
  2. Use already started tasks if possible.
  3. Not to send boot notification again.

I think this issue would be very useful for the future implementation of the offline behaviour of the library when resolved. Has anyone implemented it?. Thank you!

ferfersan6 avatar Jan 11 '21 15:01 ferfersan6

#119 and #103 are relevant to this situation

I am working on implementing a subclass of ChargePoint for the client end based on my suggestions in 119. It is still a work in progress but does manage to reconnect without re-instantiating the ChargePoint class every time.

My business logic is based on queues so I have also changed the interface to the class to use queues rather than the @xxx methods. When disconnected messages are held in a queue and are sent when reconnected. This is not correct but I plan to change the behaviour as I implement the business logic.

I dropped the context manager approach and instead implemented a reconnect coroutine that runs whenever disconnected from the server, along with support for closing cleanly.

The current state of the work is pasted below. Some of this will be specific to my particular application but it might be helpful.

Note it is unfinished

class _ChargePoint(ChargePoint):
    """subclass to over-ride some methods to provide
        a) queues for call requests and responses
        b) automatic (re-)connection to the server

    Args:
        body_in_q: queue feeding responses to the body
        client_in_q: queue to receive call requests
        server_uri: convenience for server uri
    """

    def __init__(self, **kwargs):
        """instantiate a chargepoint subclass

        Args:
            charger_id (str): ID of the charger.
            connection: Connection to CP (ignored).

        Kwargs:
            body_in_q: q to feed messages to the body
            client_in_q: q to feed messages to the client
            server_uri: str uri
            must_close_event: threading.Event to signal to close the socket
        """
        self.body_in_q = kwargs.pop('body_in_q')
        self.client_in_q = kwargs.pop('client_in_q')
        self.server_uri = kwargs.pop('server_uri')
        # threading.Event signalling when to close the socket
        self.must_close_event = kwargs.pop('must_close_event', None)
        # event signalling when (dis-)connected to server
        # note this can be changed by various coroutines so the sequence
        # if not disconnected_event.is_set():
        # does not guarantee that the following code will find the server there
        self.disconnected_event = asyncio.Event()
        self.disconnected_event.set()
        # now can instantiate super
        charger_id = kwargs.pop('charger_id')
        super().__init__(charger_id, connection=None, **kwargs)

    async def receive_calls(self):
        """replacement for the original start method to handle disconnection
        and remove the call to logging"""
        while True:
            if self.disconnected_event.is_set():
                await asyncio.sleep(0.01)
                continue
            # if the server closes or goes away this raises
            try:
                message = await self._connection.recv()
            except ConnectionRefusedError:
                logger.info('got ConnectionRefusedError')
                self.disconnected_event.set()
            except websockets.exceptions.ConnectionClosed: # closed OK or Error
                logger.warning('got ConnectionClosed from websockets.connect')
                self.disconnected_event.set()
            else:  # no exception
                await self.route_message(message)

    async def _send(self, message):
        """override _send to remove the log"""
        if not self.disconnected_event.is_set():
            await self._connection.send(message)
        else:
            # TODO how should this be handled?
            print('unable to send')

    async def handle_call_requests(self):
        """handle call requests from the sync code in the body

            These become calls to the csms server which should send a response

            Waits on a queue for a request then sends that to the remote server
            and returns the response to the body

            The body should apply a timeout that matches the max allowed
            response time from the server while waiting for the response.

            The hope is that, if the connection to the server is lost at any
            point, the body will timeout.

            The body cannot tell if the request was sent to the server before
            the connection was lost.

            The request could remain in the queue and would be sent when the
            connection is restored. The ChargePoint code should notice that
            the response UUID does not match the new request and drop the
            response.
        """
        while True:
            print(time.time(), 'ocpp handle_call_requests loops')
            # if connection is lost while async_get() this instance closes and
            # no message is lost, but instead remains in the queue.
            # The body is not aware that the call is failing until it fails to
            # get a response on its input queue and decides to time-out.
            # The body could check the qsize to see if the request has been
            # removed.
            # The message will remain on the queue and will be sent when the
            # connection is restored. - confirmed with refactored chargepoint
            request = await self.client_in_q.async_get()
            print(time.time(), f'handle_call_requests sending {request}')
            # if connection closes now the call will fail and the request is
            # lost and there will be no response put on the queue
            # The get will timeout. The body cannot know about this
            #
            # This call can fail if the connection is lost or absent.
            # Cannot catch the exception and report it back to the body as this
            # instance will be deleted.
            # the call can also raise a timeout if the server fails to respond
            # or if it disconnects after we sent the request
            # should be able to catch a timeout, but a disconnect will close
            # this instance.
            # On a timeout just absorb it and allow the body to timeout on the
            # get. This assumes there is no need to close the websocket if the
            # server is slow to respond.
            # OCPP requires that certain request are handled differently if
            # there is a timeout
            #
            # jan21 if the server goes, the call times-out after 30s even though
            # re-connected and only then continues with the new connection
            try:
                response = await self.call(request)
            except asyncio.TimeoutError:
                logger.warning(
                    'csms timeout responding to %r', request, exc_info=True
                )
                # if raised then run_asyn_main gather exits so absorb it
            else:
                print(f'handle_call_requests got response {response}')
                self.body_in_q.put(response)

    async def ensure_connected_to_server(self):
        """handle reconnections to the server whenever needed"""
        while True:
            # wait on an event
            await self.disconnected_event.wait()
            if self.must_close_event.is_set():
                # closing down so do not try to reconnect, wait to die
                logger.info('ensure_connected_to_server waiting to die')
                await asyncio.sleep(5)
            # now do the connection bit
            try:
                logger.info('trying to connect to %s', self.server_uri)
                web_sock = await websockets.connect(
                    self.server_uri,
                    subprotocols=['ocpp2.0'],
                    # ping_timeout=40,
                    # close_timeout=20,
                )
                logger.info('connected to %r', web_sock)
                self._connection = web_sock
                self.disconnected_event.clear()
            except ConnectionRefusedError:
                logger.info('got ConnectionRefusedError')
                await asyncio.sleep(1)  # should be a backoff
            except websockets.exceptions.ConnectionClosed: # closed OK or Error
                logger.warning('got ConnectionClosed from websockets.connect')
                await asyncio.sleep(1)  # should be a backoff

    async def close_socket(self):
        """attempt to close the socket"""
        # how's about this for a bit of functional code?
        _ = self._connection and await self._connection.close()
        # TODO set disconnected but also do not reconnect
        self.disconnected_event.set()

    async def monitor_for_close_event(self):
        """wait on a threading.Event signal to close the socket"""
        # expecting a threading event here which returns false on timeout
        while not self.must_close_event.wait(timeout=0.001):
            # have to put an ansync wait here or the other coroutines do not
            # ever run. could probably make this a thread etc but left for now
            await asyncio.sleep(0.5)
        logger.info('monitor_for_close_event is closing socket')
        await self.close_socket()

jr-tactiq avatar Jan 12 '21 10:01 jr-tactiq

Seems a nice approach! I adapted my code with some ideas extracted from yours. For now, it does the things I want. I will be refinishing the code for cleaner results. At least, the class is not reinstantiated and I keep my routines working fine. Future advances will be communicated.

I am very very grateful for your response! Thank you!!

ferfersan6 avatar Jan 12 '21 15:01 ferfersan6

I realize that the current API of this library is not well suited to support the use case of the opening post. There are some ideas see here: https://github.com/mobilityhouse/ocpp/discussions/165

@jr-tactiq You code sample suggests that you mix async code with threaded code. Note that several asyncio constructs like asyncio.Event and asyncio.Queue are not thread-safe.

OrangeTux avatar Jan 12 '21 20:01 OrangeTux

I realize that the current API of this library is not well suited to support the use case of the opening post. There are some ideas see here: https://github.com/mobilityhouse/ocpp/discussions/165

@jr-tactiq You code sample suggests that you mix async code with threaded code. Note that several asyncio constructs like asyncio.Event and asyncio.Queue are not thread-safe.

Link not working

Aashutosh3804 avatar Mar 27 '21 11:03 Aashutosh3804

As this has gone stale, I'll close this (Although this is still valid and is referenced by discussion https://github.com/mobilityhouse/ocpp/discussions/165)

Jared-Newell-Mobility avatar Dec 18 '23 09:12 Jared-Newell-Mobility