hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT]hudi how to upsert a non null array data to a existing column with array of nulls,optional binary. java.lang.ClassCastException: optional binary element (UTF8) is not a group

Open gtwuser opened this issue 3 years ago • 8 comments

Describe the problem you faced We are trying to update an existing column col1 which has schema of a empty array, which is by default taken as array<string>. Perhaps the issue is that the new upcoming records has data in this existing column col1 that is it's an array of not null values. While upserting it throws error of •••binary Utf8 optional element of not group ••••. We don't have any predefined schema for these records, it's all inferred by default. Hence during insert this column col1 schema becomes array by default. But since the new upcoming records have non null or non empty array values while upserting them to tu his column it fails the upsert operation.

In short this issue comes whenever we are trying to update the schema of a column from array<string> to array<struct<>> or array<array<>>. Kindly let me know if there is a work around or solution for it.

A clear and concise description of the problem.

To Reproduce

Steps to reproduce the behavior:

  1. Insert records which has a column with only empty array as value
  2. Upsert records with atleast one entry of non empty array as value in that column which previously had only empty array.

Expected behavior Expected behaviour would be to upgrade schema of columns which had a default schema for an empty array(i.e array) to the new recieved non empty array value schema. That is upgrade a array based column schema from default array to a more complex schema of the data which the non empty array holds.

Environment Description

  • AWS glue 3.0

  • Hudi version : 0.10.1

  • Spark version : 3.1.2

  • Running on Docker? (yes/no) : no, we are running glue jobs using pyspark

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error. java.lang.ClassCastException: optional binary element (UTF8) is not a group

gtwuser avatar May 27 '22 06:05 gtwuser

@nsivabalan @n3nash @umehrot2 @ Kindly suggest what should be done in this use case, we are stuck with this issue for 1 month now. Existing column schema in the hudi table created via bulk-insert. Here the value for this array column was like:

.....
{ "id":1, "NWDepStatus": [] }
{ "id":2, "NWDepStatus": null }
....

This resulted in below schema for this column in hudi table during bulk insert

root
 |-- NWDepStatus: array (nullable = true)
 |    |-- element: string (containsNull = true)

New incoming record schema for the same column is as below. This record is meant to be saved via upsert

with value as

{
    "id": 1,
    "NWDepCount": 0,
    "NWDepStatus": [
        {
            "ClassId": "metric.DepStatus",
            "Id": 21,
            "Name": "MyNW_3",
            "ObjectType": "metric.DepStatus",
            "Status": "NA"
        },
        {
            "ClassId": "metric.DepStatus",
            "Id": 22,
            "Name": "MyNW2",
            "ObjectType": "metric.DepStatus",
            "Status": "NA"
        }
    ]
}

Resulting in schema as below which is different from the existing schema saved in hudi

root
 |-- NWDepStatus: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ClassId: string (nullable = true)
 |    |    |-- Id: long (nullable = true)
 |    |    |-- Name: string (nullable = true)
 |    |    |-- ObjectType: string (nullable = true)
 |    |    |-- Status: string (nullable = true)

I even tried altering the existing column schema before writing the new records by making the schema similar to new records with non empty array and retaining nulls in it but with no success.

+------------------------+
|NWDepStatus|
+------------------------+
|null                   |
|null                   |
+------------------------+

Configs are as follows:

          commonConfig = {
            'className': 'org.apache.hudi', 'hoodie.datasource.hive_sync.use_jdbc': 'false',
            'hoodie.datasource.write.precombine.field': 'MdTimestamp',
            'hoodie.datasource.write.recordkey.field': 'id',
            'hoodie.table.name': 'hudi-table', 
            'hoodie.consistency.check.enabled': 'true',
            'hoodie.datasource.hive_sync.database': args['database_name'],
            'hoodie.datasource.write.reconcile.schema': 'true',
            'hoodie.datasource.hive_sync.table': 'hudi + prefix.replace("/", "_").lower(),
            'hoodie.datasource.hive_sync.enable': 'true', 'path': 's3://' + args['curated_bucket'] + '/hudi' + prefix,
            'hoodie.parquet.small.file.limit': '134217728' # 1,024 * 1,024 * 128 = 134,217,728 (128 MB)
        }
        unpartitionDataConfig = {
            'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.NonPartitionedExtractor', 
            'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator'
        }
        initLoadConfig = {
                         'hoodie.bulkinsert.shuffle.parallelism': 68,
                          'hoodie.datasource.write.operation': 'bulk_insert'
        }
      incrementalConfig = {
            'hoodie.upsert.shuffle.parallelism': 68, 
            'hoodie.datasource.write.operation': 'upsert',
            'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS', 
            'hoodie.cleaner.commits.retained': 10
        }

Checked this issue #2265 and the fix #2927. But even with the configs given as solution its not working and failing with the same error

inputDf.write
.format('org.apache.hudi')
.option('hoodie.datasource.write.operation', 'upsert')
.option("spark.hadoop.parquet.avro.write-old-list-structure", "false")
.option("parquet.avro.write-old-list-structure", "false")
.option("hoodie.parquet.avro.write-old-list-structure", "false")
.option("hoodie.datasource.write.reconcile.schema", "true")
.options(**combinedConf)
.mode('append')
.save()

2022-05-27 18:22:12,568 WARN [task-result-getter-0] scheduler.TaskSetManager (Logging.scala:logWarning(69)): Lost task 0.0 in stage 363.0 (TID 8061) (172.36.166.181 executor 24): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0 at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$execute$ecf5068c$1(BaseSparkCommitActionExecutor.java:174) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384) at org.apache.spark.rdd.RDD.iterator(RDD.scala:335) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed at org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:102) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:351) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:342) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:315) ... 28 more Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:147) at org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:100) ... 31 more Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141) ... 32 more Caused by: org.apache.hudi.exception.HoodieException: operation has failed at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:248) at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226) at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52) at org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:278) at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121) at java.util.concurrent.FutureTask.run(FutureTask.java:266) ... 3 more Caused by: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:54) at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ... 4 more Caused by: java.lang.ClassCastException: optional binary element (UTF8) is not a group at org.apache.parquet.schema.Type.asGroupType(Type.java:207) at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:279) at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:232) at org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:78) at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.(AvroRecordConverter.java:536) at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.(AvroRecordConverter.java:486) at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:289) at org.apache.parquet.avro.AvroRecordConverter.(AvroRecordConverter.java:141) at org.apache.parquet.avro.AvroRecordConverter.(AvroRecordConverter.java:95) at org.apache.parquet.avro.AvroRecordMaterializer.(AvroRecordMaterializer.java:33) at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138) at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183) at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156) at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135) at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49) ... 8 more 2022-05-27 18:22:12,571 INFO [dispatcher-CoarseGrainedScheduler] scheduler.TaskSetManager (Logging.scala:logInfo(57)): Starting task 0.1 in stage 363.0 (TID 8062) (172.36.140.28, executor 15, partition 0, PROCESS_LOCAL, 4444 bytes) taskResourceAssignments Map()

gtwuser avatar May 27 '22 21:05 gtwuser

Small update i tried to drop the column with nulls during upsert. Scenario:

  1. drop the column with all empty array during upsert and update table with same column name and non empty array data/ but this fails:

Please suggest/correct whats am I doing wrong here.

"error during schema update:" An error occurred while calling o615.save. : org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20220528225756057 at org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:63) at org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:46) at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:119) at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:103) at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:160) at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:217) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:277) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133) at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 253.0 failed 4 times, most recent failure: Lost task 1.3 in stage 253.0 (TID 5698) (172.34.21.0 executor 24): java.lang.ClassCastException: java.lang.String cannot be cast to scala.collection.Seq at org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToAvro$10(AvroConversionHelper.scala:327) at org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToAvro$14(AvroConversionHelper.scala:373) at org.apache.hudi.HoodieSparkUtils$.$anonfun$createRddInternal$3(HoodieSparkUtils.scala:157) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2465) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2414) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2413) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2413) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2679) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2621) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2610) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.collect(RDD.scala:1029) at org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:366) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:366) at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314) at org.apache.hudi.data.HoodieJavaPairRDD.countByKey(HoodieJavaPairRDD.java:103) at org.apache.hudi.index.bloom.HoodieBloomIndex.lookupIndex(HoodieBloomIndex.java:115) at org.apache.hudi.index.bloom.HoodieBloomIndex.tagLocation(HoodieBloomIndex.java:85) at org.apache.hudi.table.action.commit.SparkWriteHelper.tag(SparkWriteHelper.java:56) at org.apache.hudi.table.action.commit.SparkWriteHelper.tag(SparkWriteHelper.java:39) at org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:52) ... 45 more Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to scala.collection.Seq at org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToAvro$10(AvroConversionHelper.scala:327) at org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToAvro$14(AvroConversionHelper.scala:373) at org.apache.hudi.HoodieSparkUtils$.$anonfun$createRddInternal$3(HoodieSparkUtils.scala:157) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more

gtwuser avatar May 28 '22 23:05 gtwuser

are you able to try spark 3.2 which has major parquet upgrade to 1.12 ?

xushiyan avatar May 30 '22 12:05 xushiyan

Thanks for getting back @xushiyan AWS glue supports Spark 3.1, but i suppose with Hudi 0.11.0 bundle we get the parquet upgraded to 1.12, unfortunately we are not able to upgrade to the latest version of Hudi. Issue #5636 and i havent yet raised the aws ticket. Also wondering how is it working for others if it is the case. From aws console:

Glue 3.0 – Supports Spark 3.1.1 and Python 3.7. Also includes new AWS Glue Spark runtime optimizations for performance and reliability. Upgraded several dependencies that were required for the new Spark version.

gtwuser avatar May 30 '22 14:05 gtwuser

@gtwuser For the first bulk insert, are values of NWDepStatus empty? If it is, u could try to only write other columns into the table

minihippo avatar Jun 27 '22 15:06 minihippo

I have opened similar issue throwing same exception during update. I have a spark-shell example in it. In my example schema have a array of structs with one field causing this issue. https://github.com/apache/hudi/issues/5985

phillycoder avatar Jun 28 '22 12:06 phillycoder

@phillycoder It looks like we have a workaround in the other issue.

minihippo avatar Jul 13 '22 19:07 minihippo

Looks like it is an open issue of Parquet format iself that has not yet been resolved https://issues.apache.org/jira/browse/PARQUET-1681 This seems to affect even the latest version 1.12.0 and the parquet community is still discussing the solution https://issues.apache.org/jira/browse/PARQUET-2069 Let's keep an eye on the above issues. There isn't much that can be done from Hudi.

codope avatar Aug 01 '22 09:08 codope

We need to upgrade parquet-avro once the above issues are fixed. Closing this as it is not related to Hudi. Created HUDI-4798 to track parquet upgrade.

codope avatar Sep 07 '22 10:09 codope