datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

EnforceDistribution fails, seems to turn all the types of the schema to UInt64

Open fabianmurariu opened this issue 1 year ago • 6 comments

Describe the bug

This happens in 37 it works in 36 EnforceDistribution fails with "PhysicalOptimizer rule 'EnforceDistribution' failed, due to generate a different schema, original schema:

To Reproduce

WITH e1 AS (SELECT * FROM _default), e2 AS (SELECT * FROM _default), a AS (SELECT * FROM nodes), b AS (SELECT * FROM nodes), c AS (SELECT * FROM nodes) SELECT a.name, b.name, c.name FROM e1 JOIN a ON e1.src = a.id JOIN b ON e1.dst = b.id JOIN e2 ON b.id = e2.src JOIN c ON e2.dst = c.id WHERE e1.id <> e2.id

Error

called `Result::unwrap()` on an `Err` value: Context("EnforceDistribution", Internal("PhysicalOptimizer rule 'EnforceDistribution' failed, due to generate a different schema, original schema: Schema { fields: [Field { name: \"name\", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"name\", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"name\", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, new schema: Schema { fields: [Field { name: \"name\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"name\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"name\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }"))
```


### Expected behavior

Expected optimizer rules to be skipped or not to run

### Additional context

```Projection: a.name, b.name, c.name
  Inner Join: e2.dst = c.id
    Projection: a.name, b.name, e2.dst
      Inner Join: b.id = e2.src Filter: e2.id != e1.id
        Projection: e1.id, a.name, b.id, b.name
          Inner Join: e1.dst = b.id
            Projection: e1.id, e1.dst, a.name
              Inner Join: e1.src = a.id
                SubqueryAlias: e1
                  TableScan: _default projection=[id, src, dst]
                SubqueryAlias: a
                  TableScan: nodes projection=[id, name]
            SubqueryAlias: b
              TableScan: nodes projection=[id, name]
        SubqueryAlias: e2
          TableScan: _default projection=[id, src, dst]
    SubqueryAlias: c
      TableScan: nodes projection=[id, name]
```

fabianmurariu avatar May 08 '24 11:05 fabianmurariu

Thanks for the report @fabianmurariu

Is there any way we can get a self contained reproducer? I ran the query in the description and it doesn't seem to have all the tables

> WITH e1 AS (SELECT * FROM _default), e2 AS (SELECT * FROM _default), a AS (SELECT * FROM nodes), b AS (SELECT * FROM nodes), c AS (SELECT * FROM nodes) SELECT a.name, b.name, c.name FROM e1 JOIN a ON e1.src = a.id JOIN b ON e1.dst = b.id JOIN e2 ON b.id = e2.src JOIN c ON e2.dst = c.id WHERE e1.id <> e2.id;
Error during planning: table 'datafusion.public._default' not found

alamb avatar May 08 '24 17:05 alamb

I'll try next week to open source the code where this is happening

fabianmurariu avatar May 09 '24 08:05 fabianmurariu

Thanks @fabianmurariu

cc @mustafasrepo in case you have any thoughts

alamb avatar May 09 '24 10:05 alamb

The physical plans before and after enforce distribution rule, might help in locating the problem. You can use get_plan_string helper to print this information. Putting prints at the start and at the end of https://github.com/apache/datafusion/blob/2a15614c6aeb0b7c22d0a7aac895d17f5aeaef99/datafusion/core/src/physical_optimizer/enforce_distribution.rs#L189 will produce necessary logs as far as I can tell.

mustafasrepo avatar May 09 '24 12:05 mustafasrepo

Thanks @fabianmurariu

cc @mustafasrepo in case you have any thoughts

I have tried to reproduce problem by defining absolutely necessary fields in the query with below queries

statement ok
CREATE TABLE IF NOT EXISTS _default (name VARCHAR, src BIGINT, dst BIGINT, id BIGINT) AS VALUES('mustafa', 1, 2, 0),('test', 2, 3, 1);

statement ok
CREATE TABLE IF NOT EXISTS nodes (name VARCHAR,id BIGINT) AS VALUES('TR', 1),('GR', 2);

statement ok
set datafusion.execution.target_partitions = 8;

query TTT
WITH e1 AS (SELECT * FROM _default),
  e2 AS (SELECT * FROM _default),
  a AS (SELECT * FROM nodes),
  b AS (SELECT * FROM nodes),
  c AS (SELECT * FROM nodes)
  SELECT a.name, b.name, c.name
FROM e1 JOIN a ON e1.src = a.id JOIN b ON e1.dst = b.id JOIN e2 ON b.id = e2.src JOIN c ON e2.dst = c.id WHERE e1.id <> e2.id;
----

However, this test seems to pass. Unfortunately I cannot debug further. After seeing plans, or after full reproducer I will take another look.

mustafasrepo avatar May 09 '24 12:05 mustafasrepo

Strange, I'm encountering this with custom TableProviders, I'll be able to share more next week tho

fabianmurariu avatar May 09 '24 12:05 fabianmurariu

@mustafasrepo @alamb sorry for the delay, here is a repo attempting to replicate the issue, unfortunately I'm getting a completely different error on a simple join

SELECT nodes.name FROM edges JOIN nodes ON edges.src = nodes.id

https://github.com/fabianmurariu/df_sql_bug

thread 'main' panicked at src/main.rs:276:35:
collect results: Plan("The left or right side of the join does not have all columns on \"on\": \nMissing on the left: {Column { name: \"src\", index: 0 }}\nMissing on the right: {}")
stack backtrace:
   0: rust_begin_unwind
             at /rustc/129f3b9964af4d4a709d1383930ade12dfe7c081/library/std/src/panicking.rs:652:5
   1: core::panicking::panic_fmt
             at /rustc/129f3b9964af4d4a709d1383930ade12dfe7c081/library/core/src/panicking.rs:72:14
   2: core::result::unwrap_failed
             at /rustc/129f3b9964af4d4a709d1383930ade12dfe7c081/library/core/src/result.rs:1654:5
   3: core::result::Result<T,E>::expect
             at /rustc/129f3b9964af4d4a709d1383930ade12dfe7c081/library/core/src/result.rs:1034:23
   4: df_sql_bug::main::{{closure}}
             at ./src/main.rs:276:15
   5: tokio::runtime::park::CachedParkThread::block_on::{{closure}}
             at /Users/murariuf/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.38.0/src/runtime/park.rs:281:63
   6: tokio::runtime::coop::with_budget
             at /Users/murariuf/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.38.0/src/runtime/coop.rs:107:5
   7: tokio::runtime::coop::budget
             at /Users/murariuf/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.38.0/src/runtime/coop.rs:73:5
   8: tokio::runtime::park::CachedParkThread::block_on
             at /Users/murariuf/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.38.0/src/runtime/park.rs:281:31
   9: tokio::runtime::context::blocking::BlockingRegionGuard::block_on
             at /Users/murariuf/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.38.0/src/runtime/context/blocking.rs:66:9
  10: tokio::runtime::scheduler::multi_thread::MultiThread::block_on::{{closure}}
             at /Users/murariuf/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.38.0/src/runtime/scheduler/multi_thread/mod.rs:87:13
  11: tokio::runtime::context::runtime::enter_runtime
             at /Users/murariuf/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.38.0/src/runtime/context/runtime.rs:65:16
  12: tokio::runtime::scheduler::multi_thread::MultiThread::block_on
             at /Users/murariuf/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.38.0/src/runtime/scheduler/multi_thread/mod.rs:86:9
  13: tokio::runtime::runtime::Runtime::block_on
             at /Users/murariuf/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.38.0/src/runtime/runtime.rs:349:45
  14: df_sql_bug::main
             at ./src/main.rs:307:5
  15: core::ops::function::FnOnce::call_once
             at /rustc/129f3b9964af4d4a709d1383930ade12dfe7c081/library/core/src/ops/function.rs:250:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

fabianmurariu avatar Jul 14 '24 12:07 fabianmurariu

Thanks @fabianmurariu for the reproducer. I have debugged your code, the problem seems to be these two lines line1, line2 where original schema of the table is passed to the Executor.

However, if during scan call some of the columns are pruned (specified by projection: Option<&Vec<usize>> argument). Original schema is not valid. We should use corresponding correct fields in the schema. You can see, how schema projection is done in the MemoryExec. I think, doing corresponding change would solve the problem.

mustafasrepo avatar Jul 16 '24 06:07 mustafasrepo

Perhaps this function would help you: https://docs.rs/datafusion/latest/datafusion/common/fn.project_schema.html (it is what the line pointed to by @mustafasrepo above uses)

Thanks again for the report @fabianmurariu and for the debugging @mustafasrepo -- I am going to close this issue for now, Please reopen it if that isn't right

alamb avatar Jul 16 '24 21:07 alamb