streams icon indicating copy to clipboard operation
streams copied to clipboard

Utility buffer class for stream transformers

Open morganbarrett opened this issue 5 years ago • 3 comments

I was hoping to start a discussion about the class BufferTransformer that I've just mocked up. Is this an antipattern? Could this be a useful helper class in the spec?

BufferTransformer

class Lock {
  open : () => void;
  close : () => void;
  constructor(){
    (this.close = async () =>
      new Promise(r =>
        this.open = r
      )
    )();
  }
}

class BufferTransformer<I, O> implements Transformer<I[], O[]> {
  private buffer: I[] = [];
  private lock: Lock = new Lock();
  private terminator: I;

  constructor(terminator: I){
    this.terminator = terminator;
  }

  transform(chunk: I[]){
    this.buffer = this.buffer.concat(chunk);
    this.lock.open();
  }

  flush(){
    this.lock.open();
  }

  private async has(i: number): Promise<boolean> {
    if(this.buffer.length > i) return true;
    await this.lock.close();
    return this.buffer.length > i;
  }

  async read(): Promise<I> {
    return await this.has(0) ? this.buffer.shift() : this.terminator;
  }

  async peek(i): Promise<I> {
    return await this.has(i) ? this.buffer[i] : this.terminator;
  }

  reconsume(i: I){
    this.buffer.unshift(i);
  }
}

An example of its usage is shown below, it takes a stream of characters, reads an unknown amount of them and enqueues only 1 Token. This seems to be the opposite of how I've seen a transformer used, where a fixed buffer of data is given to the transformer and an unknown amount of things are then queued.

TokenizerTransformer

class TokenizerTransformer extends BufferTransformer<string, Token> {
  constructor(){
    super("");
  }

  async start(controller){
    let token: Token;
    while(token = await this.consumeToken()){
      controller.enqueue(token);
    }
  }
  
  async consumeToken(): Promise<Token>{
    await this.consumeComments();

    let c = await this.read();
    if(c === "") return;

    if(isWhitespace(c)){
      while(isWhitespace(await this.peek(0))){
        await this.read();
      }
      return {type: TokenType.whitespace};
    }
    
    if(isDigit(c)){
      this.reconsume(c);
      return this.consumeNumeric();
    }

    ...
  }

  ...
}

And another thing I was curious about is if I wanted to run this process on a string, is there no utility function or easier way than below to start the process?

const stringToStream = str => new ReadableStream<string>({
  start(controller){
    str.split("").forEach(c => 
      controller.enqueue(c)
    );
    controller.close();
  }
});

stringToStream("foo").pipeThrough(new TransformStream(new TokenizerTransformer()))

morganbarrett avatar Dec 18 '20 00:12 morganbarrett

P.S. sorry, this is all untested code, treat it as pseudocode.

morganbarrett avatar Dec 18 '20 00:12 morganbarrett

It's an interesting use case.

It looks like there's a lot of plumbing going on to turn the push-style transform() method into a pull-based read() method though (using an intermediate buffer). This is tricky to get right, especially if you need to consider backpressure. I'll have a think about it, see if there's a better way.

Could this be a useful helper class in the spec?

I think it's a bit too early to tell whether this will be useful enough to make it part of the platform. I suggest you start off by first writing it as a userland JavaScript library. 🙂

And another thing I was curious about is if I wanted to run this process on a string, is there no utility function or easier way than below to start the process?

We're working on making it easier to turn an array or any (async) iterable into a ReadableStream, see #1018. You'll then be able to do:

ReadableStream.from(str.split("")).pipeThrough(new TransformStream(new TokenizerTransformer()))

MattiasBuelens avatar Dec 18 '20 11:12 MattiasBuelens

It looks like there's a lot of plumbing going on to turn the push-style transform() method into a pull-based read() method though (using an intermediate buffer). This is tricky to get right, especially if you need to consider backpressure. I'll have a think about it, see if there's a better way.

I think this might be a slightly better solution, still no idea how to consider backpressure though.

PullTransformStream

export class PullTransformStream {
  constructor(transform) {
    let buffer = new Buffer();

    this.readable = new ReadableStream({
      async pull(controller){
        let arr = await transform(buffer);
        if(arr === undefined) controller.close();
        else controller.enqueue(arr);
      },
      cancel(){
        this.writable.close();
      }
    });

    this.writable = new WritableStream({
      write(chunk){
        chunk.forEach(c => buffer.push(c));
        buffer.resolve();
      },
      close(){
        buffer.resolve();
      },
      abort(){
        buffer.reject();
      }
    });
  }
}
export class Buffer extends Array {
  constructor(){
    this.resolve = () => {};
    this.reject = () => {};
  }

  async has(length){
    if(this.length > length) return true;

    await new Promise((resolve, reject) => {
      this.resolve = resolve;
      this.reject = reject;
    });

    return this.length > length;
  }
}

Example

export default class PreprocessTransformStream extends PullTransformStream {
  constructor(){
    super(async buffer => {
      if(!await buffer.has(1)) return;

      let c = buffer.shift();

      //replace form feed code point with a line feed
      if(c === "\u000C") return ["\u000A"];

      //replace any carriage return code points...
      if(c === "\u000D"){
        //...or pairs of carriage returns followed by a line feed...
        await buffer.has(2);
        if(buffer[0] === "\u000D" && buffer[1] === "\u000A"){
          buffer.splice(0, 2);
        }
        //...with a single line feed
        return ["\u000A"];
      }
      
      //replace any null or surrogate code points with a replacement character
      if(c === "\u0000" || (c >= "\uD800" && c <= "\uDFFF")){
        return ["\uFFFD"];
      }
      
      return [c];
    });
  }
}

I think it's a bit too early to tell whether this will be useful enough to make it part of the platform. I suggest you start off by first writing it as a userland JavaScript library. 🙂

and sorry, I meant more of an example. I see many people doing my original trail of thought which is something along the line of

class Transformation {
  readable: ReadableStream;

  constructor(readable: ReadableStream, writable: WritableStream){
    this.readable = readable;
    let writer = writable.getWriter();
    let token: Token;
    while(token = await this.consumeToken(readable)){
      writer.write([token]);
    }
  }

  async consumeToken(){
    this.consumeWhitespace();

    let [s1, s2] = this.readable.tee();
    let reader = s1.getReader();
    let {done, value} = await reader.read();

    if(done) return;

    switch(value){
      ...
    }
  }

  async consumeWhitespace(){
    let [s1, s2] = this.readable.tee();
    let reader = s1.getReader();
    let {done, value} = await reader.read();
    if(done || !isWhitespace(value)){
      this.readable = s2;
    } else {
      this.readable = s1;
      this.consumeWhitespace();
    }
  }

  ...
}

morganbarrett avatar Dec 20 '20 02:12 morganbarrett