MultiThreadedExecutor in ROS2ARIConnector slows things down 100 times
related: https://github.com/ros2/rclpy/issues/1223
Describe the bug I was reviewing this lovely PR which transforms our old openset detection and segmentation solution into agents. I've noticed a critically bad performance and decided to dive into debugging. Tried few things:
- substitution wait_for_shutdown with time.sleep (script below)
- Debugging weights loading (GroundedSamAgent, GroundingDinoAgent)
- Finally decided to change the executor in ROS2ARIConnector SingleThreaded. The speed. The performance. The speed. The power.
To Reproduce Steps to reproduce the behavior:
- Have RAI set up (colcon build + poetry install --with openset + source setup_shell.sh)
- git checkout 9cf286f
- Run the following script
from rai_open_set_vision.agents import GroundingDinoAgent, GroundedSamAgent
from rai.utils import wait_for_shutdown
import rclpy
def main():
rclpy.init()
agent1 = GroundingDinoAgent()
agent2 = GroundedSamAgent()
agent1.run()
agent2.run()
wait_for_shutdown([agent1, agent2])
rclpy.shutdown()
if __name__ == "__main__":
main()
- Run these tests
from rai.communication.ros2 import ROS2ARIConnector, ROS2ARIMessage
from rai.utils import ROS2Context
from rai_interfaces.srv import RAIGroundingDino, RAIGroundedSam
import cv2
import cv_bridge
from rai.tools.ros2.utils import ros2_message_to_dict
@ROS2Context()
def test_grounding_dino_agent():
connector = ROS2ARIConnector()
bridge = cv_bridge.CvBridge()
image = bridge.cv2_to_imgmsg(cv2.imread("docs/imgs/o3deSimulation.png"))
msg = RAIGroundingDino.Request()
msg.source_img = image
msg.classes = ["chair"]
msg.box_threshold = 0.5
msg.text_threshold = 0.5
ari_msg = ROS2ARIMessage(payload=ros2_message_to_dict(msg))
response = connector.service_call(target="/grounding_dino_classify", message=ari_msg, msg_type="rai_interfaces/srv/RAIGroundingDino", timeout_sec=10)
print(response.payload)
@ROS2Context()
def test_grounded_sam_agent():
connector = ROS2ARIConnector()
bridge = cv_bridge.CvBridge()
image = bridge.cv2_to_imgmsg(cv2.imread("docs/imgs/o3deSimulation.png"), encoding="rgb8")
msg = RAIGroundedSam.Request()
msg.source_img = image
msg.detections = {}
ari_msg = ROS2ARIMessage(payload=ros2_message_to_dict(msg))
response = connector.service_call(target="/grounded_sam_segment", message=ari_msg, msg_type="rai_interfaces/srv/RAIGroundedSam", timeout_sec=10)
print(response.payload)
- Note the time it took pytest to finish
- Change the following line to SingleThreadedExecutor (remember to update import) https://github.com/RobotecAI/rai/blob/0675ff6036bbd1450f34426f10d64f999ab91777/src/rai_core/rai/communication/ros2/connectors/ari_connector.py#L106
- Run the script and pytest again, note the time.
- Should be 100 times faster.
Changing the num_threads parameter in MultiThreadedExecutor did not result in speed up.
Expected behavior A clear and concise description of what you expected to happen.
Screenshots If applicable, add screenshots to help explain your problem.
Platform
- OS: 24.04
- ROS 2 Jazzy
@rachwalk I think you've mentioned it working fairly well on ubuntu 22.04 with ROS 2 humble. Can you please run the pytest to check if changing the executor speeds things up?
Did you investigate why this is happening? Is there excessive synchronization? Does it boil down to executor internals? There has been some fairly recent work on rclpy executors that could be an improvement once we get to Kilted: https://discourse.ros.org/t/faster-rclpy-executor-now-in-rolling/42852
I will delve into the executor in my free time. Unfortunately, the aforementioned executor treats everything as mutually exclusive, which is unsuitable for our connectors :(
I've tested the problem on a different Ubuntu 24.04 machine and I couldn't reproduce the problem. We need to find a way to detect if such bottleneck exists as it directly impacts the swiftness of RAI.
@maciejmajek Couldn't reproduce on Ubuntu 22.04. The two tests described in issue took on average 2.2s using SingleThreadedExecutor and 1.5s using the MultiThreadedExecutor. I found no difference between using the two executors, and couldn't reproduce this issue.
@rachwalk through some additional testing: it seems that ubuntu 22.04 does not have this problem
Terminal 1:
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
# benchmark
class StringPublisher(Node):
def __init__(self):
super().__init__("string_publisher")
self.publisher = self.create_publisher(String, "test_topic", 10)
self.timer = self.create_timer(0.00001, self.publish_message)
def publish_message(self):
msg = String()
msg.data = "Hello, ROS2!"
self.publisher.publish(msg)
if __name__ == "__main__":
rclpy.init()
publisher = StringPublisher()
rclpy.spin(publisher)
rclpy.shutdown()
terminal 2:
import threading
import time
from typing import Literal
import rclpy
import tabulate
from rai.communication.ros2 import ROS2Connector, ROS2Context, wait_for_ros2_topics
from rclpy.executors import MultiThreadedExecutor, SingleThreadedExecutor
from rclpy.node import Node
from std_msgs.msg import String
TEST_TIME = 15
@ROS2Context()
def test_rai(executor_type: Literal["single_threaded", "multi_threaded"]):
msgs: list[str] = []
connector = ROS2Connector(executor_type=executor_type)
wait_for_ros2_topics(connector, ["test_topic"])
connector.register_callback("test_topic", lambda msg: msgs.append(msg))
time.sleep(TEST_TIME)
connector.shutdown()
return len(msgs)
def test_rclpy(executor_type: Literal["single_threaded", "multi_threaded"]):
class StringSubscriber(Node):
def __init__(self):
super().__init__("string_subscriber")
self.msgs: list[str] = []
self.subscriber = self.create_subscription(
String, "test_topic", self.callback, 10
)
def callback(self, msg):
self.msgs.append(msg)
try:
rclpy.init()
subscriber = StringSubscriber()
if executor_type == "single_threaded":
executor = SingleThreadedExecutor()
elif executor_type == "multi_threaded":
executor = MultiThreadedExecutor()
else:
raise ValueError(f"Invalid executor type: {executor_type}")
executor.add_node(subscriber)
thread = threading.Thread(target=executor.spin)
thread.start()
time.sleep(TEST_TIME)
executor.shutdown()
thread.join()
subscriber.destroy_node()
rclpy.shutdown()
except KeyboardInterrupt:
pass
return len(subscriber.msgs)
if __name__ == "__main__":
# columns framework, executor type, number of messages received
results = []
results.append(
["rai", "single_threaded", test_rai(executor_type="single_threaded")]
)
results.append(["rai", "multi_threaded", test_rai(executor_type="multi_threaded")])
results.append(
["rclpy", "single_threaded", test_rclpy(executor_type="single_threaded")]
)
results.append(
["rclpy", "multi_threaded", test_rclpy(executor_type="multi_threaded")]
)
print(
tabulate.tabulate(
results,
headers=["framework", "executor type", "number of messages received"],
tablefmt="grid",
)
)
+-------------+-----------------+-------------------------------+
| framework | executor type | number of messages received |
+=============+=================+===============================+
| rai | single_threaded | 140750 |
+-------------+-----------------+-------------------------------+
| rai | multi_threaded | 78 |
+-------------+-----------------+-------------------------------+
| rclpy | single_threaded | 146118 |
+-------------+-----------------+-------------------------------+
| rclpy | multi_threaded | 71 |
+-------------+-----------------+-------------------------------+