fix: nested loop join requires outer table to be a FusedStream
Which issue does this PR close?
Closes #12187
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Yes
Are there any user-facing changes?
No
Is this bug visible/reproducible somehow? If so, could you add a regression test?
@crepererum This bug is triggered by our customized TableScan operator that does not support FusedStream contract. In our implementation, when the stream is finished, calling the poll_next method again will cause panic.
It is simple to mimic the above situation. Firstly, modify the MemoryStream such that it does not support FusedStream contract with following code:
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Poll::Ready(if self.index < self.data.len() {
self.index += 1;
let batch = &self.data[self.index - 1];
// return just the columns requested
let batch = match self.projection.as_ref() {
Some(columns) => batch.project(columns)?,
None => batch.clone(),
};
Some(Ok(batch))
} else if self.index == self.data.len() {
self.index += 1;
None
} else {
panic!("Exhausted stream is polled again")
})
}
Secondly, run the datafusion-cli binary with following sqls:
CREATE TABLE wtf AS VALUES (4, 'arrow'), (99, 'datafusion');
select t0.column1, t1.column1, t0.column2, t1.column2 from wtf t0 left join wtf t1 on t0.column1 > t1.column1;
Then, we will hit the panic statement.
Currently, the ExecutionPlan trait only requires the execute method to return the struct that implements the Stream trait. According to the definition of the poll_next method, panic can happens when polling the exhausted stream
I think we have roughly the following paths forward here:
- just fix this issue here (w/ or w/o regression test)
- do NOT require
FusedStream: we could invest in some tests that patch the physical plan to replace all streams with ones that panic if they are polled after the end (e.g. enforce NOTFusedStream) - update the entire stack to require
FusedStream(requires additional flags/state in a few places, so potentially costly)
I will wait a few more days to merge this PR to see if @YjyJeff can help add a regression test
Merging this to get it into 42
It would be great to get a test for this @YjyJeff otherwise it is likely a regression will creep in. Let's try and do that as a follow on PR