Add Stream node constructor for sub-classing #442
This PR provides a constructor on the Stream which can be overridden for sub-classing the nodes.
This is to create or override methods without altering the original Stream class.
It gives a single point of entry to extend any node while chaining.
The behaviour remains the same unless Stream._new_node is overridden.
This is a usage example to subclass and inherit a Stream :
import streamz as sz
class MyStream(sz.Stream):
def _new_node(self, cls, args, kwargs):
if not issubclass(cls, MyStream):
cls = type(cls.__name__, (cls, MyStream), dict(cls.__dict__))
return cls(*args, **kwargs)
@MyStream.register_api()
class foo(sz.sinks.sink):
pass
stream = MyStream()
stream.map(lambda x: x + 1).foo(print)
stream.emit(100)
Please ping me if you manage to get tests to pass. In the meantime, please note the warning suggesting to add pytest-asyncio to the CI env.
All the tests were successful on my fork: https://github.com/florentbr/streamz/actions Not sure what you expect me to do about the warning since I don't see how it relates to this PR.
what you expect me to do about the warning
This might be a convenient time to add the dep, as a service to the repo
I don't understand what "add pytest-asyncio to the CI env." and "add the dep, as a service to the repo" means and implies. Could you please be more specific?
Never mind, I'll deal with it when I can
It's not really a new feature. This change just exposes the constructor for the nodes which was otherwise inaccessible. It's not meant for a general usage, but for developers who wish to extend/tweak this library. I've already documented the API with a usage example. So I don't know what kind of prose you are expecting since it relates to basic OOP which is well documented on the web.
It's not really a new feature.
You are demonstrating usage that was previously not possible - adding methods to a subclass but not to the classes of this library. I think it's worth demonstrating that this is possible!
I have not yet managed to figure out why the kafka tests are sometimes failing here.