stream.events() vs stream.noack().noack_take(1,1)
Checklist
- [ ] I have included information about relevant versions
- [ ] I have verified that the issue persists when using the
masterbranch of Faust.
Steps to reproduce
The issue with events is that upon restart, the offset of the event causing the failure gets skipped and does not process. For example, consider that you have the following:
Expected behavior
Tell us what you expected to happen.
Topic Source Payloads:
{customer_id: 1, details: "something_1"}
{customer_id: 2, details: "something_2"}
{customer_id: 3, details: "something_3"}
{customer_id: 4, details: "something_4"}
{customer_id: 5, details: "something_5"}
{customer_id: 6, details: "something_6"}
{customer_id: 7, details: "something_7"}
{customer_id: 8, details: "something_8"}
{customer_id: 9, details: "something_9"}
If these are sent to the following agent:
@app.agent(topic_source, sink=[topic_sink])
async def process(stream):
async for record in stream.noack().event():
if record.value.get('customer_id') == 5:
raise exception
yield record
await stream.ack(record)
The agent will stop processing at customer_id 5. If you comment out the if-exception block, the app should continue processing and there should be a total of 9 unique distant payloads in the sink topic. However, what happens is that payload for 'customer_id' == 5 will get ignored upon restart.
A workaround for this is to use noack_take(1, 1). The agent becomes:
@app.agent(topic_source, sink=[topic_sink])
async def process(stream):
async for record in stream.noack().noack_take(1, 1):
new_record = record[0]
if new_record.value.get('customer_id') == 5:
raise exception
yield new_record
await stream.ack(record[0])
Tell us what you did to cause something to happen.
Actual behavior
Tell us what happened instead.
Full traceback
Paste the full traceback (if there is any)
Versions
- Python version
- Faust version
- Operating system
- Kafka version
- RocksDB version (if applicable)
@zerafachris can you provide some extra detail on the actual vs. expected behavior? I wasn't able to replicate a difference between these approaches.
@ekerstens This is the agent_1:
@app.agent(topics.source_topic, sink=[topics.predict_sink])
async def predict(stream: StreamT):
async for record in stream.noack().events():
if record.value.get('customer_id') == 5:
raise Exception('customer_id == 5')
yield record.value
await stream.ack(record)
and this is agent_2:
@app.agent(topics.source_topic, sink=[topics.predict_sink])
async def predict(stream: StreamT):
async for record in stream.noack().events():
yield record.value
await stream.ack(record)
Send the following payloads:
{"customer_id": 1, "details": "something_1"}
{"customer_id": 2, "details": "something_2"}
{"customer_id": 3, "details": "something_3"}
{"customer_id": 4, "details": "something_4"}
{"customer_id": 5, "details": "something_5"}
{"customer_id": 6, "details": "something_6"}
{"customer_id": 7, "details": "something_7"}
{"customer_id": 8, "details": "something_8"}
{"customer_id": 9, "details": "something_9"}
Actual Behaviour
Send the above payloads to agent_1. This processes the following 8 messages only:
{"customer_id": 1, "details": "something_1"}
{"customer_id": 3, "details": "something_3"}
{"customer_id": 4, "details": "something_4"}
{"customer_id": 8, "details": "something_8"}
{"customer_id": 2, "details": "something_2"}
{"customer_id": 6, "details": "something_6"}
{"customer_id": 7, "details": "something_7"}
{"customer_id": 9, "details": "something_9"}
Change the app to use agent_2. This should process the unacknowledged payload {"customer_id": 5, "details": "something_5"} and not skip it.
Expected Behaviour
Send the above payloads to agent_1. This processes the following 8 messages only:
{"customer_id": 1, "details": "something_1"}
{"customer_id": 3, "details": "something_3"}
{"customer_id": 4, "details": "something_4"}
{"customer_id": 8, "details": "something_8"}
{"customer_id": 2, "details": "something_2"}
{"customer_id": 6, "details": "something_6"}
{"customer_id": 7, "details": "something_7"}
{"customer_id": 9, "details": "something_9"}
Change the app to use agent_2. This should process the unacknowledged payload:
{"customer_id": 5, "details": "something_5"}
upon restart
Versions
- Python version 3.9.9
- Faust version 0.6.11