python-can icon indicating copy to clipboard operation
python-can copied to clipboard

BusABC.recv: keep calling _recv_internal until it returns None

Open malsyned opened this issue 2 years ago • 4 comments

Even if recv() is called with timeout=0, the caller's intention is probably for recv() to check all of the messages that have already arrived at the interface until one of them matches the filters.

This is already the way recv() behaves for interface drivers that take advantage of hardware or OS-level filtering, but those that use BusABC's default software-based filtering might return None even if a matching message has already arrived.

malsyned avatar Oct 26 '23 21:10 malsyned

Sorry, this PR is still buggy. I'm going to build out the unit tests a little bit to test recv() more thoroughly and push an updated version in a little while.

malsyned avatar Oct 27 '23 16:10 malsyned

This PR should be good to go now. It simplifies the logic in recv() by leveraging the fact that _recv_internal() already takes care of determining whether an rx timeout has occurred and returns None if it has. Therefore recv() doesn't ever need to check whether time_left > 0 or not, only whether _recv_internal() has returned None or not. Eventually, _recv_internal() will get called with timeout=0, and if there are no messages already in the underlying interface's receive buffer, recv() will return None.

malsyned avatar Oct 27 '23 18:10 malsyned

The only way I can think of to guarantee both

  1. this function terminates regardless of bus or CPU speed, and
  2. that it will return messages that have been received and processed by the system if they are available

is to call recv_internal() and do the filtering in a separate thread, and for recv() to be a wrapper around Queue.get() instead of _recv_internal(). I don't know what that would do to the performance recv() though -- I could see it going either way.

Is this a strategy you'd be open to merging if I coded it up?

malsyned avatar Nov 02 '23 14:11 malsyned

Something like this (just a sketch, not ready to call this a complete implementation yet)

diff --git a/can/bus.py b/can/bus.py
index af517e9d..55d8abac 100644
--- a/can/bus.py
+++ b/can/bus.py
@@ -4,6 +4,7 @@

 import contextlib
 import logging
+import queue
 import threading
 from abc import ABC, ABCMeta, abstractmethod
 from enum import Enum, auto
@@ -97,10 +98,33 @@ def __init__(
         """
         self._periodic_tasks: List[_SelfRemovingCyclicTask] = []
         self.set_filters(can_filters)
+        self._recv_queue = queue.Queue(maxsize=100)
+        self._recv_thread = threading.Thread(target=self._recv_task,
+                                             name='CAN rx filtering',
+                                             daemon=True)
+        self._recv_thread.start()

     def __str__(self) -> str:
         return self.channel_info

+    def _recv_task(self):
+        while not self._is_shutdown:
+            try:
+                msg, already_filtered = self._recv_internal(timeout=0.1)
+                if msg and (already_filtered or self._matches_filters(msg)):
+                    self._recv_put(msg)
+            except Exception as ex:
+                self._recv_put(ex)
+
+    def _recv_put(self, *args, **kwargs):
+        while not self._is_shutdown:
+            try:
+                self._recv_queue.put(timeout=0.1, *args, **kwargs)
+            except queue.Full:
+                continue
+            else:
+                break
+
     def recv(self, timeout: Optional[float] = None) -> Optional[Message]:
         """Block waiting for a message from the Bus.

@@ -113,25 +137,14 @@ def recv(self, timeout: Optional[float] = None) -> Optional[Message]:
         :raises ~can.exceptions.CanOperationError:
             If an error occurred while reading
         """
-        start = time()
-        time_left = timeout
-
-        while True:
-            # try to get a message
-            msg, already_filtered = self._recv_internal(timeout=time_left)
-
-            # propagate timeouts from _recv_internal()
-            if not msg:
-                return None
-
-            # return it, if it matches
-            if already_filtered or self._matches_filters(msg):
-                LOG.log(self.RECV_LOGGING_LEVEL, "Received: %s", msg)
-                return msg
-
-            # try again with reduced timeout
-            if timeout is not None:
-                time_left = max(0, timeout - (time() - start))
+        try:
+            msg = self._recv_queue.get(timeout=timeout)
+        except queue.Empty:
+            return None
+        if isinstance(msg, Exception):
+            raise msg
+        LOG.log(self.RECV_LOGGING_LEVEL, "Received: %s", msg)
+        return msg

     def _recv_internal(
         self, timeout: Optional[float]
@@ -457,6 +470,7 @@ def shutdown(self) -> None:

         self._is_shutdown = True
         self.stop_all_periodic_tasks()
+        self._recv_thread.join()

     def __enter__(self) -> Self:
         return self

malsyned avatar Nov 15 '23 20:11 malsyned