how to wait for async operations inside iterator of: Stream.reduce
When consuming a stream with .reduce, I'd like to be able to do some async operations inside the iterator/reducer function and have the stream wait for their completion before consuming next element.
Here's what I'm doing right now:
const H = require('highland')
const Promise = require('aigle')
const reducer = async (memoPromise, x) => {
console.log('got x:', x)
const memo = await memoPromise
console.log('processing x:', x)
return Promise.delay(10).then(() => memo + x)
}
H([1, 2, 3])
.reduce(reducer, 0)
.toPromise(Promise)
.then(console.log)
However the stream is being consumed as fast as possible, without waiting for async operation inside the reducer to complete. The output of running the above is:
got x: 1
got x: 2
got x: 3
processing x: 1
processing x: 2
processing x: 3
6
while I'd like it to be:
got x: 1
processing x: 1
got x: 2
processing x: 2
got x: 3
processing x: 3
6
You can use wrapAsync which converts a promise returning function (e.g. an async fn) into a stream which emits a single value, e.g.
const wrapAsync = (
fn
) => {
return (...args) => {
let promise;
try {
promise = fn.apply(_, args);
if (!_.isObject(promise) || !_.isFunction(promise.then)) {
return _.fromError(
new Error('Wrapped function did not return a promise')
);
}
return _(promise);
} catch (e) {
return _.fromError(e);
}
};
};
_([1, 2, 3])
.flatMap(wrapAsync(async (val) => {
await sleep(4000);
return val * 2;
}))
.toArray((result) => console.log('DONE', result));
@richardscarrott thanks for the tip. I am already using a similar approach with flatMap, however I'd like to use the same technique with reduce so that I can pass some state along as I'm processing individual elements. I know I can achieve a similar result by using flatMap with some mutable state in the outer scope but I'd prefer to avoid it.