taskiq icon indicating copy to clipboard operation
taskiq copied to clipboard

Pydantic model arguments with bytes cannot be sent to a worker

Open Suoslex opened this issue 8 months ago • 2 comments

I have a Pydantic model which has a "bytes" field:

class RawData:
    info: str
    bytes: bytes

And some task definition, for example:

@worker.task
async def some_task(data: RawData):
    ...

But when it tries to send it to a worker, it raises UnicodeDecodeError: 'utf-8' codec can't decode byte 0x89 in position 0: invalid utf-8. After digging into the source code, it turned out the library uses model_dump(mode='json') when preparing the message:

def model_dump(instance: Model) -> Dict[str, Any]:
   return instance.model_dump(mode="json")

And it just can't decode some byte to UTF8. When using model_dump without mode="json", it works though.

I can solve this by sending the fields as parameters of a task, but it's not the way I want it to work. Any chance it can be fixed?

Suoslex avatar Apr 27 '25 07:04 Suoslex

@Suoslex The problem you're having is Pydantic's features, not TaskIQ's. Use HexBytes, which I presented below, to make everything work for you.

def hex_bytes_validator(o: Any) -> bytes:
    if isinstance(o, bytes):
        return o
    elif isinstance(o, bytearray):
        return bytes(o)
    elif isinstance(o, str):
        return bytes.fromhex(o)
    raise TypeError(type(o))


HexBytes = Annotated[
    bytes, PlainValidator(hex_bytes_validator), PlainSerializer(lambda b: b.hex()), WithJsonSchema({"type": "string"})
]

C3EQUALZz avatar May 18 '25 11:05 C3EQUALZz

It's a common pydantic problem.

We cannot serialize without mode=json, because it makes it really hard to serialize things in future steps.

However, you can make a serialization hint for pydantic.

from pathlib import Path
from typing import Annotated
from pydantic import BaseModel, PlainSerializer, PlainValidator
import subprocess


class TestModel(BaseModel):
    test: Annotated[
        bytes,
        PlainSerializer(lambda x: list(bytes(x))),
        PlainValidator(bytes),
    ]


t = TestModel(test=Path("~/Grafana.png").expanduser().read_bytes())
parsed = TestModel.model_validate_json(t.model_dump_json())

This annotation tells Pydantic to serialize bytes as an array of integers (which is their true form) and deserialize them back to bytes object when parsing.

s3rius avatar Sep 09 '25 10:09 s3rius