RLI Spark Hudi Error occurs when executing map
I am trying to ingest the data using spark+kafka streaming to hudi table with the RLI index. but unfortunately ingesting 5-10 records is throwing the below issue.
Steps to reproduce the behavior:
- first build dependency for hudi 14 and spark 3.4
- add hudi RLI index
Expected behavior
it should work end to end with RLI index enable
Environment Description
-
Hudi version : 14
-
Spark version : 3.4.0
-
Hive version : NA
-
Hadoop version : 3.3.4
-
Storage (HDFS/S3/GCS..) : S3
-
Running on Docker? (yes/no) : Yes
Additional context
Hudi Configuration: val hudiOptions = Map( "hoodie.table.name" -> "customer_profile", "hoodie.datasource.write.recordkey.field" -> "x,y", "hoodie.datasource.write.partitionpath.field" -> "x", "hoodie.datasource.write.precombine.field" -> "ts", "hoodie.table.type" -> "COPY_ON_WRITE", "hoodie.clean.max.commits" -> "6", "hoodie.clean.trigger.strategy" -> "NUM_COMMITS", "hoodie.cleaner.commits.retained" -> "4", "hoodie.cleaner.parallelism" -> "50", "hoodie.clean.automatic" -> "true", "hoodie.clean.async" -> "true", "hoodie.parquet.compression.codec" -> "snappy", "hoodie.index.type" -> "RECORD_INDEX", "hoodie.metadata.record.index.enable" -> "true", "hoodie.metadata.record.index.min.filegroup.count " -> "20", # in trial "hoodie.metadata.record.index.max.filegroup.count" -> "5000" )
Stacktrace
24/02/02 13:51:46 INFO BlockManagerInfo: Added broadcast_86_piece0 in memory on 10.224.52.183:42743 (size: 161.7 KiB, free: 413.7 MiB)
24/02/02 13:51:46 INFO BlockManagerInfo: Added broadcast_86_piece0 in memory on 10.224.50.139:39367 (size: 161.7 KiB, free: 413.7 MiB)
24/02/02 13:51:46 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 40 to 10.224.50.139:55724
24/02/02 13:51:46 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 40 to 10.224.52.183:34940
24/02/02 13:51:46 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 40 to 10.224.50.159:33310
24/02/02 13:51:47 INFO TaskSetManager: Starting task 9.0 in stage 148.0 (TID 553) (10.224.53.172, executor 3, partition 9, NODE_LOCAL, 7189 bytes)
24/02/02 13:51:47 WARN TaskSetManager: Lost task 1.0 in stage 148.0 (TID 545) (10.224.53.172 executor 3): org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: Error occurs when executing map
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.getThrowableException(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.reportException(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.invoke(Unknown Source)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateParallel(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source)
at org.apache.hudi.common.engine.HoodieLocalEngineContext.map(HoodieLocalEngineContext.java:84)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:270)
at org.apache.hudi.metadata.BaseTableMetadata.readRecordIndex(BaseTableMetadata.java:296)
at org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:170)
at org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:157)
at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsToPair$1(JavaRDDLike.scala:186)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.hudi.exception.HoodieException: Error occurs when executing map
at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:40)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(Unknown Source)
at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(Unknown Source)
at java.base/java.util.stream.AbstractTask.compute(Unknown Source)
at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Caused by: org.apache.hudi.exception.HoodieException: Exception when reading log file
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:414)
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:220)
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.scanByFullKeys(HoodieMergedLogRecordScanner.java:160)
at org.apache.hudi.metadata.HoodieMetadataLogRecordReader.getRecordsByKeys(HoodieMetadataLogRecordReader.java:108)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.readLogRecords(HoodieBackedTableMetadata.java:327)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.lookupKeysFromFileSlice(HoodieBackedTableMetadata.java:304)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$f9381e22$1(HoodieBackedTableMetadata.java:275)
at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:38)
... 13 more
Caused by: java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class org.apache.hudi.avro.model.HoodieDeleteRecordList (org.apache.avro.generic.GenericData$Record is in unnamed module of loader 'app'; org.apache.hudi.avro.model.HoodieDeleteRecordList is in unnamed module of loader org.apache.spark.util.MutableURLClassLoader @727177d3)
at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.deserialize(HoodieDeleteBlock.java:160)
at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.getRecordsToDelete(HoodieDeleteBlock.java:115)
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:828)
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:403)
... 20 more
24/02/02 13:51:47 INFO TaskSetManager: Starting task 7.0 in stage 148.0 (TID 554) (10.224.51.194, executor 4, partition 7, NODE_LOCAL, 7189 bytes)
24/02/02 13:51:47 INFO TaskSetManager: Finished task 3.0 in stage 148.0 (TID 547) in 587 ms on 10.224.51.194 (executor 4) (1/10)
24/02/02 13:51:47 INFO TaskSetManager: Finished task 2.0 in stage 148.0 (TID 546) in 684 ms on 10.224.52.197 (executor 2) (2/10)
24/02/02 13:51:47 INFO TaskSetManager: Starting task 1.1 in stage 148.0 (TID 555) (10.224.50.139, executor 1, partition 1, NODE_LOCAL, 7189 bytes)
24/02/02 13:51:47 INFO TaskSetManager: Finished task 8.0 in stage 148.0 (TID 552) in 596 ms on 10.224.50.139 (executor 1) (3/10)
24/02/02 13:51:47 INFO TaskSetManager: Finished task 0.0 in stage 148.0 (TID 550) in 690 ms on 10.224.50.159 (executor 6) (4/10)
24/02/02 13:51:47 INFO TaskSetManager: Finished task 4.0 in stage 148.0 (TID 548) in 789 ms on 10.224.53.213 (executor 5) (5/10)
24/02/02 13:51:47 INFO TaskSetManager: Finished task 6.0 in stage 148.0 (TID 551) in 799 ms on 10.224.52.183 (executor 7) (6/10)
24/02/02 13:51:47 WARN TaskSetManager: Lost task 9.0 in stage 148.0 (TID 553) (10.224.53.172 executor 3): org.apache.hudi.exception.HoodieException: Error occurs when executing map
at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:40)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(Unknown Source)
at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(Unknown Source)
at java.base/java.util.stream.AbstractTask.compute(Unknown Source)
at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.helpCC(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.externalHelpComplete(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.tryExternalHelp(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.externalAwaitDone(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.doInvoke(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.invoke(Unknown Source)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateParallel(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source)
at org.apache.hudi.common.engine.HoodieLocalEngineContext.map(HoodieLocalEngineContext.java:84)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:270)
at org.apache.hudi.metadata.BaseTableMetadata.readRecordIndex(BaseTableMetadata.java:296)
at org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:170)
at org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:157)
at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsToPair$1(JavaRDDLike.scala:186)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.hudi.exception.HoodieException: Exception when reading log file
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:414)
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:220)
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.scanByFullKeys(HoodieMergedLogRecordScanner.java:160)
at org.apache.hudi.metadata.HoodieMetadataLogRecordReader.getRecordsByKeys(HoodieMetadataLogRecordReader.java:108)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.readLogRecords(HoodieBackedTableMetadata.java:327)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.lookupKeysFromFileSlice(HoodieBackedTableMetadata.java:304)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$f9381e22$1(HoodieBackedTableMetadata.java:275)
at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:38)
... 40 more
Caused by: java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class org.apache.hudi.avro.model.HoodieDeleteRecordList (org.apache.avro.generic.GenericData$Record is in unnamed module of loader 'app'; org.apache.hudi.avro.model.HoodieDeleteRecordList is in unnamed module of loader org.apache.spark.util.MutableURLClassLoader @727177d3)
at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.deserialize(HoodieDeleteBlock.java:160)
at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.getRecordsToDelete(HoodieDeleteBlock.java:115)
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:828)
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:403)
... 47 more
24/02/02 13:51:47 INFO TaskSetManager: Starting task 9.1 in stage 148.0 (TID 556) (10.224.50.159, executor 6, partition 9, NODE_LOCAL, 7189 bytes)
24/02/02 13:51:47 INFO TaskSetManager: Finished task 7.0 in stage 148.0 (TID 554) in 414 ms on 10.224.51.194 (executor 4) (7/10)
24/02/02 13:51:47 INFO TaskSetManager: Finished task 1.1 in stage 148.0 (TID 555) in 512 ms on 10.224.50.139 (executor 1) (8/10)
24/02/02 13:51:47 INFO TaskSetManager: Finished task 9.1 in stage 148.0 (TID 556) in 403 ms on 10.224.50.159 (executor 6) (9/10)
24/02/02 13:51:48 INFO TaskSetManager: Finished task 5.0 in stage 148.0 (TID 549) in 1906 ms on 10.224.52.235 (executor 8) (10/10)
... 36 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1019)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1018)
at org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:367)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:367)
at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314)
at org.apache.hudi.data.HoodieJavaPairRDD.countByKey(HoodieJavaPairRDD.java:105)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.buildProfile(BaseSparkCommitActionExecutor.java:200)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:174)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:86)
at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:67)
... 36 more
Caused by: org.apache.hudi.exception.HoodieException: Error occurs when executing map
at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:40)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(Unknown Source)
at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(Unknown Source)
at java.base/java.util.stream.AbstractTask.compute(Unknown Source)
at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.doInvoke(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.invoke(Unknown Source)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateParallel(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source)
at org.apache.hudi.common.engine.HoodieLocalEngineContext.map(HoodieLocalEngineContext.java:84)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:270)
at org.apache.hudi.metadata.BaseTableMetadata.readRecordIndex(BaseTableMetadata.java:296)
at org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:170)
at org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:157)
at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsToPair$1(JavaRDDLike.scala:186)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class org.apache.avro.specific.SpecificRecordBase (org.apache.avro.generic.GenericData$Record and org.apache.avro.specific.SpecificRecordBase are in unnamed module of loader 'app')
at org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeAvroMetadata(TimelineMetadataUtils.java:209)
at org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeHoodieRollbackMetadata(TimelineMetadataUtils.java:177)
at org.apache.hudi.metadata.HoodieTableMetadataUtil.getRollbackedCommits(HoodieTableMetadataUtil.java:1355)
at org.apache.hudi.metadata.HoodieTableMetadataUtil.lambda$getValidInstantTimestamps$37(HoodieTableMetadataUtil.java:1284)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown Source)
at java.base/java.util.stream.ReferencePipeline$2$1.accept(Unknown Source)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(Unknown Source)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.base/java.util.stream.ReferencePipeline.forEach(Unknown Source)
at org.apache.hudi.metadata.HoodieTableMetadataUtil.getValidInstantTimestamps(HoodieTableMetadataUtil.java:1283)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:473)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.openReaders(HoodieBackedTableMetadata.java:429)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.getOrCreateReaders(HoodieBackedTableMetadata.java:414)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.lookupKeysFromFileSlice(HoodieBackedTableMetadata.java:291)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$f9381e22$1(HoodieBackedTableMetadata.java:275)
at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:38)
... 36 more
24/02/02 13:54:34 INFO HoodieStreamingSink: Retrying the failed micro batch id=6 ...
24/02/02 13:54:34 INFO TransactionManager: Transaction manager closed
24/02/02 13:54:34 INFO AsyncCleanerService: Shutting down async clean service...
24/02/02 13:54:34 INFO TransactionManager: Transaction manager closed
24/02/02 13:54:34 ERROR HoodieStreamingSink: Micro batch id=6 threw following expections,aborting streaming app to avoid data loss:
org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20240202135415589
at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:74)
at org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:44)
at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:114)
at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:103)
at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:142)
at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:224)
at org.apache.hudi.HoodieSparkSqlWriter$.writeInternal(HoodieSparkSqlWriter.scala:431)
at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:132)
at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$2(HoodieStreamingSink.scala:138)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$1(HoodieStreamingSink.scala:130)
at org.apache.hudi.HoodieStreamingSink.retry(HoodieStreamingSink.scala:234)
at org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:129)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:729)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:726)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:726)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 256.0 failed 4 times, most recent failure: Lost task 5.3 in stage 256.0 (TID 1220) (10.224.50.139 executor 1): org.apache.hudi.exception.HoodieException: Error occurs when executing map
at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:40)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(Unknown Source)
at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(Unknown Source)
at java.base/java.util.stream.AbstractTask.compute(Unknown Source)
at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.doInvoke(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.invoke(Unknown Source)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateParallel(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source)
at org.apache.hudi.common.engine.HoodieLocalEngineContext.map(HoodieLocalEngineContext.java:84)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:270)
at org.apache.hudi.metadata.BaseTableMetadata.readRecordIndex(BaseTableMetadata.java:296)
at org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:170)
at org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:157)
at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsToPair$1(JavaRDDLike.scala:186)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class org.apache.avro.specific.SpecificRecordBase (org.apache.avro.generic.GenericData$Record and org.apache.avro.specific.SpecificRecordBase are in unnamed module of loader 'app')
at org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeAvroMetadata(TimelineMetadataUtils.java:209)
at org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeHoodieRollbackMetadata(TimelineMetadataUtils.java:177)
at org.apache.hudi.metadata.HoodieTableMetadataUtil.getRollbackedCommits(HoodieTableMetadataUtil.java:1355)
at org.apache.hudi.metadata.HoodieTableMetadataUtil.lambda$getValidInstantTimestamps$37(HoodieTableMetadataUtil.java:1284)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown Source)
at java.base/java.util.stream.ReferencePipeline$2$1.accept(Unknown Source)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(Unknown Source)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.base/java.util.stream.ReferencePipeline.forEach(Unknown Source)
at org.apache.hudi.metadata.HoodieTableMetadataUtil.getValidInstantTimestamps(HoodieTableMetadataUtil.java:1283)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:473)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.openReaders(HoodieBackedTableMetadata.java:429)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.getOrCreateReaders(HoodieBackedTableMetadata.java:414)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.lookupKeysFromFileSlice(HoodieBackedTableMetadata.java:291)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$f9381e22$1(HoodieBackedTableMetadata.java:275)
at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:38)
... 36 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1019)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1018)
at org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:367)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:367)
at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314)
at org.apache.hudi.data.HoodieJavaPairRDD.countByKey(HoodieJavaPairRDD.java:105)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.buildProfile(BaseSparkCommitActionExecutor.java:200)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:174)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:86)
at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:67)
... 36 more
Caused by: org.apache.hudi.exception.HoodieException: Error occurs when executing map
at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:40)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(Unknown Source)
at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(Unknown Source)
at java.base/java.util.stream.AbstractTask.compute(Unknown Source)
at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.doInvoke(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.invoke(Unknown Source)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateParallel(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source)
at org.apache.hudi.common.engine.HoodieLocalEngineContext.map(HoodieLocalEngineContext.java:84)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:270)
at org.apache.hudi.metadata.BaseTableMetadata.readRecordIndex(BaseTableMetadata.java:296)
at org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:170)
at org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:157)
at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsToPair$1(JavaRDDLike.scala:186)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class org.apache.avro.specific.SpecificRecordBase (org.apache.avro.generic.GenericData$Record and org.apache.avro.specific.SpecificRecordBase are in unnamed module of loader 'app')
at org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeAvroMetadata(TimelineMetadataUtils.java:209)
at org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeHoodieRollbackMetadata(TimelineMetadataUtils.java:177)
at org.apache.hudi.metadata.HoodieTableMetadataUtil.getRollbackedCommits(HoodieTableMetadataUtil.java:1355)
at org.apache.hudi.metadata.HoodieTableMetadataUtil.lambda$getValidInstantTimestamps$37(HoodieTableMetadataUtil.java:1284)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown Source)
at java.base/java.util.stream.ReferencePipeline$2$1.accept(Unknown Source)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(Unknown Source)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.base/java.util.stream.ReferencePipeline.forEach(Unknown Source)
at org.apache.hudi.metadata.HoodieTableMetadataUtil.getValidInstantTimestamps(HoodieTableMetadataUtil.java:1283)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:473)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.openReaders(HoodieBackedTableMetadata.java:429)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.getOrCreateReaders(HoodieBackedTableMetadata.java:414)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.lookupKeysFromFileSlice(HoodieBackedTableMetadata.java:291)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$f9381e22$1(HoodieBackedTableMetadata.java:275)
at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:38)
... 36 more
24/02/02 13:54:34 INFO SparkContext: Invoking stop() from shutdown hook
24/02/02 13:54:34 INFO SparkContext: SparkContext is stopping with exitCode 0.
24/02/02 13:54:35 INFO SparkUI: Stopped Spark web UI at http://cdp-spark-hudi-poc-bd39e18d6a0fdb60-driver-svc.qbm-cdp-aggregation-spark.svc:4040
24/02/02 13:54:35 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
24/02/02 13:54:35 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
24/02/02 13:54:35 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
24/02/02 13:54:35 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.
org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.
at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:178)
at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:150)
at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:690)
at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:274)
at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:111)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Unknown Source)
24/02/02 13:54:35 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
24/02/02 13:54:35 INFO MemoryStore: MemoryStore cleared
24/02/02 13:54:35 INFO BlockManager: BlockManager stopped
24/02/02 13:54:35 INFO BlockManagerMaster: BlockManagerMaster stopped
24/02/02 13:54:35 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
24/02/02 13:54:35 INFO SparkContext: Successfully stopped SparkContext
24/02/02 13:54:35 INFO ShutdownHookManager: Shutdown hook called
24/02/02 13:54:35 INFO ShutdownHookManager: Deleting directory /var/data/spark-03498e5f-b96e-44c8-bbf1-1eee297285b4/spark-73e9b280-ade7-4575-a541-20f23b0844c2
24/02/02 13:54:35 INFO ShutdownHookManager: Deleting directory /tmp/spark-146b1b36-749b-453a-b4d3-8cfeb9ef192c
spark Ui log
cc: @codope @ad1happy2go @bhasudha
had a discussion with @maheshguptags , Issue can be either related to deserialiser configs or some bug in RLI. He is trying without RLI and will let us know his findings. Thanks a lot for your contribution @maheshguptags
@ad1happy2go I tried without RLI, it is working fine. however, when I add the RLI index to the table, it starts failing.
I am not sure why RLi is causing errors whereas without any index it is working fine.
Thanks @maheshguptags . As discussed are you getting same error with Hudi Streamer?
@ad1happy2go as discussed, I have tried hudi delta stream but unfortunately, I could not execute it due to heap space issues even without sending any data.
Command
spark/bin/spark-submit \
--name customer-event-hudideltaStream \
--num-executors 10 \
--executor-memory 3g \
--driver-memory 6g \
--conf spark.task.cpus=1 \
--conf spark.driver.extraJavaOptions="-XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/varadarb_ds_driver.hprof" \
--conf spark.executor.extraJavaOptions="-XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/varadarb_ds_executor.hprof" \
--packages org.apache.hadoop:hadoop-aws:3.3.4 \
--jars /home/mahesh.gupta/hudi-utilities-bundle_2.12-0.14.0.jar,/opt/kafka_2.13-2.8.1/aws-msk-iam-auth-1.1.9-all.jar \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /home/mahesh.gupta/hudi-utilities-slim-bundle_2.12-0.14.0.jar \
--checkpoint s3a://cdp-offline-xxx/checkpointing/eks/sparkhudipoc/hudistream_rli_3 \
--target-base-path s3a://cdp-offline-xxx/huditream_rli_3 \
--target-table customer_profile \
--table-type COPY_ON_WRITE \
--base-file-format PARQUET \
--props /home/mahesh.gupta/hoodie.properties \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--source-ordering-field updated_date \
--payload-class org.apache.hudi.common.model.DefaultHoodieRecordPayload \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--hoodie-conf hoodie.streamer.schemaprovider.source.schema.file=/home/mahesh.gupta/source.avsc \
--hoodie-conf hoodie.streamer.schemaprovider.target.schema.file=/home/mahesh.gupta/source.avsc \
--op UPSERT \
--hoodie-conf hoodie.streamer.source.kafka.topic=spark_hudi_temp \
--hoodie-conf hoodie.datasource.write.partitionpath.field=client_id \
--continuous
Stacktrace for same
auto.offset.reset: latest
bootstrap.servers: local:9092
hoodie.auto.adjust.lock.configs: true
hoodie.clean.async: true
hoodie.clean.automatic: true
hoodie.clean.max.commits: 6
hoodie.clean.trigger.strategy: NUM_COMMITS
hoodie.cleaner.commits.retained: 4
hoodie.cleaner.parallelism: 50
hoodie.datasource.write.partitionpath.field: client_id
hoodie.datasource.write.precombine.field: updated_date
hoodie.datasource.write.reconcile.schema: false
hoodie.datasource.write.recordkey.field: customer_id,client_id
hoodie.index.type: RECORD_INDEX
hoodie.metadata.record.index.enable: true
hoodie.metadata.record.index.max.filegroup.count: 5000
hoodie.metadata.record.index.min.filegroup.count: 20
hoodie.parquet.compression.codec: snappy
hoodie.streamer.schemaprovider.source.schema.file: /home/mahesh.gupta/source.avsc
hoodie.streamer.schemaprovider.target.schema.file: /home/mahesh.gupta/source.avsc
hoodie.streamer.source.kafka.topic: spark_hudi_temp
sasl.client.callback.handler.class: SENSITIVE_INFO_MASKED
sasl.jaas.config: SENSITIVE_INFO_MASKED
sasl.mechanism: SENSITIVE_INFO_MASKED
security.protocol: SASL_SSL
24/02/06 07:12:20 INFO FSUtils: Resolving file /home/mahesh.gupta/source.avscto be a remote file.
24/02/06 07:12:20 INFO HoodieSparkKeyGeneratorFactory: The value of hoodie.datasource.write.keygenerator.type is empty; inferred to be COMPLEX
24/02/06 07:12:20 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from s3a://cdp-offline-xxx/huditream_rli_3
24/02/06 07:12:20 INFO HoodieTableConfig: Loading table properties from s3a://cdp-offline-xxx/huditream_rli_3/.hoodie/hoodie.properties
24/02/06 07:12:20 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from s3a://cdp-offline-xxx/huditream_rli_3
24/02/06 07:12:20 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[20240205111704768__commit__COMPLETED__20240205111748000]}
24/02/06 07:12:21 INFO HoodieWriteConfig: Automatically set hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider since user has not set the lock provider for single writer with async table services
24/02/06 07:12:21 INFO HoodieWriteConfig: Automatically set hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider since user has not set the lock provider for single writer with async table services
24/02/06 07:12:21 INFO HoodieIngestionService: Ingestion service starts running in continuous mode
24/02/06 07:12:21 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from s3a://cdp-offline-xxx/huditream_rli_3
24/02/06 07:12:21 INFO HoodieTableConfig: Loading table properties from s3a://cdp-offline-xxxdev/huditream_rli_3/.hoodie/hoodie.properties
24/02/06 07:12:21 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from s3a://cdp-offline-xxx/huditream_rli_3
24/02/06 07:12:21 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[20240205111704768__commit__COMPLETED__20240205111748000]}
24/02/06 07:12:21 INFO StreamSync: Checkpoint to resume from : Option{val=spark_hudi_temp,0:732979,1:727818,2:725765,3:719464,4:721968,5:727487,6:737757,7:727566,8:736890,9:722032,10:723030,11:724587,12:723768,13:732789,14:721004,15:721541,16:734303,17:717704,18:734645,19:721914}
24/02/06 07:12:21 INFO ConsumerConfig: ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = local:9092
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-null-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = null
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = class software.amazon.msk.auth.iam.IAMClientCallbackHandler
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = AWS_MSK_IAM
security.protocol = SASL_SSL
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
24/02/06 07:12:22 INFO AbstractLogin: Successfully logged in.
24/02/06 07:12:22 INFO AppInfoParser: Kafka version: 2.8.0
24/02/06 07:12:22 INFO AppInfoParser: Kafka commitId: ebb1d6e21cc92130
24/02/06 07:12:22 INFO AppInfoParser: Kafka startTimeMs: 1707203542234
24/02/06 07:12:23 INFO Metadata: [Consumer clientId=consumer-null-1, groupId=null] Cluster ID: H-NPJc0UTZ6XH3XCAnEDOw
24/02/06 07:12:25 INFO Metrics: Metrics scheduler closed
24/02/06 07:12:25 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
24/02/06 07:12:25 INFO Metrics: Metrics reporters closed
24/02/06 07:12:25 INFO AppInfoParser: App info kafka.consumer for consumer-null-1 unregistered
24/02/06 07:12:25 INFO KafkaOffsetGen: SourceLimit not configured, set numEvents to default value : 5000000
24/02/06 07:12:25 INFO KafkaOffsetGen: getNextOffsetRanges set config hoodie.streamer.source.kafka.minPartitions to 0
Killed
@maheshguptags I tried to reproduce the issue but couldn't do it. Following are the artefacts.
Kafka-source.props
hoodie.datasource.write.recordkey.field=volume
hoodie.datasource.write.partitionpath.field=year
hoodie.datasource.write.precombine.field=ts
hoodie.clean.max.commits=6
hoodie.clean.trigger.strategy=NUM_COMMITS
hoodie.cleaner.commits.retained=4
hoodie.cleaner.parallelism=50
hoodie.clean.automatic=true
hoodie.clean.async=true
hoodie.parquet.compression.codec=snappy
hoodie.index.type=RECORD_INDEX
hoodie.metadata.record.index.enable=true
hoodie.metadata.record.index.min.filegroup.count=20
hoodie.metadata.record.index.max.filegroup.count=5000
hoodie.datasource.write.new.columns.nullable=true
hoodie.datasource.write.reconcile.schema=true
bootstrap.servers=localhost:9092
auto.offset.reset=latest
Command -
${SPARK_HOME}/bin/spark-submit --name customer-event-hudideltaStream \
--jars ${HOME_DIR}/jars/0.14.1/spark32/hudi-spark3.4-bundle_2.12-0.14.1.jar \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
${HOME_DIR}/jars/0.14.1/spark32/hudi-utilities-slim-bundle_2.12-0.14.1.jar \
--checkpoint file:///tmp/hudistreamer/test/checkpoint1 \
--target-base-path file:///tmp/hudistreamer/test/output1 \
--target-table customer_profile --table-type COPY_ON_WRITE \
--base-file-format PARQUET \
--props kafka-source.props \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource --source-ordering-field ts \
--payload-class org.apache.hudi.common.model.DefaultHoodieRecordPayload \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--hoodie-conf hoodie.streamer.schemaprovider.source.schema.file=${HOME_DIR}/docker_demo/conf/schema.avsc \
--hoodie-conf hoodie.streamer.schemaprovider.target.schema.file=${HOME_DIR}/docker_demo/conf/schema.avsc \
--op UPSERT --hoodie-conf hoodie.streamer.source.kafka.topic=stock_ticks \
--hoodie-conf hoodie.datasource.write.partitionpath.field=year \
--continuous
Had working session with @maheshguptags . We were able to consistently reproduce with composite key in his setup. although I couldn't reproduce in my setup. SO this issue is intermittent.
@yihua Can you please check .hoodie (attached) as you requested.
@ad1happy2go and @yihua any update on this?
facing same issue, wait for updates
@michael1991 just to check , Are you also using composite key? Can you post table configuration
@michael1991 just to check , Are you also using composite key? Can you post table configuration
@ad1happy2go please check below: #Updated at 2024-02-27T07:34:03.809265Z #Tue Feb 27 07:34:03 UTC 2024 hoodie.compaction.payload.class=org.apache.hudi.common.model.OverwriteWithLatestAvroPayload hoodie.table.type=MERGE_ON_READ hoodie.table.metadata.partitions=files,record_index hoodie.table.precombine.field=pk hoodie.table.partition.fields=req_date,req_hour hoodie.archivelog.folder=archived hoodie.table.cdc.enabled=false hoodie.timeline.layout.version=1 hoodie.table.checksum=50135889 hoodie.datasource.write.drop.partition.columns=false hoodie.table.timeline.timezone=LOCAL hoodie.table.name=hudi_test_0141 hoodie.table.recordkey.fields=pk hoodie.compaction.record.merger.strategy=eeb8d96f-b1e4-49fd-bbf8-28ac514178e5 hoodie.datasource.write.hive_style_partitioning=false hoodie.partition.metafile.use.base.format=false hoodie.table.metadata.partitions.inflight= hoodie.table.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator hoodie.populate.meta.fields=true hoodie.table.base.file.format=PARQUET hoodie.database.name=dxx_prod hoodie.datasource.write.partitionpath.urlencode=false hoodie.table.version=6
@michael1991 the above one is hoodie.properties and @ad1happy2go is asking for the table properties you used during table creation.
thanks
@michael1991 the above one is
hoodie.propertiesand @ad1happy2go is asking for the table properties you used during table creation. thanks
Thanks for reminding, i'm using Dataproc 2.1 with Spark 3.3.2 and Hudi 0.14.1. Do you mean following configurations: COMMON_HUDI_CONF_MAP = { "hoodie.database.name": "dxx_prod", "hoodie.table.name": "hudi_test_0141", "hoodie.datasource.write.recordkey.field": "pk", "hoodie.datasource.write.precombine.field": "pk", "hoodie.metadata.enable": "true", "hoodie.metadata.record.index.enable": "true", "hoodie.index.type": "RECORD_INDEX", "hoodie.schema.on.read.enable": "true", "hoodie.combine.before.upsert": "false", "hoodie.datasource.write.partitionpath.field": "req_date,req_hour", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.table.type": "MERGE_ON_READ", "hoodie.clean.automatic": "true", "hoodie.clean.async": "true", "hoodie.parquet.compression.codec": "snappy" }
@maheshguptags I noticed in your timeline, there is multi writer kind of scenario -
We will connect tomorrow to review this more why that is happening. I was under impression we are using just one writer.
Sure let me schedule some time and we will discuss it. Here is the meet link RLI deltastream Friday, March 1 · 1:00 – 1:30pm Time zone: Asia/Kolkata Google Meet joining info Video call link: https://meet.google.com/eag-fhus-fgi Or dial: (US) +1 470-273-8839 PIN: 148 607 184# More phone numbers: https://tel.meet/eag-fhus-fgi?pin=9295130338114 Please let me know your thought on this.
Any conclusion on this issue? I am facing same issue too.
10:29:32.481 [qtp264384338-719] ERROR org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader - Got exception when reading log file
java.lang.ClassCastException: org.apache.hudi.avro.model.HoodieDeleteRecordList cannot be cast to org.apache.hudi.avro.model.HoodieDeleteRecordList
at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.deserialize(HoodieDeleteBlock.java:160) ~[org.apache.hudi_hudi-spark3.4-bundle_2.12-0.14.0.jar:0.14.0]
at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.getRecordsToDelete(HoodieDeleteBlock.java:115) ~[org.apache.hudi_hudi-spark3.4-bundle_2.12-0.14.0.jar:0.14.0]
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:828) ~[org.apache.hudi_hudi-spark3.4-bundle_2.12-0.14.0.jar:0.14.0]
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:403) ~[org.apache.hudi_hudi-spark3.4-bundle_2.12-0.14.0.jar:0.14.0]
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:220) ~[org.apache.hudi_hudi-spark3.4-bundle_2.12-0.14.0.jar:0.14.0]
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:201) ~[org.apache.hudi_hudi-spark3.4-bundle_2.12-0.14.0.jar:0.14.0]
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.
============================ Your help is highly appreciated.
@bksrepo which version you used to load the data? Is it an upgraded table? Original issue is different here compared to your stack trace. Can you share all the table/writer configs or may be a reproducible code if possible.
@ad1happy2go I am using spark 3.4.1 with hudi bundle 'hudi-spark3.4-bundle_2.12-0.14.0.jar', Hadoop is 3.3.6 and source database is mysql version 8.0.36
Reported ERROR comes at the time of saving the data-frame. upto df.show() code works fine.
Thank you for your help.
=================================================================================================================
from pyspark.sql import SparkSession,functions from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DecimalType, DateType, TimestampType, BooleanType
SparkSession
spark = SparkSession.builder
.appName('Sample_COW')
.config("spark.yarn.jars", "/opt/spark-3.4.1-bin-hadoop3/jars/*.jar")
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension')
.config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog')
.config('spark.kryo.registrator','org.apache.spark.HoodieSparkKryoRegistrar')
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.config('spark.sql.warehouse.dir','hdfs://nn:8020/mnt/hive/warehouse')
.config('spark.sql.debug.maxToStringFields', '200')
.config('spark.hadoop.fs.defaultFS','hdfs://Name-Node-Server:8020')
.config('spark.executor.extraClassPath','/opt/spark-3.4.1-bin-hadoop3/jars/jackson-databind-2.14.2.jar')
.config('spark.driver.extraClassPath','/opt/spark-3.4.1-bin-hadoop3/jars/jackson-databind-2.14.2.jar')
.config('spark.hadoop.yarn.resourcemanager.hostname','Name-Node-Server')
.config("spark.sql.hive.convertMetastoreParquet", "true")
.config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
.config("spark.hadoop.fs.replication", "1")
.enableHiveSupport()
.getOrCreate()
Define MySQL connection properties along with selective columns with a where clause.
mysql_props = { "url": "jdbc:mysql://localhost:3306/XXXX", "driver": "com.mysql.cj.jdbc.Driver", "user": "XXXXXx", "password": "XXXXX", "dbtable": "(select id, pid, center_id, center_code, visit_type, create_price_list_id, gender, age, age_frequency, clinical_detail, clinical_history_file, sample_drawn_date, sample_drawn_time_hrs, sample_drawn_time_min, referal_doctor_id, referal_doctor, referal_customer_id, referal_customer, department_id, profile_ids, test_ids, amount, discount, total_amount, mrp, payment_mode, amount_paid, amount_balance, test_status_code, UNIX_TIMESTAMP(log_date_created) AS log_date_created, created_by, deleted, sample_status, other_comments, team_lead_id, tech_lead_id, pathologist_id, tele_pathologist_id, Graph_path, UNIX_TIMESTAMP(CONVERT_TZ(authentication_date,'+05:30','+00:00')) AS authentication_date, reference_patient_id, protocol_id, visit_info, ref_center, investigator_details, month_year, UNIX_TIMESTAMP(CONVERT_TZ(sample_collection_datetime_at_source,'+05:30', '+00:00')) AS sample_collection_timestamp FROM sample) as sample" }
Read data from MySQL
df = spark.read.format("jdbc").options(**mysql_props).load()
Define Hudi tables schema to avoide any auto FieldType conversion and casting issues.
hoodie_schema = StructType([ StructField("id", IntegerType(), True), StructField("pid", StringType(), True), StructField("center_id", IntegerType(), True), StructField("center_code", StringType(), True), StructField("visit_type", StringType(), True), StructField("create_price_list_id", IntegerType(), True), StructField("gender", StringType(), True), StructField("age", IntegerType(), True), StructField("age_frequency", StringType(), True), StructField("clinical_detail", StringType(), True), StructField("clinical_history_file", StringType(), True), StructField("sample_drawn_date", DateType(), True), StructField("sample_drawn_time_hrs", StringType(), True), StructField("sample_drawn_time_min", StringType(), True), StructField("referal_doctor_id", StringType(), True), StructField("referal_doctor", StringType(), True), StructField("referal_customer_id", StringType(), True), StructField("referal_customer", StringType(), True), StructField("department_id", IntegerType(), True), StructField("profile_ids", StringType(), True), StructField("test_ids", StringType(), True), StructField("amount", DecimalType(precision=11, scale=2), True), StructField("discount", DecimalType(precision=11, scale=2), True), StructField("total_amount", DecimalType(precision=11, scale=2), True), StructField("mrp", DecimalType(precision=11, scale=2), True), StructField("payment_mode", StringType(), True), StructField("amount_paid", DecimalType(precision=11, scale=2), True), StructField("amount_balance", DecimalType(precision=11, scale=2), True), StructField("test_status_code", StringType(), True), StructField("log_date_created", IntegerType(), True), StructField("created_by", StringType(), True), StructField("deleted", BooleanType(), True), StructField("sample_status", StringType(), True), StructField("other_comments", StringType(), True), StructField("team_lead_id", IntegerType(), True), StructField("tech_lead_id", IntegerType(), True), StructField("pathologist_id", IntegerType(), True), StructField("tele_pathologist_id", IntegerType(), True), StructField("graph_path", StringType(), True), StructField("authentication_date", IntegerType(), True), StructField("reference_patient_id", StringType(), True), StructField("protocol_id", StringType(), True), StructField("visit_info", StringType(), True), StructField("ref_center", StringType(), True), StructField("investigator_details", StringType(), True), StructField("month_year", StringType(), True), StructField("sample_collection_timestamp", IntegerType(), True) ])
hudi_options = { 'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.table.name': 'sample_cow', 'hoodie.datasource.write.schema': hoodie_schema.json(), 'hoodie.datasource.write.table.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.operation':'bulk_insert', 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator', 'hoodie.upsert.shuffle.parallelism': '2', 'hoodie.insert.shuffle.parallelism': '2', 'hoodie.table.name':'sample_cow', 'path': '/datalake/sample/etl/sample_cow/np/cow/'
}
df.show()
Write data to Hudi COW table in Parquet format
(df .write .format("org.apache.hudi") .options(**hudi_options) .mode('overwrite') .save()) # Save to HDFS
spark.stop()
hey @bksrepo : can you file a new issue hey @ad1happy2go : if the original issue is resolved, can we close it out.
and @ad1happy2go : if you encounter any bugs wrt MDT or RLI, do keep me posted.
@nsivabalan We haven't resolved the original issue and it is still open.
@nsivabalan We were not able to reproduce this error in our setup. I went into multiple calls with @maheshguptags and setup the exact same setup in my local. But He is consistently reproduce this issue. Also discussed with @yihua on this before. Can you or @yihua also review the hoodie.properties(attached here - https://github.com/apache/hudi/issues/10609#issuecomment-1966687166) and see in case you have any insights here.
I hit the same error when I try to use record indexing:
hoodie.metadata.record.index.enable=true
hoodie.index.type=RECORD_INDEX
Are there additional configs/jars that are needed?
I hit the same error when I try to use record indexing:
hoodie.metadata.record.index.enable=true hoodie.index.type=RECORD_INDEXAre there additional configs/jars that are needed?
Hey @jayakasadev , I've resolved this issue by adding config on Spark spark.driver.extraClassPath and spark.executor.extraClassPath with hudi spark bundle jar.
Hey @ad1happy2go @nsivabalan , seems we could add this config as solution on FAQ, right?
@michael1991 can you add the value that you pass spark.executor.extraClassPath and spark.driver.extraClassPath? so that I can try at my end as well.
@michael1991 can you add the value that you pass
spark.executor.extraClassPathandspark.driver.extraClassPath? so that I can try at my end as well.
Sure @maheshguptags, due to I'm using GCP Dataproc, so I just set --jars with hudi-spark-bundle.jar GCS path, then set these two properties.
dataproc command example:
gcloud dataproc jobs submit spark --cluster=spark-test --region=us-east1 --driver-log-levels=root=WARN --jars=gs://bucket/jobs/test-batch-0.1.0.jar,gs://bucket/jars/hudi/hudi-spark3.5-bundle_2.12-0.15.0.jar --class=com.test.job.LogAppender --properties=^#^spark.driver.extraClassPath=hudi-spark3.5-bundle_2.12-0.15.0.jar#spark.executor.extraClassPath=hudi-spark3.5-bundle_2.12-0.15.0.jar
Hi, @michael1991 thank you for solving this, I can run the deltastream with RLI. Out of curiosity, how did you figure out we need to pass the jar in extraPath?
--name customer-event-hudideltaStream \
--num-executors 10 \
--executor-memory 2g \
--driver-memory 3g \
--packages org.apache.hadoop:hadoop-aws:3.3.4 \
--jars /home/mahesh.gupta/aws-msk-iam-auth-1.1.9-all.jar \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /home/mahesh.gupta/hudi-utilities-bundle_2.12-0.14.1.jar \
--checkpoint s3a://cdp-offline-store-perf2/checkpointing/eks/sparkhudipoc/hudistream_rli_4 \
--target-base-path s3a://cdp-offline-store-perf2/customer_event_temp_hudi_delta/ \
--target-table customer_event_temp \
--table-type COPY_ON_WRITE \
--base-file-format PARQUET \
--props /home/mahesh.gupta/deltaHoodie.properties \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--source-ordering-field updated_date \
--payload-class org.apache.hudi.common.model.DefaultHoodieRecordPayload \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--hoodie-conf hoodie.streamer.schemaprovider.source.schema.file=/home/mahesh.gupta/source.avsc \
--hoodie-conf hoodie.streamer.schemaprovider.target.schema.file=/home/mahesh.gupta/source.avsc \
--op UPSERT \
--hoodie-conf hoodie.streamer.source.kafka.topic=cdp_track_temp_perf \
--hoodie-conf hoodie.datasource.write.partitionpath.field=client_id \
--continuous
@ad1happy2go will need some help in memory tuning for delta stream. please let me know if there is any doc fo it.
Hey @maheshguptags , I just got inspired by GCP Dataproc Doc here: https://cloud.google.com/dataproc/docs/concepts/components/hudi
Actually, I don't know why need to pass the jar in extraPath, I guess maybe some classes missing on executors for cloud env.
Thank you very much @michael1991 !!.