BufferedReader returns oldest messages first
Describe the bug
BufferedReader.get is documented as returning the latest message: "Attempts to retrieve the latest message received by the instance.". In reality BufferedReader.get returns the oldest message in it's queue. This is because the queue is implemented using a SimpleQueue which is a FIFO type queue.
To Reproduce
Receive CAN message and compare the order read using BufferedReader.get to the output of something like candump.
Expected behavior
BufferedReader.get should always return the latest message received. If no message has been received, the most recent item from the queue should be returned.
Additional context
OS and version: Python version: 3.8 python-can version: 3.3.4 python-can interface/s (if applicable):
I implemented a basic LIFO implementation using LIFO queue. Ultimately I believe a circular LIFO queue would be the best implementation for BufferedReader. My very basic LIFO queue implementation is shown below. Note the hacky way I allow new messages when the queue is full by creating a new queue and overwriting the old one.
class LIFOReader(Listener): """ A BufferedReader is a subclass of :class:~can.Listener which implements a **message buffer**: that is, when the :class:can.BufferedReader instance is notified of a new message it pushes it into a queue of messages waiting to be serviced. The messages can then be fetched with :meth:~can.BufferedReader.get_message`.
Putting in messages after :meth:`~can.BufferedReader.stop` has be called will raise
an exception, see :meth:`~can.BufferedReader.on_message_received`.
:attr bool is_stopped: ``True`` iff the reader has been stopped
"""
def __init__(self, maxsize = 128):
# set to "infinite" size
self.buffer = LifoQueue(maxsize=maxsize)
self.is_stopped = False
def on_message_received(self, msg):
"""Append a message to the buffer.
:raises: BufferError
if the reader has already been stopped
"""
if self.is_stopped:
raise RuntimeError("reader has already been stopped")
else:
try:
self.buffer.put(msg)
except Full:
with self.buffer.mutex:
self.buffer = LifoQueue(maxsize=self.buffer.maxsize)
def get_message(self, timeout=0.5):
"""
Attempts to retrieve the latest message received by the instance. If no message is
available it blocks for given timeout or until a message is received, or else
returns None (whichever is shorter). This method does not block after
:meth:`can.BufferedReader.stop` has been called.
:param float timeout: The number of seconds to wait for a new message.
:rytpe: can.Message or None
:return: the message if there is one, or None if there is not.
"""
try:
return self.buffer.get(block=not self.is_stopped, timeout=timeout)
except Empty:
return None
def stop(self):
"""Prohibits any more additions to this reader.
"""
self.is_stopped = True`
I think FIFO is the expected behavior of a buffer. The docstring is incorrect.
But we could add a use_lifo keyword argument in the __init__() method to support both FIFO and LIFO.
I agree.
I think it really depends on what kind of message/data/protocol is being used on the CAN bus. If I've got a message that just has sensor data on it, I want the latest data. If there's an underlying protocol like ISO15765-2 then I want the messages in order.
I use it as it is (FIFO behaviour) and it always made sense to me. I believe other people use it like that too so I hope this fix will not cause much trouble.
I also agree that most common use for a buffered reader is to use a fifo and do not lifo. I would suggest to improve the docstring instead of the implementation.
@trey-sygnalauto Can you give an example where you need a last in first out feature?
I'm good with just changing the docstring.
I have a seed key exchange mechanism in an external device that is used to reset certain states. Seed messages are sent out periodically so if I get the oldest message I end up generating a key with a seed that is no longer valid. I'm more of an embedded C guy so my initial solution after discovering the BufferedReader wasn't behaving as documented was to change the underlying queue structure. Since I've filed this bug I ended up directly registering my rx handling code with the package so its behaves as a real time callback without a buffer.