hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] There are duplicate values in HUDI MOR table for different partition and not updating values in same partition for GLOBAL_BLOOM

Open uvplearn opened this issue 3 years ago • 2 comments

Desciption There are duplicate values in HUDI MOR table for different partition and not updating values in same partition for GLOBAL_BLOOM.

Steps To Reproduce this behavior STEP 1 I have created a hudi table with follwing input data and properties. hudi_options = { 'hoodie.table.name': 'my_hudi_table', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'creation_date', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.write.table.type': 'MERGE_ON_READ' , 'hoodie.bloom.index.update.partition.path': 'true', "hoodie.index.type": "GLOBAL_BLOOM", "hoodie.datasource.write.keygenerator.class" : "org.apache.hudi.keygen.ComplexKeyGenerator", "hoodie.datasource.write.hive_style_partitioning": 'true', 'hoodie.datasource.hive_sync.assume_date_partitioning':'false', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database':'pfg_silver_fantasy', 'hoodie.datasource.hive_sync.table': 'hudi_test1', 'hoodie.datasource.hive_sync.partition_fields': 'creation_date', 'hoodie.datasource.hive_sync.support_timestamp': 'true', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor' }

          # Create a DataFrame
          inputDF = spark.createDataFrame(
            [
                ("100", "2015-01-01", "1", 'a'),
                ("101", "2015-01-01", "1", 'a'),
            ],
            ["id", "creation_date", "last_update_time","new_col"]
        )
                      
          # Write a DataFrame as a Hudi dataset
          inputDF.write \
          .format('org.apache.hudi') \
          .options(**hudi_options) \
          .mode('overwrite') \
          .save('s3://<loc>/hudi_test1')
          

Output after step1 in _rt table: "_hoodie_commit_time" "_hoodie_commit_seqno" "_hoodie_record_key" "_hoodie_partition_path" "_hoodie_file_name" id last_update_time new_col creation_date 20220615024525 20220615024525_0_1 id:101 creation_date=2015-01-01 cb8df2b4-1268-48b3-8665-1e4ac1196734-0_0-58-25650_20220615024525.parquet 101 1 a 2015-01-01 20220615024525 20220615024525_0_2 id:100 creation_date=2015-01-01 cb8df2b4-1268-48b3-8665-1e4ac1196734-0_0-58-25650_20220615024525.parquet 100 1 a 2015-01-01

Step3: Upserting inputDF = spark.createDataFrame( [ ("100", "2015-01-02", "2","b"), ("101", "2015-01-01", "2","b") ], ["id", "creation_date", "last_update_time","new_col"] ) inputDF.write
.format('org.apache.hudi')
.options(**hudi_options)
.mode('append')
.option('hoodie.datasource.write.operation', 'upsert')
.save('s3:///hudi_test2')

Output after step3 in _rt table : "_hoodie_commit_time" "_hoodie_commit_seqno" "_hoodie_record_key" "_hoodie_partition_path" "_hoodie_file_name" id last_update_time new_col creation_date 20220615024525 20220615024525_0_1 id:101 creation_date=2015-01-01 cb8df2b4-1268-48b3-8665-1e4ac1196734-0_0-58-25650_20220615024525.parquet 101 1 a 2015-01-01 20220615024525 20220615024525_0_2 id:100 creation_date=2015-01-01 cb8df2b4-1268-48b3-8665-1e4ac1196734-0_0-58-25650_20220615024525.parquet 100 1 a 2015-01-01 20220615024626 20220615024626_1_3 id:100 creation_date=2015-01-02 6c1dbd2d-5db5-4c65-b180-f1d9561cf637-0_1-92-39217_20220615024626.parquet 100 2 b 2015-01-02

Expected behavior

It should not have any duplicate values and also update values in same partition.

Environment Description

  • Hudi version : hudi-spark-bundle_2.11-0.7.0-amzn-1.jar

  • Spark version : version 2.4.7-amzn-1

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : no

uvplearn avatar Jun 15 '22 02:06 uvplearn

@uvplearn I am unable to reproduce the issue. I am using latest master of Hudi (with 0.7.0 i'm facing some other issue in my setup). Meanwhile, I would suggest to try out with the latest Hudi (0.11.0) or if you're on AWS you could use 0.10.1-amzn-0 version of Hudi.

codope avatar Jun 17 '22 15:06 codope

Also, if you query using spark.read.format("hudi"), do you see the right records? I am trying to figure out if this is an issue with hive _rt tables only.

codope avatar Jun 17 '22 15:06 codope

yeah, I remember we had a fix around this in 0.10.1 or some release. Can you try out 0.11 or later versions and let us know what you see.

nsivabalan avatar Aug 16 '22 07:08 nsivabalan

If the issue is resolved, can you please close the github issue

nsivabalan avatar Aug 16 '22 07:08 nsivabalan

Closing this due to no activity. Feel free to open a new issue if you are having any more issues. we can def look into deeply.

nsivabalan avatar Oct 25 '22 03:10 nsivabalan

Reopening. Fix is in progress - https://github.com/apache/hudi/pull/8490

codope avatar Apr 27 '23 06:04 codope

Not reproducible w/ https://github.com/apache/hudi/pull/8490


import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import scala.collection.mutable

val tableName = "hudi5869"
val spark = SparkSession.builder.enableHiveSupport.getOrCreate

val basePath = "/tmp/hudi5869/"

import spark.implicits._
// spark-shell


val hudiOptions = mutable.Map(
      "hoodie.table.name" -> tableName,
      "hoodie.datasource.write.table.type" -> "MERGE_ON_READ",
      "hoodie.datasource.write.operation" -> "upsert",
      "hoodie.datasource.write.recordkey.field" -> "id",
      "hoodie.datasource.write.precombine.field" -> "last_update_time",
      "hoodie.datasource.write.partitionpath.field" -> "creation_date",
      "hoodie.index.type" -> "GLOBAL_BLOOM",
      "hoodie.bloom.index.update.partition.path" -> "true",
      "hoodie.compact.inline" -> "true",
      "hoodie.datasource.write.keygenerator.class" -> "org.apache.hudi.keygen.ComplexKeyGenerator"
    )

val df = Seq(
      ("100", "2015-01-01", "1","a"),
      ("101", "2015-01-01", "1","a")
    ).toDF("id", "creation_date", "last_update_time", "new_col")


 df.write.format("hudi").
        options(hudiOptions).
        mode(Append).
        save(basePath)
spark.read.format("hudi").load(basePath).show(false)


val df1 = Seq(
      ("100", "2015-01-02", "2","b"),
      ("101", "2015-01-01", "2","b")
    ).toDF("id", "creation_date", "last_update_time", "new_col")


 df1.write.format("hudi").
        options(hudiOptions).
        mode(Append).
        save(basePath)
spark.read.format("hudi").load(basePath).show(false)


Output:

scala> spark.read.format("hudi").load(basePath).show(false)
+-------------------+---------------------+------------------+----------------------+---------------------------------------------------------------------------+---+-------------+----------------+-------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name                                                          |id |creation_date|last_update_time|new_col|
+-------------------+---------------------+------------------+----------------------+---------------------------------------------------------------------------+---+-------------+----------------+-------+
|20230427220827516  |20230427220827516_0_1|id:101            |2015-01-01            |f183954a-9d23-4192-a1ed-8efc25e4e77f-0                                     |101|2015-01-01   |2               |b      |
|20230427220827516  |20230427220827516_1_0|id:100            |2015-01-02            |b395a368-8e9a-46c8-8660-c78cfd53d06f-0_1-275-1770_20230427220827516.parquet|100|2015-01-02   |2               |b      |
+-------------------+---------------------+------------------+----------------------+---------------------------------------------------------------------------+---+-------------+----------------+-------+

nsivabalan avatar Apr 28 '23 05:04 nsivabalan

From what I can glean from the description, looks like the query is a RO query and update partition path is set to true. So, w/ 2nd commit, the delete record went to a log file in partition creation_date=2015-01-01, while the new insert for same record key (100), went to new partition creation_date=2015-01-02. hence RO query will return dups. If you trigger compaction, this should be resolved. this is a known limitation for RO query.

Also, if you prefer not to update the partition path, for eg, for record with record key 100, if you wish to retain the record in partition 2015-01-01 itself, you should set hoodie.bloom.index.update.partition.path = false.

nsivabalan avatar Apr 28 '23 05:04 nsivabalan