The producer performance of pulsar-client v3.5.0 is much lower than that of v2.10.2
When using pulsar-client v3.5.0 and pulsar-client v2.10.2 to send the same batch of data (about 100MB), 3.5.0 takes about 3.5 times longer than 2.10.2.
Core Code
async def consume_pulsar(q: asyncio.Queue, url: str, size: int):
try:
cli, pro = None, None
for info in pulsar_infos:
cli = pulsar.Client(
io_threads=4,
service_url=info.service_url)
pro = cli.create_producer(
info.topic,
schema=pulsar.schema.StringSchema(),
compression_type=pulsar.CompressionType.LZ4,
batching_enabled=True,
batching_max_allowed_size_in_bytes=1048576,
max_pending_messages=10000,
max_pending_messages_across_partitions=50000,
batching_max_publish_delay_ms=10,
block_if_queue_full=True
)
while True:
lines = await q.get()
if lines is None:
break
for line in lines:
pro.send_async(line, callback=send_callback)
pro.flush()
cli.close()
except Exception:
raise
3.5.0
2.10.2
The above is the running time statistics of the line_profiler tool. The main time consumption is
pro.send_async(line, callback=send_callback), which accounts for more than 97%. The pulsar-client v3.5.0 takes about 127s, and the pulsar-client v2.10.2 version takes about 35.6s.
Reproduce
Demo
import pulsar
import os
PULSAR_TOPIC="persistent://benchmark/test/test"
PULSAR_URL="pulsar://localhost:6650"
def send_callback(result, msg_id):
return
def produce():
try:
cli = pulsar.Client(
io_threads=4,
service_url=PULSAR_URL)
pro = cli.create_producer(
PULSAR_TOPIC,
schema=pulsar.schema.StringSchema(),
compression_type=pulsar.CompressionType.LZ4,
batching_enabled=True,
batching_max_allowed_size_in_bytes=1048576,
max_pending_messages=10000,
max_pending_messages_across_partitions=50000,
batching_max_publish_delay_ms=10,
block_if_queue_full=True
)
for i in range(0, size):
pro.send_async(msg, callback=send_callback)
pro.flush()
cli.close()
except Exception:
raise
if __name__ == "__main__":
msg = os.urandom(100).hex()
size = 1000000
produce()
Result
$ pip3 list | grep pulsar-client
pulsar-client 3.5.0
$ time python3 test_pulsar.py > 1
real 1m2.925s
user 1m1.735s
sys 1m1.803s
$ pip3 install pulsar-client==2.10.2
$ time python3 test_pulsar.py > 1
real 0m14.575s
user 0m16.140s
sys 0m7.578s
It can be reproduced in my env (Python 3.8 and Ubuntu 20.04 WSL2) but the gap is not so high as your env.
3.5.0:
- send_async: 41.614 s
- flush: 0.0178 s
2.10.2
- send_async: 35.452 s
- flush: 0.0150 s
It's also weird that send_async takes so much time. I will take a further look soon.
I rewrote the test script to avoid being affected by the pending queue (because it would be blocked if the queue is full) and reduce the test time. I tested various client versions with Python 3.8 on Ubuntu 20.04 WSL for 3 times against Pulsar 4.0.0 standalone and the same topic locally.
from pulsar import Client, CompressionType, Result
import os
import time
def send_callback(i, result, msg_id):
if result != Result.Ok:
print(f'{i} failed: {result}')
if __name__ == "__main__":
client = Client(service_url='pulsar://localhost:6650',
io_threads=4)
msg = os.urandom(100).hex().encode()
producer = client.create_producer(
'test-topic',
compression_type=CompressionType.LZ4,
batching_enabled=True,
batching_max_messages=1000, # batch size will be always 1000
batching_max_allowed_size_in_bytes=10485760,
batching_max_publish_delay_ms=10,
max_pending_messages=0, # avoid send_async being blocked due to full queue
block_if_queue_full=True)
t1 = time.time()
for i in range(0, 200000):
producer.send_async(msg, lambda result, msg_id, i=i: send_callback(i, result, msg_id))
producer.flush()
t2 = time.time()
print(f'send_async: {round(t2 - t1, 3)} s')
client.close()
| version | 1st | 2nd | 3rd |
|---|---|---|---|
| 2.10.2 | 7.169 | 7.211 | 7.083 |
| 3.1.0 | 6.768 | 6.788 | 6.943 |
| 3.2.0 | 7.445 | 7.336 | 7.427 |
| 3.3.0 | 7.451 | 7.454 | 7.435 |
| 3.4.0 | 8.955 | 9.369 | 8.256 |
| 3.5.0 | 9.676 | 9.653 | 10.097 |
P.S. 3.0.0 is not tested because it has a deadlock bug.
As we can see, actually 3.1.0 has better performance than 2.10.2. But there are some significant performance regressions from 3.1.0 -> 3.2.0, 3.3.0 -> 3.4.0, 3.4.0 -> 3.5.0
I disabled the compression and the test results are:
| version | 1st | 2nd | 3rd | 4st |
|---|---|---|---|---|
| 2.10.2 | 7.355 | 7.190 | 7.221 | 7.408 |
| 3.1.0 | 6.397 | 6.548 | 6.256 | 6.077 |
| 3.2.0 | 8.261 | 6.900 | 6.695 | 6.742 |
| 3.3.0 | 6.847 | 7.121 | 6.979 | 7.213 |
| 3.4.0 | 7.706 | 7.276 | 8.067 | 7.688 |
| 3.5.0 | 6.398 | 9.835 | 7.196 | 7.022 |
Then I increased the batch delay to avoid being affected by the batch timer (batching_max_publish_delay_ms=3600000). Then the test results are:
| version | 1st | 2nd | 3rd |
|---|---|---|---|
| 2.10.2 | 6.865 | 6.872 | 6.819 |
| 3.1.0 | 6.521 | 6.706 | 6.602 |
| 3.2.0 | 8.027 | 8.232 | 7.822 |
| 3.3.0 | 8.231 | 8.254 | 8.196 |
| 3.4.0 | 7.767 | 7.478 | 8.161 |
| 3.5.0 | 7.673 | 7.955 | 7.443 |
NOTE:
- 2.10.2 will encounter segmentation fault during the close phase
- 3.1.0, 3.2.0 will fail the last 1000 messages with
AlreadyClosedinflush.
Is this still an issue after https://github.com/apache/pulsar-client-python/pull/230?
It should not. Let me close this issue
Could you show how this benchmark performs on 3.8.0?