How to read existing hoodie written data from `S3` using `AWS Glue DynamicFrame` class. Failing with error with below error: An error occurred while calling o84.getDynamicFrame. s3://xxxx/.hoodie/202212312312.commit is not a Parquet file. expected magic number at tail
Describe the problem you faced
Error found while reading data written using Hudi in a S3 prefix.
A clear and concise description of the problem.
We are writing data to S3 using AWS glue with Hudi libraries. The issue is when we try to read the already hudi written data from this S3 prefix its erroring out.
To Reproduce
Steps to reproduce the behavior:
- Write some data via AWS Glue Job script to S3 using Hudi apis and libraries.
- Try to read them back using DynamicFrame class with code as mentioned below
- It will be erroring out with failure
import sys
from awsglue.transforms import Join
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
glueContext = GlueContext(SparkContext.getOrCreate())
data = glueContext.create_dynamic_frame.from_catalog(database="ingestion_hudi_db_392d4f60", table_name="ingestion_details")
print("Count: ", data.count())
data.printSchema()
Expected behavior
A clear and concise description of what you expected to happen. We should be able to read back the Hudi written data using AWS glue dynamicFrame class. Just as we are able to write it. Sample code of how data is written to S3 in hudi format using DynamicFrame class
glueContext.write_dynamic_frame.from_options(frame=DynamicFrame.fromDF(inputDf, glueContext, "inputDf"),
connection_type="marketplace.spark",
connection_options=combinedConf)
Environment Description
- Hudi version : 0.10.0
- Spark version : 3.1
- Storage (HDFS/S3/GCS..) : S3
- Running on Docker? (yes/no) : no
Additional context
Add any other context about the problem here. If we find a solution to this issue it will also resolve #5880 Stacktrace
Add the stacktrace of the error.
An error occurred while calling o84.getDynamicFrame. s3://hudi-bucket-392d4f60/ingestion/.hoodie/20220616185638342.commit is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [32, 125, 10, 125]
@gtwuser I am not very familiar with Glue APIs. But, can you share the full stacktrace? Also, don't you need to give connection_type while creating dataframe? I am referring to https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame-reader.html#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-reader-from_options
@umehrot2 @zhedoubushishi @xushiyan Have you come across such an issue?
@gtwuser I am not very familiar with Glue APIs. But, can you share the full stacktrace? Also, don't you need to give
connection_typewhile creating dataframe? I am referring to https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame-reader.html#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-reader-from_options
@codope sorry for the delay and thanks for your response, also you are right we do need connection_type but thats when we are creating DynamicFrame instance using from_option method, here im using from_catalog so we wont need it. Ref_link
@umehrot2 @zhedoubushishi : Can you guys please assist here.
Can confirm having the same issue (see full stack trace from CloudWatch below). Is there a viable workaround?
"Event": "GlueETLJobExceptionEvent",
"Timestamp": 1663789368616,
"Failure Reason": "Traceback (most recent call last):\n File \"/tmp/some_glue_job.py\", line 29, in <module>\n datasource_some_table_name = glueContext.create_dynamic_frame.from_catalog(database = args['ENV']+\"_some_datawarehouse\", table_name = \"some_table_name\", transformation_ctx = \"datasource_cdhr0\")\n File \"/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py\", line 642, in from_catalog\n return self._glue_context.create_dynamic_frame_from_catalog(db, table_name, redshift_tmp_dir, transformation_ctx, push_down_predicate, additional_options, catalog_id, **kwargs)\n File \"/opt/amazon/lib/python3.6/site-packages/awsglue/context.py\", line 186, in create_dynamic_frame_from_catalog\n return source.getFrame(**kwargs)\n File \"/opt/amazon/lib/python3.6/site-packages/awsglue/data_source.py\", line 36, in getFrame\n jframe = self._jsource.getDynamicFrame()\n File \"/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py\", line 1257, in __call__\n answer, self.gateway_client, self.target_id, self.name)\n File \"/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py\", line 63, in deco\n return f(*a, **kw)\n File \"/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py\", line 328, in get_return_value\n format(target_id, \".\", name), value)\npy4j.protocol.Py4JJavaError: An error occurred while calling o82.getDynamicFrame.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, 127.0.0.1, executor 1): org.apache.spark.SparkException: Exception thrown in awaitResult: \n\tat org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)\n\tat org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:290)\n\tat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readParquetFootersInParallel(ParquetFileFormat.scala:538)\n\tat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:611)\n\tat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:603)\n\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)\n\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:288)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:121)\n\tat org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:750)\nCaused by: java.io.IOException: Could not read footer for file: FileStatus{path=s3://clx-datawarehouse-qa/dwh/cardholder/some_table_name/.hoodie/20220831153536771.commit; isDirectory=false; length=15280; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}\n\tat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:551)\n\tat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:538)\n\tat org.apache.spark.util.ThreadUtils$$anonfun$3$$anonfun$apply$1.apply(ThreadUtils.scala:287)\n\tat scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)\n\tat scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)\n\tat scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)\n\tat scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\nCaused by: java.lang.RuntimeException: s3://clx-datawarehouse-qa/dwh/cardholder/some_table_name/.hoodie/20220831153536771.commit is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [32, 125, 10, 125]\n\tat org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:524)\n\tat org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:505)\n\tat org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:499)\n\tat org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:476)\n\tat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:544)\n\t... 9 more\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)\n\tat scala.Option.foreach(Option.scala:257)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:363)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:944)\n\tat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:633)\n\tat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.inferSchema(ParquetFileFormat.scala:241)\n\tat org.apache.spark.sql.wrapper.SparkSqlDecoratorDataSource$$anonfun$2.apply(SparkSqlDecoratorDataSource.scala:33)\n\tat org.apache.spark.sql.wrapper.SparkSqlDecoratorDataSource$$anonfun$2.apply(SparkSqlDecoratorDataSource.scala:33)\n\tat scala.Option.orElse(Option.scala:289)\n\tat org.apache.spark.sql.wrapper.SparkSqlDecoratorDataSource.getOrInferFileFormatSchema(SparkSqlDecoratorDataSource.scala:32)\n\tat org.apache.spark.sql.wrapper.SparkSqlDecoratorDataSource.resolveRelation(SparkSqlDecoratorDataSource.scala:53)\n\tat com.amazonaws.services.glue.SparkSQLDataSource$$anonfun$getDynamicFrame$9.apply(DataSource.scala:761)\n\tat com.amazonaws.services.glue.SparkSQLDataSource$$anonfun$getDynamicFrame$9.apply(DataSource.scala:732)\n\tat com.amazonaws.services.glue.util.FileSchemeWrapper$$anonfun$executeWithQualifiedScheme$1.apply(FileSchemeWrapper.scala:89)\n\tat com.amazonaws.services.glue.util.FileSchemeWrapper$$anonfun$executeWithQualifiedScheme$1.apply(FileSchemeWrapper.scala:89)\n\tat com.amazonaws.services.glue.util.FileSchemeWrapper.executeWith(FileSchemeWrapper.scala:82)\n\tat com.amazonaws.services.glue.util.FileSchemeWrapper.executeWithQualifiedScheme(FileSchemeWrapper.scala:89)\n\tat com.amazonaws.services.glue.SparkSQLDataSource.getDynamicFrame(DataSource.scala:731)\n\tat com.amazonaws.services.glue.DataSource$class.getDynamicFrame(DataSource.scala:97)\n\tat com.amazonaws.services.glue.SparkSQLDataSource.getDynamicFrame(DataSource.scala:709)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:238)\n\tat java.lang.Thread.run(Thread.java:750)\nCaused by: org.apache.spark.SparkException: Exception thrown in awaitResult: \n\tat org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)\n\tat org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:290)\n\tat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readParquetFootersInParallel(ParquetFileFormat.scala:538)\n\tat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:611)\n\tat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:603)\n\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)\n\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:288)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:121)\n\tat org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\t... 1 more\nCaused by: java.io.IOException: Could not read footer for file: FileStatus{path=s3://clx-datawarehouse-qa/dwh/cardholder/some_table_name/.hoodie/20220831153536771.commit; isDirectory=false; length=15280; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}\n\tat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:551)\n\tat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:538)\n\tat org.apache.spark.util.ThreadUtils$$anonfun$3$$anonfun$apply$1.apply(ThreadUtils.scala:287)\n\tat scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)\n\tat scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)\n\tat scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)\n\tat scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\nCaused by: java.lang.RuntimeException: s3://clx-datawarehouse-qa/dwh/cardholder/some_table_name/.hoodie/20220831153536771.commit is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [32, 125, 10, 125]\n\tat org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:524)\n\tat org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:505)\n\tat org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:499)\n\tat org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:476)\n\tat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:544)\n\t... 9 more\n",
"Stack Trace": [
{
"Declaring Class": "get_return_value",
"Method Name": "format(target_id, \".\", name), value)",
"File Name": "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",
"Line Number": 328
},
{
"Declaring Class": "deco",
"Method Name": "return f(*a, **kw)",
"File Name": "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
"Line Number": 63
},
{
"Declaring Class": "__call__",
"Method Name": "answer, self.gateway_client, self.target_id, self.name)",
"File Name": "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
"Line Number": 1257
},
{
"Declaring Class": "getFrame",
"Method Name": "jframe = self._jsource.getDynamicFrame()",
"File Name": "/opt/amazon/lib/python3.6/site-packages/awsglue/data_source.py",
"Line Number": 36
},
{
"Declaring Class": "create_dynamic_frame_from_catalog",
"Method Name": "return source.getFrame(**kwargs)",
"File Name": "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py",
"Line Number": 186
},
{
"Declaring Class": "from_catalog",
"Method Name": "return self._glue_context.create_dynamic_frame_from_catalog(db, table_name, redshift_tmp_dir, transformation_ctx, push_down_predicate, additional_options, catalog_id, **kwargs)",
"File Name": "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py",
"Line Number": 642
},
{
"Declaring Class": "<module>",
"Method Name": "datasource_some_table_name = glueContext.create_dynamic_frame.from_catalog(database = args['ENV']+\"_some_datawarehouse\", table_name = \"some_table_name\", transformation_ctx = \"datasource_cdhr0\")",
"File Name": "/tmp/some_glue_job.py",
"Line Number": 29
}
],
"Last Executed Line number": 29,
"script": "some_glue_job.py"
}
Below are two workable solutions to get around this problem. They both involve adding additional_options to the S3 connectionType.
- Exclude folder:
additional_options={"exclusions": "[\"s3://bucket/folder/.hoodie/*\"]"}
- Exclude file(s):
additional_options={"exclusions": "[\"**.commit\",\"**.inflight\",\"**.properties\"]"}
@gtwuser
glueContext.write_dynamic_frame.from_options(frame=DynamicFrame.fromDF(inputDf, glueContext, "inputDf"),
connection_type="marketplace.spark",
connection_options=combinedConf,
additional_options={"exclusions": "[\"**.commit\",\"**.inflight\",\"**.properties\"]"})
thanks @jharringtonCoupons closing it for now.