iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

MERGE INTO requires sorting in already sorted iceberg tables

Open korbel-jacek opened this issue 1 year ago • 3 comments

Apache Iceberg version

1.4.2

Query engine

Spark

Please describe the bug 🐞

Hi, I am trying to MERGE a small iceberg table into a large iceberg table, but the performance is bad. I have 2 iceberg tables split into buckets and sorted locally by these columns to use Storage Partition Join to prevent shuffling. Additionally, I use merge-on-read to make merge into operations faster, but it still takes a lot of time to sort these tables when merging. There is a sort step before the sort merge join. Is it possible to prevent this sorting step somehow, as I assume we do not need additional sorting?

== Physical Plan ==
WriteDelta (14)
+- * Sort (13)
   +- Exchange (12)
      +- MergeRows (11)
         +- * Project (10)
            +- * SortMergeJoin RightOuter (9)
               :- * Sort (5)
               :  +- * Filter (4)
               :     +- * Project (3)
               :        +- * ColumnarToRow (2)
               :           +- BatchScan spark_catalog.default.customer3 (1)
               +- * Sort (8)
                  +- * ColumnarToRow (7)
                     +- BatchScan spark_catalog.default.customer4 (6)

image

Example code to reproduce: spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.2 --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf spark.sql.catalog.spark_catalog.type=hadoop --conf spark.sql.sources.v2.bucketing.enabled=true --conf spark.sql.sources.v2.bucketing.push.part.values.enabled=true --conf spark.sql.requireAllClusterKeysForCoPartition=false --conf spark.sql.iceberg.planning.preserve-data-grouping=true --conf spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled=true --conf spark.sql.sources.v2.bucketing.pushPartValues.enabled=true --conf spark.dynamicAllocation.enabled=false --conf spark.shuffle.useOldFetchProtocol=true --conf spark.sql.shuffle.partitions=10 --conf spark.sql.adaptive.enabled=false --conf spark.sql.join.preferSortMergeJoin=false --conf spark.sql.bucketing.coalesceBucketsInJoin.enabled=true --conf spark.sql.catalog.spark_catalog.warehouse=spark-warehouse/iceberg

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{SparkSession, DataFrame}
// Create sample data for customers
val customerData = Seq(
  (1, "John Doe", "USA",1,1),
  (2, "Jane Smith", "Canada",1,1),
  (3, "Alice Johnson", "UK",1,1),
  (4, "Bob Brown", "USA",1,1),
  (5, "Charlie Davis", "Canada",1,1)
)

// Create DataFrame for customers
val customerDF = spark.createDataFrame(customerData).toDF("customer_id", "name", "country", "order_id","amount")

// Partition both DataFrames by customer_id
val partitionedCustomerDF = customerDF.repartition(col("customer_id"))


partitionedCustomerDF.writeTo("default.customer3").tableProperty("write.distribution-mode","range").partitionedBy(bucket(10,col("customer_id")),bucket(10,col("order_id"))).using("iceberg").createOrReplace()
partitionedCustomerDF.writeTo("default.customer4").tableProperty("write.distribution-mode","range").partitionedBy(bucket(10,col("customer_id")),bucket(10,col("order_id"))).using("iceberg").createOrReplace()
spark.sql("TRUNCATE table default.customer4")
spark.sql("TRUNCATE table default.customer3")

spark.sql("ALTER TABLE default.customer4 WRITE LOCALLY ORDERED BY customer_id, order_id").show
spark.sql("ALTER TABLE default.customer3 WRITE LOCALLY ORDERED BY customer_id, order_id").show

spark.sql("ALTER TABLE default.customer3 SET TBLPROPERTIES ('write.delete.mode'='merge-on-read','write.update.mode'='merge-on-read','write.merge.mode'='merge-on-read')");
spark.sql("ALTER TABLE default.customer4 SET TBLPROPERTIES ('write.delete.mode'='merge-on-read','write.update.mode'='merge-on-read','write.merge.mode'='merge-on-read')");

partitionedCustomerDF.writeTo("default.customer4").append()
partitionedCustomerDF.writeTo("default.customer3").append()

spark.sql("""
MERGE INTO default.customer3 AS target
USING default.customer4 AS source
ON target.customer_id = source.customer_id AND target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""").show()

Willingness to contribute

  • [ ] I can contribute a fix for this bug independently
  • [x] I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • [ ] I cannot contribute a fix for this bug at this time

korbel-jacek avatar Aug 07 '24 06:08 korbel-jacek

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] avatar Feb 04 '25 00:02 github-actions[bot]

I have this same issue with Iceberg v1.6.1 and cannot get my MERGE INTO to use Storage Partition Join to prevent two massive sorting stages before the joins.

chadwilcomb avatar Feb 11 '25 18:02 chadwilcomb

@chadwilcomb @korbel-jacek I had a same problem with join, but fixed it with some properties (table + sessions) and custom rule. It's workaround - not final solution. Need patch source code of Spark to support hints in MERGE clause (like in oracle - merge /*+ append */ into t2) and probably need patch RewriteMergeIntoTable class in Iceberg.

Software stack - Spark 3.5.1, Iceberg 1.6.1, JDK 1.8.

  1. Need add Spark Session properties:
sparkConf.set("spark.sql.join.preferSortMergeJoin", "false")
sparkConf.set("spark.sql.iceberg.distribution-mode", "none")
  1. Create 2 table (source, target) with same structure and TBLPROPERTIES (SPJ work only for DataSourceV2):
CREATE TABLE ...
...
CLUSTERED BY (id) INTO 12 BUCKETS
 TBLPROPERTIES (
 "format-version" = "2",
  "write.spark.fanout.enabled"="true",
  "write.distribution-mode" = "none"
 )
  1. Add custom rule (proof of concept). But need more logic in rule for not hint all relation inside USING (SELECT * FROM tbl JOIN tbl2 ... JOIN tbl3 ...):
    import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
    import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, LogicalPlan, SHUFFLE_HASH}
    import org.apache.spark.sql.catalyst.rules.Rule

    object MergeIntoShuffleHashJoinSelection extends Rule[LogicalPlan] {
      def apply(plan: LogicalPlan): LogicalPlan = {
        val isMerge = plan.find(p => p.getClass == classOf[org.apache.spark.sql.catalyst.plans.logical.MergeRows]).isDefined
        if (isMerge) {
          plan.transformDown {
            case j@ExtractEquiJoinKeys(_, _, _, _, _, _, _, hint) =>
              var newHint = hint
              if (!hint.leftHint.exists(_.strategy.isDefined)) {
                newHint = newHint.copy(leftHint =
                  Some(hint.leftHint.getOrElse(HintInfo()).copy(strategy = Some(SHUFFLE_HASH))))
              }
              if (!hint.rightHint.exists(_.strategy.isDefined)) {
                newHint = newHint.copy(rightHint =
                  Some(hint.rightHint.getOrElse(HintInfo()).copy(strategy = Some(SHUFFLE_HASH))))
              }
              if (newHint.ne(hint)) {
                j.copy(hint = newHint)
              } else {
                j
              }
          }
        } else {
          plan
        }
      }
    }
    spark.experimental.extraOptimizations = Seq(MergeIntoShuffleHashJoinSelection)
  1. PROFIT - local test show 10-15% performance gain Image

Update 2025-06-16 - Spark already have patch in PR, but not merged in master - https://github.com/apache/spark/pull/50524

UT36104 avatar Jun 14 '25 21:06 UT36104

Hi @UT36104 — could you please explain why MERGE scans the target table twice and how to avoid it? My understanding is the planner creates two branches: one for WHEN MATCHED (UPDATE) and another for WHEN NOT MATCHED (INSERT), which results in two joins/scans of the target. In our case the table is very large, and scanning it twice leads to unacceptable processing time.

milleniax avatar Oct 14 '25 16:10 milleniax

@milleniax your understanding is right. You can avoid two full scan only if you change query / change source table structure.

UT36104 avatar Oct 27 '25 10:10 UT36104