EventsExecutor with an overrunning timer can lead to a burst of timer events
Hi,
I recently found an issue with the EventsExecutor relating to timers. If a timer callback is running slower than its period then it will create multiple timer events inside the EventsExecutor. This leads to 2 issues:
- Other events will not run untill all queued timers events finish
- If the timer callback starts running faster than its period, a sudden burst of timer events will run.
Example:
The code to reproduce the following is given at the end. I am running ROS2 rolling (commit: fa63fcf) on Ubuntu 24.04. The first node is running a timer at 1Hz with a callback that publishes a message before sleeping for 2 seconds. A second node listens and measures the time between messages. The sleep time can be changed via:
ros2 topic pub /sleep_time std_msgs/msg/Float32 data:\ 0.01 -1
| Timer | Listener |
|---|---|
|
|
Why does this happen?
From my understanding of the EventsExecutor, it will add a timer event after each period of the timer regardless of if the timer has finished running or if there are other timer events in the queue. This leads to a build up of timer events. This backlog is the underlying root cause for both 1 and 2. The backlog will delay other incoming events (such as messages) and lead to the burst of timer events when the running time drops.
Observed behavioral differences to other executors:
SingleThreadedExecutor The SingleThreadedExecutor does not display either of the behaviors. If you publish a new message, its callback will be executed after the current timer. If the running time suddenly changes, the timer doesn't run faster than its period.
| Timer | Listener |
|---|---|
|
|
MultiThreadedExecutor The MultiThreadedExecutor can suffer from 1 (solvable by changing the setup of the callback groups) but should not suffer from 2. The issue with 1 has already been raised by this other issue https://github.com/ros2/rclcpp/issues/2402.
| Timer | Listener |
|---|---|
|
|
Code to reproduce
Timer
#include <chrono>
#include <memory>
#include <string>
#include "rclcpp/rclcpp.hpp"
#include "rclcpp/experimental/executors/events_executor/events_executor.hpp"
#include "std_msgs/msg/float32.hpp"
#include "std_msgs/msg/bool.hpp"
using namespace std::chrono_literals;
class Timer : public rclcpp::Node
{
public:
Timer()
: Node("timer")
{
auto callback_group = create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
rclcpp::SubscriptionOptions options;
options.callback_group = callback_group;
timer_ = this->create_wall_timer(1000ms, std::bind(&Timer::timer_callback, this));
sleep_time_sub_ = this->create_subscription<std_msgs::msg::Float32>(
"sleep_time", 10, std::bind(&Timer::sleep_time_callback, this, std::placeholders::_1), options);
publisher_ = this->create_publisher<std_msgs::msg::Bool>("msg", 10);
}
private:
void timer_callback()
{
rclcpp::Rate wait_time(1.0/sleep_time_);
RCLCPP_INFO_STREAM(get_logger(), "Sleeping for " << sleep_time_ << "s");
publisher_->publish(std_msgs::msg::Bool());
wait_time.sleep();
}
void sleep_time_callback(const std_msgs::msg::Float32::SharedPtr msg)
{
sleep_time_ = msg->data;
RCLCPP_INFO_STREAM(get_logger(), "Sleep changed to " << msg->data << "s");
}
private:
rclcpp::TimerBase::SharedPtr timer_;
rclcpp::Subscription<std_msgs::msg::Float32>::SharedPtr sleep_time_sub_;
rclcpp::Publisher<std_msgs::msg::Bool>::SharedPtr publisher_;
float sleep_time_{2.0f};
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
auto node = std::make_shared<Timer>();
rclcpp::experimental::executors::EventsExecutor executor;
// rclcpp::executors::SingleThreadedExecutor executor;
// rclcpp::executors::MultiThreadedExecutor executor;
executor.add_node(node);
executor.spin();
rclcpp::shutdown();
return 0;
}
Listener
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/bool.hpp"
class Listener : public rclcpp::Node
{
public:
Listener()
: Node("listener")
{
subscription_ =
this->create_subscription<std_msgs::msg::Bool>("msg", 10, std::bind(&Listener::callback, this, std::placeholders::_1));
}
void callback(std_msgs::msg::Bool::UniquePtr msg)
{
auto current_time = std::chrono::steady_clock::now();
// Calculate the time difference between the current time and the last received time
auto duration = std::chrono::duration_cast<std::chrono::duration<float>>(current_time - last_time_);
// Print the time difference in milliseconds
RCLCPP_INFO(get_logger(), "%.2fs since last msg", duration.count());
// Update the last time to the current time
last_time_ = current_time;
}
private:
std::chrono::steady_clock::time_point last_time_;
rclcpp::Subscription<std_msgs::msg::Bool>::SharedPtr subscription_;
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<Listener>());
rclcpp::shutdown();
return 0;
}
FYI @alsora to discuss at Client WG meeting. Also @jmachowinski does this impact your implementation?
From my understanding of the EventsExecutor, it will add a timer event after each period of the timer regardless of if the timer has finished running or if there are other timer events in the queue.
Yes, your understanding is correct and this is the expected behavior. Events are generated whenever an entity is ready, and they are not related to how much time it takes to execute that entity (you made an example with a timer, but the same could be said of a subscription that receives a message every second and takes 2 seconds in the subscription callback).
Other events will not run untill all queued timers events finish
yes and no, but this is again expected behavior. in the events executor implementation that you find in rclcpp, events are only sorted by their "arrival time".
let's consider your example where you have a timer with a 1s period and a 2s duration. let's also assume that at T=4s we receive a message
T=1s -> timer event `timer_1` is ready
-> timer event `timer_1` starts processing
-> events queue is empty
T=2s -> timer event `timer_2` is ready
-> timer event `timer_1` is still processing
-> events queue = [`timer_2`]
T=3s -> timer event `timer_3` is ready
-> timer event `timer_1` is done
-> timer event `timer_2` starts processing
-> events queue = [`timer_3`]
T=4s -> timer event `timer_4` is ready
-> subscription event `sub_1` is ready (note that whether this goes before or after `timer_4` is not relevant
-> timer event `timer_2` is still processing
-> events queue = [`timer_3`, `timer_4`, `sub_1`]
So yes, sub_1 will only be executed when previous events are done (assuming a single thread implementation).
This will happen at T=9s.
In my opinion this is the expected and correct behavior.
It has nothing to do with timers, but with the fact that the system is not correclty designed.
Again, what would you do if this wasn't a timer, but it was one or more subscriptions where the callback duration was longer than the period? why would you want to invalidate the ordering of the events?
If the timer callback starts running faster than its period, a sudden burst of timer events will run.
This is the same as before. your timer has a period of 0.01s, but each callback takes 2seconds to run... your queue will grow a lot. The same happens with subscriptions (you receive a message every 0.01s but it takes you 2s to process it).
Having said that... Are there "work-arounds"? Yes! The events executor is designed to allow you to implement your custom queue logic. If you want to have a custom pruning or sorting of the queue you are welcome to do it.
@mattbooker you can try this implementation : https://github.com/cellumation/cm_executors
This one will not suffer from the timer problem, as no new timer events are generated, while a timer is being executed or in the ready to be executed list. The problem still exists for subscriptions, and services though. In the case of a message callback taking too long, you would start the queue up events. I am wondering though, if there is a limit to the number of 'backlog events' for messages. Does the RMW still produce events if the max number of messages specified in the QoS is reached ? @alsora @mjcarroll does any one of you two know this by chance ?
Yes, you will still produce events, but if you have a history qos of let's say 10, only 10 events will be valid. The invalid events will return a "false" when trying to be executed, and the executor will move on to the next one.
Enforcing the qos in the events queue (so that you only have 10 events in this case) is not trivial.
It's not sufficient to stop pushing events, you also need to "re-order" the queue.
Note that even if you don't try to handle this, whenever a history qos is overrun the std::queue implementation that currently is in rclcpp will give you some messages out of order.
Let's say that you have two subscriptions (S1 and S2), S1 has a history QoS of 2. We get a queue like this: S1_1, S2_1, S1_2, S1_3 (where S1_1 is the first event for sub S1). S1 is overrun. To get the correct behavior, you should discard S1_1 and process S2_1, S1_2, S1_3. This way you are preserving the order. However what you currently get is S1_1, S2_1, S1_2 and then S1_3 will fail (because only 2 events can be retrieved from S1). When you process S1_1 you are actually processing the message that arrived with S1_2 and that was supposed to be after S2_1.
I hope this makes sense =)
Enforcing the qos in the events queue (so that you only have 10 events in this case) is not trivial.
Hmmm I think it's quite trivial to support. Pass some shared object to the callback from the rmw with a counter. If you enque an event increment. If you execute an event decrement. If you get an rmw callback, and the counter is at the qos limit don't increment / enque. This would get rid of the potential memory leak issue with the events executor. Not to blow my own horn here, but I like this idea very much... Sounds like my next Friday project....
Yes, that's relatively easy to implement. I think that somewhere we had an event queue implementation that did that internally.
The tricky part is to also re-order the events in the queue when an overrun happens so that the ordering of the events remains valid. Usually you will get an overrun when the system is under heavy load, so this operation must be done efficiently, otherwise you'll cause more overruns by trying to handle them.
This issue has been mentioned on ROS Discourse. There might be relevant details there:
https://discourse.ros.org/t/next-client-library-wg-meeting-friday-21st-march-8am-pst/42590/1