rocketmq icon indicating copy to clipboard operation
rocketmq copied to clipboard

[Bug] Issue with setting setAwaitDuration below 20 seconds in RocketMQ SimpleConsumer mode?

Open CodingOX opened this issue 1 year ago • 4 comments

Before Creating the Bug Report

  • [X] I found a bug, not just asking a question, which should be created in GitHub Discussions.

  • [X] I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.

  • [X] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.

Runtime platform environment

Ubuntu 20.04

RocketMQ version

RocketMQ 5.2

JDK Version

JDK 17

Describe the Bug

Feedback for RocketMQ 5.2 version using SimpleConsumer mode:

I am encountering an issue when using the SimpleConsumer mode in RocketMQ version 5.2. Specifically, when I set the setAwaitDuration to a value less than 20 seconds, I receive the following error message:

Exception in thread "main" org.apache.rocketmq.client.java.exception.BadRequestException: [request-id=0bb55e31-e6ba-4539-8e23-296c6b35224f, response-code=40018] The deadline time remaining is not enough for polling, please check network condition

Here is the relevant Kotlin code snippet for reference:

val simpleConsumer = provider
    .newSimpleConsumerBuilder()
    .setClientConfiguration(clientConfiguration)
    .setConsumerGroup(consumerGroup)
    .setAwaitDuration(Duration.ofMillis(500)) // The issue occurs when this value is set below 20s
    .setSubscriptionExpressions(mapOf(topic to filterExpression))
    .build()

It seems that the error is related to the setAwaitDuration parameter needing to be greater than 20 seconds for proper functioning. However, I require more flexibility in setting this duration for my use case. Could you provide guidance on how to resolve this issue or suggest any workarounds that would allow me to set an AwaitDuration below 20 seconds without encountering this error?

Steps to Reproduce

Exception in thread "main" org.apache.rocketmq.client.java.exception.BadRequestException: [request-id=0bb55e31-e6ba-4539-8e23-296c6b35224f, response-code=40018] The deadline time remaining is not enough for polling, please check network condition
	at org.apache.rocketmq.client.java.exception.StatusChecker.check(StatusChecker.java:63)
	at org.apache.rocketmq.client.java.impl.consumer.ConsumerImpl.lambda$receiveMessage$0(ConsumerImpl.java:114)
	at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractTransformFuture$AsyncTransformFuture.doTransform(AbstractTransformFuture.java:221)
	at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractTransformFuture$AsyncTransformFuture.doTransform(AbstractTransformFuture.java:208)
	at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractTransformFuture.run(AbstractTransformFuture.java:122)
	at org.apache.rocketmq.shaded.com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270)
	at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038)
	at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:783)
	at org.apache.rocketmq.shaded.com.google.common.util.concurrent.SettableFuture.set(SettableFuture.java:49)
	at org.apache.rocketmq.client.java.rpc.RpcClientImpl$1.onCompleted(RpcClientImpl.java:168)
	at org.apache.rocketmq.shaded.io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:485)
	at org.apache.rocketmq.shaded.io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	at org.apache.rocketmq.shaded.io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	at org.apache.rocketmq.shaded.io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:563)
	at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
	at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:744)
	at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
	at org.apache.rocketmq.shaded.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at org.apache.rocketmq.shaded.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)

What Did You Expect to See?

Code can run well when setAwaitDuration paramter < 20 seconds

What Did You See Instead?

No

Additional Context

No response

CodingOX avatar Mar 29 '24 06:03 CodingOX

This exception indicates the pollingTime (awaitDuration) set by client is too small.

There are two configs related to pollingtime in proxy, grpcClientConsumerMinLongPollingTimeoutMillis (default=5s) and grpcClientConsumerMaxLongPollingTimeoutMillis (default=20s). Set the pollingTime within this range would be ok.

Related proxy codes: https://github.com/apache/rocketmq/blob/develop/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java#L82

redlsz avatar Apr 01 '24 09:04 redlsz

This exception indicates the pollingTime (awaitDuration) set by client is too small.

There are two configs related to pollingtime in proxy, grpcClientConsumerMinLongPollingTimeoutMillis (default=5s) and grpcClientConsumerMaxLongPollingTimeoutMillis (default=20s). Set the pollingTime within this range would be ok.

Related proxy codes: develop/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java#L82

thanks, I consulted the relevant documents and did not find this suggestion. The code comments also did not mention this problem. In order to better help subsequent users, would you consider improving the documentation?

CodingOX avatar Apr 01 '24 09:04 CodingOX

Can you describe what scenario requires about setting setAwaitDuration below 20 seconds.

lizhimins avatar Apr 06 '24 08:04 lizhimins

Can you describe what scenario requires about setting setAwaitDuration below 20 seconds.

In my understanding, the awaitDuration parameter allows me to fetch messages more promptly. During periods of low business activity, when a particular message is highly important yet the number of such messages may fall short of the maxMessageNum threshold (which stems from the org.apache.rocketmq.client.apis.consumer.SimpleConsumer#receive(maxMessageNum, duration) call), this parameter ensures a swift return nonetheless. Alternatively, I might need to fallback to assigning the macMessageNum variable with a value of 1, which could potentially be less efficient.

CodingOX avatar Apr 07 '24 08:04 CodingOX

This issue is stale because it has been open for 365 days with no activity. It will be closed in 3 days if no further activity occurs.

github-actions[bot] avatar Apr 08 '25 00:04 github-actions[bot]

This issue was closed because it has been inactive for 3 days since being marked as stale.

github-actions[bot] avatar Apr 11 '25 00:04 github-actions[bot]