Pydantic model arguments with bytes cannot be sent to a worker
I have a Pydantic model which has a "bytes" field:
class RawData:
info: str
bytes: bytes
And some task definition, for example:
@worker.task
async def some_task(data: RawData):
...
But when it tries to send it to a worker, it raises UnicodeDecodeError: 'utf-8' codec can't decode byte 0x89 in position 0: invalid utf-8. After digging into the source code, it turned out the library uses model_dump(mode='json') when preparing the message:
def model_dump(instance: Model) -> Dict[str, Any]:
return instance.model_dump(mode="json")
And it just can't decode some byte to UTF8. When using model_dump without mode="json", it works though.
I can solve this by sending the fields as parameters of a task, but it's not the way I want it to work. Any chance it can be fixed?
@Suoslex The problem you're having is Pydantic's features, not TaskIQ's. Use HexBytes, which I presented below, to make everything work for you.
def hex_bytes_validator(o: Any) -> bytes:
if isinstance(o, bytes):
return o
elif isinstance(o, bytearray):
return bytes(o)
elif isinstance(o, str):
return bytes.fromhex(o)
raise TypeError(type(o))
HexBytes = Annotated[
bytes, PlainValidator(hex_bytes_validator), PlainSerializer(lambda b: b.hex()), WithJsonSchema({"type": "string"})
]
It's a common pydantic problem.
We cannot serialize without mode=json, because it makes it really hard to serialize things in future steps.
However, you can make a serialization hint for pydantic.
from pathlib import Path
from typing import Annotated
from pydantic import BaseModel, PlainSerializer, PlainValidator
import subprocess
class TestModel(BaseModel):
test: Annotated[
bytes,
PlainSerializer(lambda x: list(bytes(x))),
PlainValidator(bytes),
]
t = TestModel(test=Path("~/Grafana.png").expanduser().read_bytes())
parsed = TestModel.model_validate_json(t.model_dump_json())
This annotation tells Pydantic to serialize bytes as an array of integers (which is their true form) and deserialize them back to bytes object when parsing.