persist-queue icon indicating copy to clipboard operation
persist-queue copied to clipboard

Using task_done() in multiple threads

Open kodonnell opened this issue 7 years ago • 5 comments

I'd like to use Queue to store items to be processed by threads. However, if one of the items fails to get processed (and task_done is hence not called) it's still possible that the item is removed from the queue persistently (whereas one would expect it not to be, as is usual behaviour).

Example:

import threading
import time

from persistqueue import Queue

q = Queue("testq")


def worker1():
    print("getting from worker1")
    x = q.get()
    print("got", x, "from worker1")
    # processing goes here ... takes some time
    time.sleep(2)
    try:
        assert False, "something went wrong"
        q.task_done()
    except:
        print("something went wrong with worker1 in processing", x, "so not calling task_done")


def worker2():
    time.sleep(1)
    print("getting from worker2")
    x = q.get()
    print("got", x, "from worker2")
    # processing would happen here - but happens quicker than task1
    print("finished processing", x, "from worker2 so calling task_done")
    q.task_done()
    print("called task_done from worker2")


if __name__ == "__main__":

    q.put("a")
    q.put("b")

    t1 = threading.Thread(target=worker1)
    t1.start()
    t2 = threading.Thread(target=worker2)
    t2.start()
    t1.join()
    t2.join()
    print("reloading q")
    del q
    q = Queue("testq")
    print("qsize", q.qsize())

Output:

getting from worker1
got a from worker1
getting from worker2
got b from worker2
finished processing b from worker2 so calling task_done
called task_done from worker2
something went wrong with worker1 in processing a so not calling task_done
reloading q
qsize 0

As you can see, 'a' was permanently removed, even though task_done "wasn't" called. In other words, I'd expect to see qsize 1 as the output. Is there a way to achieve this, i.e. task_done only completes a specific task, not all tasks in all threads?

Bonus question: how do I also add 'a' back onto the in-memory queue (ignoring persistence)? I.e. the equivalent of SQLiteAckQueue.nack? The only way I see how would be reloading the queue from disk (in which case the get wouldn't have persisted) but this seems messy.

(Also, yes, I know of the SQLiteAckQueue which seems well-suited, but I'd prefer to use plain files if possible.)

kodonnell avatar Jan 28 '19 09:01 kodonnell

Related: #34 #55

kodonnell avatar Jan 28 '19 09:01 kodonnell

@kodonnell this is known limitation for file queue. In my opinion,you should reenque the failed items so that it can be processed later, can this fit your case

peter-wangxu avatar Jan 28 '19 10:01 peter-wangxu

this is known limitation for file queue

Sorry, I wasn't aware. Can this be documented? It's described as "thread-safe" and this doesn't really fit that bill. Also - doesn't the same apply to the sqlite queue? (I assume that's what the SQLiteAckQueue is for.)

you should reenque the failed items so that it can be processed later, can this fit your case

Ah - so you mean every time I .get() I follow it with .task_done() - and then if a fail happens, I requeue it? This should work, though the FIFO order wouldn't be preserved - which isn't too much of a drama for us, actually.

kodonnell avatar Jan 28 '19 19:01 kodonnell

@kodonnell sqlite ack queue should fit well in your case since you have strict FIFO requirement, I strongly suggest you trying it.

the file queue data is written sequentially, and it's hard to implement ACK for part of its content. If you have any idea, just pop it up here.

Thanks Peter

peter-wangxu avatar Jan 30 '19 06:01 peter-wangxu

this is known limitation for file queue

Sorry, I wasn't aware. Can this be documented? It's described as "thread-safe" and this doesn't really fit that bill. Also - doesn't the same apply to the sqlite queue? (I assume that's what the SQLiteAckQueue is for.)

you should reenque the failed items so that it can be processed later, can this fit your case

Ah - so you mean every time I .get() I follow it with .task_done() - and then if a fail happens, I requeue it? This should work, though the FIFO order wouldn't be preserved - which isn't too much of a drama for us, actually.

Late here but use multiple similar queue having dedicated roles (i.e. not just 1 but 2 additional compensating queues, so 1 queue for success and 2 queues for handling failure scenarios); accordingly put items so that you can jump / swap between them. That should ensure FIFO with some additional load.

avineshwar avatar Sep 06 '22 22:09 avineshwar