[SUPPORT]: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file
Describe the problem you faced
Hello community,
i'm using Hudi to change data capture with spark structured streaming + kafka + debezium , my jobs works well, sometimes few jobs failed with errors related to parquet size or format
To Reproduce
Steps to reproduce the behavior:
- start long running replication streams
Expected behavior
Write / read parquet with correct size / format
Environment Description
-
Hudi version : 0.11.0
-
Spark version : 3.1.3
-
Hive version : 1.2.1000
-
Storage (HDFS) : 2.7.3
-
Running on Docker? (yes/no) : no
Additional context
this problem occasionally occurs on certain tables this is my config:
hudi {
options{
upsert_parallelisme_value = "1500"
insert_parallelisme_value = "1500"
bulk_insert_parallelisme_value = "1500"
bulk_insert_sort_mode = "NONE"
parquet_small_file_limit = "104857600"
streaming_retry_count = "3"
streaming_retry_interval_ms ="2000"
parquet_max_file_size = "134217728"
parquet_block_size = "134217728"
parquet_page_size = "1048576"
index_type = "SIMPLE"
simple.index_use_caching = "true"
simple.index_input_storage_level = "MEMORY_AND_DISK_SER"
partition.fields = ""
generator = "org.apache.hudi.keygen.NonpartitionedKeyGenerator"
key_generator.hive = "org.apache.hudi.hive.NonPartitionedExtractor"
}
compaction {
inline_compact = "true"
inline_compact_num_delta_commits = "10"
cleaner_commits_retained = "4"
cleaner_policy = "KEEP_LATEST_COMMITS"
cleaner_fileversions_retained = "3"
async_clean = "true"
}
MVCC conf:
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key -> "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider",
HoodieLockConfig.ZK_CONNECT_URL.key -> "zookeper-poll:2181",
HoodieLockConfig.ZK_PORT.key -> "2181",
HoodieLockConfig.ZK_LOCK_KEY.key -> ( table.table_name),
HoodieLockConfig.ZK_BASE_PATH.key -> ("/"+table.db_name),
HoodieLockConfig.LOCK_ACQUIRE_NUM_RETRIES.key -> "15",
HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.key -> "15",
HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.key -> "60000",
HoodieLockConfig.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS.key -> "60000",
HoodieLockConfig.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS.key -> "20000",
Stacktrace
*for small parquet size
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 9.0 failed 4 times, most recent failure: Lost task 20.3 in stage 9.0 (TID 6057) (ocnode46 executor 2): org.apache.hudi.exception.HoodieException: unable to read next record from parquet file
at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
at org.apache.hudi.common.util.ParquetUtils$HoodieKeyIterator.hasNext(ParquetUtils.java:485)
at java.util.Iterator.forEachRemaining(Iterator.java:115)
at org.apache.hudi.common.util.ParquetUtils.fetchHoodieKeys(ParquetUtils.java:197)
at org.apache.hudi.common.util.ParquetUtils.fetchHoodieKeys(ParquetUtils.java:147)
at org.apache.hudi.io.HoodieKeyLocationFetchHandle.locations(HoodieKeyLocationFetchHandle.java:62)
at org.apache.hudi.index.simple.HoodieSimpleIndex.lambda$fetchRecordLocations$33972fb4$1(HoodieSimpleIndex.java:155)
at org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:117)
at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:177)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: hdfs://prod/cdc.db/database/table/723c5d09-573b-4df6-ad41-76ae19ec976f-0_2-16682-7063518_20231024224507047.parquet is not a Parquet file (too small length: 0)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:514)
at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:689)
at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:595)
at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:152)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
... 22 more
one day i had also this error related to parquet format:
expected magic number at tail [80, 65, 82, 49] but found [2, -70, -67, -119] at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:524)
@Armelabdelkbir I recommend you upgrade your Hudi version to 0.12.3 or 0.13.1 or 0.14.0. It may happen due to missing column in later records compared to previous ones. Do you have any such scenario?
missing columnar, do you mean schema evolution, sometimes we have schema evolution, but not for this usecase. what is the impact of upgrade on production i have hundred of tables and billions of rows, i need just to upgrade the hudi version and keep same metadata folders ?
@Armelabdelkbir You just need to upgrade the Hudi version. It should automatically upgrade. Were your metadata was enabled with 0.11.0? I guess it was off by default with 0.11.0. I recommend you to upgrade to 0.12.3 .
In this usecase, is your schema always consistent?
@ad1happy2go my metadata is disabled in version 0.11.0: "hoodie.metadata.enable" -> "false" , currently I can't upgrade until the client migration is complete. schema evolution happen sometimes, but it disabled on my side due to my hive version 1.2.1 by waiting if i have empty parquets problems i need just to delete them ?
You can try deleting these parquet files, although need to understand how they got created at first place.
maybe compaction produced the broken parquet file when it failed for the first time and produced the normal parquet file when it retried successfully。There will be tow parquet file with same filegroup ID and instance time, like this: xxx partition --- 00000000_1-2-3_2023110412345.parquet (broken parquet) --- 00000000_4-5-6_2023110412345.parquet (normal parquet) you can see this issue to resolve the problem. https://github.com/apache/hudi/issues/9615
@Armelabdelkbir
@watermelon12138 thanks for the link of issue i'll check, my files are in the same filegroup: example 3e2e9939-71f0-41dc-a5ff-c276ae3cdfc6-0_0-819-355182_20231108134016057.parquet (broken parquet) ccf19756-bce5-402b-b85e-64232e2f34b2-0_242-819-355161_20231108134016057.parquet(normal parquet)
I did encounter this in hudi 13.0 as well.
@victorxiang30 @Armelabdelkbir @watermelon12138 Can you provide the schema to help me to reproduce this.
If it has complex data type, can you try setting spark config spark.hadoop.parquet.avro.write-old-list-structure as false.