[SUPPORT] Unsupported schema evolution corrupts Hudi table without error
Describe the problem you faced
Hudi evolves schema for nested fields in unsupported way (bool -> str and timestamp -> str), writes corrupted data, alters Hive table schema and later fails to read it.
To Reproduce
case class TestClass(bool_value: Boolean, timestamp_value: Timestamp)
Seq(
("1", "a", TestClass(true, Timestamp.from(LocalDateTime.now().toInstant(ZoneOffset.UTC)))),
("1", "b", TestClass(false, Timestamp.from(LocalDateTime.now().toInstant(ZoneOffset.UTC))))
).toDF("partition", "id", "detail")
.write
.format("org.apache.hudi")
.options(
Map(
HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key -> "id",
KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key -> "id",
HoodieWriteConfig.TBL_NAME.key -> "test_table",
HiveSyncConfig.HIVE_SYNC_ENABLED.key -> "true",
HiveSyncConfig.HIVE_URL.key -> "jdbc:hive2://localhost:10000",
HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.key -> "true",
HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> "test_db",
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> classOf[NonPartitionedExtractor].getName,
HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "test_table",
KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key -> "partition",
KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key -> "true",
)
)
.mode(SaveMode.Overwrite)
.save(s"s3://$s3Bucket/temp/test_db/test_table")
case class TestClass2(bool_value: String, timestamp_value: String)
Seq(
("2", "c", TestClass2("str1", LocalDateTime.now().toString)),
("2", "d", TestClass2("str2", LocalDateTime.now().toString))
).toDF("partition", "id", "detail")
.write
.format("org.apache.hudi")
.options(
Map(
HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key -> "id",
KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key -> "id",
HoodieWriteConfig.TBL_NAME.key -> "test_table",
HiveSyncConfig.HIVE_SYNC_ENABLED.key -> "true",
HiveSyncConfig.HIVE_URL.key -> "jdbc:hive2://localhost:10000",
HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.key -> "true",
HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> "test_db",
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> classOf[NonPartitionedExtractor].getName,
HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "test_table",
KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key -> "partition",
KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key -> "true",
)
)
.mode(SaveMode.Append)
.save(s"s3://$s3Bucket/temp/test_db/test_table")
spark.read.format("org.apache.hudi").load(s"s3://$s3Bucket/temp/test_db/test_table").show(false)
Expected behavior
Hudi should not succeed to write data if field types evolve in unsupported way. There are two things that I think go wrong here: 1) Hudi creates a new partition with invalid Parquet files what corrupts the table, and 2) Hudi hive sync alters Hive table schema in a way that it doesn't support later (bool -> str, bigint -> str).
Environment Description
-
Hudi version : 0.12.2-amzn-0 (AWS EMR-6.10.0)
-
Spark version : 3.3.1
-
Hive version : 3.1.3
-
Hadoop version : 3.3.3
-
Storage (HDFS/S3/GCS..) : S3
-
Running on Docker? (yes/no) : no
Additional context
I've played with this issue a little bit and it seems that the problem is a combination of the fact that fields are nested and new partitions are created.
When fields are at root level (not nested) and table is not partitioned, then when field type changes from bool to string or from timestamp to string, Hudi fails to merge records, commit is aborted and hive sync is never executed.
If fields are at root level, but table is partitioned, then when new data with changed field types is written to a new partition, then Hudi write is successful, but Hudi hive sync fails with the following error:
Caused by: org.apache.hudi.exception.HoodieException: Got runtime exception when hive syncing test_table_2
at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:145)
at org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:58)
... 95 more
Caused by: org.apache.hudi.hive.HoodieHiveSyncException: Could not convert field Type from BIGINT to string for field timestamp_value
at org.apache.hudi.hive.util.HiveSchemaUtil.getSchemaDifference(HiveSchemaUtil.java:109)
at org.apache.hudi.hive.HiveSyncTool.syncSchema(HiveSyncTool.java:285)
at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:217)
at org.apache.hudi.hive.HiveSyncTool.doSync(HiveSyncTool.java:154)
at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:142)
... 96 more
When both fields are nested and table is partitioned, then write and Hive sync operations are successful.
Stacktrace
Error message when reading from corrupted Hive table:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 777.0 failed 4 times, most recent failure: Lost task 0.3 in stage 777.0 (TID 866) (ip-10-203-88-126.eu-west-2.compute.internal executor 16): org.apache.spark.sql.execution.QueryExecutionException: Encountered error while reading file s3://REDACTED/temp/test_db/test_table/partition=1/a08715d6-5d14-49ed-b15d-309ba3c09252-0_0-711-793_20230517125708406.parquet. Details:
at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:731)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:402)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:227)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown Source)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:955)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:383)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:138)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file s3://REDACTED/temp/test_db/test_table/partition=1/a08715d6-5d14-49ed-b15d-309ba3c09252-0_0-711-793_20230517125708406.parquet
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254)
at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator$$anon$1.hasNext(RecordReaderIterator.scala:61)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:227)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:393)
... 20 more
Caused by: java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainLongDictionary
at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetStringConverter.$anonfun$setDictionary$1(ParquetRowConverter.scala:507)
at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetStringConverter.$anonfun$setDictionary$1$adapted(ParquetRowConverter.scala:506)
at scala.Array$.tabulate(Array.scala:418)
at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetStringConverter.setDictionary(ParquetRowConverter.scala:506)
at org.apache.parquet.column.impl.ColumnReaderBase.<init>(ColumnReaderBase.java:415)
at org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:46)
at org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:82)
at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:271)
at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:177)
at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:136)
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:225)
... 25 more
@ganczarek
https://hudi.apache.org/docs/schema_evolution
This looks like a Hive-Sync error. Can you try dropping your hive table (without purging your local data) and recreating your table with the new schema?
Hudi-Full-Schema-Evolution
FWIU, BIGINT -> STRING should be supported.
@ganczarek I tried to reproduce your code snippet but I was getting error in write itself. https://gist.github.com/ad1happy2go/d97f1421d05b4b7573d4b79e25b78f08
Can you please share us the code for which you are getting this issue in read (write successful and corrupt the table)
@ad1happy2go. Thank you for trying to reproduce the problem.
I know this issue is quite old, but I don't have a way to test it with newer versions of Hudi or Hive right now. However, I can still reproduce the problem with Hudi 0.12.2 and Hive 3.1.3 (AWS EMR 6.10.0).
$ hive --version
Hive 3.1.3-amzn-3
Git file:///codebuild/output/src425131955/src/build/hive/rpm/BUILD/apache-hive-3.1.3-amzn-3-src -r Unknown
Compiled by release on Tue Feb 21 19:18:24 UTC 2023
From source with checksum 9c4721677b95382c13a0fc6adb9dc41c
What version of Hive did you use?
The full code with all imports:
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.{HiveSyncConfig, NonPartitionedExtractor}
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.spark.sql.SaveMode
import java.sql.Timestamp
import java.time.{LocalDateTime, ZoneOffset}
val hiveUrl = "jdbc:hive2://localhost:10000"
val s3TablePath = "s3://<redacted>/test_table"
case class TestClass(bool_value: Boolean, timestamp_value: Timestamp)
case class TestClass2(bool_value: String, timestamp_value: String)
Seq(
("1", "a", TestClass(true, Timestamp.from(LocalDateTime.now().toInstant(ZoneOffset.UTC)))),
("1", "b", TestClass(false, Timestamp.from(LocalDateTime.now().toInstant(ZoneOffset.UTC))))
).toDF("partition", "id", "detail")
.write
.format("org.apache.hudi")
.options(
Map(
HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key -> "id",
KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key -> "id",
HoodieWriteConfig.TBL_NAME.key -> "test_table",
HiveSyncConfig.HIVE_SYNC_ENABLED.key -> "true",
HiveSyncConfig.HIVE_URL.key -> hiveUrl,
HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.key -> "true",
HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> "test_db",
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> classOf[NonPartitionedExtractor].getName,
HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "test_table",
KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key -> "partition",
KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key -> "true",
)
)
.mode(SaveMode.Overwrite)
.save(s3TablePath)
spark.read.format("org.apache.hudi").load(s3TablePath).show(false)
/**
+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---+--------------------------------+---------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name |id |detail |partition|
+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---+--------------------------------+---------+
|20240214124653234 |20240214124653234_0_0|a |partition=1 |4a12398f-e664-48af-a397-28d5741a0256-0_0-76-74_20240214124653234.parquet|a |{true, 2024-02-14 12:46:52.992} |1 |
|20240214124653234 |20240214124653234_0_1|b |partition=1 |4a12398f-e664-48af-a397-28d5741a0256-0_0-76-74_20240214124653234.parquet|b |{false, 2024-02-14 12:46:52.992}|1 |
+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---+--------------------------------+---------+
*/
Seq(
("2", "c", TestClass2("str1", LocalDateTime.now().toString)),
("2", "d", TestClass2("str2", LocalDateTime.now().toString))
).toDF("partition", "id", "detail")
.write
.format("org.apache.hudi")
.options(
Map(
HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key -> "id",
KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key -> "id",
HoodieWriteConfig.TBL_NAME.key -> "test_table",
HiveSyncConfig.HIVE_SYNC_ENABLED.key -> "true",
HiveSyncConfig.HIVE_URL.key -> hiveUrl,
HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.key -> "true",
HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> "test_db",
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> classOf[NonPartitionedExtractor].getName,
HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "test_table",
KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key -> "partition",
KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key -> "true",
)
)
.mode(SaveMode.Append)
.save(s3TablePath)
)
spark.read.format("org.apache.hudi").load(s3TablePath).show(false)
// java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainLongDictionary
Table schema in Hive after executing above snippet is:
hive> SHOW CREATE TABLE test_db.test_table;
OK
CREATE EXTERNAL TABLE `test_db.test_table`(
`_hoodie_commit_time` string,
`_hoodie_commit_seqno` string,
`_hoodie_record_key` string,
`_hoodie_partition_path` string,
`_hoodie_file_name` string,
`id` string,
`detail` struct<bool_value:string,timestamp_value:string>)
PARTITIONED BY (
`partition` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES (
'hoodie.query.as.ro.table'='false',
'path'='s3://<redacted>/test_table')
STORED AS INPUTFORMAT
'org.apache.hudi.hadoop.HoodieParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
's3://<redacted>/test_table'
TBLPROPERTIES (
'bucketing_version'='2',
'last_commit_time_sync'='20240214133801211',
'last_modified_by'='hive',
'last_modified_time'='1707917907',
'spark.sql.create.version'='3.3.1-amzn-0',
'spark.sql.sources.provider'='hudi',
'spark.sql.sources.schema.numPartCols'='1',
'spark.sql.sources.schema.numParts'='1',
'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"detail","type":{"type":"struct","fields":[{"name":"bool_value","type":"string","nullable":true,"metadata":{}},{"name":"timestamp_value","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"partition","type":"string","nullable":true,"metadata":{}}]}',
'spark.sql.sources.schema.partCol.0'='partition',
'transient_lastDdlTime'='1707917907')
Thanks @ganczarek for your contribution.
I have confirmed that issue with was there in 0.12.2 version but with later version of hudi 0.14.X the write itself fails with org.apache.hudi.exception.SchemaCompatibilityException: Incoming batch schema is not compatible with the table's one . So we are good on this.
Code gist - https://gist.github.com/ad1happy2go/72de4d53b497dff80c4d8527ff45934c