spring-cloud-gateway icon indicating copy to clipboard operation
spring-cloud-gateway copied to clipboard

Gateway memory overflow Memory leakage?

Open 1198862746 opened this issue 3 years ago • 2 comments

Version use: spring-cloud-dependencies Hoxton.SR3 spring-boot-dependencies 2.2.0.RELEASE spring-cloud-starter-gateway 2.2.0.RELEASE error logging:

ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information. Recent access records: Created at: io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:363) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178) io.netty.channel.unix.PreferredDirectByteBufAllocator.ioBuffer(PreferredDirectByteBufAllocator.java:53) io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114) io.netty.channel.epoll.EpollRecvByteAllocatorHandle.allocate(EpollRecvByteAllocatorHandle.java:75) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:777) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.lang.Thread.run(Thread.java:748)

ERROR reactor.netty.tcp.TcpServer - [id: 0x9d6a6ae2, L:/127.0.0.1:8098 - R:/127.0.0.1:53310] onUncaughtException(SimpleConnection{channel=[id: 0x9d6a6ae2, L:/127.0.0.1:8098 - R:/127.0.0.1:53310]}) io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 1023410183, max: 1029177344) at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:742) at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:697) at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:758) at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:734) at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:245) at io.netty.buffer.PoolArena.allocate(PoolArena.java:215) at io.netty.buffer.PoolArena.allocate(PoolArena.java:147) at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:356) at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187) at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178) at io.netty.channel.unix.PreferredDirectByteBufAllocator.ioBuffer(PreferredDirectByteBufAllocator.java:53) at io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114) at io.netty.channel.epoll.EpollRecvByteAllocatorHandle.allocate(EpollRecvByteAllocatorHandle.java:75) at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:777) at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748)

1198862746 avatar Mar 11 '22 06:03 1198862746

The gateway code:

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

        ServerHttpRequest requestqu = exchange.getRequest();
        String realPath = requestqu.getPath().toString();
        String addressIp = requestqu.getRemoteAddress().getAddress().getHostAddress();
        String params = requestqu.getQueryParams().toSingleValueMap().toString();
        String uri = requestqu.getURI().getPath();

   g domainjwt = requestqu.getHeaders().getFirst("domainjwt");
        String tenantid   = null;

        if (StringUtils.isNotBlank(domainjwt)){
            tenantid   = redisService.get(domainjwt);

          
        }

        if (StringUtils.isBlank(tenantid)){
            ServerHttpResponse response = exchange.getResponse();
            response.setStatusCode(HttpStatus.OK);
            response.getHeaders().add("Content-Type", "application/json;charset=UTF-8");
            byte[] datas = JSONObject.toJSONBytes(ApiResult.error(" 机构不存在,请联系管理员查看权限设置"));
            DataBuffer buffer = response.bufferFactory().wrap(datas);
            return response.writeWith(Mono.just(buffer));

        }
        /**免认证路径**/
        String path = redisService.get("stuNoAuth:path");
        logger.info("获取当前的免认证路径:{}", path);
        if (!StringUtils.isEmpty(path)) {
            List<String> list = Arrays.asList(path.split(","));
            if (list.contains(realPath)) {
                ServerHttpRequest request = exchange.getRequest().mutate()
                        .header("tenantid", tenantid) //机构Id
                        .build();
                return chain.filter(exchange.mutate().request(request).build());

            }
        }
        String type = requestqu.getHeaders().getFirst("type");
        String token = requestqu.getHeaders().getFirst("token");
      
 
        if (StringUtils.isNotBlank(token)) {

            ServerHttpResponse response = exchange.getResponse();
            response.setStatusCode(HttpStatus.OK);
            response.getHeaders().add("Content-Type", "application/json;charset=UTF-8");
            String access_token = "";

            if(StringUtils.isNotBlank(type)){

                if ("2".equals(type)) {
                    access_token = redisService.get("mtoken:" + token);

                }else if ("3".equals(type)) {
                    access_token = redisService.get("gzhtoken:" + token);
                }else  {
                    access_token = redisService.get("pctoken:" + token);
                }
            }else {
                access_token = redisService.get("pctoken:" + token);

            }




            if(StringUtils.isBlank(access_token)){
                byte[] datas = JSONObject.toJSONBytes(ApiResult.error("token 已经过期无效"));
                DataBuffer buffer = response.bufferFactory().wrap(datas);
                return response.writeWith(Mono.just(buffer));

            }
           
            boolean isTrui = JwtUtil.isVerify(access_token);
            if (!isTrui) {
                byte[] datas = JSONObject.toJSONBytes(ApiResult.error("token 解析失败,已经过期无效"));
                DataBuffer buffer = response.bufferFactory().wrap(datas);
                return response.writeWith(Mono.just(buffer));
            }
            username = (String) JwtUtil.parseJWT(access_token).get("username");

            if (StringUtils.isEmpty(username)) {
                byte[] datas = JSONObject.toJSONBytes(ApiResult.error("token 已经过期无效"));
                DataBuffer buffer = response.bufferFactory().wrap(datas);
                return response.writeWith(Mono.just(buffer));
            }
            Map<String, String> map = redisService.hmGetAll("loginStu:" + token);
            if (null == map) {
                byte[] datas = JSONObject.toJSONBytes(ApiResult.error("token 已经过期无效"));
                DataBuffer buffer = response.bufferFactory().wrap(datas);
                return response.writeWith(Mono.just(buffer));
            }

            try {
                username = URLEncoder.encode(username, "utf8");
                name = URLEncoder.encode(map.get("nickname"), "utf8");
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }

            if (exchange.getRequest().getHeaders().getContentType() == null) {
                ServerHttpRequest request = exchange.getRequest().mutate() .build();
                return chain.filter(exchange.mutate().request(request).build());
            }else {

                return DataBufferUtils.join(exchange.getRequest().getBody())
                        .flatMap(dataBuffer -> {
                            DataBufferUtils.retain(dataBuffer);
                            Flux<DataBuffer> cachedFlux = Flux
                                    .defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
                            /**构建rerquest**/
                            ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(
                                    exchange.getRequest()) {
                                @Override
                                public Flux<DataBuffer> getBody() {
                                    return cachedFlux;
                                }
                            };
                
                                
                            return chain.filter(exchange.mutate().request(mutatedRequest).build());
                        });


            }
        }else {
            ServerHttpRequest request = exchange.getRequest().mutate()
                    .build();
            return chain.filter(exchange.mutate().request(request).build());
        }


    }

1198862746 avatar Mar 11 '22 06:03 1198862746

问题解决了吗

zhaodp avatar Sep 13 '22 13:09 zhaodp

Sorry for the late reply. I don't have the bandwidth to debug your custom filter.

spencergibb avatar Mar 13 '24 22:03 spencergibb