node icon indicating copy to clipboard operation
node copied to clipboard

Stream not destroyed when piped in Duplex.from() writable

Open matthieusieben opened this issue 1 year ago • 6 comments

Version

v22.9.0

Platform

Darwin MBPM.local 24.0.0 Darwin Kernel Version 24.0.0: Mon Aug 12 20:52:31 PDT 2024; root:xnu-11215.1.10~2/RELEASE_ARM64_T6030 arm64

Subsystem

stream

What steps will reproduce the bug?

const { Duplex, Readable, pipeline } = require('node:stream')

pipeline(
  Readable.from(
    (async function* () {
      try {
        console.log('Readable stream started')
        let i = 32
        while (i-- > 0) {
          yield Buffer.from('Hello, world!')
          console.log('Readable stream resumed')
        }
        console.log('Readable stream ended')
      } catch (err) {
        console.log('Readable stream errored', err)
      } finally {
        console.log('Readable stream destroyed')
      }
    })(),
    { objectMode: false, autoDestroy: true, highWaterMark: 1 }
  ),
  Duplex.from(async function (asyncGenerator) {
    asyncGenerator.return()
  }),
  (err) => {
    if (err) {
      console.error('Pipeline failed', err)
    } else {
      console.log('Pipeline succeeded')
    }
  }
)

How often does it reproduce? Is there a required condition?

Any code used as Duplex.from(async function () {}) that does not fully consume the input will prevent the input stream to be properly disposed:

  Duplex.from(async function (asyncGenerator) {
    for await (const chunk of asyncGenerator) {
      console.log('Writable stream received', chunk)
      break
    }
  })
  Duplex.from(async function (asyncGenerator) {
    asyncGenerator.retrun()
  })
  Duplex.from(async function (asyncGenerator) {
    // do nothing
  })

What is the expected behavior? Why is that the expected behavior?

The readable's destroy steps should be called.

What do you see instead?

The readable is not destroyed.

Additional information

No response

matthieusieben avatar Sep 23 '24 09:09 matthieusieben

Related issue: #55010 CC @nodejs/streams

avivkeller avatar Sep 23 '24 11:09 avivkeller

I don't understand what the problem is.

ronag avatar Sep 23 '24 11:09 ronag

Calling asyncGenerator.retrun() should cause the readable to be destroyed, and Readable stream destroyed to be printed. This is not the case. The pipeline callback function is never called either.

matthieusieben avatar Sep 23 '24 14:09 matthieusieben

Sounds like a bug.

mcollina avatar Sep 23 '24 16:09 mcollina

@matthieusieben would you like to send a PR to fix it?

mcollina avatar Sep 23 '24 16:09 mcollina

I gave it a try but I struggle a bit.

matthieusieben avatar Sep 24 '24 09:09 matthieusieben

Is this also a problem if you do:

  Duplex.from(async function (asyncGenerator) {
    for await (const _ of asyncGenerator) { return }
  }),

instead of:

  Duplex.from(async function (asyncGenerator) {
    asyncGenerator.return()
  }),

ronag avatar Nov 23 '24 09:11 ronag