Stream.group_by don't support schema
Checklist
- [X] I have included information about relevant versions
- [X] I have verified that the issue persists when using the
masterbranch of Faust.
Steps to reproduce
I have an agent that uses a custom schema. Also, the agent groups messages by key before processing them.
The issue is due to the internal topic created by the method stream.group_by. This internal topic doesn't deserialize the message properly.
The solution could be to pass this schema through an argument to the method Stream.group_by.
Expected behavior
async def get_key(value):
return value.id
@app.agent(schema=CustomSchema())
async def counter(stream):
""" Count records. """
async for event in stream.group_by(get_key, name='count').events():
## the type of event.value should be MyRecord, instead of str
yield event
Actual behavior
This internal topic created by Stream.group_by doesn't deserialize the message properly.
Versions
- Python version: 3.10
- Faust version: 0.10.16
An ugly patch to the issue was the following
schema = MySchema()
# create internal topic manually
internal_topic = app.topic('counter-internal-topic', schema=schema, internal=True)
@app.agent()
async def counter(stream):
async for event in stream.group_by(get_key, name='count', topic=internal_topic).events():
# ...
yield event
@dabdada could you extend stream.group_by to accept schemas as an argument?
@cmartmos I have not used the group_by functionality my own. While having a quick glance at the function there is a topic parameter. Can you create a topic and pass it into the function as a param to use instead of internal topic? That way you could specify key-/valueserializer as required?