Streams: finished Change in behavoir between 22.2.0 and 22.9.0. Throws Exception (Unhandled Rejection)
Version
22.9.0 (Docker Latest)
Platform
Linux (Docker Latest)
root@0cd652c8fcb1:/# uname -a
Linux 0cd652c8fcb1 5.15.0-84-generic #93-Ubuntu SMP Tue Sep 5 17:16:10 UTC 2023 x86_64 GNU/Linux
Subsystem
STREAMS/PROMISES FINISHED
What steps will reproduce the bug?
Run the following code on 22.2 and 22.9 and note changed behavior - Unhandled Rejection
const { PassThrough} = require('stream');
const { pipeline, finished, } = require('stream/promises');
const fs = require('fs');
class MyTransform extends PassThrough {
constructor(is) {
super()
this.is = is
this.counter = 0
}
async _transform(data,enc,callback) {
this.counter++
this.push(data)
if (this.counter > 100) {
this.is.close()
}
callback()
}
}
async function runPipeline() {
const is = fs.createReadStream('input.txt')
is.on('error',(err) => {
console.log(is.constructor.name,err)
})
const t = new MyTransform(is)
t.on('error',(err) => {
console.log(t.constructor.name,err)
})
const os = fs.createWriteStream('output.txt')
os.on('error',(err) => {
console.log(os.constructor.name,err)
})
const streams = [is,t,os]
const activeStreams = streams.map((s) => {
return finished(s)
})
console.log(activeStreams)
try {
await pipeline(...streams);
console.log(t.counter)
} catch (err) {
console.log(1)
console.log(activeStreams)
await Promise.allSettled(activeStreams)
console.log(2)
console.log(activeStreams)
console.error('Pipeline error:', err);
}
}
process.on('unhandledRejection', (e,p) => {
console.log("Unhandled",e,p)
})
runPipeline().then(() => {console.log('success')}).catch((e) => {console.log(e)})
How often does it reproduce? Is there a required condition?
Always
What is the expected behavior? Why is that the expected behavior?
The 22.2.0 Behavoir appears to be correct to me
C:\Development\YADAMU\src\scratch\streams>docker cp input.txt NODE-22-2:/tmp
C:\Development\YADAMU\src\scratch\streams>docker cp test1.js NODE-22-2:/tmp
C:\Development\YADAMU\src\scratch\streams>docker exec -it NODE-22-2 bash
root@c873fb41f508:/# cd tmp
root@c873fb41f508:/tmp# node -v
v22.2.0
root@c873fb41f508:/tmp# node test1.js
[ Promise { <pending> }, Promise { <pending> }, Promise { <pending> } ]
MyTransform Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
at ReadStream.emit (node:events:532:35)
at emitCloseNT (node:internal/streams/destroy:148:10)
at process.processTicksAndRejections (node:internal/process/task_queues:81:21) {
code: 'ERR_STREAM_PREMATURE_CLOSE'
}
1
[
Promise {
<rejected> Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
at ReadStream.emit (node:events:532:35)
at emitCloseNT (node:internal/streams/destroy:148:10)
at process.processTicksAndRejections (node:internal/process/task_queues:81:21) {
code: 'ERR_STREAM_PREMATURE_CLOSE'
}
},
Promise {
<rejected> Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
at ReadStream.emit (node:events:532:35)
at emitCloseNT (node:internal/streams/destroy:148:10)
at process.processTicksAndRejections (node:internal/process/task_queues:81:21) {
code: 'ERR_STREAM_PREMATURE_CLOSE'
}
},
Promise { <pending> }
]
WriteStream Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
at ReadStream.emit (node:events:532:35)
at emitCloseNT (node:internal/streams/destroy:148:10)
at process.processTicksAndRejections (node:internal/process/task_queues:81:21) {
code: 'ERR_STREAM_PREMATURE_CLOSE'
}
2
[
Promise {
<rejected> Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
at ReadStream.emit (node:events:532:35)
at emitCloseNT (node:internal/streams/destroy:148:10)
at process.processTicksAndRejections (node:internal/process/task_queues:81:21) {
code: 'ERR_STREAM_PREMATURE_CLOSE'
}
},
Promise {
<rejected> Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
at ReadStream.emit (node:events:532:35)
at emitCloseNT (node:internal/streams/destroy:148:10)
at process.processTicksAndRejections (node:internal/process/task_queues:81:21) {
code: 'ERR_STREAM_PREMATURE_CLOSE'
}
},
Promise {
<rejected> Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
at ReadStream.emit (node:events:532:35)
at emitCloseNT (node:internal/streams/destroy:148:10)
at process.processTicksAndRejections (node:internal/process/task_queues:81:21) {
code: 'ERR_STREAM_PREMATURE_CLOSE'
}
}
]
Pipeline error: Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at MyTransform.<anonymous> (node:internal/streams/pipeline:417:14)
at MyTransform.emit (node:events:532:35)
at emitCloseNT (node:internal/streams/destroy:148:10)
at emitErrorCloseNT (node:internal/streams/destroy:130:3)
at process.processTicksAndRejections (node:internal/process/task_queues:82:21) {
code: 'ERR_STREAM_PREMATURE_CLOSE'
}
success
root@c873fb41f508:/tmp#
root@c873fb41f508:/tmp#
root@c873fb41f508:/tmp#
exit
What do you see instead?
root@0cd652c8fcb1:/tmp# node test1.js
[ Promise { <pending> }, Promise { <pending> }, Promise { <pending> } ]
MyTransform Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
at ReadStream.emit (node:events:531:35)
at emitCloseNT (node:internal/streams/destroy:148:10)
at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
code: 'ERR_STREAM_PREMATURE_CLOSE'
}
Unhandled Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
at ReadStream.emit (node:events:531:35)
at emitCloseNT (node:internal/streams/destroy:148:10)
at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
code: 'ERR_STREAM_PREMATURE_CLOSE'
} Promise {
<rejected> Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
at ReadStream.emit (node:events:531:35)
at emitCloseNT (node:internal/streams/destroy:148:10)
at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
code: 'ERR_STREAM_PREMATURE_CLOSE'
}
}
Unhandled Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
at ReadStream.emit (node:events:531:35)
at emitCloseNT (node:internal/streams/destroy:148:10)
at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
code: 'ERR_STREAM_PREMATURE_CLOSE'
} Promise {
<rejected> Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
at ReadStream.emit (node:events:531:35)
at emitCloseNT (node:internal/streams/destroy:148:10)
at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
code: 'ERR_STREAM_PREMATURE_CLOSE'
}
}
WriteStream Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
at ReadStream.emit (node:events:531:35)
at emitCloseNT (node:internal/streams/destroy:148:10)
at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
code: 'ERR_STREAM_PREMATURE_CLOSE'
}
1
[
Promise {
<rejected> Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
at ReadStream.emit (node:events:531:35)
at emitCloseNT (node:internal/streams/destroy:148:10)
at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
code: 'ERR_STREAM_PREMATURE_CLOSE'
}
},
Promise {
<rejected> Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
at ReadStream.emit (node:events:531:35)
at emitCloseNT (node:internal/streams/destroy:148:10)
at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
code: 'ERR_STREAM_PREMATURE_CLOSE'
}
},
Promise {
<rejected> Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
at ReadStream.emit (node:events:531:35)
at emitCloseNT (node:internal/streams/destroy:148:10)
at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
code: 'ERR_STREAM_PREMATURE_CLOSE'
}
}
]
2
[
Promise {
<rejected> Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
at ReadStream.emit (node:events:531:35)
at emitCloseNT (node:internal/streams/destroy:148:10)
at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
code: 'ERR_STREAM_PREMATURE_CLOSE'
}
},
Promise {
<rejected> Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
at ReadStream.emit (node:events:531:35)
at emitCloseNT (node:internal/streams/destroy:148:10)
at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
code: 'ERR_STREAM_PREMATURE_CLOSE'
}
},
Promise {
<rejected> Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
at ReadStream.emit (node:events:531:35)
at emitCloseNT (node:internal/streams/destroy:148:10)
at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
code: 'ERR_STREAM_PREMATURE_CLOSE'
}
}
]
Pipeline error: Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
at ReadStream.emit (node:events:531:35)
at emitCloseNT (node:internal/streams/destroy:148:10)
at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
code: 'ERR_STREAM_PREMATURE_CLOSE'
}
success
(node:13) PromiseRejectionHandledWarning: Promise rejection was handled asynchronously (rejection id: 1)
(Use `node --trace-warnings ...` to show where the warning was created)
(node:13) PromiseRejectionHandledWarning: Promise rejection was handled asynchronously (rejection id: 2)
root@0cd652c8fcb1:/tmp#
exit
Additional information
No response
Sorry - Missed the commands and version from the 22.9 t/c
C:\Development\YADAMU\src\scratch\streams>docker cp input.txt NODE-LATEST:/tmp
C:\Development\YADAMU\src\scratch\streams>docker cp test1.js NODE-LATEST:/tmp
C:\Development\YADAMU\src\scratch\streams>docker exec -it NODE-LATEST bash
root@0cd652c8fcb1:/# node -v
v22.9.0
root@0cd652c8fcb1:/# cd tmp
root@0cd652c8fcb1:/tmp# node test1.js
``
I have tried wrapping the return finished and Promise.allSettled in try/catch but still get the unhandled rejection
const { PassThrough} = require('stream');
const { pipeline, finished, } = require('stream/promises');
const fs = require('fs');
class MyTransform extends PassThrough {
constructor(is) {
super()
this.is = is
this.counter = 0
}
async _transform(data,enc,callback) {
this.counter++
this.push(data)
if (this.counter > 100) {
this.is.close()
}
callback()
}
}
async function runPipeline() {
const is = fs.createReadStream('input.txt')
is.on('error',(err) => {
console.log(is.constructor.name,err)
})
const t = new MyTransform(is)
t.on('error',(err) => {
console.log(t.constructor.name,err)
})
const os = fs.createWriteStream('output.txt')
os.on('error',(err) => {
console.log(os.constructor.name,err)
})
const streams = [is,t,os]
const activeStreams = streams.map((s) => {
try {
return finished(s)
} catch (e) {
console.log('WTF-FINISHED',e)
}
})
console.log(activeStreams)
try {
await pipeline(...streams);
console.log(t.counter)
} catch (err) {
console.log(1)
console.log(activeStreams)
try {
await Promise.allSettled(activeStreams)
} catch (e) {
console.log('WTF-ALL-S',e)
}
console.log(2)
console.log(activeStreams)
console.error('Pipeline error:', err);
}
}
process.on('unhandledRejection', (e,p) => {
console.log("Unhandled",e,p)
})
runPipeline().then(() => {console.log('success')}).catch((e) => {console.log(e)})
The only workaround i have it to flag the error as handled in the stream on error and ignore it in the unhandled rejection code. It appears to be that finished must be throwing the exception, but I do not seem to be able to catch it, and I do not think it should be throwing anyway.
I am having a very similar problem here. Fortunately it's a new feature and I caught it in unit tests. Any updates on this from maintainers would be appreciated.