rai icon indicating copy to clipboard operation
rai copied to clipboard

MultiThreadedExecutor in ROS2ARIConnector slows things down 100 times

Open maciejmajek opened this issue 10 months ago • 7 comments

related: https://github.com/ros2/rclpy/issues/1223

Describe the bug I was reviewing this lovely PR which transforms our old openset detection and segmentation solution into agents. I've noticed a critically bad performance and decided to dive into debugging. Tried few things:

  1. substitution wait_for_shutdown with time.sleep (script below)
  2. Debugging weights loading (GroundedSamAgent, GroundingDinoAgent)
  3. Finally decided to change the executor in ROS2ARIConnector SingleThreaded. The speed. The performance. The speed. The power.

Image

To Reproduce Steps to reproduce the behavior:

  1. Have RAI set up (colcon build + poetry install --with openset + source setup_shell.sh)
  2. git checkout 9cf286f
  3. Run the following script
from rai_open_set_vision.agents import GroundingDinoAgent, GroundedSamAgent
from rai.utils import wait_for_shutdown
import rclpy


def main():
    rclpy.init()
    agent1 = GroundingDinoAgent()
    agent2 = GroundedSamAgent()
    agent1.run()
    agent2.run()
    wait_for_shutdown([agent1, agent2])
    rclpy.shutdown()


if __name__ == "__main__":
    main()
  1. Run these tests

from rai.communication.ros2 import ROS2ARIConnector, ROS2ARIMessage
from rai.utils import ROS2Context
from rai_interfaces.srv import RAIGroundingDino, RAIGroundedSam
import cv2
import cv_bridge

from rai.tools.ros2.utils import ros2_message_to_dict

@ROS2Context()
def test_grounding_dino_agent():
    connector = ROS2ARIConnector()
    bridge = cv_bridge.CvBridge()

    image = bridge.cv2_to_imgmsg(cv2.imread("docs/imgs/o3deSimulation.png"))

    msg = RAIGroundingDino.Request()
    msg.source_img = image
    msg.classes = ["chair"]
    msg.box_threshold = 0.5
    msg.text_threshold = 0.5
    
    ari_msg = ROS2ARIMessage(payload=ros2_message_to_dict(msg))
    response = connector.service_call(target="/grounding_dino_classify", message=ari_msg, msg_type="rai_interfaces/srv/RAIGroundingDino", timeout_sec=10)
    print(response.payload)

@ROS2Context()
def test_grounded_sam_agent():
    connector = ROS2ARIConnector()
    bridge = cv_bridge.CvBridge()

    image = bridge.cv2_to_imgmsg(cv2.imread("docs/imgs/o3deSimulation.png"), encoding="rgb8")

    msg = RAIGroundedSam.Request()
    msg.source_img = image
    msg.detections = {}

    ari_msg = ROS2ARIMessage(payload=ros2_message_to_dict(msg))
    response = connector.service_call(target="/grounded_sam_segment", message=ari_msg, msg_type="rai_interfaces/srv/RAIGroundedSam", timeout_sec=10)
    print(response.payload)
  1. Note the time it took pytest to finish
  2. Change the following line to SingleThreadedExecutor (remember to update import) https://github.com/RobotecAI/rai/blob/0675ff6036bbd1450f34426f10d64f999ab91777/src/rai_core/rai/communication/ros2/connectors/ari_connector.py#L106
  3. Run the script and pytest again, note the time.
  4. Should be 100 times faster.

Changing the num_threads parameter in MultiThreadedExecutor did not result in speed up.

Expected behavior A clear and concise description of what you expected to happen.

Screenshots If applicable, add screenshots to help explain your problem.

Platform

  • OS: 24.04
  • ROS 2 Jazzy

maciejmajek avatar Apr 07 '25 12:04 maciejmajek

@rachwalk I think you've mentioned it working fairly well on ubuntu 22.04 with ROS 2 humble. Can you please run the pytest to check if changing the executor speeds things up?

maciejmajek avatar Apr 07 '25 12:04 maciejmajek

Did you investigate why this is happening? Is there excessive synchronization? Does it boil down to executor internals? There has been some fairly recent work on rclpy executors that could be an improvement once we get to Kilted: https://discourse.ros.org/t/faster-rclpy-executor-now-in-rolling/42852

adamdbrw avatar Apr 07 '25 13:04 adamdbrw

I will delve into the executor in my free time. Unfortunately, the aforementioned executor treats everything as mutually exclusive, which is unsuitable for our connectors :(

maciejmajek avatar Apr 07 '25 14:04 maciejmajek

I've tested the problem on a different Ubuntu 24.04 machine and I couldn't reproduce the problem. We need to find a way to detect if such bottleneck exists as it directly impacts the swiftness of RAI.

maciejmajek avatar Apr 07 '25 14:04 maciejmajek

@maciejmajek Couldn't reproduce on Ubuntu 22.04. The two tests described in issue took on average 2.2s using SingleThreadedExecutor and 1.5s using the MultiThreadedExecutor. I found no difference between using the two executors, and couldn't reproduce this issue.

rachwalk avatar Apr 09 '25 08:04 rachwalk

@rachwalk through some additional testing: it seems that ubuntu 22.04 does not have this problem

maciejmajek avatar Sep 26 '25 14:09 maciejmajek

Terminal 1:

import rclpy
from rclpy.node import Node
from std_msgs.msg import String


# benchmark
class StringPublisher(Node):
    def __init__(self):
        super().__init__("string_publisher")
        self.publisher = self.create_publisher(String, "test_topic", 10)
        self.timer = self.create_timer(0.00001, self.publish_message)

    def publish_message(self):
        msg = String()
        msg.data = "Hello, ROS2!"
        self.publisher.publish(msg)


if __name__ == "__main__":
    rclpy.init()
    publisher = StringPublisher()
    rclpy.spin(publisher)
    rclpy.shutdown()

terminal 2:

import threading
import time
from typing import Literal

import rclpy
import tabulate
from rai.communication.ros2 import ROS2Connector, ROS2Context, wait_for_ros2_topics
from rclpy.executors import MultiThreadedExecutor, SingleThreadedExecutor
from rclpy.node import Node
from std_msgs.msg import String

TEST_TIME = 15


@ROS2Context()
def test_rai(executor_type: Literal["single_threaded", "multi_threaded"]):
    msgs: list[str] = []
    connector = ROS2Connector(executor_type=executor_type)
    wait_for_ros2_topics(connector, ["test_topic"])
    connector.register_callback("test_topic", lambda msg: msgs.append(msg))
    time.sleep(TEST_TIME)
    connector.shutdown()
    return len(msgs)


def test_rclpy(executor_type: Literal["single_threaded", "multi_threaded"]):
    class StringSubscriber(Node):
        def __init__(self):
            super().__init__("string_subscriber")
            self.msgs: list[str] = []
            self.subscriber = self.create_subscription(
                String, "test_topic", self.callback, 10
            )

        def callback(self, msg):
            self.msgs.append(msg)

    try:
        rclpy.init()
        subscriber = StringSubscriber()
        if executor_type == "single_threaded":
            executor = SingleThreadedExecutor()
        elif executor_type == "multi_threaded":
            executor = MultiThreadedExecutor()
        else:
            raise ValueError(f"Invalid executor type: {executor_type}")
        executor.add_node(subscriber)
        thread = threading.Thread(target=executor.spin)
        thread.start()
        time.sleep(TEST_TIME)
        executor.shutdown()
        thread.join()
        subscriber.destroy_node()
        rclpy.shutdown()
    except KeyboardInterrupt:
        pass
    return len(subscriber.msgs)


if __name__ == "__main__":
    # columns framework, executor type, number of messages received
    results = []
    results.append(
        ["rai", "single_threaded", test_rai(executor_type="single_threaded")]
    )
    results.append(["rai", "multi_threaded", test_rai(executor_type="multi_threaded")])
    results.append(
        ["rclpy", "single_threaded", test_rclpy(executor_type="single_threaded")]
    )
    results.append(
        ["rclpy", "multi_threaded", test_rclpy(executor_type="multi_threaded")]
    )
    print(
        tabulate.tabulate(
            results,
            headers=["framework", "executor type", "number of messages received"],
            tablefmt="grid",
        )
    )
+-------------+-----------------+-------------------------------+
| framework   | executor type   |   number of messages received |
+=============+=================+===============================+
| rai         | single_threaded |                        140750 |
+-------------+-----------------+-------------------------------+
| rai         | multi_threaded  |                            78 |
+-------------+-----------------+-------------------------------+
| rclpy       | single_threaded |                        146118 |
+-------------+-----------------+-------------------------------+
| rclpy       | multi_threaded  |                            71 |
+-------------+-----------------+-------------------------------+

maciejmajek avatar Nov 15 '25 13:11 maciejmajek