highland icon indicating copy to clipboard operation
highland copied to clipboard

how to wait for async operations inside iterator of: Stream.reduce

Open adrian-gierakowski opened this issue 5 years ago • 2 comments

When consuming a stream with .reduce, I'd like to be able to do some async operations inside the iterator/reducer function and have the stream wait for their completion before consuming next element.

Here's what I'm doing right now:

const H = require('highland')
const Promise = require('aigle')

const reducer = async (memoPromise, x) => {
  console.log('got x:', x)
  const memo = await memoPromise
  console.log('processing x:', x)
  return Promise.delay(10).then(() => memo + x)
}

H([1, 2, 3])
  .reduce(reducer, 0)
  .toPromise(Promise)
  .then(console.log)

However the stream is being consumed as fast as possible, without waiting for async operation inside the reducer to complete. The output of running the above is:

got x: 1
got x: 2
got x: 3
processing x: 1
processing x: 2
processing x: 3
6

while I'd like it to be:

got x: 1
processing x: 1
got x: 2
processing x: 2
got x: 3
processing x: 3
6

adrian-gierakowski avatar Aug 05 '20 16:08 adrian-gierakowski

You can use wrapAsync which converts a promise returning function (e.g. an async fn) into a stream which emits a single value, e.g.

const wrapAsync = (
  fn
) => {
  return (...args) => {
    let promise;
    try {
      promise = fn.apply(_, args);
      if (!_.isObject(promise) || !_.isFunction(promise.then)) {
        return _.fromError(
          new Error('Wrapped function did not return a promise')
        );
      }
      return _(promise);
    } catch (e) {
      return _.fromError(e);
    }
  };
};
_([1, 2, 3])
  .flatMap(wrapAsync(async (val) => {
     await sleep(4000);
     return val * 2;
  }))
  .toArray((result) => console.log('DONE', result));

richardscarrott avatar Sep 22 '20 09:09 richardscarrott

@richardscarrott thanks for the tip. I am already using a similar approach with flatMap, however I'd like to use the same technique with reduce so that I can pass some state along as I'm processing individual elements. I know I can achieve a similar result by using flatMap with some mutable state in the outer scope but I'd prefer to avoid it.

adrian-gierakowski avatar Nov 26 '20 09:11 adrian-gierakowski