promesa
promesa copied to clipboard
implement merge for channels
I've been using and enjoying the csp channels, however I ran a missing function while working with a csp/mult. In clojure.core.async there is a merge function to take n channels and merge the values as they become available onto the given channel, and I think there should be an analogue for promesa.exec.csp.
Here's a rough sketch of how I'm using it:
(def north-xf (map (fn [[x y]] [:north x y])))
(def south-xf (map (fn [[x y]] [:south x y])))
;; this is the source chan, a series of [x y] coordinates
(def coords-ch (csp/chan))
;; create a mult from the source chan
(def mx (csp/mult* coords-ch))
;; tap values from the mult into the respective work chans
(def north (let [ch (csp/chan :buf 1 :xf north-xf)]
(csp/tap! mx ch)))
(def south (let [ch (csp/chan :buf 1 :xf south-xf)]
(csp/tap! mx ch)))
;; load up the source chan with coordinates
(csp/onto-chan! coords-ch (partition-all 2 (range 10)))
;; this is what I want to do to consolidate all the work done:
(merge [north south])
I've come up with my own version of merge (taking inspiration from clojure.core.async) that seems to be working fine on the jvm, I haven't had to use it in CLJS yet.
(require '[promesa.exec.csp :as csp])
(defn merge
[chans & {:keys [buf xf exh exc] :as opts}]
(let [out (csp/chan opts)]
(csp/go-loop [cs (vec chans)]
(if (pos? (count cs))
(let [[v ch] (csp/alts! cs)]
(if (nil? v)
(recur (filterv #(not= ch %) cs))
(do (csp/put! out v)
(recur cs))))
(csp/close! out)))
out))
I'd be happy to put together a PR for it if this is the direction you want to go.