spinach icon indicating copy to clipboard operation
spinach copied to clipboard

Rx triggers Max CPU and few addjobs

Open kiddouk opened this issue 7 years ago • 1 comments

Version : 1.0.0-SNAPSHOT API: reactive. Use case: trying to spawn 6M+ messages to disque Reproduction :

        val disqueClient = DisqueClient.create(DisqueURI.create("disque://localhost:7711"))
        var conn = disqueClient.connect().reactive()

        Observable.just("channel")
                           .flatMap { _ ->
                                  val r = (1..6000000).asIterable().map { it.toString() }
                                  Observable.from(r)
                            }
                            .compose(monitor("About to spool"))
                           .flatMap { w -> conn.addjob("test", w, 1000, TimeUnit.MILLISECONDS) }
                           .compose(monitor("Spooled"))
                          .subscribe(
            { x -> },
            {err -> logger.catching(err)},
            { println("Completed") }
           )
            

with

    fun <T> monitor(message: String) = Transformer<T, T>() {
        observable -> observable.buffer(10, TimeUnit.SECONDS)
                                .doOnNext { x -> logger.info("${message} : collected ${x.size}" ) }
                                .flatMap { x -> Observable.from(x) }
    }

What I am experiencing:

A few messages will be sent to the disque server and after a while (random), spinach will stop sending messages. I have 100% CPU on all core of my machine. a jstack indicates that we are busy in kevent (yes, I am on a mac).

Reading a bit on Netty, I decided to recompile Lettuce and Spinach with Netty 4.1.28.Final and to add the native kqueue to spinach in order to enjoy some native polling. I read here and ther that Netty has a bug with epoll. I havent been able to see if the problem remains also fo OSX but since I also need that code to run on Linux (and that I experience the same issue on this plateform), I thought that this was worth a try. No luck.

Any idea of what is going on?

kiddouk avatar Aug 09 '18 22:08 kiddouk

Thanks for the reproducer. Looking at your code, you're creating a lot of inner subscriptions. Depending on CPU and memory limits, this can easily eat up a lot of resources as the flapMap operators create a merged stream. On my machine, I get the following trace:

  java.lang.Thread.State: RUNNABLE
	  at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:691)
	  at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:562)
	  at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:851)
	  at com.lambdaworks.redis.ReactiveCommandDispatcher$ObservableCommand.complete(ReactiveCommandDispatcher.java:127)
	  at com.lambdaworks.redis.protocol.CommandHandler.decode(CommandHandler.java:187)
	  at com.lambdaworks.redis.protocol.CommandHandler.channelRead(CommandHandler.java:153)
	  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334)
	  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:326)
	  at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
	  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334)
	  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:326)
	  at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
	  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334)
	  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:326)
	  at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
	  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334)
	  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:326)
	  at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1320)
	  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334)
	  at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:905)
	  at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
	  at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:563)
	  at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:504)
	  at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:418)
	  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:390)
	  at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:742)
	  at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:145)
	  at java.lang.Thread.run(Thread.java:745)

After about 75k messages, the merge operator is kept busy with iterating over its inner children consuming a lot of memory so your CPU is either busy spinning or busy with GC.

Please note that Spinach isn't really maintained anymore until Disque gets reanimated through Redis Modules.

mp911de avatar Aug 13 '18 14:08 mp911de