pulsar-client-python icon indicating copy to clipboard operation
pulsar-client-python copied to clipboard

The producer performance of pulsar-client v3.5.0 is much lower than that of v2.10.2

Open Jayer23 opened this issue 1 year ago • 3 comments

When using pulsar-client v3.5.0 and pulsar-client v2.10.2 to send the same batch of data (about 100MB), 3.5.0 takes about 3.5 times longer than 2.10.2.

Core Code

async def consume_pulsar(q: asyncio.Queue, url: str, size: int):
    try:
        cli, pro = None, None
        for info in pulsar_infos:
            cli = pulsar.Client(
                io_threads=4,
                service_url=info.service_url)

            pro = cli.create_producer(
                info.topic,
                schema=pulsar.schema.StringSchema(),
                compression_type=pulsar.CompressionType.LZ4,
                batching_enabled=True,
                batching_max_allowed_size_in_bytes=1048576,
                max_pending_messages=10000,
                max_pending_messages_across_partitions=50000,
                batching_max_publish_delay_ms=10,
                block_if_queue_full=True
            )

        while True:
            lines = await q.get()
            if lines is None:
                break

            for line in lines:
                pro.send_async(line, callback=send_callback)

        pro.flush()
        cli.close()

    except Exception:
        raise

3.5.0

241

2.10.2

242 The above is the running time statistics of the line_profiler tool. The main time consumption is pro.send_async(line, callback=send_callback), which accounts for more than 97%. The pulsar-client v3.5.0 takes about 127s, and the pulsar-client v2.10.2 version takes about 35.6s.

Reproduce

Demo

import pulsar
import os

PULSAR_TOPIC="persistent://benchmark/test/test"
PULSAR_URL="pulsar://localhost:6650"


def send_callback(result, msg_id):
    return


def produce():
    try:
        cli = pulsar.Client(
            io_threads=4,
            service_url=PULSAR_URL)

        pro = cli.create_producer(
            PULSAR_TOPIC,
            schema=pulsar.schema.StringSchema(),
            compression_type=pulsar.CompressionType.LZ4,
            batching_enabled=True,
            batching_max_allowed_size_in_bytes=1048576,
            max_pending_messages=10000,
            max_pending_messages_across_partitions=50000,
            batching_max_publish_delay_ms=10,
            block_if_queue_full=True
        )
        for i in range(0, size):
            pro.send_async(msg, callback=send_callback)

        pro.flush()
        cli.close()

    except Exception:
        raise

if __name__ == "__main__":
    msg  = os.urandom(100).hex()
    size = 1000000
    produce()

Result

$ pip3 list | grep pulsar-client
pulsar-client       3.5.0
$ time python3 test_pulsar.py > 1

real	1m2.925s
user	1m1.735s
sys	1m1.803s
$ pip3 install pulsar-client==2.10.2
$ time python3 test_pulsar.py > 1
real	0m14.575s
user	0m16.140s
sys	0m7.578s

Jayer23 avatar Oct 18 '24 11:10 Jayer23

It can be reproduced in my env (Python 3.8 and Ubuntu 20.04 WSL2) but the gap is not so high as your env.

3.5.0:

  • send_async: 41.614 s
  • flush: 0.0178 s

2.10.2

  • send_async: 35.452 s
  • flush: 0.0150 s

It's also weird that send_async takes so much time. I will take a further look soon.

BewareMyPower avatar Oct 26 '24 14:10 BewareMyPower

I rewrote the test script to avoid being affected by the pending queue (because it would be blocked if the queue is full) and reduce the test time. I tested various client versions with Python 3.8 on Ubuntu 20.04 WSL for 3 times against Pulsar 4.0.0 standalone and the same topic locally.

from pulsar import Client, CompressionType, Result
import os
import time

def send_callback(i, result, msg_id):
    if result != Result.Ok:
        print(f'{i} failed: {result}')

if __name__ == "__main__":
    client = Client(service_url='pulsar://localhost:6650',
                    io_threads=4)
    msg = os.urandom(100).hex().encode()
    producer = client.create_producer(
        'test-topic',
        compression_type=CompressionType.LZ4,
        batching_enabled=True,
        batching_max_messages=1000, # batch size will be always 1000
        batching_max_allowed_size_in_bytes=10485760,
        batching_max_publish_delay_ms=10,
        max_pending_messages=0, # avoid send_async being blocked due to full queue
        block_if_queue_full=True)
    t1 = time.time()
    for i in range(0, 200000):
        producer.send_async(msg, lambda result, msg_id, i=i: send_callback(i, result, msg_id))
    producer.flush()
    t2 = time.time()
    print(f'send_async: {round(t2 - t1, 3)} s')
    client.close()
version 1st 2nd 3rd
2.10.2 7.169 7.211 7.083
3.1.0 6.768 6.788 6.943
3.2.0 7.445 7.336 7.427
3.3.0 7.451 7.454 7.435
3.4.0 8.955 9.369 8.256
3.5.0 9.676 9.653 10.097

P.S. 3.0.0 is not tested because it has a deadlock bug.

As we can see, actually 3.1.0 has better performance than 2.10.2. But there are some significant performance regressions from 3.1.0 -> 3.2.0, 3.3.0 -> 3.4.0, 3.4.0 -> 3.5.0

BewareMyPower avatar Oct 27 '24 04:10 BewareMyPower

I disabled the compression and the test results are:

version 1st 2nd 3rd 4st
2.10.2 7.355 7.190 7.221 7.408
3.1.0 6.397 6.548 6.256 6.077
3.2.0 8.261 6.900 6.695 6.742
3.3.0 6.847 7.121 6.979 7.213
3.4.0 7.706 7.276 8.067 7.688
3.5.0 6.398 9.835 7.196 7.022

Then I increased the batch delay to avoid being affected by the batch timer (batching_max_publish_delay_ms=3600000). Then the test results are:

version 1st 2nd 3rd
2.10.2 6.865 6.872 6.819
3.1.0 6.521 6.706 6.602
3.2.0 8.027 8.232 7.822
3.3.0 8.231 8.254 8.196
3.4.0 7.767 7.478 8.161
3.5.0 7.673 7.955 7.443

NOTE:

  • 2.10.2 will encounter segmentation fault during the close phase
  • 3.1.0, 3.2.0 will fail the last 1000 messages with AlreadyClosed in flush.

BewareMyPower avatar Oct 27 '24 05:10 BewareMyPower

Is this still an issue after https://github.com/apache/pulsar-client-python/pull/230?

akahn avatar Oct 02 '25 13:10 akahn

It should not. Let me close this issue

BewareMyPower avatar Oct 03 '25 10:10 BewareMyPower

Could you show how this benchmark performs on 3.8.0?

akahn avatar Oct 03 '25 16:10 akahn