faust icon indicating copy to clipboard operation
faust copied to clipboard

Producer is sending messages outside a transaction. (Exactly Once Semantic)

Open irux opened this issue 4 years ago • 0 comments

Checklist

  • [x] I have included information about relevant versions
  • [x] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

from typing import Any
import sys
import faust
from faust.serializers.codecs import codecs
from faust.types import ProcessingGuarantee
from mode import CrashingSupervisor
import os

brokers = "127.0.0.1:9094"
broker_list = list(map(lambda x: "kafka://" + str(x), brokers.split(",")))

settings = faust.Settings(
    id="test-faust-config",
    processing_guarantee=ProcessingGuarantee.EXACTLY_ONCE,
    agent_supervisor=CrashingSupervisor,
    broker=broker_list,
    stream_publish_on_commit=True,
    broker_commit_every=1
)

app = faust.App(id="test-faust-config")

app.conf = settings

topic_input = app.topic(
    "fake-messages",
    key_serializer=codecs["raw"],
    key_type=bytes,
    value_serializer=codecs["raw"],
)

topic_output = app.topic(
    "processed-messages",
    key_serializer=codecs["raw"],
    key_type=bytes,
    value_serializer=codecs["raw"]
)



@app.agent(topic_input)
async def processor(stream: faust.Stream[Any]):
    async for key, value in stream.items():
        await topic_output.send_soon(key=key, value=value)


sys.argv = ["", "worker", "-l", "info"]
app.main()

Expected behavior

The exactly once semantic should work and it should only commit when the transaction was successful.

Actual behavior

Faust is initializing a default producer that doesn't correspond to a partition. Then, this default producer is sending messages outside the transaction and this creates duplicates when the application crash and recovers.

Possible problem in code

The problem is when the consumer tell the producer to send a message. Here the consumer send a message without a transactional_id and it causes the producer to send a message outside the transaction.

Versions

  • Python version 3.7
  • Faust version 0.6.5
  • Kafka version 2.6

irux avatar Aug 09 '21 18:08 irux