celery-progress icon indicating copy to clipboard operation
celery-progress copied to clipboard

Any way to track a celery group?

Open petoor opened this issue 5 years ago • 5 comments

Hello.

Is there any way to track the progress of a celery group? That is, spawning a lot of asynchronous tasks, and keeping track of how many subtasks are completed as a function The celery group has a completed_count() option which does exactly that, but my understanding of celery-progress is not good enough to know if this can be incorporated https://docs.celeryproject.org/en/latest/reference/celery.result.html#celery.result.GroupResult

if our group code looks something like this group(do_something_to_file.si(file) for file in file_list) then i'm not sure where to put the observer, since every subtask has a unique task id. We could also assign an id to the group itself, but then again im not sure where to put the observer.

Best regards. Peter

petoor avatar Oct 12 '20 12:10 petoor

I've never attempted this. A couple possible options that may work (just some quick thoughts):

  1. Show a separate progress per task using the normal observer
  2. Create some other observer object that can monitor all the child tasks and report progress on everything. Not sure whether you want to just track the number that have completed or the progress of each individual task in the group, but the latter would require passing information from the spawned tasks back up to the main thread somehow (sounds a bit tricky).

czue avatar Oct 13 '20 07:10 czue

Hi @czue, I really appreciate your commitment :). I'm working with @petoor, that's why I'm commenting on this issue also. One of the issues with the current workflow for this project (if I understand correctly) is that the view is expected to return the task id so if multiple tasks are spawned from a group, there's no way to return the id's of all tasks. Is that right?

I'm working on #50 which I guess it would solve this by including the id of each subtask within a group in a table that links it to a particular user. Then this can be queried at any time.

I have currently a very simple implementation of #50 implemented that I'm testing at the moment. I'll be able to share it shortly in case it might be useful for this project.

jjrugui avatar Oct 13 '20 08:10 jjrugui

The package currently does have support for tracking tasks in a group, although it does not have persistence as you've noted. An example view to show off group support would look as follows:

def task_view(request):
    result = group(task.s(i) for i in [1000 for _ in range(5)])()
    return render(request, 'http.html', context={'task_ids': [task for parents in result.children for task in parents.as_list()[::-1]]})

The task_ids context will be used to get all of the tasks currently in the group, and the list comprehension is to simply act as a star expression to remove each task ID from the list that parents.as_list() generates, and to reverse it so any children of that task are properly displayed as well (or else you'd have the first tasks at the bottom of the list!). An accompanying task would be:

@shared_task(bind=True)
def task(self, number):
    progress_recorder = ProgressRecorder(self)
    for i in range(number):
        progress_recorder.set_progress(i+1, number)
    return int(random()*1000)

For the html, the significant part of the page would have:

{% for task_id in task_ids %}
<div class='progress-wrapper-{{ forloop.counter0 }}'>
    <div id='progress-bar-{{ forloop.counter0 }}' class='progress-bar-{{ forloop.counter0 }}' style="background-color: #68a9ef; width: 0%;">&nbsp;</div>
    <div id="progress-bar-message-{{ forloop.counter0 }}">Waiting for progress to start...</div>
    <div id="progress-result-{{ forloop.counter0 }}"></div>
</div>
{% endfor %}
<script>
    document.addEventListener("DOMContentLoaded", function () {
        const task_urls = [
            {% for task_id in task_ids %}"{% url 'celery_progress:task_status' task_id %}",
        {% endfor %}];
        for (var i = 0; i < task_urls.length; i++) {
            CeleryProgressBar.initProgressBar(task_urls[i], {
                progressBarId: "progress-bar-" + i,
                progressBarMessageId: "progress-bar-message-" + i,
                resultElementId: "progress-result-" + i
            });
        }
    });
</script>

The for loop will ensure that each task ID is assigned it's own bar and result element, and when the page finishes loading, the JavaScript will start a bar instance to begin pulling data. This, in total, would produce something to the tune of the image provided below. image When the tasks finish, the end result will look like the image below. image

If you have a callback that is supposed to activate after the task is finished, it would be fairly easy to add an if statement on the last task in the JavaScript for loop and modify the onSuccess to suit your needs. A native solution for this would be an interesting task to behold, but any suggestions are welcome!

EJH2 avatar Oct 14 '20 03:10 EJH2

Alternatively, if a single progress bar encompassing all tasks is what your after, that we currently don't support. As @czue noted, it would require passing information from all tasks into a centralized location. From there, this "super observer" would then have to try and figure out how "done" every task is, and I guess work out how to display those results. From what I can see from GroupResult's source, it seems a GroupResult is only returned once the group is started, which also is coincidentally the object that holds the information that would be used to get the task IDs. Alternatively, for a rather crazy idea, it would be interesting if each child task could be spawned with a "subserver" that will shout out it's status, with a "super observer" capturing this and doing generally the same as above. If either of these could be accomplished, it may be pretty useful.

EJH2 avatar Oct 14 '20 04:10 EJH2

hi could the main ProgressRecorder be serialized somehow to the subtasks? edit: when i try to run the desrialized set_progress i get [2021-03-08 18:59:39,920: ERROR/ForkPoolWorker-1] Task cluster123.tasks.task_progress_update[5e1e9d5a-54c6-4a89-b062-fcbbcbc283dd] raised unexpected: AttributeError("'dict' object has no attribute 'update_state'") Traceback (most recent call last): File "/home/arubico/PycharmProjects/djangoProject/venv/lib/python3.9/site-packages/celery/app/trace.py", line 405, in trace_task R = retval = fun(*args, **kwargs) File "/home/arubico/PycharmProjects/djangoProject/venv/lib/python3.9/site-packages/celery/app/trace.py", line 697, in __protected_call__ return self.run(*args, **kwargs) File "/home/arubico/PycharmProjects/djangoProject/cluster123/tasks.py", line 22, in task_progress_update prog_update.set_progress(i, t, m) File "/home/arubico/PycharmProjects/djangoProject/venv/lib/python3.9/site-packages/celery_progress/backend.py", line 50, in set_progress self.task.update_state( AttributeError: 'dict' object has no attribute 'update_state' which is logical the deserialization isn't a constructor and this is a bad idea to begin with

safhac avatar Mar 08 '21 13:03 safhac

Was this ever solved, I run the same problem as @safhac , trying to pass the ProgressRecorder from a main_task down to the sub_tasks it spawns, however this do not seem doable. Only solution ive come up with is for the main task to save the sub_task id´s which are generated when spawning them. And then building a system to iterate over the list of id´s and calculate a progress.

WassawRoki avatar Feb 28 '24 07:02 WassawRoki

Apologies for the radio-silence. We can likely add serialization to the ProgressRecorder class if that's all it takes? If someone who has run can provide a dummy example of the code that uses groups and/or spawns subtasks I'd be happy to try and get it working and then submit an updated version with support for this. Alternatively, I'd happily accept contributions on this.

czue avatar Feb 28 '24 11:02 czue

@shared_task(bind=True)
def main_task(self, seconds):
    progress_recorder = ProgressRecorder(self)
    result = 0
    for i in range(seconds):
        time.sleep(1)
        sub_task(seconds, progress_recorder)
        result += i
        progress_recorder.set_progress(i + 1, seconds)
    return result
    
@shared_task(bind=True)
def sub_task(self, seconds, progress_recorder):
    time.sleep(1)
    task_status = progress_recorder.get_status()
    progress_recorder.set_progress(task_status+1, seconds)

This is a quick rough idea. The reason for the need to be able to create subtasks that share a progress_recorder, is that if a group of tasks are to be executes async, it is a must that all iterations of the main task is created as their own tasks. As of right now i have gone with an implementation of django_celery_results.models.GroupResult. However this is only able to provide me with a count of completed vs total tasks in a group of tasks, i have not be able to use this with your library. This also mean that i am not able to track progress of the sub_tasks as i only track completion.

WassawRoki avatar Mar 01 '24 11:03 WassawRoki

thanks - I'll take a look soon

czue avatar Mar 01 '24 11:03 czue

I was able to get something working. See https://github.com/czue/celery-progress/pull/115/. Would love any feedback/testing as I didn't explore the edge cases.

Note, I explicitly used GroupResult instead of working with this example. @WassawRoki does this work for your use case?

czue avatar Mar 04 '24 09:03 czue

Interesting, i will look in to this implementation asap :D

WassawRoki avatar Mar 04 '24 09:03 WassawRoki