Rx triggers Max CPU and few addjobs
Version : 1.0.0-SNAPSHOT API: reactive. Use case: trying to spawn 6M+ messages to disque Reproduction :
val disqueClient = DisqueClient.create(DisqueURI.create("disque://localhost:7711"))
var conn = disqueClient.connect().reactive()
Observable.just("channel")
.flatMap { _ ->
val r = (1..6000000).asIterable().map { it.toString() }
Observable.from(r)
}
.compose(monitor("About to spool"))
.flatMap { w -> conn.addjob("test", w, 1000, TimeUnit.MILLISECONDS) }
.compose(monitor("Spooled"))
.subscribe(
{ x -> },
{err -> logger.catching(err)},
{ println("Completed") }
)
with
fun <T> monitor(message: String) = Transformer<T, T>() {
observable -> observable.buffer(10, TimeUnit.SECONDS)
.doOnNext { x -> logger.info("${message} : collected ${x.size}" ) }
.flatMap { x -> Observable.from(x) }
}
What I am experiencing:
A few messages will be sent to the disque server and after a while (random), spinach will stop sending messages. I have 100% CPU on all core of my machine. a jstack indicates that we are busy in kevent (yes, I am on a mac).
Reading a bit on Netty, I decided to recompile Lettuce and Spinach with Netty 4.1.28.Final and to add the native kqueue to spinach in order to enjoy some native polling. I read here and ther that Netty has a bug with epoll. I havent been able to see if the problem remains also fo OSX but since I also need that code to run on Linux (and that I experience the same issue on this plateform), I thought that this was worth a try. No luck.
Any idea of what is going on?
Thanks for the reproducer. Looking at your code, you're creating a lot of inner subscriptions. Depending on CPU and memory limits, this can easily eat up a lot of resources as the flapMap operators create a merged stream. On my machine, I get the following trace:
java.lang.Thread.State: RUNNABLE
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:691)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:562)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:851)
at com.lambdaworks.redis.ReactiveCommandDispatcher$ObservableCommand.complete(ReactiveCommandDispatcher.java:127)
at com.lambdaworks.redis.protocol.CommandHandler.decode(CommandHandler.java:187)
at com.lambdaworks.redis.protocol.CommandHandler.channelRead(CommandHandler.java:153)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:326)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:326)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:326)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:326)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1320)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:905)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:563)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:504)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:418)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:390)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:742)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:145)
at java.lang.Thread.run(Thread.java:745)
After about 75k messages, the merge operator is kept busy with iterating over its inner children consuming a lot of memory so your CPU is either busy spinning or busy with GC.
Please note that Spinach isn't really maintained anymore until Disque gets reanimated through Redis Modules.