faust
faust copied to clipboard
Producer is sending messages outside a transaction. (Exactly Once Semantic)
Checklist
- [x] I have included information about relevant versions
- [x] I have verified that the issue persists when using the
masterbranch of Faust.
Steps to reproduce
from typing import Any
import sys
import faust
from faust.serializers.codecs import codecs
from faust.types import ProcessingGuarantee
from mode import CrashingSupervisor
import os
brokers = "127.0.0.1:9094"
broker_list = list(map(lambda x: "kafka://" + str(x), brokers.split(",")))
settings = faust.Settings(
id="test-faust-config",
processing_guarantee=ProcessingGuarantee.EXACTLY_ONCE,
agent_supervisor=CrashingSupervisor,
broker=broker_list,
stream_publish_on_commit=True,
broker_commit_every=1
)
app = faust.App(id="test-faust-config")
app.conf = settings
topic_input = app.topic(
"fake-messages",
key_serializer=codecs["raw"],
key_type=bytes,
value_serializer=codecs["raw"],
)
topic_output = app.topic(
"processed-messages",
key_serializer=codecs["raw"],
key_type=bytes,
value_serializer=codecs["raw"]
)
@app.agent(topic_input)
async def processor(stream: faust.Stream[Any]):
async for key, value in stream.items():
await topic_output.send_soon(key=key, value=value)
sys.argv = ["", "worker", "-l", "info"]
app.main()
Expected behavior
The exactly once semantic should work and it should only commit when the transaction was successful.
Actual behavior
Faust is initializing a default producer that doesn't correspond to a partition. Then, this default producer is sending messages outside the transaction and this creates duplicates when the application crash and recovers.
Possible problem in code
The problem is when the consumer tell the producer to send a message. Here the consumer send a message without a transactional_id and it causes the producer to send a message outside the transaction.
Versions
- Python version 3.7
- Faust version 0.6.5
- Kafka version 2.6