hudi icon indicating copy to clipboard operation
hudi copied to clipboard

Duplicate data in MOR table Hudi

Open koochiswathiTR opened this issue 2 years ago • 10 comments

We see duplicate data in our hudi dataset

  • Have you gone through our FAQs?

  • Join the mailing list to engage in conversations and get faster support at [email protected].

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced

A clear and concise description of the problem.

We run spark streaming application where we read kinesis stream process the data and stores in Hudi. We started seeing duplicates in our hudi dataset Below are our Hudi configs

DataSourceWriteOptions.TABLE_TYPE.key() -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "guid",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "collectionName",
DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "operationTime",
HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY.key() -> CompactionTriggerStrategy.TIME_ELAPSED.name,
HoodieCompactionConfig.INLINE_COMPACT_TIME_DELTA_SECONDS.key() -> String.valueOf(60 * 60),
HoodieCompactionConfig.CLEANER_POLICY.key() -> HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name(),
HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key() -> "624", 
HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key() -> "625",  
HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key() -> "648", 
HoodieCompactionConfig.ASYNC_CLEAN.key() -> "false", 
HoodieCompactionConfig.INLINE_COMPACT.key() -> "true",
HoodieMetricsConfig.TURN_METRICS_ON.key() -> "true",
HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key() -> MetricsReporterType.DATADOG.name(),
HoodieMetricsDatadogConfig.API_SITE_VALUE.key() -> "US",
HoodieMetricsDatadogConfig.METRIC_PREFIX_VALUE.key() -> "tacticalnovusingest.hudi",
HoodieMetadataConfig.ENABLE.key() -> "false",
HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.key() -> "false",

We only use upsert in our code , we never use insert

    dataframe.write.format("org.apache.hudi")
      .option("hoodie.insert.shuffle.parallelism", hudiParallelism)
      .option("hoodie.upsert.shuffle.parallelism", hudiParallelism)
      .option(HoodieWriteConfig.TABLE_NAME, hudiTableName)
      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
      .option(HoodieMetricsDatadogConfig.METRIC_TAG_VALUES.key(), s"env:$environment")
      .options(hudiOptions).mode(SaveMode.Append)
      .save(s3Location)

Please help us on this. Below are two situation where we see duplicates.

  1. duplicates with same hudi commit time
  2. duplicates with different commit time.

I have attached the json files for reference We tried to delete duplicate data using hudi commit seq num and our primary key, it is deleting both keys Duplicate with hudi DELETE dataframe.write.format("org.apache.hudi") .option("hoodie.insert.shuffle.parallelism", hudiParallelism) .option("hoodie.upsert.shuffle.parallelism", hudiParallelism) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)

We tried to deduplicate with the hudi cli command,

repair deduplicate --duplicatedPartitionPath s3:/// --repairedOutputPath s3:/// --sparkMemory 2G --sparkMaster yarn

We are getting java.io.FileNotFoundException:
Please help

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

  • Hudi version : 0.11.1

  • Spark version : 3.2

  • Hive version :NA

  • Hadoop version : NA

  • Storage (HDFS/S3/GCS..) :S3

  • Running on Docker? (yes/no) :no

koochiswathiTR avatar Mar 14 '23 09:03 koochiswathiTR

@nsivabalan can you please check this.

koochiswathiTR avatar Mar 15 '23 13:03 koochiswathiTR

we have found some issues w/ spark cache invalidation when tasks are retried. we made some fixes on that end. https://github.com/apache/hudi/pull/4753 https://github.com/apache/hudi/pull/4856

Can you try w/ 0.12.0 or 0.13.0 and let us know if you see any more issues.

nsivabalan avatar Mar 17 '23 05:03 nsivabalan

btw, repair deduplicate does not work for MOR table :( so you have to write some code in the application layer to fix the duplicates unfortunately. sorry about that.

nsivabalan avatar Mar 17 '23 17:03 nsivabalan

probably, here is what you can do

  1. query the table to find all duplicates.
  2. store the dupes to some staging location (may be df.write.parquet).
  3. issue deletes for these records to against hudi.
  4. for the same batch, de-duplicate to pick one version of the record and ingest to hudi using upsert.

If anything crashes inbetween, you always have the staging data. this is just to ensure after deleting from hudi table, if your process crashes, you may have lost track of the records. bcoz, snapshot query is not going to return it.

nsivabalan avatar Mar 17 '23 17:03 nsivabalan

@koochiswathiTR Is the record key field guid some randomly generation id like uuid. There have been known issues with non-deteministic id generation scheme, especially when there are loss of executors.

codope avatar Apr 25 '23 16:04 codope

Also, previously our spark streaming writes were not idempotent. So, there could be duplicates. We have fixed that in https://github.com/apache/hudi/pull/6098 Can you please try to upgrade to the latest release?

codope avatar Apr 25 '23 17:04 codope

hey @koochiswathiTR : can you provide us any more details. we are taking another serious look into all data consistency issues. So, interested in getting to the bottom of the issue.

nsivabalan avatar Apr 28 '23 22:04 nsivabalan

hey @koochiswathiTR : are you using global index by any chance. i.e. GLOBAL_BLOOM or GLOBAL_SIMPLE as the index type. we know there is a bug which could lead to duplicates when either of these index is used. and we already have a fix for it. https://github.com/apache/hudi/pull/8490

nsivabalan avatar Apr 28 '23 22:04 nsivabalan

@nsivabalan I have the similar problem with Hudi 0.13.1. Index type is just BLOOM (not GLOBAL_BLOOM). Table type is MOR, operation type is UPSERT.

I created a parallel job which writes the same data for this month, but this fresh parallel do not have any duplicates. I think the old dataset is somehow corrupted, and there is a bug in Hudi which produces duplicates.

The old dataset is huge, so reingestion will be expensive and will take too much time. Migration to higher versions of Hudi is not possible ATM.

eshu avatar Jun 18 '25 22:06 eshu