Fast-DDS-python icon indicating copy to clipboard operation
Fast-DDS-python copied to clipboard

FastDDS High Latency using Large Data

Open andreclerigo opened this issue 11 months ago • 0 comments

When using FastDDS to transfer images (~4MB) between participants, the achieved latency results (~60ms) are worse when compared to other tools like Zenoh or ZeroMQ (~40/50 ms). This should not happen since FastDDS leverages SHM and others don't.

Setup

I am using FastDDS docker images, something similar to #134. I am using eProsima_Fast-DDS-v2.13.3-Linux.tgz and https://raw.githubusercontent.com/eProsima/Fast-DDS-python/main/fastdds_python.repos as the base resources for the docker image. I deploy the containers as such: docker run -d --ipc shareable --name ipc_share_container alpine sleep infinity docker run -itd --network host --ipc container:ipc_share_container --name app1 docker run -itd --network host --ipc container:ipc_share_container --name app2

Participants

Using the following XML file

<?xml version="1.0" encoding="utf-8" ?>
<profiles xmlns="http://www.eprosima.com/XMLSchemas/fastRTPS_Profiles">
    <!-- Descriptors for new transports -->
    <transport_descriptors>
        <!-- UDP new transport -->
        <transport_descriptor>
            <transport_id>standard_udp_transport</transport_id>
            <type>UDPv4</type>
            <TTL>250</TTL>
        </transport_descriptor>
        <!-- Create a descriptor for the new transport -->
        <transport_descriptor>
            <transport_id>shm_transport</transport_id>
            <type>SHM</type> <!-- REQUIRED -->
            <maxMessageSize>524288</maxMessageSize> <!-- OPTIONAL uint32 valid of all transports-->
            <segment_size>1048576</segment_size> <!-- OPTIONAL uint32 SHM only-->
            <port_queue_capacity>1024</port_queue_capacity> <!-- OPTIONAL uint32 SHM only-->
            <healthy_check_timeout_ms>250</healthy_check_timeout_ms> <!-- OPTIONAL uint32 SHM only-->
            <default_reception_threads> <!-- OPTIONAL -->
                <scheduling_policy>-1</scheduling_policy>
                <priority>0</priority>
                <affinity>0</affinity>
                <stack_size>-1</stack_size>
            </default_reception_threads>
            <reception_threads> <!-- OPTIONAL -->
                <reception_thread port="12345">
                    <scheduling_policy>-1</scheduling_policy>
                    <priority>0</priority>
                    <affinity>0</affinity>
                    <stack_size>-1</stack_size>
                </reception_thread>
            </reception_threads>
        </transport_descriptor>
    </transport_descriptors>

    <participant profile_name="large_data_builtin_transports_options" is_default_profile="true">
        <rtps>
            <sendSocketBufferSize>1048576</sendSocketBufferSize>
            <listenSocketBufferSize>4194304</listenSocketBufferSize>
            <builtinTransports max_msg_size="310KB" sockets_size="310KB" non_blocking="true" tcp_negotiation_timeout="50">LARGE_DATA</builtinTransports>
        </rtps>
    </participant>

    <participant profile_name="video_publisher_qos">
        <rtps>
            <!-- Link the Transport Layer to the Participant -->
            <userTransports>
                <transport_id>shm_transport</transport_id>
                <transport_id>standard_udp_transport</transport_id>
            </userTransports>
            <useBuiltinTransports>false</useBuiltinTransports>
        </rtps>
    </participant>
</profiles>

We have tested participants using either the large_data_builtin_transports_options and the video_publisher_qos profiles, and results were about the same.

Sample code for the publisher:

class Writer:
    def __init__(self, profile: str, data_name: str, topic_name: str) -> None:
        self._matched_reader = 0
        self._cvDiscovery = Condition()
        self.participant_handles = fastdds.InstanceHandleVector()  # Store handles here

        factory = fastdds.DomainParticipantFactory.get_instance()
        self.participant_qos = fastdds.DomainParticipantQos()
        factory.get_default_participant_qos(self.participant_qos)
        participant = factory.create_participant_with_profile(profile)
        self.participant = participant
        
        # Register the DDS data type
        if data_name == "SensorMessage":
            self.topic_data_type = SensorMessage.SensorMessagePubSubType()
        else:
            raise ValueError("Invalid data name")
        
        self.topic_data_type.setName(data_name)
        self.type_support = fastdds.TypeSupport(self.topic_data_type)
        self.participant.register_type(self.type_support)
        self.topic_qos = fastdds.TopicQos()
        self.participant.get_default_topic_qos(self.topic_qos)
        self.topic = self.participant.create_topic(topic_name, self.topic_data_type.getName(), self.topic_qos)
        self.publisher_qos = fastdds.PublisherQos()
        self.participant.get_default_publisher_qos(self.publisher_qos)
        self.publisher = self.participant.create_publisher(self.publisher_qos)
        self.listener = WriterListener(self)
        self.writer_qos = fastdds.DataWriterQos()
        self.publisher.get_default_datawriter_qos(self.writer_qos)

        self.writer = self.publisher.create_datawriter(self.topic, self.writer_qos, self.listener)
...

In main code:

def write(self, data: SensorFormat):
        super().write()

        object: SensorMessage.SensorMessage = SensorMsgOperations.BuildSensorMessage(data)
        self.writer.write(object)

SensorMsgOperations uses this IDL File

struct SensorMessage 
{
    string serializedObjectTypeDef;
    long serializedObjectID;
    sequence<octet> serializedObject;
    string uuid;
};

and is built like this:

fastddsgen -python SensorMessage.idl

if [ -d "build" ]; then
    cd build
else
    mkdir build && cd build
fi

cmake .. -DCMAKE_INSTALL_PREFIX=/usr/local/ -DBUILD_SHARED_LIBS=ON .
cmake --build .
cd ..

Sample code for the subscriber:

class Reader:
    def __init__(self, profile: str, data_name: str, topic_name: str, listener: Optional[ReaderListener] = None) -> None:
        factory = fastdds.DomainParticipantFactory.get_instance()
        self.participant_qos = fastdds.DomainParticipantQos()
        factory.get_default_participant_qos(self.participant_qos)
        participant = factory.create_participant_with_profile(profile)
        self.participant = participant

        # Register the DDS data type
        if data_name == "SensorMessage":
            self.topic_data_type = SensorMessage.SensorMessagePubSubType()
        else:
            raise ValueError("DDS Reader data name specified not supported")
        
        self.topic_data_type.setName(data_name)
        self.type_support = fastdds.TypeSupport(self.topic_data_type)
        self.participant.register_type(self.type_support)

        # Create a DDS topic
        self.topic_qos = fastdds.TopicQos()
        self.participant.get_default_topic_qos(self.topic_qos)
        if data_name == "SensorMessage":
            self.topic = self.participant.create_topic(topic_name, self.topic_data_type.getName(), self.topic_qos)
        else:
            raise ValueError("DDS Reader data name specified not supported")

        self.subscriber_qos = fastdds.SubscriberQos()
        self.participant.get_default_subscriber_qos(self.subscriber_qos)
        self.subscriber = self.participant.create_subscriber(self.subscriber_qos)

        # Set a custom listener if provided
        if listener is None:
            self.listener = ReaderListener()
        else:
            self.listener = listener
        
        self.reader_qos = fastdds.DataReaderQos()
        self.subscriber.get_default_datareader_qos(self.reader_qos)
        self.reader = self.subscriber.create_datareader(self.topic, self.reader_qos, self.listener)
...

In the main code:

    def on_data_available(self, reader):
        super().on_data_available(reader)

        info = fastdds.SampleInfo()
        data = SensorMessage.SensorMessage()
        reader.take_next_sample(data, info)

        # Convert to SensorFormat
        sensor_data = SensorMsgOperations.ReadSensorMessage(data)

Is there any kind of flag missing, or is there an issue when using FastDDS Python inside Docker containers? I would assume that these values are not normal, and I should be achieving much better latencies.

Thanks in advance for the help 😄

andreclerigo avatar Feb 26 '25 11:02 andreclerigo