faust icon indicating copy to clipboard operation
faust copied to clipboard

stream.events() vs stream.noack().noack_take(1,1)

Open ekerstens opened this issue 3 years ago • 2 comments

Checklist

  • [ ] I have included information about relevant versions
  • [ ] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

The issue with events is that upon restart, the offset of the event causing the failure gets skipped and does not process. For example, consider that you have the following:

Expected behavior

Tell us what you expected to happen.

Topic Source Payloads:

{customer_id: 1, details: "something_1"}
{customer_id: 2, details: "something_2"}
{customer_id: 3, details: "something_3"}
{customer_id: 4, details: "something_4"}
{customer_id: 5, details: "something_5"}
{customer_id: 6, details: "something_6"}
{customer_id: 7, details: "something_7"}
{customer_id: 8, details: "something_8"}
{customer_id: 9, details: "something_9"}

If these are sent to the following agent:

@app.agent(topic_source, sink=[topic_sink])
async def process(stream):
    async for record in stream.noack().event():
           if record.value.get('customer_id') == 5:
                      raise exception
            yield record
            await stream.ack(record)

The agent will stop processing at customer_id 5. If you comment out the if-exception block, the app should continue processing and there should be a total of 9 unique distant payloads in the sink topic. However, what happens is that payload for 'customer_id' == 5 will get ignored upon restart.

A workaround for this is to use noack_take(1, 1). The agent becomes:

@app.agent(topic_source, sink=[topic_sink])
async def process(stream):
    async for record in stream.noack().noack_take(1, 1):
          new_record = record[0]
           if new_record.value.get('customer_id') == 5:
                      raise exception
            yield new_record
            await stream.ack(record[0])

Tell us what you did to cause something to happen.

Actual behavior

Tell us what happened instead.

Full traceback

Paste the full traceback (if there is any)

Versions

  • Python version
  • Faust version
  • Operating system
  • Kafka version
  • RocksDB version (if applicable)

ekerstens avatar Jun 06 '22 18:06 ekerstens

@zerafachris can you provide some extra detail on the actual vs. expected behavior? I wasn't able to replicate a difference between these approaches.

ekerstens avatar Jun 06 '22 18:06 ekerstens

@ekerstens This is the agent_1:

@app.agent(topics.source_topic, sink=[topics.predict_sink])
async def predict(stream: StreamT):
    async for record in stream.noack().events():
        if record.value.get('customer_id') == 5:
            raise Exception('customer_id == 5')
        yield record.value
        await stream.ack(record)

and this is agent_2:

@app.agent(topics.source_topic, sink=[topics.predict_sink])
async def predict(stream: StreamT):
    async for record in stream.noack().events():
        yield record.value
        await stream.ack(record)

Send the following payloads:

{"customer_id": 1, "details": "something_1"}
{"customer_id": 2, "details": "something_2"}
{"customer_id": 3, "details": "something_3"}
{"customer_id": 4, "details": "something_4"}
{"customer_id": 5, "details": "something_5"}
{"customer_id": 6, "details": "something_6"}
{"customer_id": 7, "details": "something_7"}
{"customer_id": 8, "details": "something_8"}
{"customer_id": 9, "details": "something_9"}

Actual Behaviour

Send the above payloads to agent_1. This processes the following 8 messages only:

{"customer_id": 1, "details": "something_1"}
{"customer_id": 3, "details": "something_3"}
{"customer_id": 4, "details": "something_4"}
{"customer_id": 8, "details": "something_8"}
{"customer_id": 2, "details": "something_2"}
{"customer_id": 6, "details": "something_6"}
{"customer_id": 7, "details": "something_7"}
{"customer_id": 9, "details": "something_9"}

Change the app to use agent_2. This should process the unacknowledged payload {"customer_id": 5, "details": "something_5"} and not skip it.

Expected Behaviour

Send the above payloads to agent_1. This processes the following 8 messages only:

{"customer_id": 1, "details": "something_1"}
{"customer_id": 3, "details": "something_3"}
{"customer_id": 4, "details": "something_4"}
{"customer_id": 8, "details": "something_8"}
{"customer_id": 2, "details": "something_2"}
{"customer_id": 6, "details": "something_6"}
{"customer_id": 7, "details": "something_7"}
{"customer_id": 9, "details": "something_9"}

Change the app to use agent_2. This should process the unacknowledged payload: {"customer_id": 5, "details": "something_5"} upon restart

Versions

  • Python version 3.9.9
  • Faust version 0.6.11

zerafachris avatar Jun 08 '22 15:06 zerafachris