[SUPPORT] multi writer insert overwrite different partition conflict resolve stragegy throw StringIndexOutOfBoundsException
Describe the problem you faced
I have 5 writer job insert overwrite different partition(without partition or bucket overlap). Finally, one writer job failed with execption:
24/06/25 12:28:28 Driver ERROR SparkSQLDriver: Failed in [ insert overwrite table defaultdb.test_table partition(p_date, p_product) xxx ] java.lang.StringIndexOutOfBoundsException: begin 0, end 8, length 0 at java.base/java.lang.String.checkBoundsBeginEnd(String.java:3319) at java.base/java.lang.String.substring(String.java:1874) at org.apache.hudi.index.bucket.BucketIdentifier.bucketIdStrFromFileId(BucketIdentifier.java:86) at org.apache.hudi.index.bucket.BucketIdentifier.bucketIdFromFileId(BucketIdentifier.java:82) at org.apache.hudi.client.transaction.BucketIndexConcurrentFileWritesConflictResolutionStrategy.lambda$hasConflict$1(BucketIndexConcurrentFileWritesConflictResolutionStrategy.java:51) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1621) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) at org.apache.hudi.client.transaction.BucketIndexConcurrentFileWritesConflictResolutionStrategy.hasConflict(BucketIndexConcurrentFileWritesConflictResolutionStrategy.java:52) at org.apache.hudi.client.utils.TransactionUtils.lambda$resolveWriteConflictIfAny$0(TransactionUtils.java:88) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) at java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:735) at java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734) at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658) at org.apache.hudi.client.utils.TransactionUtils.resolveWriteConflictIfAny(TransactionUtils.java:85) at org.apache.hudi.client.BaseHoodieClient.resolveWriteConflict(BaseHoodieClient.java:202) at org.apache.hudi.client.BaseHoodieWriteClient.preCommit(BaseHoodieWriteClient.java:346) at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:232) at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:104) at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:1137) at org.apache.hudi.HoodieSparkSqlWriter$.writeInternal(HoodieSparkSqlWriter.scala:439) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:133) at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:131) at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:71) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:69) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:80) at org.apache.spark.sql.execution.adaptive.QueryStage.executeCollect(QueryStage.scala:283) at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194) at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194) at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363) at org.apache.spark.sql.Dataset.
(Dataset.scala:194) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79) at org.apache.spark.sql.SparkSession$$anonfun$8.apply(SparkSession.scala:652) at org.apache.spark.sql.SparkSession$$anonfun$8.apply(SparkSession.scala:651) at org.apache.spark.KwaiDriverMetricsCollector$.countTime(KwaiDriverMetricsCollector.scala:143) at org.apache.spark.KwaiDriverMetricsCollector$.countSqlExecuteTime(KwaiDriverMetricsCollector.scala:104) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:651) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:694) at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:117) at org.apache.spark.sql.hive.SparkSQLCLIDriver2.processCmd(SparkSQLCLIDriver2.scala:501) at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:403) at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:336) at org.apache.hadoop.hive.cli.CliDriver.processReader(CliDriver.java:474) at org.apache.spark.sql.hive.SparkSQLCLIDriver2.processFile(SparkSQLCLIDriver2.scala:450) at org.apache.spark.sql.hive.SparkSQLCLIDriver2$.main(SparkSQLCLIDriver2.scala:293) at org.apache.spark.sql.hive.SparkSQLCLIDriver2.main(SparkSQLCLIDriver2.scala) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:882)`
Environment Description
- Hudi version : 0.14.0
- table type: mor
- table index:simple bucket index
- occ confict resolve stragegy: occ, job parameters are: set hoodie.write.concurrency.mode=OPTIMISTIC_CONCURRENCY_CONTROL; set hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider; set hoodie.write.lock.client.num_retries=1500; set hoodie.write.lock.filesystem.expire=10;
I try catch Exception at BucketIndexConcurrentFileWritesConflictResolutionStrategy.java hasConflict fucntion by myself and print log of conflict operation. log content is blow.
24/06/25 12:28:28 Driver ERROR BucketIndexConcurrentFileWritesConflictResolutionStrategy: hasConflict got exception, between first operation = {actionType=**replacecommit**, instantTime=20240625120033401, actionState=**INFLIGHT**'}, second operation = {actionType=**replacecommit**, instantTime=20240625120040733, actionState=**INFLIGHT**'},java.lang.StringIndexOutOfBoundsException: begin 0, end 8, length 0
I found confict operation is two replacecommit inflight instant. also I checked the metadata content of these two inflight replacecommit instant. FileId is empty string ,this caused substring throw StringIndexOutOfBoundsException.
Is this an known bug? I think it's best not throw StringIndexOutOfBoundsException. Could we catch the exception and rethrow an easy understant execption?
Did you try 0.14.1, it might already be fixed in it.
Did you try 0.14.1, it might already be fixed in it.
I think 0.14.1 and master neither fix this problem.
- upsert and insertoverwrite inflight instant content is generated by BaseSparkCommitActionExecutor.buildprofie, build profile can only get insert/update partition info, could not get fileid here.
- TransactionUtils.resolveWriteConflictIfAny getCandidateInstants is also get pending clustering by REPLACE_COMMIT_ACTION. If I am wrong, please correct me.
Also in my company, we have demand for multi job insertoverwrite or multi job udpate 、insertoverwrite different partition concurrently.
TransactionUtils.resolveWriteConflictIfAny getCandidateInstants function want to get pending clustering. Now get pending clustering instants by REPLACE_COMMIT_ACTION, actually here get both clustering and insertoverwrite instants.
So, I think we can continue filter by get clusteringplan is present , it will get only clustering instant。
can we rosolve in this way?
TransactionUtils.resolveWriteConflictIfAny getCandidateInstants function want to get pending clustering. Now get pending clustering instants by REPLACE_COMMIT_ACTION, actually here get both clustering and insertoverwrite instants. So, I think we can continue filter by get clusteringplan is present , it will get only clustering instant。
Yeah, for your use case there is no need to resolve conflicts for insert_overwrite because it occurs in different partitions, but from Hudi side, we still need to take it into consideration because there is no prompt to tell Hudi that the ingestion and insert_overwrite happens in different partitions(only after a resolution of conflict we can find it out).
It looks like you have fond the culprit of the StringIndexOutOfBoundsException, would you mind to fire a fix for it?
TransactionUtils.resolveWriteConflictIfAny getCandidateInstants function want to get pending clustering. Now get pending clustering instants by REPLACE_COMMIT_ACTION, actually here get both clustering and insertoverwrite instants. So, I think we can continue filter by get clusteringplan is present , it will get only clustering instant。
Yeah, for your use case there is no need to resolve conflicts for
insert_overwritebecause it occurs in different partitions, but from Hudi side, we still need to take it into consideration because there is no prompt to tell Hudi that the ingestion andinsert_overwritehappens in different partitions(only after a resolution of conflict we can find it out).It looks like you have fond the culprit of the
StringIndexOutOfBoundsException, would you mind to fire a fix for it?
Yes, I am very glad to fix it.
Allow me to further clarify (forgive my repetition ), i also think it is need to resolve conflicts for insert_overwrite. when insert_overwrite instant is complete, it will be checked.
Can I fix it like following picture?do you agree?
Based on the variable name compactionAndClusteringPendingTimeline, it looks like your fix is reasonable.
Based on the variable name
compactionAndClusteringPendingTimeline, it looks like your fix is reasonable.
I have fix it in https://github.com/apache/hudi/pull/11691 , can you help me reivew? Thanks a lot!
Closing as the fix is merged. Thanks everybody.