BusABC.recv: keep calling _recv_internal until it returns None
Even if recv() is called with timeout=0, the caller's intention is probably for recv() to check all of the messages that have already arrived at the interface until one of them matches the filters.
This is already the way recv() behaves for interface drivers that take advantage of hardware or OS-level filtering, but those that use BusABC's default software-based filtering might return None even if a matching message has already arrived.
Sorry, this PR is still buggy. I'm going to build out the unit tests a little bit to test recv() more thoroughly and push an updated version in a little while.
This PR should be good to go now. It simplifies the logic in recv() by leveraging the fact that _recv_internal() already takes care of determining whether an rx timeout has occurred and returns None if it has. Therefore recv() doesn't ever need to check whether time_left > 0 or not, only whether _recv_internal() has returned None or not. Eventually, _recv_internal() will get called with timeout=0, and if there are no messages already in the underlying interface's receive buffer, recv() will return None.
The only way I can think of to guarantee both
- this function terminates regardless of bus or CPU speed, and
- that it will return messages that have been received and processed by the system if they are available
is to call recv_internal() and do the filtering in a separate thread, and for recv() to be a wrapper around Queue.get() instead of _recv_internal(). I don't know what that would do to the performance recv() though -- I could see it going either way.
Is this a strategy you'd be open to merging if I coded it up?
Something like this (just a sketch, not ready to call this a complete implementation yet)
diff --git a/can/bus.py b/can/bus.py
index af517e9d..55d8abac 100644
--- a/can/bus.py
+++ b/can/bus.py
@@ -4,6 +4,7 @@
import contextlib
import logging
+import queue
import threading
from abc import ABC, ABCMeta, abstractmethod
from enum import Enum, auto
@@ -97,10 +98,33 @@ def __init__(
"""
self._periodic_tasks: List[_SelfRemovingCyclicTask] = []
self.set_filters(can_filters)
+ self._recv_queue = queue.Queue(maxsize=100)
+ self._recv_thread = threading.Thread(target=self._recv_task,
+ name='CAN rx filtering',
+ daemon=True)
+ self._recv_thread.start()
def __str__(self) -> str:
return self.channel_info
+ def _recv_task(self):
+ while not self._is_shutdown:
+ try:
+ msg, already_filtered = self._recv_internal(timeout=0.1)
+ if msg and (already_filtered or self._matches_filters(msg)):
+ self._recv_put(msg)
+ except Exception as ex:
+ self._recv_put(ex)
+
+ def _recv_put(self, *args, **kwargs):
+ while not self._is_shutdown:
+ try:
+ self._recv_queue.put(timeout=0.1, *args, **kwargs)
+ except queue.Full:
+ continue
+ else:
+ break
+
def recv(self, timeout: Optional[float] = None) -> Optional[Message]:
"""Block waiting for a message from the Bus.
@@ -113,25 +137,14 @@ def recv(self, timeout: Optional[float] = None) -> Optional[Message]:
:raises ~can.exceptions.CanOperationError:
If an error occurred while reading
"""
- start = time()
- time_left = timeout
-
- while True:
- # try to get a message
- msg, already_filtered = self._recv_internal(timeout=time_left)
-
- # propagate timeouts from _recv_internal()
- if not msg:
- return None
-
- # return it, if it matches
- if already_filtered or self._matches_filters(msg):
- LOG.log(self.RECV_LOGGING_LEVEL, "Received: %s", msg)
- return msg
-
- # try again with reduced timeout
- if timeout is not None:
- time_left = max(0, timeout - (time() - start))
+ try:
+ msg = self._recv_queue.get(timeout=timeout)
+ except queue.Empty:
+ return None
+ if isinstance(msg, Exception):
+ raise msg
+ LOG.log(self.RECV_LOGGING_LEVEL, "Received: %s", msg)
+ return msg
def _recv_internal(
self, timeout: Optional[float]
@@ -457,6 +470,7 @@ def shutdown(self) -> None:
self._is_shutdown = True
self.stop_all_periodic_tasks()
+ self._recv_thread.join()
def __enter__(self) -> Self:
return self