[ISSUE#4552] Add asySendBlockMode to implement BackPressure
What is the purpose of the change
When the upstream sending message traffic is too heavy, the threadpool executor will directly refuse to execute and throw MQClientException.
In the IO operation "Netty Remoting", backpressure is implemented by semaphoreAsync:
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
long beginStartTime = System.currentTimeMillis();
final int opaque = request.getOpaque();
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
once.release();
throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
}
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
this.responseTable.put(opaque, responseFuture);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
}
requestFail(opaque);
log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
}
});
} catch (Exception e) {
responseFuture.release();
log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
} else {
String info =
String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
timeoutMillis,
this.semaphoreAsync.getQueueLength(),
this.semaphoreAsync.availablePermits()
);
log.warn(info);
throw new RemotingTimeoutException(info);
}
}
}
I consider to implement backpressure mechanism relying on BackPressure in NettyRemoting. In this situation, within the timeout allowed by the client, the SDK will block the sending operation, not reject the task.
Brief changelog
- add back pressure mechanism by just run
runnablein this thread, relying on backpressure in NettyRemoting. - add
asySendBlockModeparameter inDefaultMQProducerto let user decide whether to enable this mode.
Notice: Through the experimental test, the time cost of asynchronous sending is mainly in the IO operation. So, whether start runnable in DefaultMQProducerImpl through ThreadPool, there is not much difference in performace.
Codecov Report
Merging #4553 (5deade9) into develop (493e52b) will increase coverage by
0.01%. The diff coverage is74.07%.
@@ Coverage Diff @@
## develop #4553 +/- ##
=============================================
+ Coverage 48.17% 48.19% +0.01%
- Complexity 5129 5132 +3
=============================================
Files 649 649
Lines 43045 43050 +5
Branches 5630 5632 +2
=============================================
+ Hits 20737 20746 +9
+ Misses 19801 19790 -11
- Partials 2507 2514 +7
| Impacted Files | Coverage Δ | |
|---|---|---|
| ...mq/client/impl/producer/DefaultMQProducerImpl.java | 45.28% <72.00%> (+0.85%) |
:arrow_up: |
| ...he/rocketmq/client/producer/DefaultMQProducer.java | 58.87% <100.00%> (+0.78%) |
:arrow_up: |
| ...org/apache/rocketmq/common/stats/StatsItemSet.java | 47.76% <0.00%> (-4.48%) |
:arrow_down: |
| ...ava/org/apache/rocketmq/test/util/VerifyUtils.java | 46.26% <0.00%> (-2.99%) |
:arrow_down: |
| ...ava/org/apache/rocketmq/filter/util/BitsArray.java | 59.82% <0.00%> (-2.57%) |
:arrow_down: |
| ...a/org/apache/rocketmq/store/StoreStatsService.java | 36.69% <0.00%> (-1.69%) |
:arrow_down: |
| ...mq/client/impl/consumer/RebalanceLitePullImpl.java | 72.05% <0.00%> (-1.48%) |
:arrow_down: |
| ...e/rocketmq/client/impl/consumer/RebalanceImpl.java | 43.75% <0.00%> (-0.79%) |
:arrow_down: |
| ...ent/impl/consumer/DefaultLitePullConsumerImpl.java | 70.84% <0.00%> (-0.51%) |
:arrow_down: |
| ...he/rocketmq/client/impl/consumer/ProcessQueue.java | 61.92% <0.00%> (-0.46%) |
:arrow_down: |
| ... and 22 more |
Continue to review full report at Codecov.
Legend - Click here to learn more
Δ = absolute <relative> (impact),ø = not affected,? = missing dataPowered by Codecov. Last update 493e52b...5deade9. Read the comment docs.
Coverage increased (+0.06%) to 52.05% when pulling 69be9f7230d3d1d0e543f9b58d69f4d75ebeb3af on shengminw:backpressure into 06d954f86fad4ad95927ed5435c53171a4ab2bfb on apache:develop.
Coverage decreased (-0.006%) to 51.986% when pulling 5deade9c71c38cb3e2a3e0cdc8e444a8c3cc5373 on shengminw:backpressure into 06d954f86fad4ad95927ed5435c53171a4ab2bfb on apache:develop.
IMO, Modify deprecated code is not a good idea. Maybe RIP-37 can solve your problem.
@duhenglucky It's a good a idea. I still consider to build a flow control mechanism based on the size of the message memory in the future.
The new API uses a new protocol model. it is an accomplishment, not a replacement.
BTW, the new API needs a long time to be mature.
Currently, we could also do some necessary polishments to the traditional API without changing the method signatures.
The new API uses a new protocol model. it is an accomplishment, not a replacement.
BTW, the new API needs a long time to be mature.
Currently, we could also do some necessary polishments to the traditional API without changing the method signatures.
Agree it.
This PR is stale because it has been open for 365 days with no activity. It will be closed in 3 days if no further activity occurs. If you wish not to mark it as stale, please leave a comment in this PR.
This PR was closed because it has been inactive for 3 days since being marked as stale.