hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] Hudi Delete Not working with EMR, AWS Glue & S3

Open navbalaraman opened this issue 3 years ago • 5 comments

Describe the problem

I'm using a Spark job running on EMR to insert data using hudi (0.9.0). The inserts are working as expected and it stores parquet files in Amazon S3 and I have AWS Glue Data catalog which is used to read data from this S3 using Amazon Athena. Now I have a use case where i need to delete some records in the dataset and I tried using hudi delete but the records are not getting deleted (don't see new parquet file without the delete record getting created). The job does not throw any error either. Any thoughts on what could be missing?

SparkSession.builder()
      .appName("CCPA Record Deletion")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
      .config("spark.hadoop.fs.s3a.multiobjectdelete.enable", "false")
      .config("spark.hadoop.fs.s3a.fast.upload", "true")
      .config("spark.sql.parquet.filterPushdown", "true")
      .config("spark.sql.parquet.mergeSchema", "false")
      .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
      .config("spark.speculation", "false")
      .enableHiveSupport()
      .getOrCreate()

spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")

Hudi configuration:

val hudiOptions = hudioptions()
def hudioptions(): Map[String, String] = {
    Map[String, String](
      HoodieWriteConfig.TABLE_NAME -> "table_name",
      DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE",
      DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id_field,
      DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition_field",
      DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp_field",
      DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
      DataSourceWriteOptions.HIVE_URL_OPT_KEY -> s"jdbc:hive2://masterdns:10000",
      DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY -> "db_name",
      DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> s"table_name}",
      DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "partition_field",
      DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> "false",
      DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true",
      DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName,
      HoodieCompactionConfig.CLEANER_FILE_VERSIONS_RETAINED_PROP -> "1",
      HoodieCompactionConfig.CLEANER_POLICY_PROP -> HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name()
)
  }

val dataFrameToDelete = S3DataDataFrame.where("column_name in ('a','b')")
      (dataFrameToDelete.write
        .format("org.apache.hudi")
        .options(hudiOptions)
        .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL)
        .mode(SaveMode.Append)
        .save(S3_Path + "/"))

spark and hive config for AWS Glue support:

"Classification" = "spark-hive-site",
              "Properties"     = {
                "hive.metastore.client.factory.class" = "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
              }
"Classification" = "hive-site",
              "Properties"     = {
                "hive.metastore.client.factory.class" = "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
              }

Additional Info

EMR 6.5, hudi 0.9.0, Spark 3.1.2, Hive 3.1.2

navbalaraman avatar Jul 13 '22 14:07 navbalaraman

Got an error like this with the latest run (table_name is the actual name of the AWS Glue table): "Got runtime exception when hive syncing table_name"

navbalaraman avatar Jul 13 '22 15:07 navbalaraman

Used a different config below and now am getting a different error "Failed to upsert for commit time 20220713163717"

.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key(), "org.apache.hudi.EmptyHoodieRecordPayload")

Tried this per the option in the documentation given here https://hudi.apache.org/docs/0.9.0/writing_data/#deletes

navbalaraman avatar Jul 13 '22 17:07 navbalaraman

I encountered the same problem. I used Spark-shell to delete HUDI table data in the official website document of the War. When I deleted the non-partitioned table, the deletion succeeded, but the deletion did not work when I deleted the partitioned table

qianchutao avatar Jul 19 '22 08:07 qianchutao

@navbalaraman : looks like you made a typo w/ you operation

.option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL)

Its "delete" and not "delete_partition".

Or did you intend to actually trigger "delete_partition" ? can you give it a try with "delete" and let us know how it goes. and then we can further investigate.

nsivabalan avatar Aug 10 '22 03:08 nsivabalan

@qianchutao : can you create a new github issue. Could be different from what OP posted for. Later if we deduce both are same, we can close one. But would like to not pollute one github issue w/ diff issues.

nsivabalan avatar Aug 10 '22 03:08 nsivabalan

@navbalaraman : gentle ping

nsivabalan avatar Aug 16 '22 13:08 nsivabalan

@nsivabalan Thanks for your attention to this issue. Here is the current status:

  • Managed to get the deletes working.
  • Was trying to delete with the partition column name as "_test_partition" but the parquet file didn't have the column but instead had "_hoodie_partition_path":"_test_partition=default".
  • So while reading the S3 into the DF to be deleted, had to do this: .withColumn("_test_partition", col("_hoodie_partition_path"))
  • We had explicitly made a change to not have hudi create that additional partition column with "_test_partition" because when we run a crawler against the S3 data it would error on querying the table with "duplicate column".
  • Also, had to set the operation to delete. (As you indicated above) .options(hudioptions).option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)

Question:

  • Currently the data gets deleted but the partitions continue to exist. Will that get fixed if i also include the below? .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL)
  • Also, anytime a schema change happens we are getting the below error. Anything we can set to ensure hudi can handle schema changes? org.apache.hudi.exception.HoodieUpsertException: Failed to delete for commit time

Appreciate your inputs.

navbalaraman avatar Aug 16 '22 14:08 navbalaraman

if you wish to delete entire partition data, you can directly issue "DELETE_PARTITION" operation. and you don't need to send entire DF<Recs> for it. For instance, you can set this config https://hudi.apache.org/docs/0.11.1/configurations#hoodiedatasourcewritepartitionstodelete as comma separated list of partitions to be deleted.

If you are not setting this config param, then you need to pass DF<Recs> for records for which you wish to delete partition. and later when cleaner comes around, it will delete the physical directory.

nsivabalan avatar Aug 28 '22 00:08 nsivabalan

@navbalaraman : any updates please

nsivabalan avatar Sep 06 '22 04:09 nsivabalan

Hi @nsivabalan, i still am having issues with the deletion. If my delete records all in a single partition then it succeeds, if its spread across partitions then i see partial record deletions or sometimes no record deletions. The worst part is my EMR job also doesnt throw an error in these cases. I know there are some minor schema differences in some of the records (some records dont have some values) but im not sure if thats what is causing the deletions to not succeed. Appreciate your inputs.

navbalaraman avatar Sep 09 '22 21:09 navbalaraman

I assume you are referring to delete_partitions right? how are you triggering delete_partition. are you passing in a regular dataframe as you would for other write operations. Or are you setting the config https://hudi.apache.org/docs/configurations#hoodiedatasourcewritepartitionstodelete . you can set comma separated list of partition values that needs to be deleted.

I might need to reproduce your exact scenario and go from there. in the mean time, if you have a reproducible script, let me know.

nsivabalan avatar Sep 14 '22 22:09 nsivabalan

hey I tried to test delete_partitions for multiple partitions and could not reproduce. Would you mind giving us a reproducible script. would help us find the root cause.

nsivabalan avatar Sep 20 '22 01:09 nsivabalan

@navbalaraman : hey any updates for us. if you could not reproduce, feel free to close it out.

nsivabalan avatar Sep 30 '22 03:09 nsivabalan

Hi, We are also looking for the feature to drop older partitions from hudi, but can't find examples on how do to this. Our data is partitioned as follows:

s3://bucket/<path_to_hudi_metadata>/movement_type=xx/year=YYYY/month=mm/day=dd/provider=yy/qk=00223/*.parquet. We would like to drop all records for a given year, month and day, ie: s3://bucket/<path_to_hudi_metadata>/movement_type=xx/year=2022/month=09/day=30/

Could you tell us how the partition urls should be constructed? We are also passing in an empty dataframe to the DELETE_PARTITION_OPERATION:

emptyDF.write.format("hudi")
  .option(HoodieWriteConfig.TBL_NAME.key, HUDI_TABLE_NAME)
  .option(OPERATION.key, DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL)
  .option(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key, partitions.mkString(","))
  .option(TABLE_TYPE.key, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
  .option(RECORDKEY_FIELD.key, HUDI_RECORD_KEY)
  .option(PARTITIONPATH_FIELD.key, partitionCols)
  .option(HIVE_STYLE_PARTITIONING.key, "true")
  .mode(SaveMode.Append)
  .save("s3://bucket/<path_to_hudi_metadata>")

For example, I am trying to drop these partitions: /movement_type=/{year=2022/month=09/day=28}/provider=/qk=/,/movement_type=/{year=2022/month=09/day=27}/provider=/qk=/

However I see that nothing is dropped. Our environment: EMR 6.5 Spark 3.1.2 Hadoop 3.2.1 Hudi 0.10.1

Here are the logs from std console: 2022-10-03 21:25:01 INFO CleanPlanner:398 - 0 patterns used to delete in partition path:movement_type=unknown/year=2022/month=09/day=29/provider=367/qk=03221 2022-10-03 21:25:01 INFO CleanPlanner:398 - 0 patterns used to delete in partition path:movement_type=stopped/year=2022/month=09/day=29/provider=367/qk=02331 2022-10-03 21:25:01 INFO CleanPlanner:398 - 1 patterns used to delete in partition path:movement_type=stopped/year=2022/month=09/day=29/provider=367/qk=03202 2022-10-03 21:25:01 INFO CleanPlanner:398 - 1 patterns used to delete in partition path:movement_type=moving/year=2022/month=09/day=29/provider=367/qk=03221 2022-10-03 21:25:01 INFO CleanPlanner:398 - 0 patterns used to delete in partition path:movement_type=stopped/year=2022/month=09/day=28/provider=367/qk=03232 2022-10-03 21:25:01 INFO CleanPlanner:398 - 1 patterns used to delete in partition path:movement_type=stopped/year=2022/month=09/day=29/provider=367/qk=03221 2022-10-03 21:25:01 INFO CleanPlanner:398 - 0 patterns used to delete in partition path:movement_type=moving/year=2022/month=09/day=28/provider=367/qk=03201 2022-10-03 21:25:01 INFO CleanPlanner:398 - 0 patterns used to delete in partition path:movement_type=stopped/year=2022/month=09/day=28/provider=367/qk=03221 2022-10-03 21:25:01 INFO CleanPlanner:398 - 0 patterns used to delete in partition path:movement_type=moving/year=2022/month=09/day=29/provider=367/qk=02331 2022-10-03 21:25:01 INFO CleanPlanner:398 - 0 patterns used to delete in partition path:movement_type=moving/year=2022/month=09/day=29/provider=367/qk=03201 2022-10-03 21:25:01 INFO Executor:57 - Finished task 10.0 in stage 9.0 (TID 47). 1250 bytes result sent to driver 2022-10-03 21:25:01 INFO Executor:57 - Finished task 7.0 in stage 9.0 (TID 44). 991 bytes result sent to driver 2022-10-03 21:25:01 INFO CleanPlanner:398 - 0 patterns used to delete in partition path:movement_type=moving/year=2022/month=09/day=28/provider=367/qk=03221 2022-10-03 21:25:01 INFO CleanPlanner:398 - 0 patterns used to delete in partition path:movement_type=stopped/year=2022/month=09/day=28/provider=367/qk=21033 2022-10-03 21:25:01 INFO CleanPlanner:398 - 0 patterns used to delete in partition path:movement_type=moving/year=2022/month=09/day=29/provider=367/qk=03202

INRIX-Trang-Nguyen avatar Oct 04 '22 03:10 INRIX-Trang-Nguyen

Attached sample test files. nrt-delete-partitions.zip

INRIX-Trang-Nguyen avatar Oct 04 '22 04:10 INRIX-Trang-Nguyen