shenyu icon indicating copy to clipboard operation
shenyu copied to clipboard

Soul Gateway repeat read post body direct memory OOM

Open a807966224 opened this issue 3 years ago • 7 comments

dep.txt

Up here is my dependency data

I am using the Soul gateway,An out-of-heap memory overflow also occurred

Since I now need to read the Post Body data, here is my code

`public class CacheBodyGlobalFilter extends BaseWebFilter {

@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
    if (super.checkRequestIsJsonType(exchange.getRequest().getMethod()
            , exchange.getRequest().getHeaders().getContentType())) {
        return chain.filter(exchange);
    } else {
        if (RouteListAcmConfig.getGlobalCacheSwitch()) {
            return DataBufferUtils.join(exchange.getRequest().getBody())
                    .flatMap(dataBuffer -> {
                        byte[] bytes = new byte[dataBuffer.readableByteCount()];
                        dataBuffer.read(bytes);
                        DataBufferUtils.release(dataBuffer);
                        String body = Strings.fromUTF8ByteArray(bytes);
                        exchange.getAttributes().put(ConstantUtil.CACHE_REQUEST_BODY_OBJECT_KEY, body);
                        ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(
                                exchange.getRequest()) {
                            @Override
                            public Flux<DataBuffer> getBody() {
                                DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);
                                return Flux.just(buffer);
                            }
                        };
                        return chain.filter(exchange.mutate().request(mutatedRequest).build());
                    });
        }
        return ServerRequest.create(exchange, HandlerStrategies.withDefaults().messageReaders()).bodyToMono(String.class)
                .switchIfEmpty(Mono.defer(() -> Mono.just("")))
                .flatMap(body -> {
                    DataBuffer cacheDataBuffer = exchange.getResponse().bufferFactory().wrap(body.getBytes(StandardCharsets.UTF_8));
                    exchange.getAttributes().put(ConstantUtil.CACHE_REQUEST_BODY_OBJECT_KEY, body);
                    return chain.filter(exchange.mutate().request(new ServerHttpRequestDecorator(
                            exchange.getRequest()) {
                        @Override
                        public Flux<DataBuffer> getBody() {
                            return Flux.just(cacheDataBuffer);
                        }
                    }).build());
                });
    }
}

@Override
public int getOrder() {
    return Ordered.HIGHEST_PRECEDENCE + 1;
}

}`

Am I using it in a bad way?

My out-of-heap message is as follows

2022-07-27 19:02:51,753 [NettyClientWorker-4-3] ERROR [open-gateway] [NettyClientWorker-4-3raceId] io.netty.util.ResourceLeakDetector : LEAK: ByteBuf. release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information. Recent access records: #1: io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:285) io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollRdHupReady(AbstractEpollChannel.java:443) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:482) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.lang.Thread.run(Thread.java:748) #2: io.netty.buffer.AdvancedLeakAwareByteBuf.readRetainedSlice(AdvancedLeakAwareByteBuf.java:106) io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:336) io.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:225) io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501) io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440) io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollRdHupReady(AbstractEpollChannel.java:443) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:482) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.lang.Thread.run(Thread.java:748) #3: Hint: 'reactor.left.httpCodec' will handle the message from this point. io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollRdHupReady(AbstractEpollChannel.java:443) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:482) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.lang.Thread.run(Thread.java:748) #4: Hint: 'DefaultChannelPipeline$HeadContext#0' will handle the message from this point. io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollRdHupReady(AbstractEpollChannel.java:443) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:482) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.lang.Thread.run(Thread.java:748) Created at: io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:385) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178) io.netty.channel.unix.PreferredDirectByteBufAllocator.ioBuffer(PreferredDirectByteBufAllocator.java:53) io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114) io.netty.channel.epoll.EpollRecvByteAllocatorHandle.allocate(EpollRecvByteAllocatorHandle.java:75) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:780) io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollRdHupReady(AbstractEpollChannel.java:443) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:482) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.lang.Thread.run(Thread.java:748)

Originally posted by @a807966224 in https://github.com/apache/shenyu/issues/1069#issuecomment-1196628310

a807966224 avatar Jul 27 '22 11:07 a807966224

see https://github.com/apache/shenyu/pull/3438 , we had upgrade reactor-netty by upgrading spring-boot and spring-cloud.

loongs-zhang avatar Jul 27 '22 12:07 loongs-zhang

Hi, soul gateway not have this code :

RouteListAcmConfig.getGlobalCacheSwitch()

yu199195 avatar Jul 28 '22 02:07 yu199195

This should be their own business code. I think just pay attention to :

            return DataBufferUtils.join(exchange.getRequest().getBody())
                    .flatMap(dataBuffer -> {
                        byte[] bytes = new byte[dataBuffer.readableByteCount()];
                        dataBuffer.read(bytes);
                        DataBufferUtils.release(dataBuffer);
                        String body = Strings.fromUTF8ByteArray(bytes);
                        exchange.getAttributes().put(ConstantUtil.CACHE_REQUEST_BODY_OBJECT_KEY, body);
                        ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(
                                exchange.getRequest()) {
                            @Override
                            public Flux<DataBuffer> getBody() {
                                DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);
                                return Flux.just(buffer);
                            }
                        };
                        return chain.filter(exchange.mutate().request(mutatedRequest).build());
                    });

loongs-zhang avatar Jul 28 '22 03:07 loongs-zhang

Hi, soul gateway not have this code :

RouteListAcmConfig.getGlobalCacheSwitch()

This is my self-test code,Switch no concern; What is my pain point in repeatedly reading the Post body

a807966224 avatar Jul 28 '22 03:07 a807966224

This should be their own business code. I think just pay attention to :

            return DataBufferUtils.join(exchange.getRequest().getBody())
                    .flatMap(dataBuffer -> {
                        byte[] bytes = new byte[dataBuffer.readableByteCount()];
                        dataBuffer.read(bytes);
                        DataBufferUtils.release(dataBuffer);
                        String body = Strings.fromUTF8ByteArray(bytes);
                        exchange.getAttributes().put(ConstantUtil.CACHE_REQUEST_BODY_OBJECT_KEY, body);
                        ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(
                                exchange.getRequest()) {
                            @Override
                            public Flux<DataBuffer> getBody() {
                                DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);
                                return Flux.just(buffer);
                            }
                        };
                        return chain.filter(exchange.mutate().request(mutatedRequest).build());
                    });

yes, Now I have tested and found that if I implement the method of reading body repeatedly, there will be leakage information after the pressure test, so I need to help you check whether there is any problem with my way of using it

a807966224 avatar Jul 28 '22 03:07 a807966224

@a807966224 hi. After reading the body, put it directly into the buffer. If not released, it will cause OOM.

impactCn avatar Aug 15 '22 07:08 impactCn

hi. i have the same problem, i consult 2.5.0-release sign plugins code to solve this problem. this is the link: https://github.com/apache/shenyu/blob/2.5.0-release/shenyu-plugin/shenyu-plugin-sign/src/main/java/org/apache/shenyu/plugin/sign/SignPlugin.java

accsam avatar Aug 17 '22 06:08 accsam