[Bug] Issue with setting setAwaitDuration below 20 seconds in RocketMQ SimpleConsumer mode?
Before Creating the Bug Report
-
[X] I found a bug, not just asking a question, which should be created in GitHub Discussions.
-
[X] I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.
-
[X] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.
Runtime platform environment
Ubuntu 20.04
RocketMQ version
RocketMQ 5.2
JDK Version
JDK 17
Describe the Bug
Feedback for RocketMQ 5.2 version using SimpleConsumer mode:
I am encountering an issue when using the SimpleConsumer mode in RocketMQ version 5.2. Specifically, when I set the setAwaitDuration to a value less than 20 seconds, I receive the following error message:
Exception in thread "main" org.apache.rocketmq.client.java.exception.BadRequestException: [request-id=0bb55e31-e6ba-4539-8e23-296c6b35224f, response-code=40018] The deadline time remaining is not enough for polling, please check network condition
Here is the relevant Kotlin code snippet for reference:
val simpleConsumer = provider
.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup(consumerGroup)
.setAwaitDuration(Duration.ofMillis(500)) // The issue occurs when this value is set below 20s
.setSubscriptionExpressions(mapOf(topic to filterExpression))
.build()
It seems that the error is related to the setAwaitDuration parameter needing to be greater than 20 seconds for proper functioning. However, I require more flexibility in setting this duration for my use case. Could you provide guidance on how to resolve this issue or suggest any workarounds that would allow me to set an AwaitDuration below 20 seconds without encountering this error?
Steps to Reproduce
Exception in thread "main" org.apache.rocketmq.client.java.exception.BadRequestException: [request-id=0bb55e31-e6ba-4539-8e23-296c6b35224f, response-code=40018] The deadline time remaining is not enough for polling, please check network condition
at org.apache.rocketmq.client.java.exception.StatusChecker.check(StatusChecker.java:63)
at org.apache.rocketmq.client.java.impl.consumer.ConsumerImpl.lambda$receiveMessage$0(ConsumerImpl.java:114)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractTransformFuture$AsyncTransformFuture.doTransform(AbstractTransformFuture.java:221)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractTransformFuture$AsyncTransformFuture.doTransform(AbstractTransformFuture.java:208)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractTransformFuture.run(AbstractTransformFuture.java:122)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:783)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.SettableFuture.set(SettableFuture.java:49)
at org.apache.rocketmq.client.java.rpc.RpcClientImpl$1.onCompleted(RpcClientImpl.java:168)
at org.apache.rocketmq.shaded.io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:485)
at org.apache.rocketmq.shaded.io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
at org.apache.rocketmq.shaded.io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
at org.apache.rocketmq.shaded.io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:563)
at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:744)
at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
at org.apache.rocketmq.shaded.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at org.apache.rocketmq.shaded.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
What Did You Expect to See?
Code can run well when setAwaitDuration paramter < 20 seconds
What Did You See Instead?
No
Additional Context
No response
This exception indicates the pollingTime (awaitDuration) set by client is too small.
There are two configs related to pollingtime in proxy, grpcClientConsumerMinLongPollingTimeoutMillis (default=5s) and grpcClientConsumerMaxLongPollingTimeoutMillis (default=20s). Set the pollingTime within this range would be ok.
Related proxy codes: https://github.com/apache/rocketmq/blob/develop/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java#L82
This exception indicates the pollingTime (awaitDuration) set by client is too small.
There are two configs related to pollingtime in proxy, grpcClientConsumerMinLongPollingTimeoutMillis (default=5s) and grpcClientConsumerMaxLongPollingTimeoutMillis (default=20s). Set the pollingTime within this range would be ok.
Related proxy codes:
develop/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java#L82
thanks, I consulted the relevant documents and did not find this suggestion. The code comments also did not mention this problem. In order to better help subsequent users, would you consider improving the documentation?
Can you describe what scenario requires about setting setAwaitDuration below 20 seconds.
Can you describe what scenario requires about setting setAwaitDuration below 20 seconds.
In my understanding, the awaitDuration parameter allows me to fetch messages more promptly. During periods of low business activity, when a particular message is highly important yet the number of such messages may fall short of the maxMessageNum threshold (which stems from the org.apache.rocketmq.client.apis.consumer.SimpleConsumer#receive(maxMessageNum, duration) call), this parameter ensures a swift return nonetheless.
Alternatively, I might need to fallback to assigning the macMessageNum variable with a value of 1, which could potentially be less efficient.
This issue is stale because it has been open for 365 days with no activity. It will be closed in 3 days if no further activity occurs.
This issue was closed because it has been inactive for 3 days since being marked as stale.