procrastinate icon indicating copy to clipboard operation
procrastinate copied to clipboard

Is it possible to kill a running job ("via" database)?

Open turicas opened this issue 3 years ago • 3 comments

I've searched the docs and only found references to SIGINT, SIGKILL and SIGTERM related to the worker process directly, but I have a use case where the code that schedules jobs (running in another machine) needs to kill a running job.

turicas avatar Apr 27 '23 14:04 turicas

there's nothing in place currently for doing that. I guess that it should be possible, we'd need a few things:

  • all workers would listen to events in the LISTEN/NOTIFY channel (either the existing one or a new one)
    • if it's the existing one, we'd need to add some processing logic so that depending on the message we do specific actions
  • upon receiving a cancellation order, workers would need to determine if they need to process it, so they need to keep track of what tasks are currently being processed
  • if a worker is processing the task to be cancelled, it needs to cancel the coroutine. Note that it will only work for async tasks (until we get things moving with #753)
  • we'll probably need a new state for tasks, "cancelled". Or maybe we just set them as errors

ewjoachim avatar Apr 30 '23 11:04 ewjoachim

Just tried to do something similar as well. I have a job that fetches a file. I want retries for network errors, but if the file is unparseable I want to give up and cancel the job from within the task rather than keep retrying since it will never succeed.

@app.periodic(cron="* * * * *")
@app.task(queue="foo", retry=5, pass_context=True)  # type: ignore
async def foo(context: JobContext, timestamp: int) -> None:
    try:
        foo()
    except UnretryableError:
        await context.app.job_manager.finish_job(
            context.job, status=jobs.Status.FAILED, delete_job=False
        )
Error: One of the specified coroutines ended with an exception

    Database error.

Job was not found or not in "doing" or "todo" status (job id: 14)

I saw there was a cancel defined from the cli that uses the same function call, but I receive an error while inside the context of the task decorator.

https://github.com/procrastinate-org/procrastinate/blob/28d0252b5a02dbbf52132833930c34641d1ea71e/procrastinate/shell.py#L131-L145

caire-bear avatar May 09 '23 04:05 caire-bear

Just tried to do something similar as well. I have a job that fetches a file. I want retries for network errors, but if the file is unparseable I want to give up and cancel the job from within the task rather than keep retrying since it will never succeed.

@app.periodic(cron="* * * * *")
@app.task(queue="foo", retry=5, pass_context=True)  # type: ignore
async def foo(context: JobContext, timestamp: int) -> None:
    try:
        foo()
    except UnretryableError:
        await context.app.job_manager.finish_job(
            context.job, status=jobs.Status.FAILED, delete_job=False
        )
Error: One of the specified coroutines ended with an exception

    Database error.

Job was not found or not in "doing" or "todo" status (job id: 14)

I saw there was a cancel defined from the cli that uses the same function call, but I receive an error while inside the context of the task decorator.

https://github.com/procrastinate-org/procrastinate/blob/28d0252b5a02dbbf52132833930c34641d1ea71e/procrastinate/shell.py#L131-L145

That would be bug https://github.com/procrastinate-org/procrastinate/issues/511.

wokis avatar Dec 14 '23 14:12 wokis