[SUPPORT] Caused by: java.lang.ClassNotFoundException: org.apache.hudi.DefaultSource after hudi upgraded to 6.15
Describe the problem you faced We are creating empty hudi tables from java as follows
Dataset<Row> emptyDF = spark.createDataFrame(new ArrayList<Row>(), schemaStruct);
emptyDF.write()
.format("org.apache.hudi")
.options(tableConf.getHudiOptions())
.mode(SaveMode.Append)
.save();
Spark conf:
entryPoint: /hudi/hudi-addon-edfx.jar
sparkParamsArguments = ["--class com.edifecs.em.cloud.hudi.setup.PreCreateEmptyTablesInHudi",
"--conf spark.jars=/usr/lib/hudi/hudi-utilities-bundle.jar",
"--conf spark.executor.instances=0",
"--conf spark.executor.memory=4g",
"--conf spark.driver.memory=4g",
"--conf spark.driver.cores=4",
"--conf spark.dynamicAllocation.initialExecutors=1"
This used to work fine but suddenly stopped working after hudi upgraded from 13.1 to 14.0 (Emr upgraded from 6.12 to 6.15)
I refered to similar issue: https://github.com/apache/hudi/issues/2997 I also added hudi-spark3-bundle_2.12-0.14.0.jar to the spark.jars but it didnt work. Dont know why it is not able to find this class.
Environment Description
-
Hudi version : 14.0
-
AWS EMR version : 6.15
Stacktrace
org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: org.apache.hudi. Please find packages at `https://spark.apache.org/third-party-projects.html`.
at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:739) ~[spark-catalyst_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:860) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:256) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at com.edifecs.em.cloud.hudi.setup.PreCreateEmptyTablesInHudi.lambda$main$0(PreCreateEmptyTablesInHudi.java:170) ~[?:?]
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) ~[?:?]
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) ~[?:?]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) ~[?:?]
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290) ~[?:?]
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:754) ~[?:?]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) ~[?:?]
at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) ~[?:?]
at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) ~[?:?]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) ~[?:?]
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) ~[?:?]
Caused by: java.lang.ClassNotFoundException: org.apache.hudi.DefaultSource
at jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641) ~[?:?]
at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188) ~[?:?]
at java.lang.ClassLoader.loadClass(ClassLoader.java:525) ~[?:?]
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at scala.util.Try$.apply(Try.scala:213) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at scala.util.Failure.orElse(Try.scala:224) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
... 15 more```
EMR here is guide to follow
https://youtu.be/jvbHUl9A4tQ?si=l7AdUR4vmr_5sDIq
Running Apache Hudi Delta Streamer On EMR Serverless Hands on Lab step by step guide for beginners
Video based guide
- https://www.youtube.com/watch?v=jvbHUl9A4tQ&feature=youtu.be
Steps
Step 1: Download the sample Parquet files from the links
- https://drive.google.com/drive/folders/1BwNEK649hErbsWcYLZhqCWnaXFX3mIsg?usp=share_link
Uplaod to S3 Folder as shown in diagram

Step 2: Start EMR Serverless Cluster

Step 3 Run Python Code to submit Job
- Please change nd edit the varibales
try:
import json
import uuid
import os
import boto3
from dotenv import load_dotenv
load_dotenv(".env")
except Exception as e:
pass
global AWS_ACCESS_KEY
global AWS_SECRET_KEY
global AWS_REGION_NAME
AWS_ACCESS_KEY = os.getenv("DEV_ACCESS_KEY")
AWS_SECRET_KEY = os.getenv("DEV_SECRET_KEY")
AWS_REGION_NAME = os.getenv("DEV_REGION")
client = boto3.client("emr-serverless",
aws_access_key_id=AWS_ACCESS_KEY,
aws_secret_access_key=AWS_SECRET_KEY,
region_name=AWS_REGION_NAME)
def lambda_handler_test_emr(event, context):
# ------------------Hudi settings ---------------------------------------------
glue_db = "hudi_db"
table_name = "invoice"
op = "UPSERT"
table_type = "COPY_ON_WRITE"
record_key = 'invoiceid'
precombine = "replicadmstimestamp"
partition_feild = 'destinationstate'
source_ordering_field = 'replicadmstimestamp'
delta_streamer_source = 's3://XXXXXXXXXXXX/raw'
hudi_target_path = 's3://XXXXXXXXX/hudi'
# ---------------------------------------------------------------------------------
# EMR
# --------------------------------------------------------------------------------
ApplicationId = "XXXXXXXXXXXXXXX"
ExecutionTime = 600
ExecutionArn = "XXXXXXXXXXXXXXXXXXXXXX"
JobName = 'delta_streamer_{}'.format(table_name)
# --------------------------------------------------------------------------------
spark_submit_parameters = ' --conf spark.jars=/usr/lib/hudi/hudi-utilities-bundle.jar'
spark_submit_parameters += ' --conf spark.serializer=org.apache.spark.serializer.KryoSerializer'
spark_submit_parameters += ' --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
spark_submit_parameters += ' --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
spark_submit_parameters += ' --conf spark.sql.hive.convertMetastoreParquet=false'
spark_submit_parameters += ' --conf mapreduce.fileoutputcommitter.marksuccessfuljobs=false'
spark_submit_parameters += ' --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory'
spark_submit_parameters += ' --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer'
arguments = [
"--table-type", table_type,
"--op", op,
"--enable-sync",
"--source-ordering-field", source_ordering_field,
"--source-class", "org.apache.hudi.utilities.sources.ParquetDFSSource",
"--target-table", table_name,
"--target-base-path", hudi_target_path,
"--payload-class", "org.apache.hudi.common.model.AWSDmsAvroPayload",
"--hoodie-conf", "hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator",
"--hoodie-conf", "hoodie.datasource.write.recordkey.field={}".format(record_key),
"--hoodie-conf", "hoodie.datasource.write.partitionpath.field={}".format(partition_feild),
"--hoodie-conf", "hoodie.deltastreamer.source.dfs.root={}".format(delta_streamer_source),
"--hoodie-conf", "hoodie.datasource.write.precombine.field={}".format(precombine),
"--hoodie-conf", "hoodie.database.name={}".format(glue_db),
"--hoodie-conf", "hoodie.datasource.hive_sync.enable=true",
"--hoodie-conf", "hoodie.datasource.hive_sync.table={}".format(table_name),
"--hoodie-conf", "hoodie.datasource.hive_sync.partition_fields={}".format(partition_feild),
]
response = client.start_job_run(
applicationId=ApplicationId,
clientToken=uuid.uuid4().__str__(),
executionRoleArn=ExecutionArn,
jobDriver={
'sparkSubmit': {
'entryPoint': "command-runner.jar",
'entryPointArguments': arguments,
'sparkSubmitParameters': spark_submit_parameters
},
},
executionTimeoutMinutes=ExecutionTime,
name=JobName,
)
print("response", end="\n")
print(response)
lambda_handler_test_emr(context=None, event=None)
Adhoc Query

@soumilshah1995 thanks for replying. I know how to use streamer in emr serverless, dont need tutorial. Can you please help me regariding this particular exception?
the error indicates that Spark cannot find the Hudi data source (org.apache.hudi.DefaultSource), which typically means the required Hudi jar is not properly included or recognized by Spark please ensure you are using right jar files with right version of spark
@ROOBALJINDAL Also. Are you setting these spark configs. I can see you adding utilities bundle in spark.jars. Is it EMR Serverless? After spark session is initialised adding jar might not help you.
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
@ROOBALJINDAL Were you able to make it work? Please let us know.
No I havent passed these configs. Yes it is emr serverless and we use hudi-utilities bundle jar and it works but dont know why it is not able to find DefaultSource class @ad1happy2go
@ROOBALJINDAL Did you tried adding to driver/executor extra class path