How to parametrize stream/pipeline creation?
Hi, thanks for the great project!
What I'm trying to achieve is to create a custom builder interface/wrapper around streamz to have predefined building blocks of preconfigured streams which can be used together. Example
def _custom_stream_builder_1(input_stream):
return input_stream.accumulate(_custom_acc, returns_state=True, start=_custom_state)
def _custom_stream_builder_2(input_stream):
return input_stream.filter(lambda event: _custom_condition)
def _custom_stream_builder_3(input_stream):
return input_stream.map(lambda event: _custom_map)
stream = Stream()
stream = _custom_stream_builder_1(_custom_stream_builder_2(_custom_stream_builder_3(stream)))
stream.sink(print)
for event in events:
stream.emit(event)
However it looks like the code inside those functions does not alter the initial stream object and all emitted events go straight to sink. What am I doing wrong? Can you please point me in right direction?
Another question, what is the difference between
stream = Stream()
stream.sink(print)
for event in events:
stream.emit(event)
and
stream = Stream()
stream = stream.sink(print) # any other function map/filter/etc. here
for event in events:
stream.emit(event)
I feel like the answer is somewhere in this example but don't understand where. Thanks!
The problem is, you have redefined what stream is, so that when you emit() you are emitting on the last node of your stream graph, not the input.
If you had
stream_in = Stream()
stream = _custom_stream_builder_1(_custom_stream_builder_2(_custom_stream_builder_3(stream_in)))
stream.sink(print)
for event in events:
stream_in.emit(event)
you will get the behaviour you are after.
Thanks @martindurant, that worked!