hash_join panics when join keys have different data types
Describe the bug
There are several places where hash_join can panic in version 9.0.0 when I run SQL queries.
thread 'tokio-runtime-worker' panicked at 'called `Option::unwrap()` on a `None` value', /home/andy/.cargo/registry/src/github.com-1ecc6299db9ec823/datafusion-9.0.0/src/physical_plan/hash_join.rs:973:17
thread 'tokio-runtime-worker' panicked at 'called `Option::unwrap()` on a `None` value', /home/andy/.cargo/registry/src/github.com-1ecc6299db9ec823/datafusion-9.0.0/src/physical_plan/hash_join.rs:979:17
thread 'tokio-runtime-worker' panicked at 'called `Option::unwrap()` on a `None` value', /home/andy/.cargo/registry/src/github.com-1ecc6299db9ec823/datafusion-9.0.0/src/physical_plan/hash_join.rs:1045:17
To Reproduce I am experimenting with SQL query fuzzing. Here is one query that failed:
SELECT _c438, _c439, _c440, _c441, _c442, _c443
FROM (
(
(SELECT test1.c0 AS _c438, test1.c1 AS _c439, test1.c2 AS _c440, test1.c3 AS _c441, test1.c4 AS _c442, test1.c5 AS _c443
FROM (test1))
FULL JOIN
(SELECT test1.c0 AS _c444, test1.c1 AS _c445, test1.c2 AS _c446, test1.c3 AS _c447, test1.c4 AS _c448, test1.c5 AS _c449
FROM (test1))
ON _c438 = _c446)
RIGHT JOIN
(SELECT test0.c0 AS _c450, test0.c1 AS _c451, test0.c2 AS _c452, test0.c3 AS _c453, test0.c4 AS _c454, test0.c5 AS _c455
FROM (test0))
ON _c441 = _c451);
ArrowError(ExternalError("Arrow error: External error: Execution error: Join Error: task 5672 panicked"))
Data files are here - https://github.com/andygrove/sqlfuzz/tree/main/testdata
Expected behavior Should not panic
Additional context
I think the issue is that we don't have type coercion for join keys so joining a number column to a string can cause this. in fn equal_rows we are making assumptions that left and right columns have the same data type.
@andygrove I'd like to investigate this as part of #2910
Rev engineered data from sqlfuzz, quick test to reproduce the behavior
#[tokio::test]
async fn hj_non_compat() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
arrow::datatypes::Field::new(
"column_1",
DataType::Int32,
false,
),
arrow::datatypes::Field::new(
"column_2",
DataType::Int8,
false,
)
]));
let data = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(arrow::array::Int32Array::from(vec![
Some(1),
])),
Arc::new(arrow::array::Int8Array::from(vec![
Some(1),
])),
],
)?;
let table = crate::datasource::MemTable::try_new(schema, vec![vec![data]])?;
let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(table))?;
let sql = r#"
SELECT
*
FROM
test a right join test b on a.column_1 = b.column_2"#;
let df = ctx.sql(sql).await.unwrap();
df.show_limit(10).await.unwrap();
Ok(())
}
Right, the problem is non compat downcast