MERGE INTO requires sorting in already sorted iceberg tables
Apache Iceberg version
1.4.2
Query engine
Spark
Please describe the bug 🐞
Hi, I am trying to MERGE a small iceberg table into a large iceberg table, but the performance is bad. I have 2 iceberg tables split into buckets and sorted locally by these columns to use Storage Partition Join to prevent shuffling. Additionally, I use merge-on-read to make merge into operations faster, but it still takes a lot of time to sort these tables when merging. There is a sort step before the sort merge join. Is it possible to prevent this sorting step somehow, as I assume we do not need additional sorting?
== Physical Plan ==
WriteDelta (14)
+- * Sort (13)
+- Exchange (12)
+- MergeRows (11)
+- * Project (10)
+- * SortMergeJoin RightOuter (9)
:- * Sort (5)
: +- * Filter (4)
: +- * Project (3)
: +- * ColumnarToRow (2)
: +- BatchScan spark_catalog.default.customer3 (1)
+- * Sort (8)
+- * ColumnarToRow (7)
+- BatchScan spark_catalog.default.customer4 (6)
Example code to reproduce:
spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.2 --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf spark.sql.catalog.spark_catalog.type=hadoop --conf spark.sql.sources.v2.bucketing.enabled=true --conf spark.sql.sources.v2.bucketing.push.part.values.enabled=true --conf spark.sql.requireAllClusterKeysForCoPartition=false --conf spark.sql.iceberg.planning.preserve-data-grouping=true --conf spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled=true --conf spark.sql.sources.v2.bucketing.pushPartValues.enabled=true --conf spark.dynamicAllocation.enabled=false --conf spark.shuffle.useOldFetchProtocol=true --conf spark.sql.shuffle.partitions=10 --conf spark.sql.adaptive.enabled=false --conf spark.sql.join.preferSortMergeJoin=false --conf spark.sql.bucketing.coalesceBucketsInJoin.enabled=true --conf spark.sql.catalog.spark_catalog.warehouse=spark-warehouse/iceberg
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{SparkSession, DataFrame}
// Create sample data for customers
val customerData = Seq(
(1, "John Doe", "USA",1,1),
(2, "Jane Smith", "Canada",1,1),
(3, "Alice Johnson", "UK",1,1),
(4, "Bob Brown", "USA",1,1),
(5, "Charlie Davis", "Canada",1,1)
)
// Create DataFrame for customers
val customerDF = spark.createDataFrame(customerData).toDF("customer_id", "name", "country", "order_id","amount")
// Partition both DataFrames by customer_id
val partitionedCustomerDF = customerDF.repartition(col("customer_id"))
partitionedCustomerDF.writeTo("default.customer3").tableProperty("write.distribution-mode","range").partitionedBy(bucket(10,col("customer_id")),bucket(10,col("order_id"))).using("iceberg").createOrReplace()
partitionedCustomerDF.writeTo("default.customer4").tableProperty("write.distribution-mode","range").partitionedBy(bucket(10,col("customer_id")),bucket(10,col("order_id"))).using("iceberg").createOrReplace()
spark.sql("TRUNCATE table default.customer4")
spark.sql("TRUNCATE table default.customer3")
spark.sql("ALTER TABLE default.customer4 WRITE LOCALLY ORDERED BY customer_id, order_id").show
spark.sql("ALTER TABLE default.customer3 WRITE LOCALLY ORDERED BY customer_id, order_id").show
spark.sql("ALTER TABLE default.customer3 SET TBLPROPERTIES ('write.delete.mode'='merge-on-read','write.update.mode'='merge-on-read','write.merge.mode'='merge-on-read')");
spark.sql("ALTER TABLE default.customer4 SET TBLPROPERTIES ('write.delete.mode'='merge-on-read','write.update.mode'='merge-on-read','write.merge.mode'='merge-on-read')");
partitionedCustomerDF.writeTo("default.customer4").append()
partitionedCustomerDF.writeTo("default.customer3").append()
spark.sql("""
MERGE INTO default.customer3 AS target
USING default.customer4 AS source
ON target.customer_id = source.customer_id AND target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""").show()
Willingness to contribute
- [ ] I can contribute a fix for this bug independently
- [x] I would be willing to contribute a fix for this bug with guidance from the Iceberg community
- [ ] I cannot contribute a fix for this bug at this time
This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.
I have this same issue with Iceberg v1.6.1 and cannot get my MERGE INTO to use Storage Partition Join to prevent two massive sorting stages before the joins.
@chadwilcomb @korbel-jacek I had a same problem with join, but fixed it with some properties (table + sessions) and custom rule. It's workaround - not final solution.
Need patch source code of Spark to support hints in MERGE clause (like in oracle - merge /*+ append */ into t2) and probably need patch RewriteMergeIntoTable class in Iceberg.
Software stack - Spark 3.5.1, Iceberg 1.6.1, JDK 1.8.
- Need add Spark Session properties:
sparkConf.set("spark.sql.join.preferSortMergeJoin", "false")
sparkConf.set("spark.sql.iceberg.distribution-mode", "none")
- Create 2 table (source, target) with same structure and TBLPROPERTIES (SPJ work only for DataSourceV2):
CREATE TABLE ...
...
CLUSTERED BY (id) INTO 12 BUCKETS
TBLPROPERTIES (
"format-version" = "2",
"write.spark.fanout.enabled"="true",
"write.distribution-mode" = "none"
)
- Add custom rule (proof of concept). But need more logic in rule for not hint all relation inside
USING (SELECT * FROM tbl JOIN tbl2 ... JOIN tbl3 ...):
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, LogicalPlan, SHUFFLE_HASH}
import org.apache.spark.sql.catalyst.rules.Rule
object MergeIntoShuffleHashJoinSelection extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
val isMerge = plan.find(p => p.getClass == classOf[org.apache.spark.sql.catalyst.plans.logical.MergeRows]).isDefined
if (isMerge) {
plan.transformDown {
case j@ExtractEquiJoinKeys(_, _, _, _, _, _, _, hint) =>
var newHint = hint
if (!hint.leftHint.exists(_.strategy.isDefined)) {
newHint = newHint.copy(leftHint =
Some(hint.leftHint.getOrElse(HintInfo()).copy(strategy = Some(SHUFFLE_HASH))))
}
if (!hint.rightHint.exists(_.strategy.isDefined)) {
newHint = newHint.copy(rightHint =
Some(hint.rightHint.getOrElse(HintInfo()).copy(strategy = Some(SHUFFLE_HASH))))
}
if (newHint.ne(hint)) {
j.copy(hint = newHint)
} else {
j
}
}
} else {
plan
}
}
}
spark.experimental.extraOptimizations = Seq(MergeIntoShuffleHashJoinSelection)
- PROFIT - local test show 10-15% performance gain
Update 2025-06-16 - Spark already have patch in PR, but not merged in master - https://github.com/apache/spark/pull/50524
Hi @UT36104 — could you please explain why MERGE scans the target table twice and how to avoid it? My understanding is the planner creates two branches: one for WHEN MATCHED (UPDATE) and another for WHEN NOT MATCHED (INSERT), which results in two joins/scans of the target. In our case the table is very large, and scanning it twice leads to unacceptable processing time.
@milleniax your understanding is right. You can avoid two full scan only if you change query / change source table structure.