refactor(client): Resend return type
Functional changes
TODO: update changelog: that resend subscriptions can be unsubscribed
TODO: remove Subscription#unsubscribe test cases from unsubscribe.test.ts (maybe replace with unit test which ensures that Subscription#unsubscribe is called)
TODO: how safe we should do: should we add try-finally to doUnsubscribe so that event is always emitted? OTOH, the return() runs our own code
StreamrClient#resend returns Subscription. Therefore a resend subscription has exactly same API as realtime subscription, and it is possible to call unsubscribe to it. The subscription is included in StreamrClient#getSubscriptions list until it has been iterated completely (or user manually unsubscribes).
There is a new internal unsubscribe event in Subscription. If needed, we could expose this event to be part of public API.
As resend is now subscription, it now emits resendComplete event when it has been completely iterated.
Added pOnce limit to Subscription#unsubscribe so that if Subscription#unsubscribe is called multiple times, only the first invocation emits the unsubscribe event.
Renamed message listener interface SubscriptionOnMessage/MessageStreamOnMessage to MessageListener
Refactored unsubscribe
Simplified the interaction between StreamrClient#unsubscribe and Subscription#unsubscribe.
Before this PR StreamrClient#unsubscribe and Subscription#unsubscribe were kind of separate actions. A StreamrClient#unsubscribe call invoked Subscription#unsubscribe via Subscriber.unsubscribe -> Subscriber.remove -> SubscriptionSession.remove chain (if the subscription was not in "done" state). And Subscription#unsubscribe removed the subscription from active subscriptions via sub.onBeforeFinally, which it triggeddc by calling this.return().
Now StreamrClient#unsubscribe calls Subscription#unsubscribe via Subscriber#unsubscribe or via Resends#unsubscribe. A subscription is removed from active subscriptions when unsubscribe event in received in SubscribeSession or in Resends.
This way we ensure that Subscriber#unsubscribe is always called by StreamrClient#unsubscribe (even if was in "done" state) and unsubscribe events are always emitted only once.
There is a small functionality change related to "auto-unsubscribe" feature. That feature works like this: when an iterator returns (e.g. because of break statement in a for await loop), onBeforeFinally is triggered. It then removes the subscription from active subscriptions.
- Earlier the
onBeforeFinallycalledthis.remove()and blocked before the removal was done - Now we call
sub.unsubscribe()as a background task - In practice the behavior is the same, just now as a background operation
Other refactoring
- Simplified
useLegacyOnMessageHandlercalls: now the calls are made directly fromStreamrClientmethods- the
onMessageparameter is now required inuseLegacyOnMessageHandler
- the
-
Subscriber#subscribemethod takesstreamPartIdparameter instead ofStreamDefinition - Removed
MessageStreamclass as all message streams are nowSubscriptioninstances- there was a method
collectContent: migrated calls to that method to usePipeline#collectinstead
- there was a method
- Removed
SubscribePipelineOptions#messageStream(each called passed a newStreamMessageas an argument) - Extracted some test cases from
StreamrClient.test.tsto newunsubscribe.test.tsand added new test cases - Renamed
SubscriptionPipeline.tstosubscriptionPipeline.tsas it is not a class - In
StreamrClient#destroywe callStreamrClient#unsubscribeinstead ofSubscriber#stop- that does the same thing as previously (removes all subscriptions), but is more straightforward and readable
Test coverage
-
StreamrClient.test.ts: removed"unsubscribe before subscribed"test case- If both subscribe and unsubscribe are done in parallel, it is maybe ok that the state of the subscription is not well-defined? It may be in subscribed state, if unsubscribe request was processed before the unsubscribe request. (It can be unsubscribed state if the requests were processed in opposite order.)
- Also removed
does not have subscription yetcheck from"unsubscribe after subscribed"test case for the same reason (that test is now inunsubscribe.test.ts)
-
Resends2.test.ts: removed"can return before start"test case- It tested how the subscription behaves when internal Subscription#return method is called. As it is an internal method, we don't need to test that aspect.
-
Resends2.test.ts: fixed"decodes resent messages correctly"test case- it compared signatures of content objects not
StreamMessageobjects
- it compared signatures of content objects not
- Divided
MessageStream.test.tsunit test toSubscription.test.tstest andPushPipeline.test.ts- removed some test cases which used
Readable(the pull functionality is already covered by other tests)
- removed some test cases which used
Future improvements
- As resends subscriptions can be manually unsubscribed, it would make sense that an ongoing HTTP request would be aborted if it still ongoing.
- We should maybe expose
Subscriptionas an interface. The interface would have the current capabilities: it is async iterable, it hasstreamPartIdproperty and hasunsubscribemethod.- in some tests we call Subcriptions's pipeline method, e.g.
sub.return(). Maybe those tests should be removed/rewritten so that they don't use these internal methods.
- in some tests we call Subcriptions's pipeline method, e.g.
- Is it currently possible to have two iterators for the same subscription? Should it be possible? (And in that case, breaking out from one iterator should not affect another iterator).
- Is it good that we have the "auto-unsubscribe" feature. See a comment about "auto-unsubscribe in
SubscriptionSession:172. -
SubscriptionSessionimplements pending subscription removal for some reason. Is that feature needed, or could we simplify it so that removing a subscription from subscription list would be a synchronous operation?
Checklist before requesting a review
- [ ] Is this a breaking change? If it is, be clear in summary.
- [ ] Read through code myself one more time.
- [ ] Make sure any and all
TODOcomments left behind are meant to be left in. - [ ] Has reasonable passing test coverage?
- [ ] Updated changelog if applicable.
- [ ] Updated documentation if applicable.