pulsar-client-cpp
pulsar-client-cpp copied to clipboard
[Feature Request] Support executing callbacks concurrently
#include <pulsar/Client.h>
#include <chrono>
#include <thread>
#include "lib/LogUtils.h"
DECLARE_LOG_OBJECT()
using namespace pulsar;
int main(int argc, char *argv[]) {
ClientConfiguration conf;
conf.setIOThreads(8);
Client client{"pulsar://localhost:6650", conf};
for (int i = 0; i < 5; i++) {
auto topic = "my-topic-" + std::to_string(i);
LOG_INFO(" XYZ Before create producer for " << topic);
client.createProducerAsync(topic, [topic](Result result, Producer producer) {
LOG_INFO(" XYZ After create producer for " << topic << ": " << result);
producer.sendAsync(MessageBuilder().setContent("msg").build(),
[](Result result, const MessageId &msgId) {
LOG_INFO("XYZ send: " << result << ", " << msgId);
});
std::this_thread::sleep_for(std::chrono::hours(1));
});
}
std::this_thread::sleep_for(std::chrono::hours(1));
}
Output:
2023-12-06 20:01:08.044 INFO [0x1fb6f1e00] SampleProducer:18 | XYZ Before create producer for my-topic-0
2023-12-06 20:01:08.130 INFO [0x1fb6f1e00] ClientConnection:190 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
2023-12-06 20:01:08.130 INFO [0x1fb6f1e00] ConnectionPool:114 | Created connection for pulsar://localhost:6650-0
2023-12-06 20:01:08.143 INFO [0x1fb6f1e00] SampleProducer:18 | XYZ Before create producer for my-topic-1
2023-12-06 20:01:08.143 INFO [0x1fb6f1e00] SampleProducer:18 | XYZ Before create producer for my-topic-2
2023-12-06 20:01:08.143 INFO [0x1fb6f1e00] SampleProducer:18 | XYZ Before create producer for my-topic-3
2023-12-06 20:01:08.144 INFO [0x1fb6f1e00] SampleProducer:18 | XYZ Before create producer for my-topic-4
2023-12-06 20:01:08.163 INFO [0x16ef87000] ClientConnection:404 | [127.0.0.1:54469 -> 127.0.0.1:6650] Connected to broker
2023-12-06 20:01:08.344 INFO [0x16ef87000] HandlerBase:83 | [persistent://public/default/my-topic-0, ] Getting connection from pool
2023-12-06 20:01:08.347 INFO [0x16ef87000] HandlerBase:83 | [persistent://public/default/my-topic-2, ] Getting connection from pool
2023-12-06 20:01:08.347 INFO [0x16ef87000] HandlerBase:83 | [persistent://public/default/my-topic-1, ] Getting connection from pool
2023-12-06 20:01:08.347 INFO [0x16ef87000] HandlerBase:83 | [persistent://public/default/my-topic-4, ] Getting connection from pool
2023-12-06 20:01:08.347 INFO [0x16ef87000] HandlerBase:83 | [persistent://public/default/my-topic-3, ] Getting connection from pool
2023-12-06 20:01:08.355 INFO [0x16ef87000] BinaryProtoLookupService:87 | Lookup response for persistent://public/default/my-topic-0, lookup-broker-url pulsar://127.0.0.1:6650, from [127.0.0.1:54469 -> 127.0.0.1:6650]
2023-12-06 20:01:08.355 INFO [0x16ef87000] ClientConnection:190 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
2023-12-06 20:01:08.355 INFO [0x16ef87000] ConnectionPool:114 | Created connection for pulsar://127.0.0.1:6650-0
2023-12-06 20:01:08.357 INFO [0x16ef87000] BinaryProtoLookupService:87 | Lookup response for persistent://public/default/my-topic-2, lookup-broker-url pulsar://127.0.0.1:6650, from [127.0.0.1:54469 -> 127.0.0.1:6650]
2023-12-06 20:01:08.357 INFO [0x16ef87000] BinaryProtoLookupService:87 | Lookup response for persistent://public/default/my-topic-1, lookup-broker-url pulsar://127.0.0.1:6650, from [127.0.0.1:54469 -> 127.0.0.1:6650]
2023-12-06 20:01:08.357 INFO [0x16ef87000] BinaryProtoLookupService:87 | Lookup response for persistent://public/default/my-topic-4, lookup-broker-url pulsar://127.0.0.1:6650, from [127.0.0.1:54469 -> 127.0.0.1:6650]
2023-12-06 20:01:08.357 INFO [0x16ef87000] BinaryProtoLookupService:87 | Lookup response for persistent://public/default/my-topic-3, lookup-broker-url pulsar://127.0.0.1:6650, from [127.0.0.1:54469 -> 127.0.0.1:6650]
2023-12-06 20:01:08.363 INFO [0x16ef87000] ClientConnection:406 | [127.0.0.1:54470 -> 127.0.0.1:6650] Connected to broker through proxy. Logical broker: pulsar://127.0.0.1:6650
2023-12-06 20:01:08.427 INFO [0x16ef87000] ProducerImpl:212 | [persistent://public/default/my-topic-2, ] Created producer on broker [127.0.0.1:54470 -> 127.0.0.1:6650]
2023-12-06 20:01:08.428 INFO [0x16ef87000] SampleProducer:20 | XYZ After create producer for my-topic-2: Ok
Only one producer was created successfully. Other producers were blocked because all callbacks of createProducerAsync were executed in the same I/O thread, even though the number of I/O threads is 8.
That's because each event loop of a connection to broker uses the same thread to execute callbacks. So if a callback is blocked, other asynchronous calls will be blocked. Increasing connectionsPerBroker can make more I/O threads be used, but users cannot control which thread the callback is executed.
In the Java client, the asynchronous API returns a CompletableFuture whose thenXxxAsync method can execute the callback in a user provided executor.