Gateway memory overflow Memory leakage?
Version use: spring-cloud-dependencies Hoxton.SR3 spring-boot-dependencies 2.2.0.RELEASE spring-cloud-starter-gateway 2.2.0.RELEASE error logging:
ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information. Recent access records: Created at: io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:363) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178) io.netty.channel.unix.PreferredDirectByteBufAllocator.ioBuffer(PreferredDirectByteBufAllocator.java:53) io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114) io.netty.channel.epoll.EpollRecvByteAllocatorHandle.allocate(EpollRecvByteAllocatorHandle.java:75) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:777) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.lang.Thread.run(Thread.java:748)
ERROR reactor.netty.tcp.TcpServer - [id: 0x9d6a6ae2, L:/127.0.0.1:8098 - R:/127.0.0.1:53310] onUncaughtException(SimpleConnection{channel=[id: 0x9d6a6ae2, L:/127.0.0.1:8098 - R:/127.0.0.1:53310]}) io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 1023410183, max: 1029177344) at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:742) at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:697) at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:758) at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:734) at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:245) at io.netty.buffer.PoolArena.allocate(PoolArena.java:215) at io.netty.buffer.PoolArena.allocate(PoolArena.java:147) at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:356) at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187) at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178) at io.netty.channel.unix.PreferredDirectByteBufAllocator.ioBuffer(PreferredDirectByteBufAllocator.java:53) at io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114) at io.netty.channel.epoll.EpollRecvByteAllocatorHandle.allocate(EpollRecvByteAllocatorHandle.java:75) at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:777) at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748)
The gateway code:
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest requestqu = exchange.getRequest();
String realPath = requestqu.getPath().toString();
String addressIp = requestqu.getRemoteAddress().getAddress().getHostAddress();
String params = requestqu.getQueryParams().toSingleValueMap().toString();
String uri = requestqu.getURI().getPath();
g domainjwt = requestqu.getHeaders().getFirst("domainjwt");
String tenantid = null;
if (StringUtils.isNotBlank(domainjwt)){
tenantid = redisService.get(domainjwt);
}
if (StringUtils.isBlank(tenantid)){
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.OK);
response.getHeaders().add("Content-Type", "application/json;charset=UTF-8");
byte[] datas = JSONObject.toJSONBytes(ApiResult.error(" 机构不存在,请联系管理员查看权限设置"));
DataBuffer buffer = response.bufferFactory().wrap(datas);
return response.writeWith(Mono.just(buffer));
}
/**免认证路径**/
String path = redisService.get("stuNoAuth:path");
logger.info("获取当前的免认证路径:{}", path);
if (!StringUtils.isEmpty(path)) {
List<String> list = Arrays.asList(path.split(","));
if (list.contains(realPath)) {
ServerHttpRequest request = exchange.getRequest().mutate()
.header("tenantid", tenantid) //机构Id
.build();
return chain.filter(exchange.mutate().request(request).build());
}
}
String type = requestqu.getHeaders().getFirst("type");
String token = requestqu.getHeaders().getFirst("token");
if (StringUtils.isNotBlank(token)) {
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.OK);
response.getHeaders().add("Content-Type", "application/json;charset=UTF-8");
String access_token = "";
if(StringUtils.isNotBlank(type)){
if ("2".equals(type)) {
access_token = redisService.get("mtoken:" + token);
}else if ("3".equals(type)) {
access_token = redisService.get("gzhtoken:" + token);
}else {
access_token = redisService.get("pctoken:" + token);
}
}else {
access_token = redisService.get("pctoken:" + token);
}
if(StringUtils.isBlank(access_token)){
byte[] datas = JSONObject.toJSONBytes(ApiResult.error("token 已经过期无效"));
DataBuffer buffer = response.bufferFactory().wrap(datas);
return response.writeWith(Mono.just(buffer));
}
boolean isTrui = JwtUtil.isVerify(access_token);
if (!isTrui) {
byte[] datas = JSONObject.toJSONBytes(ApiResult.error("token 解析失败,已经过期无效"));
DataBuffer buffer = response.bufferFactory().wrap(datas);
return response.writeWith(Mono.just(buffer));
}
username = (String) JwtUtil.parseJWT(access_token).get("username");
if (StringUtils.isEmpty(username)) {
byte[] datas = JSONObject.toJSONBytes(ApiResult.error("token 已经过期无效"));
DataBuffer buffer = response.bufferFactory().wrap(datas);
return response.writeWith(Mono.just(buffer));
}
Map<String, String> map = redisService.hmGetAll("loginStu:" + token);
if (null == map) {
byte[] datas = JSONObject.toJSONBytes(ApiResult.error("token 已经过期无效"));
DataBuffer buffer = response.bufferFactory().wrap(datas);
return response.writeWith(Mono.just(buffer));
}
try {
username = URLEncoder.encode(username, "utf8");
name = URLEncoder.encode(map.get("nickname"), "utf8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
if (exchange.getRequest().getHeaders().getContentType() == null) {
ServerHttpRequest request = exchange.getRequest().mutate() .build();
return chain.filter(exchange.mutate().request(request).build());
}else {
return DataBufferUtils.join(exchange.getRequest().getBody())
.flatMap(dataBuffer -> {
DataBufferUtils.retain(dataBuffer);
Flux<DataBuffer> cachedFlux = Flux
.defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
/**构建rerquest**/
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(
exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};
return chain.filter(exchange.mutate().request(mutatedRequest).build());
});
}
}else {
ServerHttpRequest request = exchange.getRequest().mutate()
.build();
return chain.filter(exchange.mutate().request(request).build());
}
}
问题解决了吗
Sorry for the late reply. I don't have the bandwidth to debug your custom filter.