promesa icon indicating copy to clipboard operation
promesa copied to clipboard

implement merge for channels

Open dpetranek opened this issue 1 year ago • 0 comments

I've been using and enjoying the csp channels, however I ran a missing function while working with a csp/mult. In clojure.core.async there is a merge function to take n channels and merge the values as they become available onto the given channel, and I think there should be an analogue for promesa.exec.csp.

Here's a rough sketch of how I'm using it:

(def north-xf (map (fn [[x y]] [:north x y])))
(def south-xf (map (fn [[x y]] [:south x y])))

;; this is the source chan, a series of [x y] coordinates
(def coords-ch (csp/chan))
;; create a mult from the source chan
(def mx (csp/mult* coords-ch))
;; tap values from the mult into the respective work chans
(def north (let [ch (csp/chan :buf 1 :xf north-xf)]
               (csp/tap! mx ch)))
(def south (let [ch (csp/chan :buf 1 :xf south-xf)]
               (csp/tap! mx ch)))

;; load up the source chan with coordinates
(csp/onto-chan! coords-ch (partition-all 2 (range 10)))

;; this is what I want to do to consolidate all the work done:
(merge [north south])

I've come up with my own version of merge (taking inspiration from clojure.core.async) that seems to be working fine on the jvm, I haven't had to use it in CLJS yet.

(require '[promesa.exec.csp :as csp])

(defn merge
  [chans & {:keys [buf xf exh exc] :as opts}]
  (let [out (csp/chan opts)]
    (csp/go-loop [cs (vec chans)]
      (if (pos? (count cs))
        (let [[v ch] (csp/alts! cs)]
          (if (nil? v)
            (recur (filterv #(not= ch %) cs))
            (do (csp/put! out v)
                (recur cs))))
        (csp/close! out)))
    out))

I'd be happy to put together a PR for it if this is the direction you want to go.

dpetranek avatar Aug 03 '24 05:08 dpetranek