Is it possible to kill a running job ("via" database)?
I've searched the docs and only found references to SIGINT, SIGKILL and SIGTERM related to the worker process directly, but I have a use case where the code that schedules jobs (running in another machine) needs to kill a running job.
there's nothing in place currently for doing that. I guess that it should be possible, we'd need a few things:
- all workers would listen to events in the LISTEN/NOTIFY channel (either the existing one or a new one)
- if it's the existing one, we'd need to add some processing logic so that depending on the message we do specific actions
- upon receiving a cancellation order, workers would need to determine if they need to process it, so they need to keep track of what tasks are currently being processed
- if a worker is processing the task to be cancelled, it needs to cancel the coroutine. Note that it will only work for async tasks (until we get things moving with #753)
- we'll probably need a new state for tasks, "cancelled". Or maybe we just set them as errors
Just tried to do something similar as well. I have a job that fetches a file. I want retries for network errors, but if the file is unparseable I want to give up and cancel the job from within the task rather than keep retrying since it will never succeed.
@app.periodic(cron="* * * * *")
@app.task(queue="foo", retry=5, pass_context=True) # type: ignore
async def foo(context: JobContext, timestamp: int) -> None:
try:
foo()
except UnretryableError:
await context.app.job_manager.finish_job(
context.job, status=jobs.Status.FAILED, delete_job=False
)
Error: One of the specified coroutines ended with an exception
Database error.
Job was not found or not in "doing" or "todo" status (job id: 14)
I saw there was a cancel defined from the cli that uses the same function call, but I receive an error while inside the context of the task decorator.
https://github.com/procrastinate-org/procrastinate/blob/28d0252b5a02dbbf52132833930c34641d1ea71e/procrastinate/shell.py#L131-L145
Just tried to do something similar as well. I have a job that fetches a file. I want retries for network errors, but if the file is unparseable I want to give up and cancel the job from within the task rather than keep retrying since it will never succeed.
@app.periodic(cron="* * * * *") @app.task(queue="foo", retry=5, pass_context=True) # type: ignore async def foo(context: JobContext, timestamp: int) -> None: try: foo() except UnretryableError: await context.app.job_manager.finish_job( context.job, status=jobs.Status.FAILED, delete_job=False )Error: One of the specified coroutines ended with an exception Database error. Job was not found or not in "doing" or "todo" status (job id: 14)I saw there was a cancel defined from the cli that uses the same function call, but I receive an error while inside the context of the task decorator.
https://github.com/procrastinate-org/procrastinate/blob/28d0252b5a02dbbf52132833930c34641d1ea71e/procrastinate/shell.py#L131-L145
That would be bug https://github.com/procrastinate-org/procrastinate/issues/511.