Fix ack failure on message listener in multi topics consumer
Fixes #446
Motivation
We found an ack failure issue: #446
MessageListeners can start to process messages before subscribing all child topics are completed in "multi topics" (e.g. partitioned, list, regex) cases.
If acknowledge() is called in messageListeners (it is very typical usage) before subscribing completion, it will fail with AlreadyClosed error since the state of the parent consumer is not Ready yet:
https://github.com/apache/pulsar-client-cpp/blob/8b2753a56579ea6cf11e26a0c5a160797518df63/lib/MultiTopicsConsumerImpl.cc#L643-L648
That results in ack holes, and finally full backlog.
Modifications
-
MultiTopicsConsumerImpl: Pause messageListeners at first by settingstartPausedto true and resume them after subscribing all child topics are completed. -
ConsumerImpl: Delete unused code.- It tries to skip
sendFlowPermitsToBroker()at the first connection.- The motivation seems same as this PR, i.e. to prevent messageListeners from starting before subscribing completion.
- However, actually it does not work so far, the variable
firstTimelooks always false at https://github.com/apache/pulsar-client-cpp/blob/8b2753a56579ea6cf11e26a0c5a160797518df63/lib/ConsumerImpl.cc#L316 because it becomes false at https://github.com/apache/pulsar-client-cpp/blob/8b2753a56579ea6cf11e26a0c5a160797518df63/lib/ConsumerImpl.cc#L297-L299- Also it is static variable in a method that is shared by all
ConsumerImplinstances, whereas it has no chance of returning to true once it becomes false.
- Also it is static variable in a method that is shared by all
- It seems better that child consumers don't have to care about their parents and are completely independent of them.
- It tries to skip
Verifying this change
- [x] Make sure that the change passes the CI checks.
Specific tests for this issue is difficult to implement because it does not occur every time.
Documentation
-
[ ]
doc-required(Your PR needs to update docs and you will update later) -
[x]
doc-not-needed(Please explain why) -
[ ]
doc(Your PR contains doc changes) -
[ ]
doc-complete(Docs have been already added)