hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] multi writer insert overwrite different partition conflict resolve stragegy throw StringIndexOutOfBoundsException

Open dongtingting opened this issue 1 year ago • 1 comments

Describe the problem you faced

I have 5 writer job insert overwrite different partition(without partition or bucket overlap). Finally, one writer job failed with execption:

24/06/25 12:28:28 Driver ERROR SparkSQLDriver: Failed in [ insert overwrite table defaultdb.test_table partition(p_date, p_product) xxx ] java.lang.StringIndexOutOfBoundsException: begin 0, end 8, length 0 at java.base/java.lang.String.checkBoundsBeginEnd(String.java:3319) at java.base/java.lang.String.substring(String.java:1874) at org.apache.hudi.index.bucket.BucketIdentifier.bucketIdStrFromFileId(BucketIdentifier.java:86) at org.apache.hudi.index.bucket.BucketIdentifier.bucketIdFromFileId(BucketIdentifier.java:82) at org.apache.hudi.client.transaction.BucketIndexConcurrentFileWritesConflictResolutionStrategy.lambda$hasConflict$1(BucketIndexConcurrentFileWritesConflictResolutionStrategy.java:51) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1621) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) at org.apache.hudi.client.transaction.BucketIndexConcurrentFileWritesConflictResolutionStrategy.hasConflict(BucketIndexConcurrentFileWritesConflictResolutionStrategy.java:52) at org.apache.hudi.client.utils.TransactionUtils.lambda$resolveWriteConflictIfAny$0(TransactionUtils.java:88) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) at java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:735) at java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734) at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658) at org.apache.hudi.client.utils.TransactionUtils.resolveWriteConflictIfAny(TransactionUtils.java:85) at org.apache.hudi.client.BaseHoodieClient.resolveWriteConflict(BaseHoodieClient.java:202) at org.apache.hudi.client.BaseHoodieWriteClient.preCommit(BaseHoodieWriteClient.java:346) at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:232) at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:104) at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:1137) at org.apache.hudi.HoodieSparkSqlWriter$.writeInternal(HoodieSparkSqlWriter.scala:439) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:133) at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:131) at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:71) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:69) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:80) at org.apache.spark.sql.execution.adaptive.QueryStage.executeCollect(QueryStage.scala:283) at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194) at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194) at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363) at org.apache.spark.sql.Dataset.(Dataset.scala:194) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79) at org.apache.spark.sql.SparkSession$$anonfun$8.apply(SparkSession.scala:652) at org.apache.spark.sql.SparkSession$$anonfun$8.apply(SparkSession.scala:651) at org.apache.spark.KwaiDriverMetricsCollector$.countTime(KwaiDriverMetricsCollector.scala:143) at org.apache.spark.KwaiDriverMetricsCollector$.countSqlExecuteTime(KwaiDriverMetricsCollector.scala:104) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:651) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:694) at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:117) at org.apache.spark.sql.hive.SparkSQLCLIDriver2.processCmd(SparkSQLCLIDriver2.scala:501) at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:403) at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:336) at org.apache.hadoop.hive.cli.CliDriver.processReader(CliDriver.java:474) at org.apache.spark.sql.hive.SparkSQLCLIDriver2.processFile(SparkSQLCLIDriver2.scala:450) at org.apache.spark.sql.hive.SparkSQLCLIDriver2$.main(SparkSQLCLIDriver2.scala:293) at org.apache.spark.sql.hive.SparkSQLCLIDriver2.main(SparkSQLCLIDriver2.scala) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:882)`

Environment Description

  • Hudi version : 0.14.0
  • table type: mor
  • table index:simple bucket index
  • occ confict resolve stragegy: occ, job parameters are: set hoodie.write.concurrency.mode=OPTIMISTIC_CONCURRENCY_CONTROL; set hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider; set hoodie.write.lock.client.num_retries=1500; set hoodie.write.lock.filesystem.expire=10;

I try catch Exception at BucketIndexConcurrentFileWritesConflictResolutionStrategy.java hasConflict fucntion by myself and print log of conflict operation. log content is blow.

24/06/25 12:28:28 Driver ERROR BucketIndexConcurrentFileWritesConflictResolutionStrategy: hasConflict got exception, between first operation = {actionType=**replacecommit**, instantTime=20240625120033401, actionState=**INFLIGHT**'}, second operation = {actionType=**replacecommit**, instantTime=20240625120040733, actionState=**INFLIGHT**'},java.lang.StringIndexOutOfBoundsException: begin 0, end 8, length 0

I found confict operation is two replacecommit inflight instant. also I checked the metadata content of these two inflight replacecommit instant. FileId is empty string ,this caused substring throw StringIndexOutOfBoundsException. replacement-inflight

Is this an known bug? I think it's best not throw StringIndexOutOfBoundsException. Could we catch the exception and rethrow an easy understant execption?

dongtingting avatar Jun 26 '24 03:06 dongtingting

Did you try 0.14.1, it might already be fixed in it.

danny0405 avatar Jun 27 '24 08:06 danny0405

Did you try 0.14.1, it might already be fixed in it.

I think 0.14.1 and master neither fix this problem.

  • upsert and insertoverwrite inflight instant content is generated by BaseSparkCommitActionExecutor.buildprofie, build profile can only get insert/update partition info, could not get fileid here.
  • TransactionUtils.resolveWriteConflictIfAny getCandidateInstants is also get pending clustering by REPLACE_COMMIT_ACTION. If I am wrong, please correct me.

Also in my company, we have demand for multi job insertoverwrite or multi job udpate 、insertoverwrite different partition concurrently.

TransactionUtils.resolveWriteConflictIfAny getCandidateInstants function want to get pending clustering. Now get pending clustering instants by REPLACE_COMMIT_ACTION, actually here get both clustering and insertoverwrite instants.
So, I think we can continue filter by get clusteringplan is present , it will get only clustering instant。

can we rosolve in this way?

dongtingting avatar Jul 22 '24 04:07 dongtingting

TransactionUtils.resolveWriteConflictIfAny getCandidateInstants function want to get pending clustering. Now get pending clustering instants by REPLACE_COMMIT_ACTION, actually here get both clustering and insertoverwrite instants. So, I think we can continue filter by get clusteringplan is present , it will get only clustering instant。

Yeah, for your use case there is no need to resolve conflicts for insert_overwrite because it occurs in different partitions, but from Hudi side, we still need to take it into consideration because there is no prompt to tell Hudi that the ingestion and insert_overwrite happens in different partitions(only after a resolution of conflict we can find it out).

It looks like you have fond the culprit of the StringIndexOutOfBoundsException, would you mind to fire a fix for it?

danny0405 avatar Jul 22 '24 08:07 danny0405

TransactionUtils.resolveWriteConflictIfAny getCandidateInstants function want to get pending clustering. Now get pending clustering instants by REPLACE_COMMIT_ACTION, actually here get both clustering and insertoverwrite instants. So, I think we can continue filter by get clusteringplan is present , it will get only clustering instant。

Yeah, for your use case there is no need to resolve conflicts for insert_overwrite because it occurs in different partitions, but from Hudi side, we still need to take it into consideration because there is no prompt to tell Hudi that the ingestion and insert_overwrite happens in different partitions(only after a resolution of conflict we can find it out).

It looks like you have fond the culprit of the StringIndexOutOfBoundsException, would you mind to fire a fix for it?

Yes, I am very glad to fix it.

Allow me to further clarify (forgive my repetition ), i also think it is need to resolve conflicts for insert_overwrite. when insert_overwrite instant is complete, it will be checked.

Can I fix it like following picture?do you agree? image

dongtingting avatar Jul 22 '24 13:07 dongtingting

Based on the variable name compactionAndClusteringPendingTimeline, it looks like your fix is reasonable.

danny0405 avatar Jul 23 '24 01:07 danny0405

Based on the variable name compactionAndClusteringPendingTimeline, it looks like your fix is reasonable.

I have fix it in https://github.com/apache/hudi/pull/11691 , can you help me reivew? Thanks a lot!

dongtingting avatar Jul 26 '24 13:07 dongtingting

Closing as the fix is merged. Thanks everybody.

ad1happy2go avatar Oct 17 '24 15:10 ad1happy2go