hudi icon indicating copy to clipboard operation
hudi copied to clipboard

Inconsistency in Hudi Table Configuration between Initial Insert and Subsequent Merges

Open prashant462 opened this issue 2 years ago • 10 comments

Issue Summary

When using dbt Spark with Hudi to create a Hudi format table, there is an inconsistency in the Hudi table configuration between the initial insert and subsequent merge operations. The properties provided in the options of the dbt model are correctly fetched and applied during the first run. However, during the second run, when executing the merge operation, Hudi fetches a subset of the properties from the Hudi catalog table, leading to the addition of default properties and changes in configuration.

Steps to Reproduce

  • Execute the dbt model with Hudi options for the initial insert.

    Sample model

       {{
    config(
    materialized = 'incremental',
    file_format= 'hudi',
    pre_hook="SET spark.sql.legacy.allowNonEmptyLocationInCTAS = true",
    location_root="file:///Users/B0279627/Downloads/Hudi",
    unique_key="id",
    incremental_strategy="merge",
    options={
    'preCombineField': 'id2',
    'hoodie.index.type':"GLOBAL_SIMPLE",
    'hoodie.simple.index.update.partition.path':'true',
    'hoodie.keep.min.commits':'145',
    'hoodie.keep.max.commits':'288',
    'hoodie.cleaner.policy':'KEEP_LATEST_BY_HOURS',
    'hoodie.cleaner.hours.retained':'72',
    'hoodie.cleaner.fileversions.retained':'144',
    'hoodie.cleaner.commits.retained':'144',
    'hoodie.upsert.shuffle.parallelism':'200',
    'hoodie.insert.shuffle.parallelism':'200',
    'hoodie.bulkinsert.shuffle.parallelism':'200',
    'hoodie.delete.shuffle.parallelism':'200',
    'hoodie.parquet.compression.codec':'zstd',
    'hoodie.datasource.hive_sync.support_timestamp':'true',
    'hoodie.datasource.write.reconcile.schema':'true',
    'hoodie.enable.data.skipping':'true',
    'hoodie.datasource.write.payload.class':'org.apache.hudi.common.model.DefaultHoodieRecordPayload',
    }
    )
    }}
    
  • Observe that all specified properties are correctly applied during the first run.

  • For observation you can check with sample property like hoodie.index.type=GLOBAL_SIMPLE

  • Execute the dbt model with Hudi options for a subsequent merge operation.

  • Observe changes in Hudi table properties, with defaults being applied for certain configurations like hoodie.index.type changed to SIMPLE (Target table created seems like following hoodie.index.type= SIMPLE)

Expected Behavior

Hudi should consistently set all specified properties in every run, irrespective of whether it is the initial insert or a subsequent merge operation. The properties passed in the options of the dbt model should be retained and applied consistently across all operations.

Environment Description

  • Hudi version : 0.12.1

  • Spark version : 3.3.1

  • Hive version : 3.1.3

  • Hadoop version : 3.1.1

  • DBT version: 1.7.1

  • Storage (HDFS/S3/GCS..) : Checked with s3 , hdfs and local file system.

  • Running on Docker? (yes/no) : no

Additional context

In the second run MergeIntohoodieTableCommand.scala executes InsertIntoHoodieTableCommand.run() in this case hudi fetch the props from hudicatalog table where it fetches tableConfigs and catalog properties. But they are not all that complete properties which I passed in the first run using dbt options. Due to which hudi add some other default properties in the hoodie props which are not fetched in the hudicatalog props . Seems due to this many properties are changing. Below i have attached some images of properties fetched in subsequent merge operations

MicrosoftTeams-image (21) Screenshot 2024-02-05 at 10 00 20 PM

prashant462 avatar Feb 05 '24 16:02 prashant462

@codope @nsivabalan can you guys help out here, we are kind of stuck to integrate dbt with hudi in our production use case.

pravin1406 avatar Feb 06 '24 08:02 pravin1406

@pravin1406 Just to ask one question , you are using GLOBAL_SIMPLE but you don't have any partition column defined. Can you post your table properties.

What all configurations you are not seeing that are missing in MERGE INTO ?

ad1happy2go avatar Feb 06 '24 16:02 ad1happy2go

I just noticed that it is using Non Partitioned key Generator only in your case. (in debug diagram). So that may be the reason its using SIMPLE instead of GLOBAL_SIMPLE as the later doesn't make sense.

ad1happy2go avatar Feb 06 '24 16:02 ad1happy2go

Hello @ad1happy2go , I am printing the hoodie configs in hoodie code before inserting records . I am attaching the first run and second run conf we got .

DBT model executed

{{ config( materialized = 'incremental', file_format= 'hudi', pre_hook="SET spark.sql.legacy.allowNonEmptyLocationInCTAS = true", location_root="file:///Users/B0279627/Downloads/Hudi", unique_key="id", partition_by="name", incremental_strategy="merge", options={ 'preCombineField': 'id2', 'hoodie.index.type':"GLOBAL_SIMPLE", 'hoodie.datasource.write.partitionpath.field': 'name', 'hoodie.datasource.hive_sync.partition_fields': 'name', 'hoodie.datasource.hive_sync.table': 'hudi_test_two', 'hoodie.datasource.hive_sync.database':'qultyzn1_prepd', 'hoodie.simple.index.update.partition.path':'true', 'hoodie.keep.min.commits':'145', 'hoodie.keep.max.commits':'288', 'hoodie.cleaner.policy':'KEEP_LATEST_BY_HOURS', 'hoodie.cleaner.hours.retained':'72', 'hoodie.cleaner.fileversions.retained':'144', 'hoodie.cleaner.commits.retained':'144', 'hoodie.upsert.shuffle.parallelism':'200', 'hoodie.insert.shuffle.parallelism':'200', 'hoodie.bulkinsert.shuffle.parallelism':'200', 'hoodie.delete.shuffle.parallelism':'200', 'hoodie.parquet.compression.codec':'zstd', 'hoodie.datasource.hive_sync.support_timestamp':'true', 'hoodie.datasource.write.reconcile.schema':'true', 'hoodie.enable.data.skipping':'true', 'hoodie.datasource.write.payload.class':'org.apache.hudi.common.model.DefaultHoodieRecordPayload', } ) }}

-- first run select 1 as id, 2 as id2,"test2" as name

-- second run select 1 as id, 2 as id2,"test" as name

Screenshot 2024-02-06 at 10 49 26 PM

second_run_conf.txt first_run_conf.txt

prashant462 avatar Feb 06 '24 17:02 prashant462

https://github.com/apache/hudi/issues/9342 @ad1happy2go We were facing this issue as well. So basically we know which datasource options we want to use, but we want to use them with spark sql support given by hudi. In the second run , one of the property that was also changing was "hoodie.datasource.write.payload.class" . As seen in the issue I mentioned, it has been fixed in the 0.13.1 version release for InsertInto. But for MergeInto command, it will still override the PAYLOAD_CLASS_NAME to ExpressionPayload, as that is part of the overriding options in buildMergeIntoConfig method in MergeIntoHoodieTableCommand .scala class

Our original requirement is we want to UPSERT on COW/MOR table while using Hudi DefaultHoodieRecordPayload. On first run we do CreateTable -> InsertInto On Second run we do MergeInto. Here the match condition look somewhat like this.

 when matched then update set * 
 when not matched then insert *

It would be great if you can articulate it better here for our understanding. Should we move to 0.13 or higher version will that solve the issue. Should we use InsertInto with some additional insert into behaviour properties?

pravin1406 avatar Feb 06 '24 21:02 pravin1406

@prashant462 I tried exact same model and it is working as expected with 0.14.1 version.

After first run (select 2 as id, 2 as id2,"test" as name) - image

After second run (select 2 as id, 2 as id2,"test2" as name) image

Can you please try with Hudi version 0.14.X

ad1happy2go avatar Feb 07 '24 05:02 ad1happy2go

yes, w/ pre 0.14.0, hudi expects to pass in all write configs w/ every write. from 0.14.0, atleast for table properties, hudi tries to reuse from the properties already serialized as table props. this is not applicable to write properties btw. those are not serialized anywhere.

nsivabalan avatar Feb 08 '24 18:02 nsivabalan

@nsivabalan @ad1happy2go I tried with hudi 0.14.1 hudi configs seems to be working now ,thanks . But I am facing one other issue with below property 'hoodie.simple.index.update.partition.path':'true',

I am running a dbt model with below config.

{{ config( materialized = 'incremental', file_format= 'hudi', pre_hook="SET spark.sql.legacy.allowNonEmptyLocationInCTAS = true", location_root="file:///Users/B0279627/Downloads/Hudi", unique_key="id", partition_by="name", incremental_strategy="merge", options={ 'hoodie.datasource.write.precombine.field': 'id2', 'hoodie.index.type':"GLOBAL_SIMPLE", 'hoodie.datasource.write.partitionpath.field': 'name', 'hoodie.datasource.hive_sync.partition_fields': 'name', 'hoodie.datasource.hive_sync.table': 'hudi_test_five', 'hoodie.datasource.hive_sync.database':'qultyzn1_prepd', 'hoodie.simple.index.update.partition.path':'true', 'hoodie.keep.min.commits':'145', 'hoodie.keep.max.commits':'288', 'hoodie.cleaner.policy':'KEEP_LATEST_BY_HOURS', 'hoodie.cleaner.hours.retained':'72', 'hoodie.cleaner.fileversions.retained':'144', 'hoodie.cleaner.commits.retained':'144', 'hoodie.upsert.shuffle.parallelism':'200', 'hoodie.insert.shuffle.parallelism':'200', 'hoodie.bulkinsert.shuffle.parallelism':'200', 'hoodie.delete.shuffle.parallelism':'200', 'hoodie.parquet.compression.codec':'zstd', 'hoodie.datasource.hive_sync.support_timestamp':'true', 'hoodie.datasource.write.reconcile.schema':'true', 'hoodie.enable.data.skipping':'true', 'hoodie.spark.sql.insert.into.operation': 'upsert', 'hoodie.datasource.write.payload.class':'org.apache.hudi.common.model.DefaultHoodieRecordPayload', } ) }}

1st run -- select 1 as id, 4 as id2, "test5" as name

2nd run -- select 1 as id, 2 as id2, "test4" as name

But parition path is not updating for the record key.

I am attaching the table result. Screenshot 2024-02-09 at 10 43 11 AM

prashant462 avatar Feb 09 '24 05:02 prashant462

@nsivabalan @ad1happy2go I tried with hudi 0.14.1 hudi configs seems to be working now ,thanks . But I am facing one other issue with below property 'hoodie.simple.index.update.partition.path':'true',

I am running a dbt model with below config.

{{ config( materialized = 'incremental', file_format= 'hudi', pre_hook="SET spark.sql.legacy.allowNonEmptyLocationInCTAS = true", location_root="file:///Users/B0279627/Downloads/Hudi", unique_key="id", partition_by="name", incremental_strategy="merge", options={ 'hoodie.datasource.write.precombine.field': 'id2', 'hoodie.index.type':"GLOBAL_SIMPLE", 'hoodie.datasource.write.partitionpath.field': 'name', 'hoodie.datasource.hive_sync.partition_fields': 'name', 'hoodie.datasource.hive_sync.table': 'hudi_test_five', 'hoodie.datasource.hive_sync.database':'qultyzn1_prepd', 'hoodie.simple.index.update.partition.path':'true', 'hoodie.keep.min.commits':'145', 'hoodie.keep.max.commits':'288', 'hoodie.cleaner.policy':'KEEP_LATEST_BY_HOURS', 'hoodie.cleaner.hours.retained':'72', 'hoodie.cleaner.fileversions.retained':'144', 'hoodie.cleaner.commits.retained':'144', 'hoodie.upsert.shuffle.parallelism':'200', 'hoodie.insert.shuffle.parallelism':'200', 'hoodie.bulkinsert.shuffle.parallelism':'200', 'hoodie.delete.shuffle.parallelism':'200', 'hoodie.parquet.compression.codec':'zstd', 'hoodie.datasource.hive_sync.support_timestamp':'true', 'hoodie.datasource.write.reconcile.schema':'true', 'hoodie.enable.data.skipping':'true', 'hoodie.spark.sql.insert.into.operation': 'upsert', 'hoodie.datasource.write.payload.class':'org.apache.hudi.common.model.DefaultHoodieRecordPayload', } ) }}

1st run -- select 1 as id, 4 as id2, "test5" as name

2nd run -- select 1 as id, 2 as id2, "test4" as name

But parition path is not updating for the record key.

I am attaching the table result. Screenshot 2024-02-09 at 10 43 11 AM

@ad1happy2go @nsivabalan It seems like it is taking preCombineField into consideration , is it valid case ?. 'hoodie.simple.index.update.partition.path':'true', honours preCombineField ?

because I changed 2nd run to -- select 1 as id, 4(or any greater value) as id2, "test4" as name it updated the partition.

prashant462 avatar Feb 12 '24 12:02 prashant462

@prashant462 Yes, this is expected as precombine(ordering) field is for that cause only. Any upsert will happen only if precombine is more than existing data value.

ad1happy2go avatar Feb 13 '24 10:02 ad1happy2go

@prashant462 Feel free to close this issue in case you are all good. Or let us know how can we help on this more. Thanks a lot.

ad1happy2go avatar Feb 27 '24 15:02 ad1happy2go