rocketmq icon indicating copy to clipboard operation
rocketmq copied to clipboard

[ISSUE#4552] Add asySendBlockMode to implement BackPressure

Open shengminw opened this issue 3 years ago • 7 comments

ISSUE#4552

What is the purpose of the change

When the upstream sending message traffic is too heavy, the threadpool executor will directly refuse to execute and throw MQClientException.

In the IO operation "Netty Remoting", backpressure is implemented by semaphoreAsync:

public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        long beginStartTime = System.currentTimeMillis();
        final int opaque = request.getOpaque();
        boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        if (acquired) {
            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
            long costTime = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTime) {
                once.release();
                throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
            }

            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
            this.responseTable.put(opaque, responseFuture);
            try {
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                        if (f.isSuccess()) {
                            responseFuture.setSendRequestOK(true);
                            return;
                        }
                        requestFail(opaque);
                        log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                    }
                });
            } catch (Exception e) {
                responseFuture.release();
                log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
            }
        } else {
            if (timeoutMillis <= 0) {
                throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
            } else {
                String info =
                    String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                        timeoutMillis,
                        this.semaphoreAsync.getQueueLength(),
                        this.semaphoreAsync.availablePermits()
                    );
                log.warn(info);
                throw new RemotingTimeoutException(info);
            }
        }
    }

I consider to implement backpressure mechanism relying on BackPressure in NettyRemoting. In this situation, within the timeout allowed by the client, the SDK will block the sending operation, not reject the task.

Brief changelog

  1. add back pressure mechanism by just run runnable in this thread, relying on backpressure in NettyRemoting.
  2. add asySendBlockMode parameter in DefaultMQProducer to let user decide whether to enable this mode.

Notice: Through the experimental test, the time cost of asynchronous sending is mainly in the IO operation. So, whether start runnable in DefaultMQProducerImpl through ThreadPool, there is not much difference in performace.

shengminw avatar Jul 04 '22 03:07 shengminw

Codecov Report

Merging #4553 (5deade9) into develop (493e52b) will increase coverage by 0.01%. The diff coverage is 74.07%.

@@              Coverage Diff              @@
##             develop    #4553      +/-   ##
=============================================
+ Coverage      48.17%   48.19%   +0.01%     
- Complexity      5129     5132       +3     
=============================================
  Files            649      649              
  Lines          43045    43050       +5     
  Branches        5630     5632       +2     
=============================================
+ Hits           20737    20746       +9     
+ Misses         19801    19790      -11     
- Partials        2507     2514       +7     
Impacted Files Coverage Δ
...mq/client/impl/producer/DefaultMQProducerImpl.java 45.28% <72.00%> (+0.85%) :arrow_up:
...he/rocketmq/client/producer/DefaultMQProducer.java 58.87% <100.00%> (+0.78%) :arrow_up:
...org/apache/rocketmq/common/stats/StatsItemSet.java 47.76% <0.00%> (-4.48%) :arrow_down:
...ava/org/apache/rocketmq/test/util/VerifyUtils.java 46.26% <0.00%> (-2.99%) :arrow_down:
...ava/org/apache/rocketmq/filter/util/BitsArray.java 59.82% <0.00%> (-2.57%) :arrow_down:
...a/org/apache/rocketmq/store/StoreStatsService.java 36.69% <0.00%> (-1.69%) :arrow_down:
...mq/client/impl/consumer/RebalanceLitePullImpl.java 72.05% <0.00%> (-1.48%) :arrow_down:
...e/rocketmq/client/impl/consumer/RebalanceImpl.java 43.75% <0.00%> (-0.79%) :arrow_down:
...ent/impl/consumer/DefaultLitePullConsumerImpl.java 70.84% <0.00%> (-0.51%) :arrow_down:
...he/rocketmq/client/impl/consumer/ProcessQueue.java 61.92% <0.00%> (-0.46%) :arrow_down:
... and 22 more

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update 493e52b...5deade9. Read the comment docs.

codecov-commenter avatar Jul 04 '22 06:07 codecov-commenter

Coverage Status

Coverage increased (+0.06%) to 52.05% when pulling 69be9f7230d3d1d0e543f9b58d69f4d75ebeb3af on shengminw:backpressure into 06d954f86fad4ad95927ed5435c53171a4ab2bfb on apache:develop.

coveralls avatar Jul 04 '22 06:07 coveralls

Coverage Status

Coverage decreased (-0.006%) to 51.986% when pulling 5deade9c71c38cb3e2a3e0cdc8e444a8c3cc5373 on shengminw:backpressure into 06d954f86fad4ad95927ed5435c53171a4ab2bfb on apache:develop.

coveralls avatar Jul 04 '22 06:07 coveralls

IMO, Modify deprecated code is not a good idea. Maybe RIP-37 can solve your problem.

ni-ze avatar Jul 05 '22 02:07 ni-ze

@duhenglucky It's a good a idea. I still consider to build a flow control mechanism based on the size of the message memory in the future.

shengminw avatar Jul 05 '22 13:07 shengminw

The new API uses a new protocol model. it is an accomplishment, not a replacement.

BTW, the new API needs a long time to be mature.

Currently, we could also do some necessary polishments to the traditional API without changing the method signatures.

dongeforever avatar Jul 06 '22 02:07 dongeforever

The new API uses a new protocol model. it is an accomplishment, not a replacement.

BTW, the new API needs a long time to be mature.

Currently, we could also do some necessary polishments to the traditional API without changing the method signatures.

Agree it.

cserwen avatar Jul 06 '22 03:07 cserwen

This PR is stale because it has been open for 365 days with no activity. It will be closed in 3 days if no further activity occurs. If you wish not to mark it as stale, please leave a comment in this PR.

github-actions[bot] avatar Sep 15 '23 00:09 github-actions[bot]

This PR was closed because it has been inactive for 3 days since being marked as stale.

github-actions[bot] avatar Sep 18 '23 00:09 github-actions[bot]