Offline behaviour. Reconnecting client properly troubles
Hello, it has been a time since I am trying to implement Offline behaviour as described on the specification. When the server or connection is not available, the client is supposed to reconnect (or try it) but also offer the possibility of offline transactions. I have wrapped the ConnectionClosed exception but, the way it is done, it reinstantiates the class and starts the tasks again (the old tasks are not closed). Also, it resends boot notification (which should not). Here it is my idea to do this, although it is not fully working:
async def main(server_ip):
print("Connecting...")
try:
async with websockets.connect(
'ws://' + server_ip + ':9000/CP_1',
subprotocols=['ocpp2.0.1']
) as ws:
cp = ChargePoint('CP_1', ws)
try:
await asyncio.gather(cp.start(), cp.send_boot_notification(), cp.wait_transaction())
except websockets.exceptions.ConnectionClosedError as e:
print(e)
await main(server_ip)
except ConnectionRefusedError as e:
print(e)
await main(server_ip)
cp.wait_transaction() is just a routine to wait for the ev plug in or authorization.
So, my main objective is:
- Reconnect client maintaining the same class instantiation
- Use already started tasks if possible.
- Not to send boot notification again.
I think this issue would be very useful for the future implementation of the offline behaviour of the library when resolved. Has anyone implemented it?. Thank you!
#119 and #103 are relevant to this situation
I am working on implementing a subclass of ChargePoint for the client end based on my suggestions in 119. It is still a work in progress but does manage to reconnect without re-instantiating the ChargePoint class every time.
My business logic is based on queues so I have also changed the interface to the class to use queues rather than the @xxx methods. When disconnected messages are held in a queue and are sent when reconnected. This is not correct but I plan to change the behaviour as I implement the business logic.
I dropped the context manager approach and instead implemented a reconnect coroutine that runs whenever disconnected from the server, along with support for closing cleanly.
The current state of the work is pasted below. Some of this will be specific to my particular application but it might be helpful.
Note it is unfinished
class _ChargePoint(ChargePoint):
"""subclass to over-ride some methods to provide
a) queues for call requests and responses
b) automatic (re-)connection to the server
Args:
body_in_q: queue feeding responses to the body
client_in_q: queue to receive call requests
server_uri: convenience for server uri
"""
def __init__(self, **kwargs):
"""instantiate a chargepoint subclass
Args:
charger_id (str): ID of the charger.
connection: Connection to CP (ignored).
Kwargs:
body_in_q: q to feed messages to the body
client_in_q: q to feed messages to the client
server_uri: str uri
must_close_event: threading.Event to signal to close the socket
"""
self.body_in_q = kwargs.pop('body_in_q')
self.client_in_q = kwargs.pop('client_in_q')
self.server_uri = kwargs.pop('server_uri')
# threading.Event signalling when to close the socket
self.must_close_event = kwargs.pop('must_close_event', None)
# event signalling when (dis-)connected to server
# note this can be changed by various coroutines so the sequence
# if not disconnected_event.is_set():
# does not guarantee that the following code will find the server there
self.disconnected_event = asyncio.Event()
self.disconnected_event.set()
# now can instantiate super
charger_id = kwargs.pop('charger_id')
super().__init__(charger_id, connection=None, **kwargs)
async def receive_calls(self):
"""replacement for the original start method to handle disconnection
and remove the call to logging"""
while True:
if self.disconnected_event.is_set():
await asyncio.sleep(0.01)
continue
# if the server closes or goes away this raises
try:
message = await self._connection.recv()
except ConnectionRefusedError:
logger.info('got ConnectionRefusedError')
self.disconnected_event.set()
except websockets.exceptions.ConnectionClosed: # closed OK or Error
logger.warning('got ConnectionClosed from websockets.connect')
self.disconnected_event.set()
else: # no exception
await self.route_message(message)
async def _send(self, message):
"""override _send to remove the log"""
if not self.disconnected_event.is_set():
await self._connection.send(message)
else:
# TODO how should this be handled?
print('unable to send')
async def handle_call_requests(self):
"""handle call requests from the sync code in the body
These become calls to the csms server which should send a response
Waits on a queue for a request then sends that to the remote server
and returns the response to the body
The body should apply a timeout that matches the max allowed
response time from the server while waiting for the response.
The hope is that, if the connection to the server is lost at any
point, the body will timeout.
The body cannot tell if the request was sent to the server before
the connection was lost.
The request could remain in the queue and would be sent when the
connection is restored. The ChargePoint code should notice that
the response UUID does not match the new request and drop the
response.
"""
while True:
print(time.time(), 'ocpp handle_call_requests loops')
# if connection is lost while async_get() this instance closes and
# no message is lost, but instead remains in the queue.
# The body is not aware that the call is failing until it fails to
# get a response on its input queue and decides to time-out.
# The body could check the qsize to see if the request has been
# removed.
# The message will remain on the queue and will be sent when the
# connection is restored. - confirmed with refactored chargepoint
request = await self.client_in_q.async_get()
print(time.time(), f'handle_call_requests sending {request}')
# if connection closes now the call will fail and the request is
# lost and there will be no response put on the queue
# The get will timeout. The body cannot know about this
#
# This call can fail if the connection is lost or absent.
# Cannot catch the exception and report it back to the body as this
# instance will be deleted.
# the call can also raise a timeout if the server fails to respond
# or if it disconnects after we sent the request
# should be able to catch a timeout, but a disconnect will close
# this instance.
# On a timeout just absorb it and allow the body to timeout on the
# get. This assumes there is no need to close the websocket if the
# server is slow to respond.
# OCPP requires that certain request are handled differently if
# there is a timeout
#
# jan21 if the server goes, the call times-out after 30s even though
# re-connected and only then continues with the new connection
try:
response = await self.call(request)
except asyncio.TimeoutError:
logger.warning(
'csms timeout responding to %r', request, exc_info=True
)
# if raised then run_asyn_main gather exits so absorb it
else:
print(f'handle_call_requests got response {response}')
self.body_in_q.put(response)
async def ensure_connected_to_server(self):
"""handle reconnections to the server whenever needed"""
while True:
# wait on an event
await self.disconnected_event.wait()
if self.must_close_event.is_set():
# closing down so do not try to reconnect, wait to die
logger.info('ensure_connected_to_server waiting to die')
await asyncio.sleep(5)
# now do the connection bit
try:
logger.info('trying to connect to %s', self.server_uri)
web_sock = await websockets.connect(
self.server_uri,
subprotocols=['ocpp2.0'],
# ping_timeout=40,
# close_timeout=20,
)
logger.info('connected to %r', web_sock)
self._connection = web_sock
self.disconnected_event.clear()
except ConnectionRefusedError:
logger.info('got ConnectionRefusedError')
await asyncio.sleep(1) # should be a backoff
except websockets.exceptions.ConnectionClosed: # closed OK or Error
logger.warning('got ConnectionClosed from websockets.connect')
await asyncio.sleep(1) # should be a backoff
async def close_socket(self):
"""attempt to close the socket"""
# how's about this for a bit of functional code?
_ = self._connection and await self._connection.close()
# TODO set disconnected but also do not reconnect
self.disconnected_event.set()
async def monitor_for_close_event(self):
"""wait on a threading.Event signal to close the socket"""
# expecting a threading event here which returns false on timeout
while not self.must_close_event.wait(timeout=0.001):
# have to put an ansync wait here or the other coroutines do not
# ever run. could probably make this a thread etc but left for now
await asyncio.sleep(0.5)
logger.info('monitor_for_close_event is closing socket')
await self.close_socket()
Seems a nice approach! I adapted my code with some ideas extracted from yours. For now, it does the things I want. I will be refinishing the code for cleaner results. At least, the class is not reinstantiated and I keep my routines working fine. Future advances will be communicated.
I am very very grateful for your response! Thank you!!
I realize that the current API of this library is not well suited to support the use case of the opening post. There are some ideas see here: https://github.com/mobilityhouse/ocpp/discussions/165
@jr-tactiq You code sample suggests that you mix async code with threaded code. Note that several asyncio constructs like asyncio.Event and asyncio.Queue are not thread-safe.
I realize that the current API of this library is not well suited to support the use case of the opening post. There are some ideas see here: https://github.com/mobilityhouse/ocpp/discussions/165
@jr-tactiq You code sample suggests that you mix async code with threaded code. Note that several asyncio constructs like
asyncio.Eventandasyncio.Queueare not thread-safe.
Link not working
As this has gone stale, I'll close this (Although this is still valid and is referenced by discussion https://github.com/mobilityhouse/ocpp/discussions/165)