[SUPPORT]hudi how to upsert a non null array data to a existing column with array of nulls,optional binary. java.lang.ClassCastException: optional binary element (UTF8) is not a group
Describe the problem you faced
We are trying to update an existing column col1 which has schema of a empty array, which is by default taken as array<string>. Perhaps the issue is that the new upcoming records has data in this existing column col1
that is it's an array of not null values. While upserting it throws error of •••binary Utf8 optional element of not group ••••. We don't have any predefined schema for these records, it's all inferred by default. Hence during insert this column col1 schema becomes array
In short this issue comes whenever we are trying to update the schema of a column from array<string> to array<struct<>> or array<array<>>. Kindly let me know if there is a work around or solution for it.
A clear and concise description of the problem.
To Reproduce
Steps to reproduce the behavior:
-
Insertrecords which has a column with only empty array as value - Upsert records with atleast one entry of non empty array as value in that column which previously had only empty array.
Expected behavior
Expected behaviour would be to upgrade schema of columns which had a default schema for an empty array(i.e array
Environment Description
-
AWS glue 3.0
-
Hudi version : 0.10.1
-
Spark version : 3.1.2
-
Running on Docker? (yes/no) : no, we are running glue jobs using pyspark
Additional context
Add any other context about the problem here.
Stacktrace
Add the stacktrace of the error.
java.lang.ClassCastException: optional binary element (UTF8) is not a group
@nsivabalan @n3nash @umehrot2 @ Kindly suggest what should be done in this use case, we are stuck with this issue for 1 month now.
Existing column schema in the hudi table created via bulk-insert.
Here the value for this array column was like:
.....
{ "id":1, "NWDepStatus": [] }
{ "id":2, "NWDepStatus": null }
....
This resulted in below schema for this column in hudi table during bulk insert
root
|-- NWDepStatus: array (nullable = true)
| |-- element: string (containsNull = true)
New incoming record schema for the same column is as below. This record is meant to be saved via upsert
with value as
{
"id": 1,
"NWDepCount": 0,
"NWDepStatus": [
{
"ClassId": "metric.DepStatus",
"Id": 21,
"Name": "MyNW_3",
"ObjectType": "metric.DepStatus",
"Status": "NA"
},
{
"ClassId": "metric.DepStatus",
"Id": 22,
"Name": "MyNW2",
"ObjectType": "metric.DepStatus",
"Status": "NA"
}
]
}
Resulting in schema as below which is different from the existing schema saved in hudi
root
|-- NWDepStatus: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- ClassId: string (nullable = true)
| | |-- Id: long (nullable = true)
| | |-- Name: string (nullable = true)
| | |-- ObjectType: string (nullable = true)
| | |-- Status: string (nullable = true)
I even tried altering the existing column schema before writing the new records by making the schema similar to new records with non empty array and retaining nulls in it but with no success.
+------------------------+
|NWDepStatus|
+------------------------+
|null |
|null |
+------------------------+
Configs are as follows:
commonConfig = {
'className': 'org.apache.hudi', 'hoodie.datasource.hive_sync.use_jdbc': 'false',
'hoodie.datasource.write.precombine.field': 'MdTimestamp',
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.table.name': 'hudi-table',
'hoodie.consistency.check.enabled': 'true',
'hoodie.datasource.hive_sync.database': args['database_name'],
'hoodie.datasource.write.reconcile.schema': 'true',
'hoodie.datasource.hive_sync.table': 'hudi + prefix.replace("/", "_").lower(),
'hoodie.datasource.hive_sync.enable': 'true', 'path': 's3://' + args['curated_bucket'] + '/hudi' + prefix,
'hoodie.parquet.small.file.limit': '134217728' # 1,024 * 1,024 * 128 = 134,217,728 (128 MB)
}
unpartitionDataConfig = {
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.NonPartitionedExtractor',
'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator'
}
initLoadConfig = {
'hoodie.bulkinsert.shuffle.parallelism': 68,
'hoodie.datasource.write.operation': 'bulk_insert'
}
incrementalConfig = {
'hoodie.upsert.shuffle.parallelism': 68,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
'hoodie.cleaner.commits.retained': 10
}
Checked this issue #2265 and the fix #2927. But even with the configs given as solution its not working and failing with the same error
inputDf.write
.format('org.apache.hudi')
.option('hoodie.datasource.write.operation', 'upsert')
.option("spark.hadoop.parquet.avro.write-old-list-structure", "false")
.option("parquet.avro.write-old-list-structure", "false")
.option("hoodie.parquet.avro.write-old-list-structure", "false")
.option("hoodie.datasource.write.reconcile.schema", "true")
.options(**combinedConf)
.mode('append')
.save()2022-05-27 18:22:12,568 WARN [task-result-getter-0] scheduler.TaskSetManager (Logging.scala:logWarning(69)): Lost task 0.0 in stage 363.0 (TID 8061) (172.36.166.181 executor 24): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0 at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$execute$ecf5068c$1(BaseSparkCommitActionExecutor.java:174) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384) at org.apache.spark.rdd.RDD.iterator(RDD.scala:335) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed at org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:102) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:351) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:342) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:315) ... 28 more Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:147) at org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:100) ... 31 more Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141) ... 32 more Caused by: org.apache.hudi.exception.HoodieException: operation has failed at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:248) at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226) at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52) at org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:278) at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121) at java.util.concurrent.FutureTask.run(FutureTask.java:266) ... 3 more Caused by: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:54) at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ... 4 more Caused by: java.lang.ClassCastException: optional binary element (UTF8) is not a group at org.apache.parquet.schema.Type.asGroupType(Type.java:207) at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:279) at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:232) at org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:78) at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.
(AvroRecordConverter.java:536) at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter. (AvroRecordConverter.java:486) at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:289) at org.apache.parquet.avro.AvroRecordConverter. (AvroRecordConverter.java:141) at org.apache.parquet.avro.AvroRecordConverter. (AvroRecordConverter.java:95) at org.apache.parquet.avro.AvroRecordMaterializer. (AvroRecordMaterializer.java:33) at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138) at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183) at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156) at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135) at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49) ... 8 more 2022-05-27 18:22:12,571 INFO [dispatcher-CoarseGrainedScheduler] scheduler.TaskSetManager (Logging.scala:logInfo(57)): Starting task 0.1 in stage 363.0 (TID 8062) (172.36.140.28, executor 15, partition 0, PROCESS_LOCAL, 4444 bytes) taskResourceAssignments Map()
Small update i tried to drop the column with nulls during upsert. Scenario:
- drop the column with all empty array during upsert and update table with same column name and non empty array data/ but this fails:
Please suggest/correct whats am I doing wrong here.
"error during schema update:" An error occurred while calling o615.save. : org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20220528225756057 at org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:63) at org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:46) at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:119) at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:103) at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:160) at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:217) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:277) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133) at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 253.0 failed 4 times, most recent failure: Lost task 1.3 in stage 253.0 (TID 5698) (172.34.21.0 executor 24): java.lang.ClassCastException: java.lang.String cannot be cast to scala.collection.Seq at org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToAvro$10(AvroConversionHelper.scala:327) at org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToAvro$14(AvroConversionHelper.scala:373) at org.apache.hudi.HoodieSparkUtils$.$anonfun$createRddInternal$3(HoodieSparkUtils.scala:157) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2465) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2414) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2413) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2413) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2679) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2621) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2610) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.collect(RDD.scala:1029) at org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:366) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:366) at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314) at org.apache.hudi.data.HoodieJavaPairRDD.countByKey(HoodieJavaPairRDD.java:103) at org.apache.hudi.index.bloom.HoodieBloomIndex.lookupIndex(HoodieBloomIndex.java:115) at org.apache.hudi.index.bloom.HoodieBloomIndex.tagLocation(HoodieBloomIndex.java:85) at org.apache.hudi.table.action.commit.SparkWriteHelper.tag(SparkWriteHelper.java:56) at org.apache.hudi.table.action.commit.SparkWriteHelper.tag(SparkWriteHelper.java:39) at org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:52) ... 45 more Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to scala.collection.Seq at org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToAvro$10(AvroConversionHelper.scala:327) at org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToAvro$14(AvroConversionHelper.scala:373) at org.apache.hudi.HoodieSparkUtils$.$anonfun$createRddInternal$3(HoodieSparkUtils.scala:157) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more
are you able to try spark 3.2 which has major parquet upgrade to 1.12 ?
Thanks for getting back @xushiyan AWS glue supports Spark 3.1, but i suppose with Hudi 0.11.0 bundle we get the parquet upgraded to 1.12, unfortunately we are not able to upgrade to the latest version of Hudi. Issue #5636 and i havent yet raised the aws ticket. Also wondering how is it working for others if it is the case.
From aws console:
Glue 3.0 – Supports Spark 3.1.1 and Python 3.7. Also includes new AWS Glue Spark runtime optimizations for performance and reliability. Upgraded several dependencies that were required for the new Spark version.
@gtwuser For the first bulk insert, are values of NWDepStatus empty? If it is, u could try to only write other columns into the table
I have opened similar issue throwing same exception during update. I have a spark-shell example in it. In my example schema have a array of structs with one field causing this issue. https://github.com/apache/hudi/issues/5985
@phillycoder It looks like we have a workaround in the other issue.
Looks like it is an open issue of Parquet format iself that has not yet been resolved https://issues.apache.org/jira/browse/PARQUET-1681 This seems to affect even the latest version 1.12.0 and the parquet community is still discussing the solution https://issues.apache.org/jira/browse/PARQUET-2069 Let's keep an eye on the above issues. There isn't much that can be done from Hudi.
We need to upgrade parquet-avro once the above issues are fixed. Closing this as it is not related to Hudi. Created HUDI-4798 to track parquet upgrade.