bull icon indicating copy to clipboard operation
bull copied to clipboard

Unable to reuse Redis connections

Open afrozl opened this issue 8 years ago • 16 comments

I think I've followed the instructions:

var Redis = require('ioredis');
var client = new Redis({ port:redisPort, host: redisHost,
      password: redisPass, keyPrefix: "Workers"
});
var subscriber = new Redis({ port:redisPort, host: redisHost,
      password: redisPass, keyPrefix: "Workers"
});


var redisOpts = {
    createClient: function(type){
      switch(type){
        case 'client':
          return client;
        case 'subscriber':
          return subscriber;
        default:
          return new Redis({ port:redisPort, host: redisHost,
                  password: redisPass, keyPrefix: "Worker"
          });
      }
    }
  }

var testqueue = new Queue('testqueue', redisOpts)

However, Bull does not appear to connect to Redis, although the only errors I get are somewhat cryptic:

[0113-13:37:52.587] - error: (node:23918) Warning: Possible EventEmitter memory leak detected. 11 error listeners added. Use emitter.setMaxListeners() to increase limit
[0113-13:37:52.589] - error: (node:23918) Warning: Possible EventEmitter memory leak detected. 11 pmessage listeners added. Use emitter.setMaxListeners() to increase limit
[0113-13:37:52.591] - error: (node:23918) Warning: Possible EventEmitter memory leak detected. 11 message listeners added. Use emitter.setMaxListeners() to increase limit

Do I have something misconfigured? `

afrozl avatar Jan 13 '18 18:01 afrozl

What do you mean with it does not appear to connect? have you run tests? The errors you paste are not errors, are warnings, and it is a side effect of using the same redis client for different queues, where several listeners are added to the same client. Use Use emitter.setMaxListeners() to increase limit.

manast avatar Jan 13 '18 19:01 manast

I can remove all bull keys and restart with the above settings and no queues are created. As soon as I put the previous settings back - i.e with a redis client per queue - everything works as expected

afrozl avatar Jan 13 '18 20:01 afrozl

Can you try without using the ioredis option prefix and instead set it as an option in bull:

var queue = new Queue('testqueue', {
    prefix: '{myprefix}'
  })

manast avatar Jan 13 '18 21:01 manast

I was able to get it to work by setting the prefix in redisOpts:

var redisOpts = {
    prefix: "Workers",
    createClient: function(type){
      switch(type){
        case 'client':
          return client;
        case 'subscriber':
          return subscriber;
        default:
          return new Redis({ port:redisPort, host: redisHost,
                  password: redisPass
          });
      }
    }
  }

That helped bring down the number of connections substantially since I have 60 queues and 3 instances.

A question though, I really like the subscribe approach since it removes the need for polling and improves the latency for job initiation. However, does there have to be a channel for each queue? Couldn't the same functionality be achieved with a single subscription per instance with multiplexed messages/channels?

Thanks for providing a great product.

afrozl avatar Jan 15 '18 01:01 afrozl

@afrozl I am not sure I fully understood what you meant by multiplexing, if you meant, for instance, having only one subscription and then distributing the messages to different listeners in javascript, then redis should be more efficient doing this job than anything we could implement ourselves.

manast avatar Jan 15 '18 08:01 manast

@manast I have a kinda related issue, so I will post it here. I set up Bull to re-use connections and it works fine, except when you close the queue it seems like it doesn't remove listeners from Redis, which makes such warnings appear. I peeked in code and it looks like Bull doesn't do anything related to Redis if it's not it's 'own' client. I assume listeners are kept alive since we basically just abandon Redis connections and that looks like a real memory leak, isn't it? Please, correct me if I'm wrong.

Equals182 avatar Jan 16 '18 10:01 Equals182

Actually the line that you should look at is this one: https://github.com/OptimalBits/bull/blob/master/lib/queue.js#L238 The TODO in the line you mentioned is therefore deprecated. And you are right in that we should save all the listeners we attach on the re-used redis instance and remove them when closing the queue. Should be marked as bug probably.

manast avatar Jan 16 '18 10:01 manast

No-no, I didn't mean the 'TODO line', and yeah, 238 or 239 line, you've got me anyway. I do think it's a bug, 'cause if you close and create queues dynamically it will create more and more listeners on Redis (~4 per new Bull()), and I thought it's cool to create as many queues as you want, since 'queues are cheap' as docs say.

Equals182 avatar Jan 16 '18 10:01 Equals182

@manast - yes, thats what I meant - a single subscription to a global channel and then a switch to the appropriate queue processor to handle the job. Kind of like I imagine the named processors work.

In any case, I might side step the issue and put something like redplex in front of my redis connection since it will ensure that I end up with one connection per subscription/queue no matter how many instances of the queue I bring up.

I really do want to be able to create as many queues as needed without having to worry about the number of connections, hence the gymnastics...

afrozl avatar Jan 16 '18 16:01 afrozl

@manast hello, I stumbled on this issue and it's not clear to me if as now it's ok to dynamically create and close queues which share connections , or I will incur in memory leaks.

lambrojos avatar Aug 08 '18 19:08 lambrojos

You should close the redis connections that you open by hand, the ones created by bull are closed when calling queue.close()

manast avatar Aug 09 '18 17:08 manast

Marked as enhancement so that we can have better management of external ioredis instances

manast avatar Aug 15 '18 20:08 manast

+1

mahnunchik avatar Nov 29 '18 21:11 mahnunchik

You should close the redis connections that you open by hand

@manast At what point in the lifecycle should I close connections that I am opening by hand in createClient? When calling queue.close()?

johnjensenish avatar Dec 19 '18 18:12 johnjensenish

after calling queue.close() and waiting for its promise to resolve it should be safe to close your own connections.

manast avatar Dec 19 '18 19:12 manast

@manast I have a kinda related issue, so I will post it here. I set up Bull to re-use connections and it works fine, except when you close the queue it seems like it doesn't remove listeners from Redis, which makes such warnings appear. I peeked in code and it looks like Bull doesn't do anything related to Redis if it's not it's 'own' client. I assume listeners are kept alive since we basically just abandon Redis connections and that looks like a real memory leak, isn't it? Please, correct me if I'm wrong.

Hello @Equals182, actually I also have the same issue as you. So, I'm using re-use connection, and when I close the queue it doesn't remove the connection from Redis. How do you remove it?

this is the minimum working code:

const Queue = require("bull");
const Redis = require("ioredis")
const redisInfo = {
  port: process.env.REDIS_PORT, 
  host: process.env.REDIS_HOST,
}
const client = new Redis(redisInfo);
const subscriber = new Redis(redisInfo);

const opts = {
  prefix: "Workers",
  createClient: function (type) {
    switch (type) {
      case 'client':
        return client;
      case 'subscriber':
        return subscriber;
      default:
        return new Redis(redisInfo);
    }
  }
}
const $Q = new Queue("my test", opts);


$Q.process(1, async job => {
  console.log("process", job.data);
  await wait();
  console.log("complete", job.data);
  if(job.data.i === 9){
    $Q.close();
  }
})
  .then(() => {
    console.log("Done processing jobs");
  });

 function start() {
  console.log("Adding jobs");
  for (let i = 0; i < 10; i++) {
    $Q.add({ foo: "bar", i });
  }
  console.log("done adding");
}

async function wait() {
  return new Promise(resolve => setTimeout(resolve, 1000));
}

start();

aldoalprak avatar Jul 14 '20 03:07 aldoalprak