Snail icon indicating copy to clipboard operation
Snail copied to clipboard

Ensure observable methods return replay as needed

Open wesbillman opened this issue 4 years ago • 4 comments

  • Remove unnecessary usage of expectations in non-async tests

wesbillman avatar Jul 16 '21 21:07 wesbillman

Feels like these should be made in Just (https://github.com/UrbanCompass/Snail/blob/main/Snail/Just.swift) instead of Observable. Seems like Observable should not behave like Replay.

luispadron avatar Jul 16 '21 22:07 luispadron

Feels like these should be made in Just (https://github.com/UrbanCompass/Snail/blob/main/Snail/Just.swift) instead of Observable. Seems like Observable should not behave like Replay.

@luispadron: I don't think that would be sufficient, because this issue affects all types that emit a value on subscription (Variable, Replay, etc), not only Just. Transformations like map currently trigger upstream sources immediately when chained, but since they don't in turn wait for their own subscribers, emitted values get lost during stream construction. E.g. this example

Variable(10)
    .asObservable()
    .map { 2 * $0 }
    .subscribe(onNext: { print("Doubled: \($0)") }) 

prints nothing, because by the time the mapped observable is subscribed to, the Variable has already emitted its initial value.

All that said, after thinking about this a little more, I wonder if using Replay for all transformations might be going too far in the other direction? E.g. consider this example:

let stream = Observable<String>()
let identityStream = stream.map { $0 }

stream.on(.next("Nothing to see here"))
stream.on(.done)

stream.subscribe(onNext: { print("Value: \($0)") }) // prints nothing, as expected, since the stream was already ended
identityStream.subscribe(onNext: { print("Identity: \($0)") }) // prints "Nothing to see here"

It seems pretty unintuitive to me that stream and identityStream would behave differently here. Do you agree?

I wonder if a better approach would be to have transformations like map delay subscribing to their upstream observable until they themselves receive a subscriber? A rough POC would be something like

extension Observable {
    class Mapped<U, O: ObservableType>: Observable<U> {
        let source: O
        let transform: (O.T) -> U

        init(_ source: O, transform: @escaping (O.T) -> U) {
            self.source = source
            self.transform = transform
        }

        override func subscribe(queue: DispatchQueue? = nil, onNext: ((U) -> Void)? = nil, onError: ((Error) -> Void)? = nil, onDone: (() -> Void)? = nil) -> Subscriber<U> {
            let subscriber = Subscriber(queue: queue, observable: self, handler: createHandler(onNext: onNext, onError: onError, onDone: onDone))

            source.subscribe(queue: queue, onNext: {
                let u = self.transform($0)
                self.notify(subscriber: subscriber, event: .next(u))
            }, onError: {
                self.notify(subscriber: subscriber, event: .error($0))
            }, onDone: {
                self.notify(subscriber: subscriber, event: .done)
            })

            return subscriber
        }
    }

    public func map<U>(_ transform: @escaping (T) -> U) -> Observable<U> {
        Mapped(self, transform: transform)
    }
}

This seems a little more intuitive to me, because it means that a complex stream with multiple transformations wouldn't be kicked off until subscribe is explicitly called at the end of the entire chain. That seems more in line with Combine and other Rx libraries I've seen, although I'm still not too familiar with Snail, so maybe there are other design considerations I'm overlooking?

chris-lapilla avatar Jul 19 '21 13:07 chris-lapilla

Yeah great thoughts and discussion @chris-lapilla, totally agree that observable shouldn't guarantee anything about a buffer like Replay does, hence my comment about moving this to Just.

I think the solution you proposed is the correct way to go about this, this is similar to how combine does this as well, with their many different Publishers. However it would require more work and be a fairly large change, but if we plan to keep supporting snail this should be prioritized, as well as the memory leak issues.

Fundamentally, snail just doesn't behave like any standard implementation of reactive streams, and it's a learning experience for the team that we should prefer standard solutions over these.

luispadron avatar Jul 19 '21 18:07 luispadron

We can also look at the implementations for RxSwift and Combine. Which is related to what @chris-lapilla proposed:


final private class MapSink<SourceType, Observer: ObserverType>: Sink<Observer>, ObserverType {
    typealias Transform = (SourceType) throws -> ResultType

    typealias ResultType = Observer.Element 
    typealias Element = SourceType

    private let transform: Transform

    init(transform: @escaping Transform, observer: Observer, cancel: Cancelable) {
        self.transform = transform
        super.init(observer: observer, cancel: cancel)
    }

    func on(_ event: Event<SourceType>) {
        switch event {
        case .next(let element):
            do {
                let mappedElement = try self.transform(element)
                self.forwardOn(.next(mappedElement))
            }
            catch let e {
                self.forwardOn(.error(e))
                self.dispose()
            }
        case .error(let error):
            self.forwardOn(.error(error))
            self.dispose()
        case .completed:
            self.forwardOn(.completed)
            self.dispose()
        }
    }
}

luispadron avatar Jul 20 '21 02:07 luispadron