spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-39710][SQL] Support push local topK through outer join

Open ulysses-you opened this issue 3 years ago • 1 comments

What changes were proposed in this pull request?

  • Pull out the pattern of TakeOrderedAndProjectExec to ExtractTopK
  • Add a new rule PushLocalTopKThroughOuterJoin which matches the ExtractTopK pattern
  • Add a new config spark.sql.optimizer.pushDownLocalTopKLimitThreshold to decide how big limit we allow push local topK. This is in case regression if the limit number is bigger than the acutally row count.

Why are the changes needed?

Supports push down local limit and local sort from TopK through other join:

  • for a left outer join, the references of ordering of TopK come from the left side and the limits of TopK is smaller than left side
  • for a right outer join, the references of ordering of TopK come from the right side and the limits of TopK is smaller than right side

If multi-join satisfy the rule, we should only push local topK to the bottom join.

An example for simple left outer join:

Limit global + local
  Sort global
    Join Left_Outer
       x
       y

=>

Limit global + local
  Sort global
    Join Left_Outer
       Limit local
         Sort local
           x
       y

Compared with shuffle + join, local sort should be faster

Does this PR introduce any user-facing change?

no, only improve performance

val row = 10 * 1000
val df1 = spark.range(0, row, 1, 2).selectExpr("id % 3 as c1", "id % 7 as cx")
val df2 = spark.range(0, row, 1, 2).selectExpr("id % 3 as c2")

df1.join(df2.hint("shuffle_hash"), col("c1") === col("c2"), "left_outer")
              .orderBy(col("cx"))
              .limit(100)
              .noop()
OpenJDK 64-Bit Server VM 1.8.0_332-b09 on Linux 5.13.0-1031-azure
Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz
Push local topK through outer join:               Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
--------------------------------------------------------------------------------------------------------------------------------
Push local topK through outer join for bhj - off           3694           3717          22          0.0      369361.5       1.0X
Push local topK through outer join for shj - off           3761           3767          11          0.0      376057.4       1.0X
Push local topK through outer join for smj - off           3469           3478          10          0.0      346945.6       1.1X
Push local topK through outer join for bhj - on              97            109          10          0.1        9745.2      37.9X
Push local topK through outer join for shj - on             162            176          17          0.1       16153.5      22.9X
Push local topK through outer join for smj - on             160            163           3          0.1       15974.6      23.1X

How was this patch tested?

add test

ulysses-you avatar Jul 08 '22 08:07 ulysses-you

cc @wangyum @viirya @cloud-fan how about this idea ?

ulysses-you avatar Jul 11 '22 10:07 ulysses-you

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

github-actions[bot] avatar Nov 18 '22 00:11 github-actions[bot]