faust icon indicating copy to clipboard operation
faust copied to clipboard

Error when creating a topic using aiokafka 0.11.0

Open hgalytoby opened this issue 1 year ago • 4 comments

poetry add faust-stream

import faust


class Greeting(faust.Record):
    from_name: str
    to_name: str


app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)


@app.agent(topic)
async def hello(greetings):
    async for greeting in greetings:
        print(f'Hello from {greeting.from_name} to {greeting.to_name}')


@app.timer(interval=1.0)
async def example_sender(app):
    await hello.send(
        value=Greeting(from_name='Faust', to_name='you'),
    )


if __name__ == '__main__':
    app.main()

error

AttributeError: 'CreateTopicsRequest_v1' object has no attribute 'build_request_header'

No problems with aiokafka 0.10.0

hgalytoby avatar Jul 02 '24 09:07 hgalytoby

Do you need the Faust application to create the topic? Can you try to add the topic manually with the Kafka topic scripts and see if that helps?

fonty422 avatar Jul 03 '24 21:07 fonty422

Usually I don't need faust to create a topic, usually faust just creates the leader topic for me.

I know there are other ways to create topics, it's just a matter of me finding out about it and letting it be known!

hgalytoby avatar Jul 04 '24 01:07 hgalytoby

I have an issue with my applications where if I use Faust to create the topics it often results in a topic not being created (something about topic can't be used while being created) and then the error message ignoring missing topic: . I think this is due to a large volume of applications being started at the same time and the CPU is wildly overloaded.

To get around this, I manually create the topics before hand using the kafka_topics.sh (or .bat if using windows) that's part of your kafka download and found in the bin directory.

For me, when ever I start my application I first run: kafka-topics.sh --create --if-not-exists --topic <topic name> --bootstrap-server=<address> --partitions=<partitions> replacing topic name, address, and partitions with the appropriate values.

The --if-not-exists argument will only create the topic if it doesn't already exist.

fonty422 avatar Jul 04 '24 05:07 fonty422

I'm seeing the same issue with the combination of faust 0.11.0 and aiokafka 0.10.0. Forcing aiokafka==0.10.0 fixes the issue.

jjwwiodyne avatar Jul 05 '24 22:07 jjwwiodyne

Added a PR for this https://github.com/faust-streaming/faust/pull/631, can someone have a look?

somnam avatar Jul 09 '24 16:07 somnam