streamz icon indicating copy to clipboard operation
streamz copied to clipboard

How to parametrize stream/pipeline creation?

Open anovv opened this issue 3 years ago • 1 comments

Hi, thanks for the great project!

What I'm trying to achieve is to create a custom builder interface/wrapper around streamz to have predefined building blocks of preconfigured streams which can be used together. Example


def _custom_stream_builder_1(input_stream):
    return input_stream.accumulate(_custom_acc, returns_state=True, start=_custom_state)


def _custom_stream_builder_2(input_stream):
    return input_stream.filter(lambda event: _custom_condition)


def _custom_stream_builder_3(input_stream):
    return input_stream.map(lambda event: _custom_map)

stream = Stream()
stream = _custom_stream_builder_1(_custom_stream_builder_2(_custom_stream_builder_3(stream)))
stream.sink(print)

for event in events:
    stream.emit(event)

However it looks like the code inside those functions does not alter the initial stream object and all emitted events go straight to sink. What am I doing wrong? Can you please point me in right direction?

Another question, what is the difference between


stream = Stream()
stream.sink(print)

for event in events:
    stream.emit(event)

and


stream = Stream()
stream = stream.sink(print) # any other function map/filter/etc. here

for event in events:
    stream.emit(event)

I feel like the answer is somewhere in this example but don't understand where. Thanks!

anovv avatar Oct 04 '22 12:10 anovv

The problem is, you have redefined what stream is, so that when you emit() you are emitting on the last node of your stream graph, not the input.

If you had

stream_in = Stream()
stream = _custom_stream_builder_1(_custom_stream_builder_2(_custom_stream_builder_3(stream_in)))
stream.sink(print)

for event in events:
    stream_in.emit(event)

you will get the behaviour you are after.

martindurant avatar Oct 07 '22 01:10 martindurant

Thanks @martindurant, that worked!

anovv avatar Jan 16 '23 06:01 anovv