Observing data duplication with Single Writer
Tips before filing an issue
-
Have you gone through our FAQs? Yes
-
Join the mailing list to engage in conversations and get faster support at [email protected]. Yes
-
If you have triaged this as a bug, then file an issue directly.
Describe the problem you faced We are using Hudi in AWS EMR to store data. We are using single writer(Default) to upsert data to Hudi, If duplicate EMR runs with same name and same configuration pointing to same Hudi Table, we see duplicate data in Hudi which we don't want.
A clear and concise description of the problem.
To Reproduce
Steps to reproduce the behavior:
- Bring up EMR With Spark streaming job , Store the data in Hudi with Single writer
- Bring up another duplicate EMR with same name 3.Both the EMRs are processing same data now and trying to store data in same Hudi table
- When we query Hudi we see duplicate data.
Expected behavior We dont want to see duplicate data in Hudi, as its single writer.
A clear and concise description of what you expected to happen.
Environment Description
-
Hudi version : 0.11.1
-
Spark version : 3.2.1
-
Hive version :
-
Hadoop version :
-
Storage (HDFS/S3/GCS..) : S3
-
Running on Docker? (yes/no) : no
Additional context
Add any other context about the problem here.
Stacktrace
Add the stacktrace of the error. We are not seeing any errors, but data is duplicated.
unless you configure lock providers, hudi can't guarantee this. I would suggest to add locking for both writers.
oh, I thought, both jobs are running concurrently? is it not. can you throw some light on exact steps. is it. step1: start job1 in EMR cluster1. which consumes from source X and writes to hudi table Y step2: stop job1. its essentially a batch job. step3: start job2 in EMR cluster2 which again consumes from source X and writes to hudi table Y. now if you query hudi, you see duplicate data?
is my understanding right ?
also, can you share your write configs used.
@koochiswathiTR : can you check my above response and update please.
@nsivabalan
Hi, Both the emrs are running concurrently. This is the first time we are setting up hudi with Multi writer, Below are my hudi config properties, I have set up HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key()->"optimistic_concurrency_control", HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key()->"LAZY", HoodieLockConfig.LOCK_ACQUIRE_NUM_RETRIES.key()->"3000", HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.key()->"1", HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key()->"org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider", DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key()->"hoodi_lock", DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key()->"lock", DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION.key()->"us-east-1", HoodieAWSConfig.AWS_ACCESS_KEY.key()->"XXX", HoodieAWSConfig.AWS_SECRET_KEY.key()->"XXX", HoodieAWSConfig.AWS_SESSION_TOKEN.key()->"XXXX", DynamoDbBasedLockConfig.DYNAMODB_ENDPOINT_URL.key()-> RegionUtils.getRegion("us-east-1").getServiceEndpoint(AmazonDynamoDB.ENDPOINT_PREFIX) //"dynamodb.us-east-1.amazonaws.com"
I have created dynamodb table which will be used for locking, and partition key as lock Below are my questions,
Is it mandatory to set AWS_ACCESS_KEY,AWS_SECRET_KEY ? - I dont want to set these keys Should we need to create Dynamodb table or Hudi will create it automatically? we create AWS resources with Cloudformation Dynamodb table is created with partition key lock(String) I am getting exception Unable to acquire lock, lock object null Can you please guide me here
We have solved above issue by automatic creation of Dynamodb table and table is created with partition key (key) We could able to setup without aws keys.
Now it is failing with java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes.
Details :
With multi writer setup, we are running two EMR clusters which upserts into same hudi table after acquiring the lock with one DyanmoDB table.
But application is failing with
java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes.
Below are the Hudi Config params
DataSourceWriteOptions.TABLE_TYPE.key() -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "Id",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "coll",
DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "operationTime",
//TODO: Sync with glue. Dong has working.
DataSourceWriteOptions.HIVE_SYNC_ENABLED.key() -> "true",
DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key() -> "coll",
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key() -> classOf[MultiPartKeysValueExtractor].getName,
DataSourceWriteOptions.HIVE_SYNC_MODE.key() -> "hms",
DataSourceWriteOptions.HIVE_USE_JDBC.key() -> "false",
HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY.key() -> CompactionTriggerStrategy.TIME_ELAPSED.name,
HoodieCompactionConfig.INLINE_COMPACT_TIME_DELTA_SECONDS.key() -> String.valueOf(60 * 60),
HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key() -> "2184",
HoodieCompactionConfig.ASYNC_CLEAN.key() -> "false",
HoodieCompactionConfig.INLINE_COMPACT.key() -> "true",
HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key() -> "2185",
HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key() -> "3494",
HoodieMetricsConfig.TURN_METRICS_ON.key() -> "true",
HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key() -> MetricsReporterType.DATADOG.name(),
HoodieMetricsDatadogConfig.API_SITE_VALUE.key() -> "US",
HoodieMetricsDatadogConfig.METRIC_PREFIX_VALUE.key() -> "tacticalnovusingest.qa.hudi",
HoodieMetricsDatadogConfig.API_KEY_SUPPLIER.key() -> "XXXX",
HoodieMetadataConfig.ENABLE.key() -> "true",
HoodieMetadataConfig.CLEANER_COMMITS_RETAINED.key() -> "2184",
HoodieMetadataConfig.MIN_COMMITS_TO_KEEP.key() -> "2185",
HoodieMetadataConfig.MAX_COMMITS_TO_KEEP.key() -> "3494",
HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.key() -> "false",
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key()->"optimistic_concurrency_control",
HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key()->"LAZY",
HoodieLockConfig.LOCK_ACQUIRE_NUM_RETRIES.key()->"3000",
HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.key()->"5",
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key()->"org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider",
DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key()->"XXXX",
DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key()->"lock",
DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION.key()->"us-east-1",
DynamoDbBasedLockConfig.DYNAMODB_ENDPOINT_URL.key()-> RegionUtils.getRegion("us-east-1").getServiceEndpoint(AmazonDynamoDB.ENDPOINT_PREFIX) //"dynamodb.us-east-1.amazonaws.com"
Please help on this. Thanks
@nsivabalan @xushiyan
@nsivabalan Pls respond on this.
@zhedoubushishi @nsivabalan @xushiyan
It will be great if we hear from you soon,
Complete Stacktrace :
"java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes",
"at org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy.resolveConflict(SimpleConcurrentFileWritesConflictResolutionStrategy.java:102)
\nat org.apache.hudi.client.utils.TransactionUtils.lambda$resolveWriteConflictIfAny$0(TransactionUtils.java:85)
\nat java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)\nat java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742)
\nat java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742)\nat java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647)
\nat org.apache.hudi.client.utils.TransactionUtils.resolveWriteConflictIfAny(TransactionUtils.java:79)
\nat org.apache.hudi.client.SparkRDDWriteClient.preCommit(SparkRDDWriteClient.java:475)
\nat org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:233)
\nat org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:122)
\nat org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:678)
\nat org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:313)
nat org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:165)
\nat org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
nat org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
\nat org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
\nat org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
\nat org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:115)
\nat org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
\nat org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
\nat org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
nat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
\nat org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
\nat org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
\nat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
\nat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
\nat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
\nat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
\nat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
\nat org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:112)
\nat org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:108)
\nat org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519)
\nat org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:83)
\nat org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519)
\nat org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
\nat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
\nat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
nat org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
\nat org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
\nat org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495)
\nat org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:108)
nat org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:95)\n
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:93)\n
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:136)\n
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)\n
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)\n
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)\n
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)\n
here is what is happening. if there are two concurrent writers writing to non overlapping data files, hudi will succeed both writes. but if both are modifying the same data file, hudi will succeed one and will fail another write. and hence you are seeing conflict resolution failed.
you can read about multi writer guarantees here https://hudi.apache.org/docs/concurrency_control#multi-writer-guarantees
with Locking mechanism we expect only one writer writes at a time by acquiring lock. and other writer waits for the lock. There should not be any concurrent writers. We don't expect concurrent errors in case of multi writer setup, please let us know otherwise it is not working as expected,
I have gone through this link https://hudi.apache.org/docs/concurrency_control#multi-writer-guarantees
hoodie.write.lock.client.wait_time_ms hoodie.write.lock.client.num_retries
with these params set writer should wait for the lock Pls guide us how to overcome this issue. @zhedoubushishi @nsivabalan @xushiyan
@zhedoubushishi @nsivabalan @xushiyan
Any update on this.
@zhedoubushishi @nsivabalan @xushiyan
any update on this?
nope. thats not how it works as of today. 2nd writer don't wait for 1st writer to complete. Thats not OCC at all in my understanding. what you are suggesting is, take a global lock for each write, complete the write and release the lock and then start w/ next write. In my opinion, this is just a sequential batch of writes.
In general sense, multi-writer means, two concurrent writers can write to hudi concurrently. if they don't overlap wrt data they update, both should succeed. if not, one of them will fail.
let me know if you need more clarification.
I have put up a patch to auto retry with spark data source writes incase of conflicts https://github.com/apache/hudi/pull/6854 Hope that helps your case.