spark
spark copied to clipboard
[SPARK-39710][SQL] Support push local topK through outer join
What changes were proposed in this pull request?
- Pull out the pattern of
TakeOrderedAndProjectExectoExtractTopK - Add a new rule
PushLocalTopKThroughOuterJoinwhich matches theExtractTopKpattern - Add a new config
spark.sql.optimizer.pushDownLocalTopKLimitThresholdto decide how big limit we allow push local topK. This is in case regression if the limit number is bigger than the acutally row count.
Why are the changes needed?
Supports push down local limit and local sort from TopK through other join:
- for a left outer join, the references of ordering of TopK come from the left side and the limits of TopK is smaller than left side
- for a right outer join, the references of ordering of TopK come from the right side and the limits of TopK is smaller than right side
If multi-join satisfy the rule, we should only push local topK to the bottom join.
An example for simple left outer join:
Limit global + local
Sort global
Join Left_Outer
x
y
=>
Limit global + local
Sort global
Join Left_Outer
Limit local
Sort local
x
y
Compared with shuffle + join, local sort should be faster
Does this PR introduce any user-facing change?
no, only improve performance
val row = 10 * 1000
val df1 = spark.range(0, row, 1, 2).selectExpr("id % 3 as c1", "id % 7 as cx")
val df2 = spark.range(0, row, 1, 2).selectExpr("id % 3 as c2")
df1.join(df2.hint("shuffle_hash"), col("c1") === col("c2"), "left_outer")
.orderBy(col("cx"))
.limit(100)
.noop()
OpenJDK 64-Bit Server VM 1.8.0_332-b09 on Linux 5.13.0-1031-azure
Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz
Push local topK through outer join: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------------------------------------------
Push local topK through outer join for bhj - off 3694 3717 22 0.0 369361.5 1.0X
Push local topK through outer join for shj - off 3761 3767 11 0.0 376057.4 1.0X
Push local topK through outer join for smj - off 3469 3478 10 0.0 346945.6 1.1X
Push local topK through outer join for bhj - on 97 109 10 0.1 9745.2 37.9X
Push local topK through outer join for shj - on 162 176 17 0.1 16153.5 22.9X
Push local topK through outer join for smj - on 160 163 3 0.1 15974.6 23.1X
How was this patch tested?
add test
cc @wangyum @viirya @cloud-fan how about this idea ?
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!