highland icon indicating copy to clipboard operation
highland copied to clipboard

async mapping?

Open derrickpelletier opened this issue 10 years ago • 9 comments

Having trouble determining how to best read and async map, from a file with a consumer that pauses/resumes the stream.

My first attempt was something like the following:

var mapper = function (raw) {
   //...parsing was happening here
  return result;
};

var consumer = function (err, x, push, next) {
//... if not err or nil, write batch to db
// there is some detection here to pause and resume the stream to only have a few concurrent writes to the db at a time. 
};

var stream = _(fs.createReadStream(filepath)).split().map(mapper).batch(3000).consume(consumer);

This was working fine, and it was throttling with the pause resume with no problems. However, due to some changes i'm required to have the mapper parsing each row asynchronously, but i can't seem to figure out what I need to do to make that work with highland.

I tried changing mapper to

var mapper = _.wrapCallback(function (raw, cb) {
  //... async stuff,
  cb(null, result);
});

but after the first 500 items, i end up with the script spitting out a bunch of (node) warning: Recursive process.nextTick detected. This will break in the next version of node. Please use setImmediate for recursive deferral. until max callstack exceeded.

derrickpelletier avatar May 30 '15 23:05 derrickpelletier

The typical way to do async things with highland is via the flatMap operator.

stream.flatMap(function (raw) {
    return _(function (push, next) {
        // do work with raw here.
        // call push with the result when done.
        // Call push(null, _.nil); after that.
    }
});

Alternatively, if you have a node-style async function that does the work,

stream.flatMap(_.wrapCallback(asyncMapper));

This is the analogue of returning a promise for async operations from a then callback, if you're familiar with promises.

As a side note, if you want to throttle db writes, you can easily do it with parallel.

stream.errors(function (err) {
    // handle errors somehow.
    // Optional. Only if you care about handling errors from the mapper.
}).flatMap(function (batch) {
    return _(function (push, next) {
        // write to db.
        // Optionally call push(err) for write errors.
        // call push(null, _.nil) when done.
    });
}).parallel(2) // two concurrent writes at once.
.errors(function (err) {
    // handle db errors.
    // Only needed if your writes can produce errors or
    // you didn't handle mapper errors above.
})
.resume(); // Start the whole stream. You can delay this until you're ready.

vqvu avatar May 31 '15 01:05 vqvu

Actually, you're better off calling done here instead of resume, since resume will swallow errors if you forget to install an error handler.

In general, you're better off calling one of the consumption transforms than calling resume directly, as they'll make sure that unhandled errors don't get swallowed.

vqvu avatar May 31 '15 01:05 vqvu

Thanks for the response, @vqvu.

I do have an error handler in place, just didn't in that psuedo example above. I'll give your flatMap solution a try and see if that solves my problem.

derrickpelletier avatar May 31 '15 03:05 derrickpelletier

Well, it seems I'm still getting that problem with the (node) warning: Recursive process.nextTick detected.

The problem seems tied to the https://github.com/wdavidw/node-csv-parse module that I'm using inside the flatMap.

stream.flatMap(function (raw) {
    return _(function (push, next) {
        csv(raw, function (err, output) {
            // work with the output, do some more work to it
            push(null, parsed);
            push(null, _.nil);
        });
    });
});

I was avoiding using the node-parse-csv as a stream because i couldn't seem to get the backpressure from the db writes to hold back the file reading (some of these files are quite large). So now my only working solution is this:

_h(fs.createReadStream(self.filepath).pipe(require('csv-parse')()).map(syncMapper).filter(...).batch(3000).consume();

Do you have any suggestions for how i could determine if the fileread and csv parse are being affected by the backpressure at all?

derrickpelletier avatar May 31 '15 04:05 derrickpelletier

The recursive nextTick warnings seems like a csv-parse bug, unless the intent was to not be able to synchronously call the parser within the callback. Here's a test case.

var csv = require('csv-parse');

var i = 0;
function cb(err, x) {
    if (i++ < 500) {
        console.log(x);
        csv('1, 2, 3, 4', cb);
    }
}

csv('1, 2, 3, 4', cb);

What's happening here is that Highland will do things synchronously as much as possible. The call to push(null, _.nil) will push data down to the batch, which then asks for more data (because there's more space for the buffer). Usually, there'll be more data already available because of your use of split (the file read stream tend to more than one line at a time). That data then gets immediately pushed down through the csv parser, thus causing a recursive call into the csv parser and thus process.nextTick.

You should be able to just pipe through the csv-parse stream directly (without split, since csv-parse understands Buffer) using through

_(fs.createReadStream(self.filepath))
    .through(csv())
    .batch(5000)
    ...

There's no really good way to check that back pressure is being applied, but you can override write to see when the stream is begin written to. This could serve as a proxy for how often the file is read from.

var s = _(fs.createReadStream(self.filepath));

// Use for debug/testing only!
var oldWrite = s.write;
s.write = function (x) {
    console.log('Written to.');
    return oldWrite.call(this, x);
};

s.through(csv())
    .batch(3000)
    .doto(function () {
        console.log('batch');
    })
    .resume();

You should see interleavings of batch and Written to..

Edit: You can combine the method above with large pauses in the pipeline to see if data is being written to regardless.

// Decorate write here.

s.through(csv())
    .batch(3000)
    .flatMap(function (x) {
        return _(function (push, next) {
            // Simulate large DB write delay.
            console.log('Pausing.');
            setTimeout(function () {
                push(null, x);
                push(null, _.nil);
            }, 2000);
        });
    })
    .resume();

No backpressure means you will still see lots of Written to.. Otherwise, you'll see a few every 2 seconds.

vqvu avatar May 31 '15 06:05 vqvu

Ok, I tried it out with a large file. Here's what I got (with node 0.10).

var decorate = _.curry(function decorate(str, s) {
    var oldWrite = s.write;
    s.write = function (x) {
        console.log('Written from', str);
        return oldWrite.call(this, x);
    };
    return s;
});

var s = _(fs.createReadStream('large.csv'))
    .through(decorate('file'))
    .through(csv())
    .through(decorate('csv'))
    .batch(3000)
    .flatMap(function (x) {
        var self = this;
        return _(function (push, next) {
            console.log('Pausing.');
            setTimeout(function () {
                push(null, x);
                push(null, _.nil);
            }, 500);
        });
    })
    .resume();

Result

Written from file
Written from csv
...
Written from csv x3000
...
Pausing.
Written from file
...
Written from file x a bunch
...
Written from csv x3000
Pausing.
*paused for 2 seconds*
Written from csv x3000
...

It looks like there's backpressure from between csv -> batch but not from fs -> csv.

So I went digging in the node source. There is backpressure, but 0.10 has really terrible defaults for objectMode streams. The highWaterMark is always 16k even in object mode. This means the csv-parse stream will read until it has 16k Buffers! Each of those objects are probably 64KB, and so you end up with effectively no backpressure at all.

Switching to 0.12 makes things better, since the default highWaterMark is 16. Ideally, csv-parse would set writableObjectMode = true, but they'd be giving up 0.10 compatibility in the process, so that's really up to them.

A workaround is to call csv({highWaterMark: 2}) (or some other small number), since csv-parse passes the option object directly to the stream constructor. It's a workaround though, because highWaterMark isn't an official parser option.

The other workaround is to use Highland's backpressure mechanism instead of regular stream's. You'll need to get csv-parse to fix their recursive call bug first.

var s = _(fs.createReadStream('large.csv'))
    .through(decorate('file'))
    .split()
    .flatMap(_.wrapCallback(csv))
    .batch(3000)
    .flatMap(function (x) {
        var self = this;
        return _(function (push, next) {
            console.log('Pausing.');
            setTimeout(function () {
                push(null, x);
                push(null, _.nil);
            }, 500);
        });
    })
    .resume();

The problem is that you'll be creating a new parser for every line. This may or may not be prohibitively expensive to do.

vqvu avatar May 31 '15 08:05 vqvu

@vqvu Hopefully this isn't off-topic. You mention that the typical way of doing async stuff is flatMap to a function that returns a generator stream. In your opinion, how does that compare with using consume with a consumer function? Is the former preferred because it reads better? Or are there also reasons why consume is a less-desirable choice?

juanpaco avatar Sep 25 '15 16:09 juanpaco

If you are using flatMap purely to do async computations, that is, if the generator stream that you return will always emit one or zero values, then flatMap is preferred only because it reads better than consume. Consumer functions require quite a bit of boilerplate.

However, if you're extending the stream (i.e., your generator stream returns emits values), then flatMap also comes with built-in backpressure handling, which you wouldn't be able to implement easily with consume.

That said, flatMap is implemented with consume and forces you to create lots of short-lived streams, so in theory consume has lower overhead (both computation and memory). Depending on your application, this may or may not be important.

Also, from a readability standpoint, it's also possible to factor out complicated logic into a function that can be used with through, so your choices are not necessarily restricted to readable flatMap vs boilerplate-y consume.

vqvu avatar Sep 26 '15 04:09 vqvu

Thank you.

juanpaco avatar Sep 26 '15 11:09 juanpaco