Ensure observable methods return replay as needed
- Remove unnecessary usage of expectations in non-async tests
Feels like these should be made in Just (https://github.com/UrbanCompass/Snail/blob/main/Snail/Just.swift) instead of Observable. Seems like Observable should not behave like Replay.
Feels like these should be made in
Just(https://github.com/UrbanCompass/Snail/blob/main/Snail/Just.swift) instead ofObservable. Seems likeObservableshould not behave likeReplay.
@luispadron: I don't think that would be sufficient, because this issue affects all types that emit a value on subscription (Variable, Replay, etc), not only Just. Transformations like map currently trigger upstream sources immediately when chained, but since they don't in turn wait for their own subscribers, emitted values get lost during stream construction. E.g. this example
Variable(10)
.asObservable()
.map { 2 * $0 }
.subscribe(onNext: { print("Doubled: \($0)") })
prints nothing, because by the time the mapped observable is subscribed to, the Variable has already emitted its initial value.
All that said, after thinking about this a little more, I wonder if using Replay for all transformations might be going too far in the other direction? E.g. consider this example:
let stream = Observable<String>()
let identityStream = stream.map { $0 }
stream.on(.next("Nothing to see here"))
stream.on(.done)
stream.subscribe(onNext: { print("Value: \($0)") }) // prints nothing, as expected, since the stream was already ended
identityStream.subscribe(onNext: { print("Identity: \($0)") }) // prints "Nothing to see here"
It seems pretty unintuitive to me that stream and identityStream would behave differently here. Do you agree?
I wonder if a better approach would be to have transformations like map delay subscribing to their upstream observable until they themselves receive a subscriber? A rough POC would be something like
extension Observable {
class Mapped<U, O: ObservableType>: Observable<U> {
let source: O
let transform: (O.T) -> U
init(_ source: O, transform: @escaping (O.T) -> U) {
self.source = source
self.transform = transform
}
override func subscribe(queue: DispatchQueue? = nil, onNext: ((U) -> Void)? = nil, onError: ((Error) -> Void)? = nil, onDone: (() -> Void)? = nil) -> Subscriber<U> {
let subscriber = Subscriber(queue: queue, observable: self, handler: createHandler(onNext: onNext, onError: onError, onDone: onDone))
source.subscribe(queue: queue, onNext: {
let u = self.transform($0)
self.notify(subscriber: subscriber, event: .next(u))
}, onError: {
self.notify(subscriber: subscriber, event: .error($0))
}, onDone: {
self.notify(subscriber: subscriber, event: .done)
})
return subscriber
}
}
public func map<U>(_ transform: @escaping (T) -> U) -> Observable<U> {
Mapped(self, transform: transform)
}
}
This seems a little more intuitive to me, because it means that a complex stream with multiple transformations wouldn't be kicked off until subscribe is explicitly called at the end of the entire chain. That seems more in line with Combine and other Rx libraries I've seen, although I'm still not too familiar with Snail, so maybe there are other design considerations I'm overlooking?
Yeah great thoughts and discussion @chris-lapilla, totally agree that observable shouldn't guarantee anything about a buffer like Replay does, hence my comment about moving this to Just.
I think the solution you proposed is the correct way to go about this, this is similar to how combine does this as well, with their many different Publishers. However it would require more work and be a fairly large change, but if we plan to keep supporting snail this should be prioritized, as well as the memory leak issues.
Fundamentally, snail just doesn't behave like any standard implementation of reactive streams, and it's a learning experience for the team that we should prefer standard solutions over these.
We can also look at the implementations for RxSwift and Combine. Which is related to what @chris-lapilla proposed:
final private class MapSink<SourceType, Observer: ObserverType>: Sink<Observer>, ObserverType {
typealias Transform = (SourceType) throws -> ResultType
typealias ResultType = Observer.Element
typealias Element = SourceType
private let transform: Transform
init(transform: @escaping Transform, observer: Observer, cancel: Cancelable) {
self.transform = transform
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<SourceType>) {
switch event {
case .next(let element):
do {
let mappedElement = try self.transform(element)
self.forwardOn(.next(mappedElement))
}
catch let e {
self.forwardOn(.error(e))
self.dispose()
}
case .error(let error):
self.forwardOn(.error(error))
self.dispose()
case .completed:
self.forwardOn(.completed)
self.dispose()
}
}
}