Proposal: CancellationBackend
I want to propose a possible general API for task cancellation. I don't have much experience with distributed tasks so my goal is to spark conversation and if the concept is good enough I would love to try to implement it.
What I mean by cancellation is either absence of task execution (revokement) or immediate stop of already executing task (termination). If one wants to have control over cancellation behaviour from within the task the proposed API can be modified for implementation of CancellationToken.
The tasks are delivered via queues which don't support removing specific messages from it (iirc taskiq was initially made with RMQ in mind). If this was possible, we could just remove existing message and notify workers with a cancellation message queue. Since that's not the case, Receivers need to filter incoming messages. This filtering would be supported with CancellationBackend which similarly to ResultBackend stores state about a task.
In its simplest form it can look like this:
from typing import AsyncGenerator
from abc import ABC, abstractmethod
class CancellationBackend(ABC):
@abstractmethod
async def cancel_task(self, task_id: str, terminate=True):
""" Cancel a task """
pass
@abstractmethod
async def is_cancelled(self, task_id: str):
""" Check if a task is cancelled """
pass
@abstractmethod
async def listen(self) -> AsyncGenerator[bytes, None]:
""" Listen to cancellation messages """
pass
Task execution can be skipped after receiving the message in callback. To cancel currently running task, Receiver needs to listen to cancellation messages similar to prefetcher. As of how exactly tasks should be cancelled, I don't have a solution yet.
A perfect tool for such backend would be Redis since it can both store data and notify workers with pub/sub functionality. But what if someone doesn't want to add yet another service to their app and instead use what they have? We can separate cancellation state holding and message sending into two classes, CancellationStateHolder and CancellationMessager. This way one can use, for example, SQL database and RabbitMQ.
from typing import AsyncGenerator
from abc import ABC, abstractmethod
class CancellationStateHolder(ABC):
@abstractmethod
async def cancel_task(self, task_id: str):
pass
@abstractmethod
async def is_cancelled(self, task_id: str):
pass
class CancellationMessager(ABC):
@abstractmethod
async def send(self, task_id: str):
pass
@abstractmethod
async def listen(self) -> AsyncGenerator[bytes, None]:
pass
class ModularCancellationBackend(CancellationBackend):
def __init__(self, holder: CancellationStateHolder, messager: CancellationMessanger):
self.holder = holder
self.messager = messager
async def cancel_task(self, task_id: str, terminate=True):
""" Cancel a task """
self.holder.cancel_task(task_id)
if terminate:
self.messager.send(task_id)
async def is_cancelled(self, task_id: str):
""" Check if a task is cancelled """
return self.holder.is_cancelled(task_id)
async def listen(self) -> AsyncGenerator[bytes, None]:
""" Listen to cancellation messages """
return self.messager.listen()
TaskMiddleware will on_cancel callback. AsyncTaskiqTask will have cancel and is_cancelled methods that would call methods of CancellationBackend. AsyncBroker will have with_cancellation_backend method and a respective field.