hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] Unsupported schema evolution corrupts Hudi table without error

Open ganczarek opened this issue 2 years ago • 4 comments

Describe the problem you faced

Hudi evolves schema for nested fields in unsupported way (bool -> str and timestamp -> str), writes corrupted data, alters Hive table schema and later fails to read it.

To Reproduce

case class TestClass(bool_value: Boolean, timestamp_value: Timestamp)
Seq(
  ("1", "a", TestClass(true, Timestamp.from(LocalDateTime.now().toInstant(ZoneOffset.UTC)))),
  ("1", "b", TestClass(false, Timestamp.from(LocalDateTime.now().toInstant(ZoneOffset.UTC))))
).toDF("partition", "id", "detail")
  .write
  .format("org.apache.hudi")
  .options(
    Map(
      HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key -> "id",
      KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key -> "id",
      HoodieWriteConfig.TBL_NAME.key -> "test_table",
      HiveSyncConfig.HIVE_SYNC_ENABLED.key -> "true",
      HiveSyncConfig.HIVE_URL.key -> "jdbc:hive2://localhost:10000",
      HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.key -> "true",
      HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> "test_db",
      HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> classOf[NonPartitionedExtractor].getName,
      HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "test_table",
      KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key -> "partition",
      KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key -> "true",
    )
  )
  .mode(SaveMode.Overwrite)
  .save(s"s3://$s3Bucket/temp/test_db/test_table")

case class TestClass2(bool_value: String, timestamp_value: String)
Seq(
  ("2", "c", TestClass2("str1", LocalDateTime.now().toString)),
  ("2", "d", TestClass2("str2", LocalDateTime.now().toString))
).toDF("partition", "id", "detail")
  .write
  .format("org.apache.hudi")
  .options(
    Map(
      HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key -> "id",
      KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key -> "id",
      HoodieWriteConfig.TBL_NAME.key -> "test_table",
      HiveSyncConfig.HIVE_SYNC_ENABLED.key -> "true",
      HiveSyncConfig.HIVE_URL.key -> "jdbc:hive2://localhost:10000",
      HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.key -> "true",
      HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> "test_db",
      HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> classOf[NonPartitionedExtractor].getName,
      HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "test_table",
      KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key -> "partition",
      KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key -> "true",
    )
  )
  .mode(SaveMode.Append)
  .save(s"s3://$s3Bucket/temp/test_db/test_table")

spark.read.format("org.apache.hudi").load(s"s3://$s3Bucket/temp/test_db/test_table").show(false)

Expected behavior

Hudi should not succeed to write data if field types evolve in unsupported way. There are two things that I think go wrong here: 1) Hudi creates a new partition with invalid Parquet files what corrupts the table, and 2) Hudi hive sync alters Hive table schema in a way that it doesn't support later (bool -> str, bigint -> str).

Environment Description

  • Hudi version : 0.12.2-amzn-0 (AWS EMR-6.10.0)

  • Spark version : 3.3.1

  • Hive version : 3.1.3

  • Hadoop version : 3.3.3

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : no

Additional context

I've played with this issue a little bit and it seems that the problem is a combination of the fact that fields are nested and new partitions are created.

When fields are at root level (not nested) and table is not partitioned, then when field type changes from bool to string or from timestamp to string, Hudi fails to merge records, commit is aborted and hive sync is never executed.

If fields are at root level, but table is partitioned, then when new data with changed field types is written to a new partition, then Hudi write is successful, but Hudi hive sync fails with the following error:

Caused by: org.apache.hudi.exception.HoodieException: Got runtime exception when hive syncing test_table_2
  at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:145)
  at org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:58)
  ... 95 more
Caused by: org.apache.hudi.hive.HoodieHiveSyncException: Could not convert field Type from BIGINT to string for field timestamp_value
  at org.apache.hudi.hive.util.HiveSchemaUtil.getSchemaDifference(HiveSchemaUtil.java:109)
  at org.apache.hudi.hive.HiveSyncTool.syncSchema(HiveSyncTool.java:285)
  at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:217)
  at org.apache.hudi.hive.HiveSyncTool.doSync(HiveSyncTool.java:154)
  at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:142)
  ... 96 more 

When both fields are nested and table is partitioned, then write and Hive sync operations are successful.

Stacktrace

Error message when reading from corrupted Hive table:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 777.0 failed 4 times, most recent failure: Lost task 0.3 in stage 777.0 (TID 866) (ip-10-203-88-126.eu-west-2.compute.internal executor 16): org.apache.spark.sql.execution.QueryExecutionException: Encountered error while reading file s3://REDACTED/temp/test_db/test_table/partition=1/a08715d6-5d14-49ed-b15d-309ba3c09252-0_0-711-793_20230517125708406.parquet. Details:
    at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:731)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:402)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:227)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown Source)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:955)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:383)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:138)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file s3://REDACTED/temp/test_db/test_table/partition=1/a08715d6-5d14-49ed-b15d-309ba3c09252-0_0-711-793_20230517125708406.parquet
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254)
    at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator$$anon$1.hasNext(RecordReaderIterator.scala:61)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:227)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:393)
    ... 20 more
Caused by: java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainLongDictionary
    at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetStringConverter.$anonfun$setDictionary$1(ParquetRowConverter.scala:507)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetStringConverter.$anonfun$setDictionary$1$adapted(ParquetRowConverter.scala:506)
    at scala.Array$.tabulate(Array.scala:418)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetStringConverter.setDictionary(ParquetRowConverter.scala:506)
    at org.apache.parquet.column.impl.ColumnReaderBase.<init>(ColumnReaderBase.java:415)
    at org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:46)
    at org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:82)
    at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:271)
    at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
    at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
    at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:177)
    at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:136)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:225)
    ... 25 more 

ganczarek avatar May 18 '23 11:05 ganczarek

@ganczarek

https://hudi.apache.org/docs/schema_evolution

image

This looks like a Hive-Sync error. Can you try dropping your hive table (without purging your local data) and recreating your table with the new schema?

Hudi-Full-Schema-Evolution

FWIU, BIGINT -> STRING should be supported.

voonhous avatar May 19 '23 08:05 voonhous

@ganczarek I tried to reproduce your code snippet but I was getting error in write itself. https://gist.github.com/ad1happy2go/d97f1421d05b4b7573d4b79e25b78f08

Can you please share us the code for which you are getting this issue in read (write successful and corrupt the table)

ad1happy2go avatar May 24 '23 12:05 ad1happy2go

@ad1happy2go. Thank you for trying to reproduce the problem.

I know this issue is quite old, but I don't have a way to test it with newer versions of Hudi or Hive right now. However, I can still reproduce the problem with Hudi 0.12.2 and Hive 3.1.3 (AWS EMR 6.10.0).

$ hive --version
Hive 3.1.3-amzn-3
Git file:///codebuild/output/src425131955/src/build/hive/rpm/BUILD/apache-hive-3.1.3-amzn-3-src -r Unknown
Compiled by release on Tue Feb 21 19:18:24 UTC 2023
From source with checksum 9c4721677b95382c13a0fc6adb9dc41c

What version of Hive did you use?

The full code with all imports:

import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.{HiveSyncConfig, NonPartitionedExtractor}
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.spark.sql.SaveMode

import java.sql.Timestamp
import java.time.{LocalDateTime, ZoneOffset}

val hiveUrl = "jdbc:hive2://localhost:10000"
val s3TablePath = "s3://<redacted>/test_table"

case class TestClass(bool_value: Boolean, timestamp_value: Timestamp)
case class TestClass2(bool_value: String, timestamp_value: String)

Seq(
  ("1", "a", TestClass(true, Timestamp.from(LocalDateTime.now().toInstant(ZoneOffset.UTC)))),
  ("1", "b", TestClass(false, Timestamp.from(LocalDateTime.now().toInstant(ZoneOffset.UTC))))
).toDF("partition", "id", "detail")
  .write
  .format("org.apache.hudi")
  .options(
    Map(
      HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key -> "id",
      KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key -> "id",
      HoodieWriteConfig.TBL_NAME.key -> "test_table",
      HiveSyncConfig.HIVE_SYNC_ENABLED.key -> "true",
      HiveSyncConfig.HIVE_URL.key -> hiveUrl,
      HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.key -> "true",
      HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> "test_db",
      HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> classOf[NonPartitionedExtractor].getName,
      HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "test_table",
      KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key -> "partition",
      KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key -> "true",
    )
  )
  .mode(SaveMode.Overwrite)
  .save(s3TablePath)

spark.read.format("org.apache.hudi").load(s3TablePath).show(false)
/**
+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---+--------------------------------+---------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name                                                       |id |detail                          |partition|
+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---+--------------------------------+---------+
|20240214124653234  |20240214124653234_0_0|a                 |partition=1           |4a12398f-e664-48af-a397-28d5741a0256-0_0-76-74_20240214124653234.parquet|a  |{true, 2024-02-14 12:46:52.992} |1        |
|20240214124653234  |20240214124653234_0_1|b                 |partition=1           |4a12398f-e664-48af-a397-28d5741a0256-0_0-76-74_20240214124653234.parquet|b  |{false, 2024-02-14 12:46:52.992}|1        |
+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---+--------------------------------+---------+
*/

Seq(
  ("2", "c", TestClass2("str1", LocalDateTime.now().toString)),
  ("2", "d", TestClass2("str2", LocalDateTime.now().toString))
).toDF("partition", "id", "detail")
  .write
  .format("org.apache.hudi")
  .options(
    Map(
      HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key -> "id",
      KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key -> "id",
      HoodieWriteConfig.TBL_NAME.key -> "test_table",
      HiveSyncConfig.HIVE_SYNC_ENABLED.key -> "true",
      HiveSyncConfig.HIVE_URL.key -> hiveUrl,
      HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.key -> "true",
      HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> "test_db",
      HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> classOf[NonPartitionedExtractor].getName,
      HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "test_table",
      KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key -> "partition",
      KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key -> "true",
    )
  )
  .mode(SaveMode.Append)
  .save(s3TablePath)
)

spark.read.format("org.apache.hudi").load(s3TablePath).show(false)
// java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainLongDictionary

Table schema in Hive after executing above snippet is:

hive> SHOW CREATE TABLE test_db.test_table;
OK
CREATE EXTERNAL TABLE `test_db.test_table`(
  `_hoodie_commit_time` string,
  `_hoodie_commit_seqno` string,
  `_hoodie_record_key` string,
  `_hoodie_partition_path` string,
  `_hoodie_file_name` string,
  `id` string,
  `detail` struct<bool_value:string,timestamp_value:string>)
PARTITIONED BY (
  `partition` string)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES (
  'hoodie.query.as.ro.table'='false',
  'path'='s3://<redacted>/test_table')
STORED AS INPUTFORMAT
  'org.apache.hudi.hadoop.HoodieParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://<redacted>/test_table'
TBLPROPERTIES (
  'bucketing_version'='2',
  'last_commit_time_sync'='20240214133801211',
  'last_modified_by'='hive',
  'last_modified_time'='1707917907',
  'spark.sql.create.version'='3.3.1-amzn-0',
  'spark.sql.sources.provider'='hudi',
  'spark.sql.sources.schema.numPartCols'='1',
  'spark.sql.sources.schema.numParts'='1',
  'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"detail","type":{"type":"struct","fields":[{"name":"bool_value","type":"string","nullable":true,"metadata":{}},{"name":"timestamp_value","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"partition","type":"string","nullable":true,"metadata":{}}]}',
  'spark.sql.sources.schema.partCol.0'='partition',
  'transient_lastDdlTime'='1707917907')

ganczarek avatar Feb 14 '24 14:02 ganczarek

Thanks @ganczarek for your contribution.

I have confirmed that issue with was there in 0.12.2 version but with later version of hudi 0.14.X the write itself fails with org.apache.hudi.exception.SchemaCompatibilityException: Incoming batch schema is not compatible with the table's one . So we are good on this.

Code gist - https://gist.github.com/ad1happy2go/72de4d53b497dff80c4d8527ff45934c

ad1happy2go avatar Feb 15 '24 06:02 ad1happy2go