EnforceDistribution fails, seems to turn all the types of the schema to UInt64
Describe the bug
This happens in 37 it works in 36
EnforceDistribution fails with "PhysicalOptimizer rule 'EnforceDistribution' failed, due to generate a different schema, original schema:
To Reproduce
WITH e1 AS (SELECT * FROM _default), e2 AS (SELECT * FROM _default), a AS (SELECT * FROM nodes), b AS (SELECT * FROM nodes), c AS (SELECT * FROM nodes) SELECT a.name, b.name, c.name FROM e1 JOIN a ON e1.src = a.id JOIN b ON e1.dst = b.id JOIN e2 ON b.id = e2.src JOIN c ON e2.dst = c.id WHERE e1.id <> e2.id
Error
called `Result::unwrap()` on an `Err` value: Context("EnforceDistribution", Internal("PhysicalOptimizer rule 'EnforceDistribution' failed, due to generate a different schema, original schema: Schema { fields: [Field { name: \"name\", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"name\", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"name\", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, new schema: Schema { fields: [Field { name: \"name\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"name\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"name\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }"))
```
### Expected behavior
Expected optimizer rules to be skipped or not to run
### Additional context
```Projection: a.name, b.name, c.name
Inner Join: e2.dst = c.id
Projection: a.name, b.name, e2.dst
Inner Join: b.id = e2.src Filter: e2.id != e1.id
Projection: e1.id, a.name, b.id, b.name
Inner Join: e1.dst = b.id
Projection: e1.id, e1.dst, a.name
Inner Join: e1.src = a.id
SubqueryAlias: e1
TableScan: _default projection=[id, src, dst]
SubqueryAlias: a
TableScan: nodes projection=[id, name]
SubqueryAlias: b
TableScan: nodes projection=[id, name]
SubqueryAlias: e2
TableScan: _default projection=[id, src, dst]
SubqueryAlias: c
TableScan: nodes projection=[id, name]
```
Thanks for the report @fabianmurariu
Is there any way we can get a self contained reproducer? I ran the query in the description and it doesn't seem to have all the tables
> WITH e1 AS (SELECT * FROM _default), e2 AS (SELECT * FROM _default), a AS (SELECT * FROM nodes), b AS (SELECT * FROM nodes), c AS (SELECT * FROM nodes) SELECT a.name, b.name, c.name FROM e1 JOIN a ON e1.src = a.id JOIN b ON e1.dst = b.id JOIN e2 ON b.id = e2.src JOIN c ON e2.dst = c.id WHERE e1.id <> e2.id;
Error during planning: table 'datafusion.public._default' not found
I'll try next week to open source the code where this is happening
Thanks @fabianmurariu
cc @mustafasrepo in case you have any thoughts
The physical plans before and after enforce distribution rule, might help in locating the problem. You can use get_plan_string helper to print this information. Putting prints at the start and at the end of https://github.com/apache/datafusion/blob/2a15614c6aeb0b7c22d0a7aac895d17f5aeaef99/datafusion/core/src/physical_optimizer/enforce_distribution.rs#L189 will produce necessary logs as far as I can tell.
Thanks @fabianmurariu
cc @mustafasrepo in case you have any thoughts
I have tried to reproduce problem by defining absolutely necessary fields in the query with below queries
statement ok
CREATE TABLE IF NOT EXISTS _default (name VARCHAR, src BIGINT, dst BIGINT, id BIGINT) AS VALUES('mustafa', 1, 2, 0),('test', 2, 3, 1);
statement ok
CREATE TABLE IF NOT EXISTS nodes (name VARCHAR,id BIGINT) AS VALUES('TR', 1),('GR', 2);
statement ok
set datafusion.execution.target_partitions = 8;
query TTT
WITH e1 AS (SELECT * FROM _default),
e2 AS (SELECT * FROM _default),
a AS (SELECT * FROM nodes),
b AS (SELECT * FROM nodes),
c AS (SELECT * FROM nodes)
SELECT a.name, b.name, c.name
FROM e1 JOIN a ON e1.src = a.id JOIN b ON e1.dst = b.id JOIN e2 ON b.id = e2.src JOIN c ON e2.dst = c.id WHERE e1.id <> e2.id;
----
However, this test seems to pass. Unfortunately I cannot debug further. After seeing plans, or after full reproducer I will take another look.
Strange, I'm encountering this with custom TableProviders, I'll be able to share more next week tho
@mustafasrepo @alamb sorry for the delay, here is a repo attempting to replicate the issue, unfortunately I'm getting a completely different error on a simple join
SELECT nodes.name FROM edges JOIN nodes ON edges.src = nodes.id
https://github.com/fabianmurariu/df_sql_bug
thread 'main' panicked at src/main.rs:276:35:
collect results: Plan("The left or right side of the join does not have all columns on \"on\": \nMissing on the left: {Column { name: \"src\", index: 0 }}\nMissing on the right: {}")
stack backtrace:
0: rust_begin_unwind
at /rustc/129f3b9964af4d4a709d1383930ade12dfe7c081/library/std/src/panicking.rs:652:5
1: core::panicking::panic_fmt
at /rustc/129f3b9964af4d4a709d1383930ade12dfe7c081/library/core/src/panicking.rs:72:14
2: core::result::unwrap_failed
at /rustc/129f3b9964af4d4a709d1383930ade12dfe7c081/library/core/src/result.rs:1654:5
3: core::result::Result<T,E>::expect
at /rustc/129f3b9964af4d4a709d1383930ade12dfe7c081/library/core/src/result.rs:1034:23
4: df_sql_bug::main::{{closure}}
at ./src/main.rs:276:15
5: tokio::runtime::park::CachedParkThread::block_on::{{closure}}
at /Users/murariuf/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.38.0/src/runtime/park.rs:281:63
6: tokio::runtime::coop::with_budget
at /Users/murariuf/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.38.0/src/runtime/coop.rs:107:5
7: tokio::runtime::coop::budget
at /Users/murariuf/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.38.0/src/runtime/coop.rs:73:5
8: tokio::runtime::park::CachedParkThread::block_on
at /Users/murariuf/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.38.0/src/runtime/park.rs:281:31
9: tokio::runtime::context::blocking::BlockingRegionGuard::block_on
at /Users/murariuf/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.38.0/src/runtime/context/blocking.rs:66:9
10: tokio::runtime::scheduler::multi_thread::MultiThread::block_on::{{closure}}
at /Users/murariuf/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.38.0/src/runtime/scheduler/multi_thread/mod.rs:87:13
11: tokio::runtime::context::runtime::enter_runtime
at /Users/murariuf/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.38.0/src/runtime/context/runtime.rs:65:16
12: tokio::runtime::scheduler::multi_thread::MultiThread::block_on
at /Users/murariuf/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.38.0/src/runtime/scheduler/multi_thread/mod.rs:86:9
13: tokio::runtime::runtime::Runtime::block_on
at /Users/murariuf/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.38.0/src/runtime/runtime.rs:349:45
14: df_sql_bug::main
at ./src/main.rs:307:5
15: core::ops::function::FnOnce::call_once
at /rustc/129f3b9964af4d4a709d1383930ade12dfe7c081/library/core/src/ops/function.rs:250:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
Thanks @fabianmurariu for the reproducer. I have debugged your code, the problem seems to be these two lines line1, line2 where original schema of the table is passed to the Executor.
However, if during scan call some of the columns are pruned (specified by projection: Option<&Vec<usize>> argument). Original schema is not valid. We should use corresponding correct fields in the schema. You can see, how schema projection is done in the MemoryExec. I think, doing corresponding change would solve the problem.
Perhaps this function would help you: https://docs.rs/datafusion/latest/datafusion/common/fn.project_schema.html (it is what the line pointed to by @mustafasrepo above uses)
Thanks again for the report @fabianmurariu and for the debugging @mustafasrepo -- I am going to close this issue for now, Please reopen it if that isn't right