arroyo icon indicating copy to clipboard operation
arroyo copied to clipboard

ref: Update accumulator sig to return Result<TResult> instead of TResult

Open john-z-yang opened this issue 1 year ago • 3 comments

Overview

Change the signature for the accumulator in reduce step so it can generate backpressure.

Details

In the rust snuba metrics processor, we're using an unbounded queue (defined here) where we push data on one end and a http client that writes directly to the network buffer on the other end. We're seeing on occasion very high memory usage for the rust processor and we suspect that the rate in which the strategy is processing the messages are faster than the rate in which the http client can write to it, causing the queue to take up a lot of memory.

We want to bound this buffer so that it emits backpressure, but that step is in a reduce step here.

john-z-yang avatar Apr 30 '24 21:04 john-z-yang

can you provide some context about the use-case for this? afaik it's generally used for batching which isn't a backpressure scenario. i believe the python version of this strategy also does not support backpressure -- is that supposed to be aligned as well?

lynnagara avatar Apr 30 '24 22:04 lynnagara

can you provide some context about the use-case for this? afaik it's generally used for batching which isn't a backpressure scenario. i believe the python version of this strategy also does not support backpressure -- is that supposed to be aligned as well?

In the rust snuba metrics processor, we're using an unbounded queue where we push data on one end and a http client that writes directly to the network buffer on the other end. We're seeing on occasion very high memory usage for the rust processor and we suspect that the rate in which the strategy is processing the messages are faster than the rate in which the http client can write to it, causing the queue to take up a lot of memory.

We want to bound this buffer so that it emits backpressure, but that step is in a reduce step.

cc @untitaker who has more context to this

john-z-yang avatar May 01 '24 00:05 john-z-yang

afaik it's generally used for batching which isn't a backpressure scenario. i believe the python version of this strategy also does not support backpressure -- is that supposed to be aligned as well?

due to streaming batching being implemented in rust-snuba, we can run into situations where accumulating a value cannot make progress temporarily. i assume the way the python implementation handles this is to simply block the entire main thread which isn't that good either.

untitaker avatar May 02 '24 18:05 untitaker

Since this PR is green, let's merge it. It's a good change regardless of whether we need it.

untitaker avatar May 17 '24 13:05 untitaker