Duplicate data in MOR table Hudi
We see duplicate data in our hudi dataset
-
Have you gone through our FAQs?
-
Join the mailing list to engage in conversations and get faster support at [email protected].
-
If you have triaged this as a bug, then file an issue directly.
Describe the problem you faced
A clear and concise description of the problem.
We run spark streaming application where we read kinesis stream process the data and stores in Hudi. We started seeing duplicates in our hudi dataset Below are our Hudi configs
DataSourceWriteOptions.TABLE_TYPE.key() -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "guid",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "collectionName",
DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "operationTime",
HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY.key() -> CompactionTriggerStrategy.TIME_ELAPSED.name,
HoodieCompactionConfig.INLINE_COMPACT_TIME_DELTA_SECONDS.key() -> String.valueOf(60 * 60),
HoodieCompactionConfig.CLEANER_POLICY.key() -> HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name(),
HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key() -> "624",
HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key() -> "625",
HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key() -> "648",
HoodieCompactionConfig.ASYNC_CLEAN.key() -> "false",
HoodieCompactionConfig.INLINE_COMPACT.key() -> "true",
HoodieMetricsConfig.TURN_METRICS_ON.key() -> "true",
HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key() -> MetricsReporterType.DATADOG.name(),
HoodieMetricsDatadogConfig.API_SITE_VALUE.key() -> "US",
HoodieMetricsDatadogConfig.METRIC_PREFIX_VALUE.key() -> "tacticalnovusingest.hudi",
HoodieMetadataConfig.ENABLE.key() -> "false",
HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.key() -> "false",
We only use upsert in our code , we never use insert
dataframe.write.format("org.apache.hudi")
.option("hoodie.insert.shuffle.parallelism", hudiParallelism)
.option("hoodie.upsert.shuffle.parallelism", hudiParallelism)
.option(HoodieWriteConfig.TABLE_NAME, hudiTableName)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(HoodieMetricsDatadogConfig.METRIC_TAG_VALUES.key(), s"env:$environment")
.options(hudiOptions).mode(SaveMode.Append)
.save(s3Location)
Please help us on this. Below are two situation where we see duplicates.
- duplicates with same hudi commit time
- duplicates with different commit time.
I have attached the json files for reference We tried to delete duplicate data using hudi commit seq num and our primary key, it is deleting both keys Duplicate with hudi DELETE dataframe.write.format("org.apache.hudi") .option("hoodie.insert.shuffle.parallelism", hudiParallelism) .option("hoodie.upsert.shuffle.parallelism", hudiParallelism) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
We tried to deduplicate with the hudi cli command,
repair deduplicate --duplicatedPartitionPath s3:/// --repairedOutputPath s3:/// --sparkMemory 2G --sparkMaster yarn
We are getting java.io.FileNotFoundException:
Please help
Expected behavior
A clear and concise description of what you expected to happen.
Environment Description
-
Hudi version : 0.11.1
-
Spark version : 3.2
-
Hive version :NA
-
Hadoop version : NA
-
Storage (HDFS/S3/GCS..) :S3
-
Running on Docker? (yes/no) :no
@nsivabalan can you please check this.
we have found some issues w/ spark cache invalidation when tasks are retried. we made some fixes on that end. https://github.com/apache/hudi/pull/4753 https://github.com/apache/hudi/pull/4856
Can you try w/ 0.12.0 or 0.13.0 and let us know if you see any more issues.
btw, repair deduplicate does not work for MOR table :(
so you have to write some code in the application layer to fix the duplicates unfortunately. sorry about that.
probably, here is what you can do
- query the table to find all duplicates.
- store the dupes to some staging location (may be df.write.parquet).
- issue deletes for these records to against hudi.
- for the same batch, de-duplicate to pick one version of the record and ingest to hudi using upsert.
If anything crashes inbetween, you always have the staging data. this is just to ensure after deleting from hudi table, if your process crashes, you may have lost track of the records. bcoz, snapshot query is not going to return it.
@koochiswathiTR Is the record key field guid some randomly generation id like uuid. There have been known issues with non-deteministic id generation scheme, especially when there are loss of executors.
Also, previously our spark streaming writes were not idempotent. So, there could be duplicates. We have fixed that in https://github.com/apache/hudi/pull/6098 Can you please try to upgrade to the latest release?
hey @koochiswathiTR : can you provide us any more details. we are taking another serious look into all data consistency issues. So, interested in getting to the bottom of the issue.
hey @koochiswathiTR : are you using global index by any chance. i.e. GLOBAL_BLOOM or GLOBAL_SIMPLE as the index type. we know there is a bug which could lead to duplicates when either of these index is used. and we already have a fix for it. https://github.com/apache/hudi/pull/8490
@nsivabalan I have the similar problem with Hudi 0.13.1. Index type is just BLOOM (not GLOBAL_BLOOM). Table type is MOR, operation type is UPSERT.
I created a parallel job which writes the same data for this month, but this fresh parallel do not have any duplicates. I think the old dataset is somehow corrupted, and there is a bug in Hudi which produces duplicates.
The old dataset is huge, so reingestion will be expensive and will take too much time. Migration to higher versions of Hudi is not possible ATM.