taskiq-aio-pika icon indicating copy to clipboard operation
taskiq-aio-pika copied to clipboard

Support AioPikaBroker's declare_queues_kwargs['arguments'] parameter

Open WiZeYAR opened this issue 11 months ago • 0 comments

Inside AioPikaBroker class, I had to implement the following logic:

class AioPikaBroker(AsyncBroker):
    ...
    async def declare_queues(
        self,
        channel: AbstractChannel,
    ) -> AbstractQueue:
        await channel.declare_queue(
            self._dead_letter_queue_name,
            **self._declare_queues_kwargs,
        )
        args: dict[str, Any] = {
            "x-dead-letter-exchange": "",
            "x-dead-letter-routing-key": self._dead_letter_queue_name,
            "x-expires": int(timedelta(weeks=2).total_seconds()) * 1000,
        }
    ...

I initially tried to supply "x-expires" like this:

AioPikaBroker(declare_queues_kwargs={"arguments": {"x-expires": 1000}})

But this wouldn't work, as there would be 2 kwargs

Proposition:

Merge args with declare_queues_kwargs["arguments"] if these additional arguments exist. I'm going to submit a related PR soon

WiZeYAR avatar May 20 '25 11:05 WiZeYAR