faust icon indicating copy to clipboard operation
faust copied to clipboard

Stream.group_by don't support schema

Open cmartmos opened this issue 2 years ago • 3 comments

Checklist

  • [X] I have included information about relevant versions
  • [X] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

I have an agent that uses a custom schema. Also, the agent groups messages by key before processing them.

The issue is due to the internal topic created by the method stream.group_by. This internal topic doesn't deserialize the message properly.

The solution could be to pass this schema through an argument to the method Stream.group_by.

Expected behavior

async def get_key(value):
        return value.id

@app.agent(schema=CustomSchema())
async def counter(stream):
    """ Count records. """
    async for event in stream.group_by(get_key, name='count').events():
        
        ## the type of event.value should be MyRecord, instead of str
        
        yield event

Actual behavior

This internal topic created by Stream.group_by doesn't deserialize the message properly.

Versions

  • Python version: 3.10
  • Faust version: 0.10.16

cmartmos avatar Oct 06 '23 08:10 cmartmos

An ugly patch to the issue was the following

schema = MySchema()

# create internal topic manually 
internal_topic = app.topic('counter-internal-topic', schema=schema, internal=True)


@app.agent()
async def counter(stream):
    async for event in stream.group_by(get_key, name='count', topic=internal_topic).events():
        # ...
        yield event

cmartmos avatar Oct 06 '23 08:10 cmartmos

@dabdada could you extend stream.group_by to accept schemas as an argument?

cmartmos avatar Oct 27 '23 10:10 cmartmos

@cmartmos I have not used the group_by functionality my own. While having a quick glance at the function there is a topic parameter. Can you create a topic and pass it into the function as a param to use instead of internal topic? That way you could specify key-/valueserializer as required?

dada-engineer avatar Oct 28 '23 12:10 dada-engineer