[SUPPORT] There are duplicate values in HUDI MOR table for different partition and not updating values in same partition for GLOBAL_BLOOM
Desciption There are duplicate values in HUDI MOR table for different partition and not updating values in same partition for GLOBAL_BLOOM.
Steps To Reproduce this behavior STEP 1 I have created a hudi table with follwing input data and properties. hudi_options = { 'hoodie.table.name': 'my_hudi_table', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'creation_date', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.write.table.type': 'MERGE_ON_READ' , 'hoodie.bloom.index.update.partition.path': 'true', "hoodie.index.type": "GLOBAL_BLOOM", "hoodie.datasource.write.keygenerator.class" : "org.apache.hudi.keygen.ComplexKeyGenerator", "hoodie.datasource.write.hive_style_partitioning": 'true', 'hoodie.datasource.hive_sync.assume_date_partitioning':'false', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database':'pfg_silver_fantasy', 'hoodie.datasource.hive_sync.table': 'hudi_test1', 'hoodie.datasource.hive_sync.partition_fields': 'creation_date', 'hoodie.datasource.hive_sync.support_timestamp': 'true', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor' }
# Create a DataFrame
inputDF = spark.createDataFrame(
[
("100", "2015-01-01", "1", 'a'),
("101", "2015-01-01", "1", 'a'),
],
["id", "creation_date", "last_update_time","new_col"]
)
# Write a DataFrame as a Hudi dataset
inputDF.write \
.format('org.apache.hudi') \
.options(**hudi_options) \
.mode('overwrite') \
.save('s3://<loc>/hudi_test1')
Output after step1 in _rt table: "_hoodie_commit_time" "_hoodie_commit_seqno" "_hoodie_record_key" "_hoodie_partition_path" "_hoodie_file_name" id last_update_time new_col creation_date 20220615024525 20220615024525_0_1 id:101 creation_date=2015-01-01 cb8df2b4-1268-48b3-8665-1e4ac1196734-0_0-58-25650_20220615024525.parquet 101 1 a 2015-01-01 20220615024525 20220615024525_0_2 id:100 creation_date=2015-01-01 cb8df2b4-1268-48b3-8665-1e4ac1196734-0_0-58-25650_20220615024525.parquet 100 1 a 2015-01-01
Step3: Upserting
inputDF = spark.createDataFrame(
[
("100", "2015-01-02", "2","b"),
("101", "2015-01-01", "2","b")
],
["id", "creation_date", "last_update_time","new_col"]
)
inputDF.write
.format('org.apache.hudi')
.options(**hudi_options)
.mode('append')
.option('hoodie.datasource.write.operation', 'upsert')
.save('s3://
Output after step3 in _rt table : "_hoodie_commit_time" "_hoodie_commit_seqno" "_hoodie_record_key" "_hoodie_partition_path" "_hoodie_file_name" id last_update_time new_col creation_date 20220615024525 20220615024525_0_1 id:101 creation_date=2015-01-01 cb8df2b4-1268-48b3-8665-1e4ac1196734-0_0-58-25650_20220615024525.parquet 101 1 a 2015-01-01 20220615024525 20220615024525_0_2 id:100 creation_date=2015-01-01 cb8df2b4-1268-48b3-8665-1e4ac1196734-0_0-58-25650_20220615024525.parquet 100 1 a 2015-01-01 20220615024626 20220615024626_1_3 id:100 creation_date=2015-01-02 6c1dbd2d-5db5-4c65-b180-f1d9561cf637-0_1-92-39217_20220615024626.parquet 100 2 b 2015-01-02
Expected behavior
It should not have any duplicate values and also update values in same partition.
Environment Description
-
Hudi version : hudi-spark-bundle_2.11-0.7.0-amzn-1.jar
-
Spark version : version 2.4.7-amzn-1
-
Storage (HDFS/S3/GCS..) : S3
-
Running on Docker? (yes/no) : no
@uvplearn I am unable to reproduce the issue. I am using latest master of Hudi (with 0.7.0 i'm facing some other issue in my setup). Meanwhile, I would suggest to try out with the latest Hudi (0.11.0) or if you're on AWS you could use 0.10.1-amzn-0 version of Hudi.
Also, if you query using spark.read.format("hudi"), do you see the right records? I am trying to figure out if this is an issue with hive _rt tables only.
yeah, I remember we had a fix around this in 0.10.1 or some release. Can you try out 0.11 or later versions and let us know what you see.
If the issue is resolved, can you please close the github issue
Closing this due to no activity. Feel free to open a new issue if you are having any more issues. we can def look into deeply.
Reopening. Fix is in progress - https://github.com/apache/hudi/pull/8490
Not reproducible w/ https://github.com/apache/hudi/pull/8490
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import scala.collection.mutable
val tableName = "hudi5869"
val spark = SparkSession.builder.enableHiveSupport.getOrCreate
val basePath = "/tmp/hudi5869/"
import spark.implicits._
// spark-shell
val hudiOptions = mutable.Map(
"hoodie.table.name" -> tableName,
"hoodie.datasource.write.table.type" -> "MERGE_ON_READ",
"hoodie.datasource.write.operation" -> "upsert",
"hoodie.datasource.write.recordkey.field" -> "id",
"hoodie.datasource.write.precombine.field" -> "last_update_time",
"hoodie.datasource.write.partitionpath.field" -> "creation_date",
"hoodie.index.type" -> "GLOBAL_BLOOM",
"hoodie.bloom.index.update.partition.path" -> "true",
"hoodie.compact.inline" -> "true",
"hoodie.datasource.write.keygenerator.class" -> "org.apache.hudi.keygen.ComplexKeyGenerator"
)
val df = Seq(
("100", "2015-01-01", "1","a"),
("101", "2015-01-01", "1","a")
).toDF("id", "creation_date", "last_update_time", "new_col")
df.write.format("hudi").
options(hudiOptions).
mode(Append).
save(basePath)
spark.read.format("hudi").load(basePath).show(false)
val df1 = Seq(
("100", "2015-01-02", "2","b"),
("101", "2015-01-01", "2","b")
).toDF("id", "creation_date", "last_update_time", "new_col")
df1.write.format("hudi").
options(hudiOptions).
mode(Append).
save(basePath)
spark.read.format("hudi").load(basePath).show(false)
Output:
scala> spark.read.format("hudi").load(basePath).show(false)
+-------------------+---------------------+------------------+----------------------+---------------------------------------------------------------------------+---+-------------+----------------+-------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name |id |creation_date|last_update_time|new_col|
+-------------------+---------------------+------------------+----------------------+---------------------------------------------------------------------------+---+-------------+----------------+-------+
|20230427220827516 |20230427220827516_0_1|id:101 |2015-01-01 |f183954a-9d23-4192-a1ed-8efc25e4e77f-0 |101|2015-01-01 |2 |b |
|20230427220827516 |20230427220827516_1_0|id:100 |2015-01-02 |b395a368-8e9a-46c8-8660-c78cfd53d06f-0_1-275-1770_20230427220827516.parquet|100|2015-01-02 |2 |b |
+-------------------+---------------------+------------------+----------------------+---------------------------------------------------------------------------+---+-------------+----------------+-------+
From what I can glean from the description, looks like the query is a RO query and update partition path is set to true. So, w/ 2nd commit, the delete record went to a log file in partition creation_date=2015-01-01, while the new insert for same record key (100), went to new partition creation_date=2015-01-02. hence RO query will return dups. If you trigger compaction, this should be resolved. this is a known limitation for RO query.
Also, if you prefer not to update the partition path, for eg, for record with record key 100, if you wish to retain the record in partition 2015-01-01 itself, you should set hoodie.bloom.index.update.partition.path = false.