datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

fix: nested loop join requires outer table to be a FusedStream

Open YjyJeff opened this issue 1 year ago • 2 comments

Which issue does this PR close?

Closes #12187

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Yes

Are there any user-facing changes?

No

YjyJeff avatar Aug 27 '24 08:08 YjyJeff

Is this bug visible/reproducible somehow? If so, could you add a regression test?

@crepererum This bug is triggered by our customized TableScan operator that does not support FusedStream contract. In our implementation, when the stream is finished, calling the poll_next method again will cause panic.

It is simple to mimic the above situation. Firstly, modify the MemoryStream such that it does not support FusedStream contract with following code:

    fn poll_next(
        mut self: std::pin::Pin<&mut Self>,
        _: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        Poll::Ready(if self.index < self.data.len() {
            self.index += 1;
            let batch = &self.data[self.index - 1];

            // return just the columns requested
            let batch = match self.projection.as_ref() {
                Some(columns) => batch.project(columns)?,
                None => batch.clone(),
            };

            Some(Ok(batch))
        } else if self.index == self.data.len() {
            self.index += 1;
            None
        } else {
            panic!("Exhausted stream is polled again")
        })
    }

Secondly, run the datafusion-cli binary with following sqls:

CREATE TABLE wtf AS VALUES (4, 'arrow'), (99, 'datafusion');

select t0.column1, t1.column1, t0.column2, t1.column2 from wtf t0 left join wtf t1 on t0.column1 > t1.column1;

Then, we will hit the panic statement.

Currently, the ExecutionPlan trait only requires the execute method to return the struct that implements the Stream trait. According to the definition of the poll_next method, panic can happens when polling the exhausted stream

YjyJeff avatar Aug 28 '24 02:08 YjyJeff

I think we have roughly the following paths forward here:

  • just fix this issue here (w/ or w/o regression test)
  • do NOT require FusedStream: we could invest in some tests that patch the physical plan to replace all streams with ones that panic if they are polled after the end (e.g. enforce NOT FusedStream)
  • update the entire stack to require FusedStream (requires additional flags/state in a few places, so potentially costly)

crepererum avatar Aug 28 '24 09:08 crepererum

I will wait a few more days to merge this PR to see if @YjyJeff can help add a regression test

alamb avatar Sep 10 '24 10:09 alamb

Merging this to get it into 42

It would be great to get a test for this @YjyJeff otherwise it is likely a regression will creep in. Let's try and do that as a follow on PR

alamb avatar Sep 12 '24 16:09 alamb