hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT]: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file

Open Armelabdelkbir opened this issue 2 years ago • 9 comments

Describe the problem you faced

Hello community,

i'm using Hudi to change data capture with spark structured streaming + kafka + debezium , my jobs works well, sometimes few jobs failed with errors related to parquet size or format

To Reproduce

Steps to reproduce the behavior:

  1. start long running replication streams

Expected behavior

Write / read parquet with correct size / format

Environment Description

  • Hudi version : 0.11.0

  • Spark version : 3.1.3

  • Hive version : 1.2.1000

  • Storage (HDFS) : 2.7.3

  • Running on Docker? (yes/no) : no

Additional context

this problem occasionally occurs on certain tables this is my config:

hudi {
  options{
    upsert_parallelisme_value = "1500"
    insert_parallelisme_value = "1500"
    bulk_insert_parallelisme_value = "1500"
    bulk_insert_sort_mode = "NONE"
    parquet_small_file_limit = "104857600"
    streaming_retry_count = "3"
    streaming_retry_interval_ms ="2000"
    parquet_max_file_size = "134217728"
    parquet_block_size = "134217728"
    parquet_page_size = "1048576"
    index_type = "SIMPLE"
    simple.index_use_caching = "true"
    simple.index_input_storage_level = "MEMORY_AND_DISK_SER"
    partition.fields = ""
    generator = "org.apache.hudi.keygen.NonpartitionedKeyGenerator"
    key_generator.hive = "org.apache.hudi.hive.NonPartitionedExtractor"
  }
  compaction {
    inline_compact = "true"
    inline_compact_num_delta_commits = "10"
    cleaner_commits_retained = "4"
    cleaner_policy = "KEEP_LATEST_COMMITS"
    cleaner_fileversions_retained = "3"
    async_clean = "true"
  }

MVCC conf:

      HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key -> "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider",
    HoodieLockConfig.ZK_CONNECT_URL.key -> "zookeper-poll:2181",
    HoodieLockConfig.ZK_PORT.key -> "2181",
    HoodieLockConfig.ZK_LOCK_KEY.key -> ( table.table_name),
    HoodieLockConfig.ZK_BASE_PATH.key -> ("/"+table.db_name),
    HoodieLockConfig.LOCK_ACQUIRE_NUM_RETRIES.key -> "15",
    HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.key -> "15",
    HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.key -> "60000",
    HoodieLockConfig.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS.key -> "60000",
    HoodieLockConfig.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS.key -> "20000",

Stacktrace

*for small parquet size

 Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 9.0 failed 4 times, most recent failure: Lost task 20.3 in stage 9.0 (TID 6057) (ocnode46 executor 2): org.apache.hudi.exception.HoodieException: unable to read next record from parquet file
at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
at org.apache.hudi.common.util.ParquetUtils$HoodieKeyIterator.hasNext(ParquetUtils.java:485)
at java.util.Iterator.forEachRemaining(Iterator.java:115)
at org.apache.hudi.common.util.ParquetUtils.fetchHoodieKeys(ParquetUtils.java:197)
at org.apache.hudi.common.util.ParquetUtils.fetchHoodieKeys(ParquetUtils.java:147)
at org.apache.hudi.io.HoodieKeyLocationFetchHandle.locations(HoodieKeyLocationFetchHandle.java:62)
at org.apache.hudi.index.simple.HoodieSimpleIndex.lambda$fetchRecordLocations$33972fb4$1(HoodieSimpleIndex.java:155)
at org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:117)
at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:177)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: hdfs://prod/cdc.db/database/table/723c5d09-573b-4df6-ad41-76ae19ec976f-0_2-16682-7063518_20231024224507047.parquet is not a Parquet file (too small length: 0)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:514)
at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:689)
at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:595)
at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:152)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
... 22 more 

one day i had also this error related to parquet format:

expected magic number at tail [80, 65, 82, 49] but found [2, -70, -67, -119] at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:524)

Armelabdelkbir avatar Oct 25 '23 13:10 Armelabdelkbir

@Armelabdelkbir I recommend you upgrade your Hudi version to 0.12.3 or 0.13.1 or 0.14.0. It may happen due to missing column in later records compared to previous ones. Do you have any such scenario?

ad1happy2go avatar Oct 25 '23 16:10 ad1happy2go

missing columnar, do you mean schema evolution, sometimes we have schema evolution, but not for this usecase. what is the impact of upgrade on production i have hundred of tables and billions of rows, i need just to upgrade the hudi version and keep same metadata folders ?

Armelabdelkbir avatar Oct 25 '23 16:10 Armelabdelkbir

@Armelabdelkbir You just need to upgrade the Hudi version. It should automatically upgrade. Were your metadata was enabled with 0.11.0? I guess it was off by default with 0.11.0. I recommend you to upgrade to 0.12.3 .

In this usecase, is your schema always consistent?

ad1happy2go avatar Oct 26 '23 06:10 ad1happy2go

@ad1happy2go my metadata is disabled in version 0.11.0: "hoodie.metadata.enable" -> "false" , currently I can't upgrade until the client migration is complete. schema evolution happen sometimes, but it disabled on my side due to my hive version 1.2.1 by waiting if i have empty parquets problems i need just to delete them ?

Armelabdelkbir avatar Oct 27 '23 12:10 Armelabdelkbir

You can try deleting these parquet files, although need to understand how they got created at first place.

ad1happy2go avatar Oct 29 '23 17:10 ad1happy2go

maybe compaction produced the broken parquet file when it failed for the first time and produced the normal parquet file when it retried successfully。There will be tow parquet file with same filegroup ID and instance time, like this: xxx partition --- 00000000_1-2-3_2023110412345.parquet (broken parquet) --- 00000000_4-5-6_2023110412345.parquet (normal parquet) you can see this issue to resolve the problem. https://github.com/apache/hudi/issues/9615

watermelon12138 avatar Nov 04 '23 03:11 watermelon12138

@Armelabdelkbir

watermelon12138 avatar Nov 04 '23 03:11 watermelon12138

@watermelon12138 thanks for the link of issue i'll check, my files are in the same filegroup: example 3e2e9939-71f0-41dc-a5ff-c276ae3cdfc6-0_0-819-355182_20231108134016057.parquet (broken parquet) ccf19756-bce5-402b-b85e-64232e2f34b2-0_242-819-355161_20231108134016057.parquet(normal parquet)

Armelabdelkbir avatar Nov 08 '23 14:11 Armelabdelkbir

I did encounter this in hudi 13.0 as well.

victorxiang30 avatar Dec 19 '23 07:12 victorxiang30

@victorxiang30 @Armelabdelkbir @watermelon12138 Can you provide the schema to help me to reproduce this.

If it has complex data type, can you try setting spark config spark.hadoop.parquet.avro.write-old-list-structure as false.

ad1happy2go avatar Jan 17 '24 06:01 ad1happy2go