taskiq-faststream icon indicating copy to clipboard operation
taskiq-faststream copied to clipboard

Validation error RabbitQueue

Open maxim-f1 opened this issue 1 year ago • 0 comments

faststream = {extras = ["rabbit"], version = "^0.5.12"} taskiq-faststream = {extras = ["rabbit"], version = "^0.1.8"}

from faststream.rabbit import RabbitQueue, RabbitExchange
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_faststream import BrokerWrapper, StreamScheduler
from taskiq_faststream.types import ScheduledTask

from src.api.amqp.main import BROKER


taskiq_broker = BrokerWrapper(BROKER)

scheduler = StreamScheduler(
    broker=taskiq_broker,
    sources=[
        LabelScheduleSource(taskiq_broker)
    ],
)

taskiq_broker.task(
    queue=RabbitQueue(f'test', auto_delete=True),
    exchange=RabbitExchange('test', auto_delete=True),
    schedule=[
        ScheduledTask(cron='* * * * *')
    ],
)

[2024-06-22 17:58:20,587][INFO   ][run:run_scheduler:208] Startup completed.
[2024-06-22 17:58:20,588][INFO   ][run:delayed_send:130] Sending task taskiq_faststream.broker:lambda_09e8c474b85e4cf5a4e544b775f6544b.
[2024-06-22 17:59:00,011][ERROR  ][base_events:default_exception_handler:1821] Task exception was never retrieved
future: <Task finished name='Task-17' coro=<delayed_send() done, defined at /usr/local/lib/python3.12/site-packages/taskiq/cli/scheduler/run.py:106> exception=SendTaskError()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/taskiq/kicker.py", line 138, in kiq
    await self.broker.kick(self.broker.formatter.dumps(message))
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/taskiq/formatters/proxy_formatter.py", line 27, in dumps
    message=self.broker.serializer.dumpb(model_dump(message)),
                                         ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/taskiq/compat.py", line 31, in model_dump
    return instance.model_dump(mode="json")
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/pydantic/main.py", line 347, in model_dump
    return self.__pydantic_serializer__.to_python(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
pydantic_core._pydantic_core.PydanticSerializationError: Unable to serialize unknown type: <class 'faststream.rabbit.schemas.queue.RabbitQueue'>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/taskiq/cli/scheduler/run.py", line 131, in delayed_send
    await scheduler.on_ready(source, task)
  File "/usr/local/lib/python3.12/site-packages/taskiq_faststream/scheduler.py", line 24, in on_ready
    await LabelRespectKicker(task.task_name, self.broker, task.labels).kiq(
  File "/usr/local/lib/python3.12/site-packages/taskiq/kicker.py", line 140, in kiq
    raise SendTaskError from exc
taskiq.exceptions.SendTaskError
[2024-06-22 17:59:00,015][INFO   ][run:delayed_send:130] Sending task taskiq_faststream.broker:lambda_09e8c474b85e4cf5a4e544b775f6544b.

maxim-f1 avatar Jun 22 '24 18:06 maxim-f1