Incorrect LEFT JOIN evaluation result on OR conditions
Describe the bug
When the matching condition is on columns and combined with OR, the result retuned by LEFT JOIN is similar to INNER JOIN, which is incorrect.
LEFT JOIN should not reduce the number of rows returned, i.e. it should return all the records from the left table, and the matched records from the right table. When there is no match, the result is a NULL, i.e. display as empty.
INNER JOIN reduces the number of rows, i.e. it only returns the records that have matching values in both tables.
To Reproduce
- Use DataFusion CLI:
# in dir datafusion/datafusion-cli
$ cargo build
$ ./target/debug/datafusion-cli
- Write sample data
CREATE OR REPLACE TABLE employees(emp_id INT, name VARCHAR) AS VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Carol');
CREATE OR REPLACE TABLE department(emp_id INT, department VARCHAR) AS VALUES (1, 'HR'), (3, 'Engineering'), (4, 'Sales');
- This is how
employeestable anddepartmenttable look like
> SELECT * FROM employees;
+--------+-------+
| emp_id | name |
+--------+-------+
| 1 | Alice |
| 2 | Bob |
| 3 | Carol |
+--------+-------+
3 row(s) fetched.
Elapsed 0.009 seconds.
> SELECT * FROM department;
+--------+-------------+
| emp_id | department |
+--------+-------------+
| 1 | HR |
| 3 | Engineering |
| 4 | Sales |
+--------+-------------+
3 row(s) fetched.
Elapsed 0.006 seconds.
- Query
> SELECT e.emp_id, e.name, d.department
FROM employees e
LEFT JOIN department d
ON (e.name = 'Alice' OR e.name = 'Bob');
+--------+-------+-------------+
| emp_id | name | department |
+--------+-------+-------------+
| 1 | Alice | HR |
| 1 | Alice | Engineering |
| 1 | Alice | Sales |
| 2 | Bob | HR |
| 2 | Bob | Engineering |
| 2 | Bob | Sales |
+--------+-------+-------------+ <-- should have one more row for Carol
6 row(s) fetched.
Elapsed 0.013 seconds.
> SELECT e.emp_id, e.name, d.department
FROM employees e
LEFT JOIN department d
ON (e.name = 'NotExist1' OR e.name = 'NotExist2');
+--------+------+------------+
| emp_id | name | department |
+--------+------+------------+
+--------+------+------------+ <-- should have three rows, 1 row for Alice, 1 row for Bob, and 1 row for Carol
0 row(s) fetched.
Elapsed 0.014 seconds.
> SELECT e.emp_id, e.name, d.department
FROM employees e
LEFT JOIN department d
ON (e.name = 'Alice' OR e.name = 'NotExist');
+--------+-------+-------------+
| emp_id | name | department |
+--------+-------+-------------+
| 1 | Alice | HR |
| 1 | Alice | Engineering |
| 1 | Alice | Sales |
+--------+-------+-------------+ <-- should have two more rows, 1 row for Bob and 1 row for Carol
3 row(s) fetched.
Elapsed 0.014 seconds.
Expected behavior
Postgres shows the expected results.
- In psql cli, drop tables and create new tables with data
DROP TABLE IF EXISTS employees;
DROP TABLE IF EXISTS department;
# create tables
CREATE TABLE employees(emp_id INT, name VARCHAR);
CREATE TABLE department(emp_id INT, dept_name VARCHAR);
# write sample data
INSERT INTO employees (emp_id, name) VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Carol');
INSERT INTO department (emp_id, dept_name) VALUES (1, 'HR'), (3, 'Engineering'), (4, 'Sales');
- This is how two tables look like:
chunchun=# SELECT * FROM employees;
emp_id | name
--------+-------
1 | Alice
2 | Bob
3 | Carol
(3 rows)
chunchun=# SELECT * FROM department;
emp_id | dept_name
--------+-------------
1 | HR
3 | Engineering
4 | Sales
(3 rows)
- Query
SELECT e.emp_id, e.name, d.department
FROM employees AS e
LEFT JOIN department AS d
ON (e.name = 'Alice' OR e.name = 'Bob');
emp_id | name | department
--------+-------+-------------
1 | Alice | HR
1 | Alice | Engineering
1 | Alice | Sales
2 | Bob | HR
2 | Bob | Engineering
2 | Bob | Sales
3 | Carol |
(7 rows)
SELECT e.emp_id, e.name, d.department
FROM employees e
LEFT JOIN department d
ON (e.name = 'NotExist1' OR e.name = 'NotExist2');
emp_id | name | department
--------+-------+------------
1 | Alice |
2 | Bob |
3 | Carol |
(3 rows)
SELECT e.emp_id, e.name, d.department
FROM employees e
LEFT JOIN department d
ON (e.name = 'Alice' OR e.name = 'NotExist');
emp_id | name | department
--------+-------+-------------
1 | Alice | HR
1 | Alice | Engineering
1 | Alice | Sales
2 | Bob |
3 | Carol |
(5 rows)
Additional context
No response
SELECT e.emp_id, e.name, d.department FROM employees e LEFT JOIN department d ON (e.name = 'Alice' OR e.name = 'Bob');
Hmm, as the join filter is only on employees, I think it is pushdown through the Join. The results seem correct to me. I will verify it on Spark later.
The results seem correct to me
No, what essentially this query is:
SELECT e.emp_id, e.name, d.department
FROM employees e
LEFT JOIN department d
ON (FALSE);
which by definition of LEFT JOIN should not reduce number of rows on the LEFT
Yea, just verified with Spark:
scala> df.show
+------+-----+-----------+
|emp_id| name| dept_name|
+------+-----+-----------+
| 1|Alice| Sales|
| 1|Alice| HR|
| 1|Alice|Engineering|
| 2| Bob| Sales|
| 2| Bob| HR|
| 2| Bob|Engineering|
| 3|Carol| null|
+------+-----+-----------+
This will cause correctness issue on Comet too. I will take a look.
Could this be related to a bug in the optimizer (pushing down filter to the left where it shouldn't)?
I tried adding a trivially false term to the OR clause that references the department table. This shouldn't change the result but would inhibit pushdown to the employees table. I got the expected results:
> SELECT e.emp_id, e.name, d.department
FROM employees e
LEFT JOIN department d
ON (e.name = 'Alice' OR e.name = 'Bob' OR d.department = 'FOOBAR');
+--------+-------+-------------+
| emp_id | name | department |
+--------+-------+-------------+
| 1 | Alice | HR |
| 2 | Bob | HR |
| 1 | Alice | Engineering |
| 2 | Bob | Engineering |
| 1 | Alice | Sales |
| 2 | Bob | Sales |
| 3 | Carol | |
+--------+-------+-------------+
7 row(s) fetched.
Elapsed 0.014 seconds.
If we look at the following plan:
> EXPLAIN SELECT e.emp_id, e.name, d.department
FROM employees e
LEFT JOIN department d
ON (e.name = 'Alice' OR e.name = 'Bob');
+---------------+----------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------+
| logical_plan | Left Join: Filter: e.name = Utf8("Alice") OR e.name = Utf8("Bob") |
| | SubqueryAlias: e |
| | Filter: employees.name = Utf8("Alice") OR employees.name = Utf8("Bob") |
| | TableScan: employees projection=[emp_id, name] |
| | SubqueryAlias: d |
| | TableScan: department projection=[department] |
| physical_plan | NestedLoopJoinExec: join_type=Left, filter=name@0 = Alice OR name@0 = Bob |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: name@1 = Alice OR name@1 = Bob |
| | MemoryExec: partitions=1, partition_sizes=[1] |
| | MemoryExec: partitions=1, partition_sizes=[1] |
| | |
+---------------+----------------------------------------------------------------------------+
The filters in italics seem wrong. They indeed disappear in the example I gave above:
> EXPLAIN SELECT e.emp_id, e.name, d.department
FROM employees e
LEFT JOIN department d
ON (e.name = 'Alice' OR e.name = 'Bob' OR d.department = 'FOOBAR');
+---------------+-------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------+
| logical_plan | Left Join: Filter: e.name = Utf8("Alice") OR e.name = Utf8("Bob") OR d.department = Utf8("FOOBAR") |
| | SubqueryAlias: e |
| | TableScan: employees projection=[emp_id, name] |
| | SubqueryAlias: d |
| | TableScan: department projection=[department] |
| physical_plan | ProjectionExec: expr=[emp_id@1 as emp_id, name@2 as name, department@0 as department] |
| | NestedLoopJoinExec: join_type=Right, filter=name@0 = Alice OR name@0 = Bob OR department@1 = FOOBAR |
| | MemoryExec: partitions=1, partition_sizes=[1] |
| | MemoryExec: partitions=1, partition_sizes=[1] |
| | |
+---------------+-------------------------------------------------------------------------------------------------------+
2 row(s) fetched.
Elapsed 0.016 seconds.
These filters are also not present if have an AND clause instead:
> EXPLAIN SELECT e.emp_id, e.name, d.department
FROM employees e
LEFT JOIN department d
ON (e.name = 'Alice' AND e.name = 'Bob');
+---------------+---------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------+
| logical_plan | Left Join: Filter: e.name = Utf8("Alice") AND e.name = Utf8("Bob") |
| | SubqueryAlias: e |
| | TableScan: employees projection=[emp_id, name] |
| | SubqueryAlias: d |
| | TableScan: department projection=[department] |
| physical_plan | ProjectionExec: expr=[emp_id@1 as emp_id, name@2 as name, department@0 as department] |
| | NestedLoopJoinExec: join_type=Right, filter=name@0 = Alice AND name@0 = Bob |
| | MemoryExec: partitions=1, partition_sizes=[1] |
| | MemoryExec: partitions=1, partition_sizes=[1] |
| | |
+---------------+---------------------------------------------------------------------------------------+
2 row(s) fetched.
Elapsed 0.013 seconds.
The doc of push_down_filter says:
/// For a given JOIN type, determine whether each side of the join is preserved.
///
/// We say a join side is preserved if the join returns all or a subset of the rows from
/// the relevant side, such that each row of the output table directly maps to a row of
/// the preserved input table. If a table is not preserved, it can provide extra null rows.
/// That is, there may be rows in the output table that don't directly map to a row in the
/// input table.
///
/// For example:
/// - In an inner join, both sides are preserved, because each row of the output
/// maps directly to a row from each side.
/// - In a left join, the left side is preserved and the right is not, because
/// there may be rows in the output that don't directly map to a row in the
/// right input (due to nulls filling where there is no match on the right).
///
/// This is important because we can always push down post-join filters to a preserved
/// side of the join, assuming the filter only references columns from that side. For the
/// non-preserved side it can be more tricky.
So this definition is debatable.
We could distinguish between all and subset situations.
For example, left join returns all of the rows from the left side, while returns subset of the rows from the right side.
And inner join returns subset rows of both sides.
When the join returns a subset of the relevant side, we can push the filter down to it.
I agree with @ozankabak in https://github.com/apache/datafusion/issues/10881#issuecomment-2169117921 that the problem is that the filter should not actually be pushed below the join
> EXPLAIN SELECT e.emp_id, e.name, d.department
FROM employees e
LEFT JOIN department d
ON (e.name = 'Alice' OR e.name = 'Bob');
+---------------+----------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------+
| logical_plan | Left Join: Filter: e.name = Utf8("Alice") OR e.name = Utf8("Bob") |
| | SubqueryAlias: e |
| | Filter: employees.name = Utf8("Alice") OR employees.name = Utf8("Bob") |
| | TableScan: employees projection=[emp_id, name] |
| | SubqueryAlias: d |
| | TableScan: department projection=[department] |
| physical_plan | NestedLoopJoinExec: join_type=Left, filter=name@0 = Alice OR name@0 = Bob |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: name@1 = Alice OR name@1 = Bob | <----- it is incorrect to push this filter below the join
| | MemoryExec: partitions=1, partition_sizes=[1] |
| | MemoryExec: partitions=1, partition_sizes=[1] |
| | |
+---------------+----------------------------------------------------------------------------+
I also agree with @doki23 in https://github.com/apache/datafusion/issues/10881#issuecomment-2192078394 that the problem is related to the pushdown optimization
Different semantics of predicates in ON and WHERE
I think the core issue is that for OUTER joins, the semantics of predicates in the ON clause are different than predicates in the WHERE clause
Putting (e.name = 'Alice' OR e.name = 'Bob') in the ON clause means the predicate happens during the join, and thus for OUTER joins can't filter non-preserved rows, which is why the Carol row still appears:
postgres=# SELECT e.emp_id, e.name, d.dept_name
FROM employees AS e
LEFT JOIN department AS d ON (e.name = 'Alice' OR e.name = 'Bob');
emp_id | name | dept_name
--------+-------+-------------
1 | Alice | HR
1 | Alice | Engineering
1 | Alice | Sales
2 | Bob | HR
2 | Bob | Engineering
2 | Bob | Sales
3 | Carol | <-- this row doesn't match ON clause predicate
(7 rows)
However, when you put the same predicate in the WHERE clause it does filter the row, as expected
postgres=# SELECT e.emp_id, e.name, d.dept_name
FROM employees AS e
LEFT JOIN department AS d ON (true)
where (e.name = 'Alice' OR e.name = 'Bob');
emp_id | name | dept_name
--------+-------+-------------
1 | Alice | HR
1 | Alice | Engineering
1 | Alice | Sales
2 | Bob | HR
2 | Bob | Engineering
2 | Bob | Sales <-- no "Carol" rows
(6 rows)
Proposed Fix
Thus I propose to fix this issue by changing predicate pushdown to NOT push predicates from the ON clause below OUTER joins. We can and should still push predicates for INNER joins or predicates from the WHERE clause (aka in Filters)
Agreed. This is Spark query plan:
scala> df.explain
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastNestedLoopJoin BuildRight, LeftOuter, ((name#13 = Alice) OR (name#13 = Bob))
:- FileScan parquet spark_catalog.default.employees[emp_id#12,name#13] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/spark-warehouse/employees], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<emp_id:int,name:string>
+- BroadcastExchange IdentityBroadcastMode, [plan_id=42]
+- FileScan parquet spark_catalog.default.department[dept_name#15] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/spark-warehouse/department], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<dept_name:string>
The filter predicate is not pushdown.