Unfold removes committable from values in ValueBatch
Hi, I’m not sure if this is a really a bug, but I stumbled over the Unfold strategy’s message generation. Consider the use case where our streaming process consists of the following steps:
0) consume -> 1) batch -> 2) process in batch-> 3) unbatch -> 4) process message wise -> 5) commit
When the ValuesBatch is unbatched in step 3, the Unfold strategy creates a new Message instance with a new Value which is then submitted to step 4. Unfortunately, only the last message of the batch gets a committable, although all payloads might have one already. From what I gathered from the BatchStep’s flow, I assumed it would just fan out the messages and submit them one after the other to the following step, thus unchanged.
I believe the current behavior, while useful to reduce the number of commits, does not belong to a generic strategy or at least is a bit hidden. My reasoning for moving this out of the strategy or add a note to both classes, would be that for downstream steps like number 4 it is now impossible to provide Partition and offset when raising an InvalidMessage exception. Furthermore, in case of an exception in the batches last message, the commit information is lost all together.
Environment
arroyo 2.17.4
Steps to Reproduce
from datetime import datetime
from unittest.mock import MagicMock
from arroyo.backends.kafka import KafkaPayload
from arroyo.processing.strategies import BatchStep, UnbatchStep
from arroyo.types import BrokerValue, Message, Partition, Topic
topic = Topic("test")
partition = Partition(topic, 0)
def test_unfold():
commit = MagicMock()
unbatch = UnbatchStep(next_step=commit)
batch = BatchStep(max_batch_size=2, max_batch_time=3, next_step=unbatch)
for i in range(1, 6):
batch.submit(
message=Message(
BrokerValue(
KafkaPayload(None, b"1", []),
Partition(topic, 0),
i,
datetime.now(),
)
)
)
# last messages in batch - will receive committable
assert commit.method_calls[1].args[0].committable == {
Partition(topic=topic, index=0): 3
}
assert commit.method_calls[3].args[0].committable == {
Partition(topic=topic, index=0): 5
}
# this will raise
assert commit.method_calls[2].args[0].committable == {
Partition(topic=topic, index=0): 4
}
Yeah this seems like surprising behavior, feel free to change it. I am not sure if the issue is isolated to unbatch or if batch already loses the commit data.
I took a shot, feel free to take it apart :)
This has a PR in review: https://github.com/getsentry/arroyo/pull/371
(posting to take it out of our support queue)
Fixed in 2.18.0. Thanks for your contribution and sorry for the delay!