hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] Setting hoodie.datasource.insert.dup.policy to drop still upserts the record in 0.14

Open keerthiskating opened this issue 1 year ago • 7 comments

Describe the problem you faced

If my incoming dataset already has a record which already exists in the hudi table, hudi is still updating the commit time and treating it as update even after setting 'hoodie.datasource.insert.dup.policy': 'drop',

To Reproduce

Steps to reproduce the behavior:

recordkey = "id,name"
precombine = "uuid"
method = "upsert"
table_type = "COPY_ON_WRITE"

hudi_options = {
    'hoodie.table.name': table_name,
    'hoodie.datasource.write.recordkey.field': recordkey,
    'hoodie.datasource.insert.dup.policy': 'drop',
    'hoodie.datasource.write.table.name': table_name,
    'hoodie.datasource.write.operation': method,
    'hoodie.datasource.write.precombine.field': precombine,
    'hoodie.table.cdc.enabled':'true',
    'hoodie.table.cdc.supplemental.logging.mode': 'data_before_after',
}

spark_df = spark.createDataFrame(
    data=[
    (1, "John",  1, False),
    (2, "Doe",  2, False),
], 
schema=["id", "name", "val", "_hoodie_is_deleted"])

from pyspark.sql.functions import sha2, concat_ws

record_key_col_array = recordkey.split(",")
record_key_col_array
spark_df = spark_df.withColumn("uuid", sha2(concat_ws("||", *record_key_col_array), 256))

spark_df.write.format("hudi"). \
    options(**hudi_options). \
    mode("overwrite"). \
    save(path)

df = spark. \
      read. \
      format("hudi"). \
      load(path)

df.select(['_hoodie_commit_time', 'id', 'name', 'val']).show()

+-------------------+---+----+---+
|_hoodie_commit_time| id|name|val|
+-------------------+---+----+---+
|  20240211155820562|  1|John|  1|
|  20240211155820562|  2| Doe|  2|
+-------------------+---+----+---+


spark_df = spark.createDataFrame(
    data=[
    (1, "John",  1, False)
], 
    schema=["id", "name", "val", "_hoodie_is_deleted"])
spark_df = spark_df.withColumn("uuid", sha2(concat_ws("||", *record_key_col_array), 256))

spark_df.write.format("hudi"). \
    options(**hudi_options). \
    mode("append"). \
    save(path)

# read latest data

df = spark. \
      read. \
      format("hudi"). \
      load(path)

df.select(['_hoodie_commit_time', 'id', 'name', 'val']).show()

+-------------------+---+----+---+
|_hoodie_commit_time| id|name|val|
+-------------------+---+----+---+
|  20240211155914976|  1|John|  1| ---> Commit time has updated even though record did not change.
|  20240211155820562|  2| Doe|  2|
+-------------------+---+----+---+

# query cdc data
cdc_read_options = {
    'hoodie.datasource.query.incremental.format': 'cdc',
    'hoodie.datasource.query.type': 'incremental',
    'hoodie.datasource.read.begin.instanttime': latest_commmit_ts
    # 'hoodie.datasource.read.end.instanttime': 20240208210952160,
}
df=spark.read.format("hudi"). \
    options(**cdc_read_options). \
    load(path)

df.show(2,False)

+---+-----------------+--------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------+
|op |ts_ms            |before                                                                                                                                      |after                                                                                                                                       |
+---+-----------------+--------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------+
|u  |20240211155914976|{"id": 1, "name": "John", "val": 1, "_hoodie_is_deleted": false, "uuid": "46ca69f145f50f414b7a8cd59656f4935a5162798f093edc708a1ba21c0e9c26"}|{"id": 1, "name": "John", "val": 1, "_hoodie_is_deleted": false, "uuid": "46ca69f145f50f414b7a8cd59656f4935a5162798f093edc708a1ba21c0e9c26"}|
+---+-----------------+--------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------+

Expected behavior

Since no updates were made to any records, hudi should not report any updates when performing cdc query

Environment Description

  • Hudi version : 0.14

  • Spark version : 3.3.0-amzn-1

  • Storage (HDFS/S3/GCS..) : s3

  • Running on Docker? (yes/no) : no

keerthiskating avatar Feb 11 '24 16:02 keerthiskating

@keerthiskating This setting is only applicable when operation type is insert.

image

ad1happy2go avatar Feb 11 '24 16:02 ad1happy2go

@keerthiskating This setting is only applicable when operation type is insert.

image

Any idea how do I achieve this when doing upsert operation? I want hudi to ignore records that already exist in hudi table and not update those record's commit time.

keerthiskating avatar Feb 11 '24 23:02 keerthiskating

@keerthiskating You may need to write your own Custom payload for the same. Also, We can contribute this feature to hudi code too.

One of the example here - https://gist.github.com/bhasudha/7ea07f2bb9abc5c6eb86dbd914eec4c6

ad1happy2go avatar Feb 12 '24 14:02 ad1happy2go

@ad1happy2go I do not have the bandwidth to contribute. @codope Any idea this will be supported / Do you think this is a valid use case?

keerthiskating avatar Feb 15 '24 19:02 keerthiskating

Despite the initial report being with upsert, I can confirm that the new hoodie.datasource.insert.dup.policy option does not drop dupes as expected with the insert write operation. The deprecated fields work as desired. I have a small example hudi_insert_no_dupes.py demonstrating the behavior. In the interim, I will be using the deprecated fields as a workaround.

jmnatzaganian avatar Feb 20 '24 18:02 jmnatzaganian

@keerthiskating - If you do not intend to update records, but instead merely want to drop them, then you should simply use insert instead of upsert. upsert is designed to update records. If; however, the intention is to upsert when certain fields have changed, but drop otherwise, then as @ad1happy2go mentioned you'll need to roll your own logic. Functionally, the data will be valid with upsert even if you see the changed field. So you can continue as-is with the understanding that you'll have some extra records. Note that with CDC you can compare the original and new and drop before ingesting into the next system.

jmnatzaganian avatar Feb 20 '24 18:02 jmnatzaganian

Thanks @jmnatzaganian . We were made aware of that recently and we are working on document update. For datasource writer we still need to use the old config and this new config only works for sql.

ad1happy2go avatar Feb 21 '24 05:02 ad1happy2go