Soul Gateway repeat read post body direct memory OOM
Up here is my dependency data
I am using the Soul gateway,An out-of-heap memory overflow also occurred
Since I now need to read the Post Body data, here is my code
`public class CacheBodyGlobalFilter extends BaseWebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
if (super.checkRequestIsJsonType(exchange.getRequest().getMethod()
, exchange.getRequest().getHeaders().getContentType())) {
return chain.filter(exchange);
} else {
if (RouteListAcmConfig.getGlobalCacheSwitch()) {
return DataBufferUtils.join(exchange.getRequest().getBody())
.flatMap(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
String body = Strings.fromUTF8ByteArray(bytes);
exchange.getAttributes().put(ConstantUtil.CACHE_REQUEST_BODY_OBJECT_KEY, body);
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(
exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);
return Flux.just(buffer);
}
};
return chain.filter(exchange.mutate().request(mutatedRequest).build());
});
}
return ServerRequest.create(exchange, HandlerStrategies.withDefaults().messageReaders()).bodyToMono(String.class)
.switchIfEmpty(Mono.defer(() -> Mono.just("")))
.flatMap(body -> {
DataBuffer cacheDataBuffer = exchange.getResponse().bufferFactory().wrap(body.getBytes(StandardCharsets.UTF_8));
exchange.getAttributes().put(ConstantUtil.CACHE_REQUEST_BODY_OBJECT_KEY, body);
return chain.filter(exchange.mutate().request(new ServerHttpRequestDecorator(
exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return Flux.just(cacheDataBuffer);
}
}).build());
});
}
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE + 1;
}
}`
Am I using it in a bad way?
My out-of-heap message is as follows
2022-07-27 19:02:51,753 [NettyClientWorker-4-3] ERROR [open-gateway] [NettyClientWorker-4-3raceId] io.netty.util.ResourceLeakDetector : LEAK: ByteBuf. release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information. Recent access records: #1: io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:285) io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollRdHupReady(AbstractEpollChannel.java:443) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:482) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.lang.Thread.run(Thread.java:748) #2: io.netty.buffer.AdvancedLeakAwareByteBuf.readRetainedSlice(AdvancedLeakAwareByteBuf.java:106) io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:336) io.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:225) io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501) io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440) io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollRdHupReady(AbstractEpollChannel.java:443) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:482) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.lang.Thread.run(Thread.java:748) #3: Hint: 'reactor.left.httpCodec' will handle the message from this point. io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollRdHupReady(AbstractEpollChannel.java:443) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:482) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.lang.Thread.run(Thread.java:748) #4: Hint: 'DefaultChannelPipeline$HeadContext#0' will handle the message from this point. io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollRdHupReady(AbstractEpollChannel.java:443) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:482) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.lang.Thread.run(Thread.java:748) Created at: io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:385) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178) io.netty.channel.unix.PreferredDirectByteBufAllocator.ioBuffer(PreferredDirectByteBufAllocator.java:53) io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114) io.netty.channel.epoll.EpollRecvByteAllocatorHandle.allocate(EpollRecvByteAllocatorHandle.java:75) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:780) io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollRdHupReady(AbstractEpollChannel.java:443) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:482) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.lang.Thread.run(Thread.java:748)
Originally posted by @a807966224 in https://github.com/apache/shenyu/issues/1069#issuecomment-1196628310
see https://github.com/apache/shenyu/pull/3438 , we had upgrade reactor-netty by upgrading spring-boot and spring-cloud.
Hi, soul gateway not have this code :
RouteListAcmConfig.getGlobalCacheSwitch()
This should be their own business code. I think just pay attention to :
return DataBufferUtils.join(exchange.getRequest().getBody())
.flatMap(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
String body = Strings.fromUTF8ByteArray(bytes);
exchange.getAttributes().put(ConstantUtil.CACHE_REQUEST_BODY_OBJECT_KEY, body);
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(
exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);
return Flux.just(buffer);
}
};
return chain.filter(exchange.mutate().request(mutatedRequest).build());
});
Hi, soul gateway not have this code :
RouteListAcmConfig.getGlobalCacheSwitch()
This is my self-test code,Switch no concern; What is my pain point in repeatedly reading the Post body
This should be their own business code. I think just pay attention to :
return DataBufferUtils.join(exchange.getRequest().getBody()) .flatMap(dataBuffer -> { byte[] bytes = new byte[dataBuffer.readableByteCount()]; dataBuffer.read(bytes); DataBufferUtils.release(dataBuffer); String body = Strings.fromUTF8ByteArray(bytes); exchange.getAttributes().put(ConstantUtil.CACHE_REQUEST_BODY_OBJECT_KEY, body); ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator( exchange.getRequest()) { @Override public Flux<DataBuffer> getBody() { DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes); return Flux.just(buffer); } }; return chain.filter(exchange.mutate().request(mutatedRequest).build()); });
yes, Now I have tested and found that if I implement the method of reading body repeatedly, there will be leakage information after the pressure test, so I need to help you check whether there is any problem with my way of using it
@a807966224 hi. After reading the body, put it directly into the buffer. If not released, it will cause OOM.
hi. i have the same problem, i consult 2.5.0-release sign plugins code to solve this problem. this is the link: https://github.com/apache/shenyu/blob/2.5.0-release/shenyu-plugin/shenyu-plugin-sign/src/main/java/org/apache/shenyu/plugin/sign/SignPlugin.java