pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[Bug] Msg backlog & unack msg remains when using acknowledgeAsync

Open pqab opened this issue 2 years ago • 12 comments

Search before asking

  • [X] I searched in the issues and found nothing similar.

Version

3.0.0

Minimal reproduce step

  1. Publish 600k messages

  2. Start 2 consumers with different subscription name and subscribe from Earliest

one with async ack

Mono.fromCompletionStage(() -> consumer.acknowledgeAsync(message))

another one with sync ack

Mono.fromRunnable(() -> consumer.acknowledge(message))
  1. Wait until it finished###

What did you expect to see?

msg backlog & unack message should be 0 for both acknowledge & acknowledgeAsync subscription

What did you see instead?

There are few messages in the backlog & unack message left even we received the ack callback when using acknowledgeAsync, acknowledge is working fine

Topic stats for the acknowledgeAsync subscription for reference

{
    "msgRateOut" : 382.4166635563445,
    "msgThroughputOut" : 1143831.4573635042,
    "bytesOutCounter" : 1590183426,
    "msgOutCounter" : 531437,
    "msgRateRedeliver" : 0.0,
    "messageAckRate" : 397.2166640450367,
    "chunkedMessageRate" : 0,
    "msgBacklog" : 4,
    "backlogSize" : 572603343,
    "earliestMsgPublishTimeInBacklog" : 0,
    "msgBacklogNoDelayed" : 4,
    "blockedSubscriptionOnUnackedMsgs" : false,
    "msgDelayed" : 0,
    "unackedMessages" : 7,
    "type" : "Key_Shared",
    "msgRateExpired" : 0.0,
    "totalMsgExpired" : 0,
    "lastExpireTimestamp" : 1706002395690,
    "lastConsumedFlowTimestamp" : 1706002431797,
    "lastConsumedTimestamp" : 1706002428241,
    "lastAckedTimestamp" : 1706002436543,
    "lastMarkDeleteAdvancedTimestamp" : 1706002000095,
    "consumers" : [ {
        "msgRateOut" : 382.4166635563445,
        "msgThroughputOut" : 1143831.4573635042,
        "bytesOutCounter" : 598198012,
        "msgOutCounter" : 200000,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 397.2166640450367,
        "chunkedMessageRate" : 0.0,
        "consumerName" : "consumer",
        "availablePermits" : 1000,
        "unackedMessages" : 7,
        "avgMessagesPerEntry" : 15,
        "blockedConsumerOnUnackedMsgs" : false,
        "readPositionWhenJoining" : "1018:10411",
        "address" : "/100.96.65.127:20857",
        "connectedSince" : "2024-01-23T09:26:34.035732392Z",
        "clientVersion" : "Pulsar-Java-v3.0.0",
        "lastAckedTimestamp" : 1706002436543,
        "lastConsumedTimestamp" : 1706002428241,
        "lastConsumedFlowTimestamp" : 1706002431797,
        "keyHashRanges" : [ "[0, 6276111]", "[6276112, 13422723]", "[13422724, 52719097]", "[52719098, 60122502]", "[60122503, 74675312]", "[74675313, 161996019]", "[161996020, 207509307]", "[207509308, 220775154]", "[220775155, 229655103]", "[229655104, 234228609]", "[234228610, 276636200]", "[276636201, 324880668]", "[324880669, 369408646]", "[369408647, 374232013]", "[374232014, 379665156]", "[379665157, 380576699]", "[380576700, 405888400]", "[405888401, 428673014]", "[428673015, 453720349]", "[453720350, 488351370]", "[488351371, 496052795]", "[496052796, 504603928]", "[504603929, 508760821]", "[508760822, 526107528]", "[526107529, 576532446]", "[576532447, 580447007]", "[580447008, 587033352]", "[587033353, 604050605]", "[604050606, 607110270]", "[607110271, 611987246]", "[611987247, 627803480]", "[627803481, 628603516]", "[628603517, 643340895]", "[643340896, 649016535]", "[649016536, 682844752]", "[682844753, 723271437]", "[723271438, 725352428]", "[725352429, 753192194]", "[753192195, 798356347]", "[798356348, 824987130]", "[824987131, 838415369]", "[838415370, 853347508]", "[853347509, 869121139]", "[869121140, 937189723]", "[937189724, 1004046645]", "[1004046646, 1013552657]", "[1013552658, 1063116829]", "[1063116830, 1072226625]", "[1072226626, 1102842607]", "[1102842608, 1113396043]", "[1113396044, 1133270607]", "[1133270608, 1149712306]", "[1149712307, 1196163934]", "[1196163935, 1218114318]", "[1218114319, 1239267311]", "[1239267312, 1283886353]", "[1283886354, 1298017483]", "[1298017484, 1300597583]", "[1300597584, 1311995628]", "[1311995629, 1407745525]", "[1407745526, 1487107354]", "[1487107355, 1500070137]", "[1500070138, 1527269282]", "[1527269283, 1579052216]", "[1579052217, 1584997034]", "[1584997035, 1595017626]", "[1595017627, 1601176083]", "[1601176084, 1618519791]", "[1618519792, 1641494763]", "[1641494764, 1656777545]", "[1656777546, 1681398228]", "[1681398229, 1697816514]", "[1697816515, 1706859249]", "[1706859250, 1720068125]", "[1720068126, 1779743735]", "[1779743736, 1784442894]", "[1784442895, 1823221256]", "[1823221257, 1824702978]", "[1824702979, 1838089487]", "[1838089488, 1857634960]", "[1857634961, 1861247796]", "[1861247797, 1863792279]", "[1863792280, 1937071475]", "[1937071476, 1941970878]", "[1941970879, 1965632398]", "[1965632399, 1970489707]", "[1970489708, 1979412755]", "[1979412756, 1983921632]", "[1983921633, 2008961115]", "[2008961116, 2016328150]", "[2016328151, 2020236760]", "[2020236761, 2023857462]", "[2023857463, 2032948319]", "[2032948320, 2045854070]", "[2045854071, 2060460824]", "[2060460825, 2067248154]", "[2067248155, 2103376046]", "[2103376047, 2127999799]", "[2127999800, 2131945474]", "[2131945475, 2143021740]" ],
        "metadata" : { },
        "lastAckedTime" : "2024-01-23T09:33:56.543Z",
        "lastConsumedTime" : "2024-01-23T09:33:48.241Z"
    } ],
    "isDurable" : true,
    "isReplicated" : false,
    "allowOutOfOrderDelivery" : false,
    "keySharedMode" : "AUTO_SPLIT",
    "consumersAfterMarkDeletePosition" : { },
    "nonContiguousDeletedMessagesRanges" : 4,
    "nonContiguousDeletedMessagesRangesSerializedSize" : 71,
    "delayedMessageIndexSizeInBytes" : 0,
    "subscriptionProperties" : { },
    "filterProcessedMsgCount" : 0,
    "filterAcceptedMsgCount" : 0,
    "filterRejectedMsgCount" : 0,
    "filterRescheduledMsgCount" : 0,
    "durable" : true,
    "replicated" : false
}

Anything else?

we run for multiple times, and every time there are few backlog & unack message left for the acknowledgeAsync subscription

Are you willing to submit a PR?

  • [ ] I'm willing to submit a PR!

pqab avatar Jan 23 '24 09:01 pqab

@semistone could you share the code to reproduce the issue?

pqab avatar Jan 23 '24 09:01 pqab

Our test is running in batch receive max 1000 events then process those 1000 events concurrent and parallel

Flux.fromIterable(events).parallel().runOn(Schedulers.fromExecutor(this.executorService))
.flatMap(event -> { // handle event and ack }) 

originally we use acknowledgeAsync and seem have issue,

				ackMono = Mono.fromCompletionStage(() -> consumer.acknowledgeAsync(message))
						.doOnSuccess(v -> ackCount.incrementAndGet());

so we replace by

	@SneakyThrows
	private void acknowledge(Message<?> message) {
		ackLock.lock();
		try {
			consumer.acknowledge(message);
			ackCount.incrementAndGet();
		} finally {
			ackLock.unlock();
		}
	}
				Mono<Void> runnable = Mono.fromRunnable(() -> this.acknowledge(message));
				ackMono = runnable.subscribeOn(Schedulers.fromExecutor(this.executorService));

which force to synchronized all acknowledge by ReentrantLock, then seem it worked.

I could try to write test code later if needed.

semistone avatar Jan 23 '24 10:01 semistone

Mono.fromCompletionStage(() -> consumer.acknowledgeAsync(message))

A Mono doesn't do anything unless it is subscribed. would be useful to have a simple Java class or test case that runs the logic that you are using.

lhotari avatar Jan 23 '24 12:01 lhotari

I'd recommend using pulsar-client-reactive with Project Reactor and other Reactive Streams implementations. It contains a proper solution for acknowledgements.

https://github.com/apache/pulsar-client-reactive/blob/f3b7a2425d532ae9c029c7130b76296481ee6c05/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumer.java#L37-L59

Acknowledgement / Negative Acknowledgement is handled as a value (instead of a side-effect):

https://github.com/apache/pulsar-client-reactive/blob/main/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MessageResult.java

example: https://github.com/apache/pulsar-client-reactive/tree/main?tab=readme-ov-file#consuming-messages

lhotari avatar Jan 23 '24 12:01 lhotari

we published around 1m of messages, and we are able to reproduce with this code https://github.com/apache/pulsar/compare/v3.1.2...semistone:pulsar:test/flux-test

bin/pulsar-perf consume persistent://tenant1/namespace1/topic1 --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationTls --auth-params '{"tlsCertFile":"conf/superuser.cer","tlsKeyFile":"conf/superuser.key.pem"}' --test-reactor -sp Earliest -st Key_Shared -ss sub1

the unack message keeps increasing and the available permits become negative value, which makes the consumer couldn't poll more events unless we restart it, in order to re-delivery the events to the consumer

pqab avatar Jan 24 '24 08:01 pqab

3.0.0

Is there a chance to use 3.0.2 ? A lot of bugs have been fixed in 3.0.1 and 3.0.2 . This applies to both broker and the the client.

lhotari avatar Jan 24 '24 14:01 lhotari

we published around 1m of messages, and we are able to reproduce with this code v3.1.2...semistone:pulsar:test/flux-test

Thanks for sharing the repro app. I'll give it a try soon. One question about the repro, you have -st Key_Shared. Does the problem reproduce with the Shared subscription type?

lhotari avatar Jan 24 '24 14:01 lhotari

we published around 1m of messages, and we are able to reproduce with this code v3.1.2...semistone:pulsar:test/flux-test

Thanks for sharing the repro app. I'll give it a try soon. One question about the repro, you have -st Key_Shared. Does the problem reproduce with the Shared subscription type?

Yes, I run again with Shared subscription type, it also happens, I think the type doesn't matter

pqab avatar Jan 25 '24 00:01 pqab

3.0.0

Is there a chance to use 3.0.2 ? A lot of bugs have been fixed in 3.0.1 and 3.0.2 . This applies to both broker and the the client.

The original message was running from our application using 3.0.0 client with 3.1.2 broker, we are going to upgrade client to 3.0.2 for now, and the above reproduce code was running from the server, so both client & broker are 3.1.2

pqab avatar Jan 25 '24 00:01 pqab

This is possible related to #22601 / #22810.

lhotari avatar May 31 '24 13:05 lhotari

We tested again it still happen, but we found pulsar client default doesn't wait ack return and return CompletableFuture.complete directly.

https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java#L264

so we turn on https://pulsar.apache.org/api/client/3.0.x/org/apache/pulsar/client/api/ConsumerBuilder.html#isAckReceiptEnabled(boolean)

because if enable that option, then it will create lock read lock https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java#L261

so the concurrent issue will disappear.

and if I test without enable that option. because it won't wait the acknowledge, so it will look like memory leak in my performance test. because too many currentIndividualAckFuture is queueing.

so maybe concurrent issue still there or maybe just too many currentIndividualAckFuture pile up.

but at least we could enable isAckReceiptEnabled to fix this issue. or I guess if always using that read lock with or without isAckReceiptEnabled, it will fix this issue as well.

semistone avatar Jun 06 '24 08:06 semistone

msg backlog & unack message should be 0 for both acknowledge & acknowledgeAsync subscription

regarding unack message counts, #22657 is possibly related, see https://github.com/apache/pulsar/issues/22657#issuecomment-2150533309

lhotari avatar Jun 06 '24 09:06 lhotari