pulsar-client-cpp icon indicating copy to clipboard operation
pulsar-client-cpp copied to clipboard

[Feature Request] Support executing callbacks concurrently

Open BewareMyPower opened this issue 2 years ago • 0 comments

#include <pulsar/Client.h>

#include <chrono>
#include <thread>

#include "lib/LogUtils.h"

DECLARE_LOG_OBJECT()
using namespace pulsar;

int main(int argc, char *argv[]) {
    ClientConfiguration conf;
    conf.setIOThreads(8);
    Client client{"pulsar://localhost:6650", conf};
    for (int i = 0; i < 5; i++) {
        auto topic = "my-topic-" + std::to_string(i);
        LOG_INFO(" XYZ Before create producer for " << topic);
        client.createProducerAsync(topic, [topic](Result result, Producer producer) {
            LOG_INFO(" XYZ After create producer for " << topic << ": " << result);
            producer.sendAsync(MessageBuilder().setContent("msg").build(),
                               [](Result result, const MessageId &msgId) {
                                   LOG_INFO("XYZ send: " << result << ", " << msgId);
                               });
            std::this_thread::sleep_for(std::chrono::hours(1));
        });
    }
    std::this_thread::sleep_for(std::chrono::hours(1));
}

Output:

2023-12-06 20:01:08.044 INFO  [0x1fb6f1e00] SampleProducer:18 |  XYZ Before create producer for my-topic-0
2023-12-06 20:01:08.130 INFO  [0x1fb6f1e00] ClientConnection:190 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
2023-12-06 20:01:08.130 INFO  [0x1fb6f1e00] ConnectionPool:114 | Created connection for pulsar://localhost:6650-0
2023-12-06 20:01:08.143 INFO  [0x1fb6f1e00] SampleProducer:18 |  XYZ Before create producer for my-topic-1
2023-12-06 20:01:08.143 INFO  [0x1fb6f1e00] SampleProducer:18 |  XYZ Before create producer for my-topic-2
2023-12-06 20:01:08.143 INFO  [0x1fb6f1e00] SampleProducer:18 |  XYZ Before create producer for my-topic-3
2023-12-06 20:01:08.144 INFO  [0x1fb6f1e00] SampleProducer:18 |  XYZ Before create producer for my-topic-4
2023-12-06 20:01:08.163 INFO  [0x16ef87000] ClientConnection:404 | [127.0.0.1:54469 -> 127.0.0.1:6650] Connected to broker
2023-12-06 20:01:08.344 INFO  [0x16ef87000] HandlerBase:83 | [persistent://public/default/my-topic-0, ] Getting connection from pool
2023-12-06 20:01:08.347 INFO  [0x16ef87000] HandlerBase:83 | [persistent://public/default/my-topic-2, ] Getting connection from pool
2023-12-06 20:01:08.347 INFO  [0x16ef87000] HandlerBase:83 | [persistent://public/default/my-topic-1, ] Getting connection from pool
2023-12-06 20:01:08.347 INFO  [0x16ef87000] HandlerBase:83 | [persistent://public/default/my-topic-4, ] Getting connection from pool
2023-12-06 20:01:08.347 INFO  [0x16ef87000] HandlerBase:83 | [persistent://public/default/my-topic-3, ] Getting connection from pool
2023-12-06 20:01:08.355 INFO  [0x16ef87000] BinaryProtoLookupService:87 | Lookup response for persistent://public/default/my-topic-0, lookup-broker-url pulsar://127.0.0.1:6650, from [127.0.0.1:54469 -> 127.0.0.1:6650] 
2023-12-06 20:01:08.355 INFO  [0x16ef87000] ClientConnection:190 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
2023-12-06 20:01:08.355 INFO  [0x16ef87000] ConnectionPool:114 | Created connection for pulsar://127.0.0.1:6650-0
2023-12-06 20:01:08.357 INFO  [0x16ef87000] BinaryProtoLookupService:87 | Lookup response for persistent://public/default/my-topic-2, lookup-broker-url pulsar://127.0.0.1:6650, from [127.0.0.1:54469 -> 127.0.0.1:6650] 
2023-12-06 20:01:08.357 INFO  [0x16ef87000] BinaryProtoLookupService:87 | Lookup response for persistent://public/default/my-topic-1, lookup-broker-url pulsar://127.0.0.1:6650, from [127.0.0.1:54469 -> 127.0.0.1:6650] 
2023-12-06 20:01:08.357 INFO  [0x16ef87000] BinaryProtoLookupService:87 | Lookup response for persistent://public/default/my-topic-4, lookup-broker-url pulsar://127.0.0.1:6650, from [127.0.0.1:54469 -> 127.0.0.1:6650] 
2023-12-06 20:01:08.357 INFO  [0x16ef87000] BinaryProtoLookupService:87 | Lookup response for persistent://public/default/my-topic-3, lookup-broker-url pulsar://127.0.0.1:6650, from [127.0.0.1:54469 -> 127.0.0.1:6650] 
2023-12-06 20:01:08.363 INFO  [0x16ef87000] ClientConnection:406 | [127.0.0.1:54470 -> 127.0.0.1:6650] Connected to broker through proxy. Logical broker: pulsar://127.0.0.1:6650
2023-12-06 20:01:08.427 INFO  [0x16ef87000] ProducerImpl:212 | [persistent://public/default/my-topic-2, ] Created producer on broker [127.0.0.1:54470 -> 127.0.0.1:6650] 
2023-12-06 20:01:08.428 INFO  [0x16ef87000] SampleProducer:20 |  XYZ After create producer for my-topic-2: Ok

Only one producer was created successfully. Other producers were blocked because all callbacks of createProducerAsync were executed in the same I/O thread, even though the number of I/O threads is 8.

That's because each event loop of a connection to broker uses the same thread to execute callbacks. So if a callback is blocked, other asynchronous calls will be blocked. Increasing connectionsPerBroker can make more I/O threads be used, but users cannot control which thread the callback is executed.

In the Java client, the asynchronous API returns a CompletableFuture whose thenXxxAsync method can execute the callback in a user provided executor.

BewareMyPower avatar Dec 06 '23 12:12 BewareMyPower