taskiq-faststream
taskiq-faststream copied to clipboard
Validation error RabbitQueue
faststream = {extras = ["rabbit"], version = "^0.5.12"} taskiq-faststream = {extras = ["rabbit"], version = "^0.1.8"}
from faststream.rabbit import RabbitQueue, RabbitExchange
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_faststream import BrokerWrapper, StreamScheduler
from taskiq_faststream.types import ScheduledTask
from src.api.amqp.main import BROKER
taskiq_broker = BrokerWrapper(BROKER)
scheduler = StreamScheduler(
broker=taskiq_broker,
sources=[
LabelScheduleSource(taskiq_broker)
],
)
taskiq_broker.task(
queue=RabbitQueue(f'test', auto_delete=True),
exchange=RabbitExchange('test', auto_delete=True),
schedule=[
ScheduledTask(cron='* * * * *')
],
)
[2024-06-22 17:58:20,587][INFO ][run:run_scheduler:208] Startup completed.
[2024-06-22 17:58:20,588][INFO ][run:delayed_send:130] Sending task taskiq_faststream.broker:lambda_09e8c474b85e4cf5a4e544b775f6544b.
[2024-06-22 17:59:00,011][ERROR ][base_events:default_exception_handler:1821] Task exception was never retrieved
future: <Task finished name='Task-17' coro=<delayed_send() done, defined at /usr/local/lib/python3.12/site-packages/taskiq/cli/scheduler/run.py:106> exception=SendTaskError()>
Traceback (most recent call last):
File "/usr/local/lib/python3.12/site-packages/taskiq/kicker.py", line 138, in kiq
await self.broker.kick(self.broker.formatter.dumps(message))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/taskiq/formatters/proxy_formatter.py", line 27, in dumps
message=self.broker.serializer.dumpb(model_dump(message)),
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/taskiq/compat.py", line 31, in model_dump
return instance.model_dump(mode="json")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/pydantic/main.py", line 347, in model_dump
return self.__pydantic_serializer__.to_python(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
pydantic_core._pydantic_core.PydanticSerializationError: Unable to serialize unknown type: <class 'faststream.rabbit.schemas.queue.RabbitQueue'>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.12/site-packages/taskiq/cli/scheduler/run.py", line 131, in delayed_send
await scheduler.on_ready(source, task)
File "/usr/local/lib/python3.12/site-packages/taskiq_faststream/scheduler.py", line 24, in on_ready
await LabelRespectKicker(task.task_name, self.broker, task.labels).kiq(
File "/usr/local/lib/python3.12/site-packages/taskiq/kicker.py", line 140, in kiq
raise SendTaskError from exc
taskiq.exceptions.SendTaskError
[2024-06-22 17:59:00,015][INFO ][run:delayed_send:130] Sending task taskiq_faststream.broker:lambda_09e8c474b85e4cf5a4e544b775f6544b.