Adds ConcatMap operator
Resolves #41.
@freak4pc mentioned in #41, that we could try using .flatMap(maxPublishers: 1, transform:) to achieve the functionality of ConcatMap. I added some tests to verify if this is true, but unfortunately it's not that easy. ;)
Unexpected behaviour:
-
.flatMap(maxPublishers: 1, transform:)does not buffer mapped publishers, discarding all publishers if there is an active one, leading to missing values.
Codecov Report
Merging #68 (4caf28f) into main (730d272) will increase coverage by
0.02%. The diff coverage is97.63%.
:exclamation: Current head 4caf28f differs from pull request most recent head 80ba6dd. Consider uploading reports for the commit 80ba6dd to get more accurate results
@@ Coverage Diff @@
## main #68 +/- ##
==========================================
+ Coverage 97.12% 97.15% +0.02%
==========================================
Files 62 64 +2
Lines 3336 3546 +210
==========================================
+ Hits 3240 3445 +205
- Misses 96 101 +5
| Impacted Files | Coverage Δ | |
|---|---|---|
| Sources/Operators/ConcatMap.swift | 94.73% <94.73%> (ø) |
|
| Sources/Common/DemandBuffer.swift | 100.00% <100.00%> (ø) |
|
| Tests/ConcatMapTests.swift | 100.00% <100.00%> (ø) |
Continue to review full report at Codecov.
Legend - Click here to learn more
Δ = absolute <relative> (impact),ø = not affected,? = missing dataPowered by Codecov. Last update 730d272...80ba6dd. Read the comment docs.
A gave it a go and implemented Publishers.ConcatMap. As this is my first custom combine operator / Publisher, I'm happy about any kind of feedback. 😸
Hey @ohitsdaniel - are you interested in finishing this PR? Thanks :)
Yes, just didn't find the time yet as I have been focusing on other projects. I'll see if I can squeeze it in next week. :)
Quick update from my side: I scheduled some time this Friday to work on this. :)
@freak4pc
Re-implemented it, pretty closely following the RxSwift implementation while not over-generifying the implementation. Main idea is that we have an inner and outer sink.
The OuterSink observes the stream of upstream values and has .unlimited demand. Whenever it receives a value, it transforms it into a new publisher (NewPublisher) and hands it to the inner sink.
The InnerSink observes a stream of NewPublisher values and manages the demand. Whenever it receives a completion, it subscribes to the next publisher, if there is one. Whenever the outer sink hands in a NewPublisher, the inner sink either directly subscribes to the new publisher, if it currently has no active subscription, or queues it, so that it is subscribed to whenever the currently active subscription ends (i.e. the inner sink receives a successful completion).
Had to add a remainingDemand field to DemandBuffer, so that the inner sink can request more values from the next subscription if the downstream demand hasn't been fulfilled.
@freak4pc Any plans to move on with this?
Hey, are there any plans to merge this?
bump