Utility buffer class for stream transformers
I was hoping to start a discussion about the class BufferTransformer that I've just mocked up. Is this an antipattern? Could this be a useful helper class in the spec?
BufferTransformer
class Lock {
open : () => void;
close : () => void;
constructor(){
(this.close = async () =>
new Promise(r =>
this.open = r
)
)();
}
}
class BufferTransformer<I, O> implements Transformer<I[], O[]> {
private buffer: I[] = [];
private lock: Lock = new Lock();
private terminator: I;
constructor(terminator: I){
this.terminator = terminator;
}
transform(chunk: I[]){
this.buffer = this.buffer.concat(chunk);
this.lock.open();
}
flush(){
this.lock.open();
}
private async has(i: number): Promise<boolean> {
if(this.buffer.length > i) return true;
await this.lock.close();
return this.buffer.length > i;
}
async read(): Promise<I> {
return await this.has(0) ? this.buffer.shift() : this.terminator;
}
async peek(i): Promise<I> {
return await this.has(i) ? this.buffer[i] : this.terminator;
}
reconsume(i: I){
this.buffer.unshift(i);
}
}
An example of its usage is shown below, it takes a stream of characters, reads an unknown amount of them and enqueues only 1 Token. This seems to be the opposite of how I've seen a transformer used, where a fixed buffer of data is given to the transformer and an unknown amount of things are then queued.
TokenizerTransformer
class TokenizerTransformer extends BufferTransformer<string, Token> {
constructor(){
super("");
}
async start(controller){
let token: Token;
while(token = await this.consumeToken()){
controller.enqueue(token);
}
}
async consumeToken(): Promise<Token>{
await this.consumeComments();
let c = await this.read();
if(c === "") return;
if(isWhitespace(c)){
while(isWhitespace(await this.peek(0))){
await this.read();
}
return {type: TokenType.whitespace};
}
if(isDigit(c)){
this.reconsume(c);
return this.consumeNumeric();
}
...
}
...
}
And another thing I was curious about is if I wanted to run this process on a string, is there no utility function or easier way than below to start the process?
const stringToStream = str => new ReadableStream<string>({
start(controller){
str.split("").forEach(c =>
controller.enqueue(c)
);
controller.close();
}
});
stringToStream("foo").pipeThrough(new TransformStream(new TokenizerTransformer()))
P.S. sorry, this is all untested code, treat it as pseudocode.
It's an interesting use case.
It looks like there's a lot of plumbing going on to turn the push-style transform() method into a pull-based read() method though (using an intermediate buffer). This is tricky to get right, especially if you need to consider backpressure. I'll have a think about it, see if there's a better way.
Could this be a useful helper class in the spec?
I think it's a bit too early to tell whether this will be useful enough to make it part of the platform. I suggest you start off by first writing it as a userland JavaScript library. 🙂
And another thing I was curious about is if I wanted to run this process on a string, is there no utility function or easier way than below to start the process?
We're working on making it easier to turn an array or any (async) iterable into a ReadableStream, see #1018. You'll then be able to do:
ReadableStream.from(str.split("")).pipeThrough(new TransformStream(new TokenizerTransformer()))
It looks like there's a lot of plumbing going on to turn the push-style
transform()method into a pull-basedread()method though (using an intermediate buffer). This is tricky to get right, especially if you need to consider backpressure. I'll have a think about it, see if there's a better way.
I think this might be a slightly better solution, still no idea how to consider backpressure though.
PullTransformStream
export class PullTransformStream {
constructor(transform) {
let buffer = new Buffer();
this.readable = new ReadableStream({
async pull(controller){
let arr = await transform(buffer);
if(arr === undefined) controller.close();
else controller.enqueue(arr);
},
cancel(){
this.writable.close();
}
});
this.writable = new WritableStream({
write(chunk){
chunk.forEach(c => buffer.push(c));
buffer.resolve();
},
close(){
buffer.resolve();
},
abort(){
buffer.reject();
}
});
}
}
export class Buffer extends Array {
constructor(){
this.resolve = () => {};
this.reject = () => {};
}
async has(length){
if(this.length > length) return true;
await new Promise((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
return this.length > length;
}
}
Example
export default class PreprocessTransformStream extends PullTransformStream {
constructor(){
super(async buffer => {
if(!await buffer.has(1)) return;
let c = buffer.shift();
//replace form feed code point with a line feed
if(c === "\u000C") return ["\u000A"];
//replace any carriage return code points...
if(c === "\u000D"){
//...or pairs of carriage returns followed by a line feed...
await buffer.has(2);
if(buffer[0] === "\u000D" && buffer[1] === "\u000A"){
buffer.splice(0, 2);
}
//...with a single line feed
return ["\u000A"];
}
//replace any null or surrogate code points with a replacement character
if(c === "\u0000" || (c >= "\uD800" && c <= "\uDFFF")){
return ["\uFFFD"];
}
return [c];
});
}
}
I think it's a bit too early to tell whether this will be useful enough to make it part of the platform. I suggest you start off by first writing it as a userland JavaScript library. 🙂
and sorry, I meant more of an example. I see many people doing my original trail of thought which is something along the line of
class Transformation {
readable: ReadableStream;
constructor(readable: ReadableStream, writable: WritableStream){
this.readable = readable;
let writer = writable.getWriter();
let token: Token;
while(token = await this.consumeToken(readable)){
writer.write([token]);
}
}
async consumeToken(){
this.consumeWhitespace();
let [s1, s2] = this.readable.tee();
let reader = s1.getReader();
let {done, value} = await reader.read();
if(done) return;
switch(value){
...
}
}
async consumeWhitespace(){
let [s1, s2] = this.readable.tee();
let reader = s1.getReader();
let {done, value} = await reader.read();
if(done || !isWhitespace(value)){
this.readable = s2;
} else {
this.readable = s1;
this.consumeWhitespace();
}
}
...
}