arroyo icon indicating copy to clipboard operation
arroyo copied to clipboard

Unfold removes committable from values in ValueBatch

Open mj0nez opened this issue 1 year ago • 3 comments

Hi, I’m not sure if this is a really a bug, but I stumbled over the Unfold strategy’s message generation. Consider the use case where our streaming process consists of the following steps:

0) consume -> 1) batch -> 2) process in batch-> 3) unbatch -> 4) process message wise -> 5) commit

When the ValuesBatch is unbatched in step 3, the Unfold strategy creates a new Message instance with a new Value which is then submitted to step 4. Unfortunately, only the last message of the batch gets a committable, although all payloads might have one already. From what I gathered from the BatchStep’s flow, I assumed it would just fan out the messages and submit them one after the other to the following step, thus unchanged.

I believe the current behavior, while useful to reduce the number of commits, does not belong to a generic strategy or at least is a bit hidden. My reasoning for moving this out of the strategy or add a note to both classes, would be that for downstream steps like number 4 it is now impossible to provide Partition and offset when raising an InvalidMessage exception. Furthermore, in case of an exception in the batches last message, the commit information is lost all together.

Environment

arroyo 2.17.4

Steps to Reproduce

from datetime import datetime
from unittest.mock import MagicMock

from arroyo.backends.kafka import KafkaPayload
from arroyo.processing.strategies import BatchStep, UnbatchStep
from arroyo.types import BrokerValue, Message, Partition, Topic

topic = Topic("test")
partition = Partition(topic, 0)


def test_unfold():
    commit = MagicMock()

    unbatch = UnbatchStep(next_step=commit)

    batch = BatchStep(max_batch_size=2, max_batch_time=3, next_step=unbatch)

    for i in range(1, 6):
        batch.submit(
            message=Message(
                BrokerValue(
                    KafkaPayload(None, b"1", []),
                    Partition(topic, 0),
                    i,
                    datetime.now(),
                )
            )
        )

    # last messages in batch - will receive committable
    assert commit.method_calls[1].args[0].committable == {
        Partition(topic=topic, index=0): 3
    }
    assert commit.method_calls[3].args[0].committable == {
        Partition(topic=topic, index=0): 5
    }

    # this will raise
    assert commit.method_calls[2].args[0].committable == {
        Partition(topic=topic, index=0): 4
    }

mj0nez avatar May 19 '24 12:05 mj0nez

Yeah this seems like surprising behavior, feel free to change it. I am not sure if the issue is isolated to unbatch or if batch already loses the commit data.

untitaker avatar May 20 '24 11:05 untitaker

I took a shot, feel free to take it apart :)

mj0nez avatar May 21 '24 21:05 mj0nez

This has a PR in review: https://github.com/getsentry/arroyo/pull/371

(posting to take it out of our support queue)

onewland avatar Jun 10 '24 21:06 onewland

Fixed in 2.18.0. Thanks for your contribution and sorry for the delay!

untitaker avatar Sep 26 '24 15:09 untitaker