hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] Caused by: java.lang.ClassNotFoundException: org.apache.hudi.DefaultSource after hudi upgraded to 6.15

Open ROOBALJINDAL opened this issue 1 year ago • 7 comments

Describe the problem you faced We are creating empty hudi tables from java as follows

Dataset<Row> emptyDF = spark.createDataFrame(new ArrayList<Row>(), schemaStruct);
                    emptyDF.write()
                            .format("org.apache.hudi")
                            .options(tableConf.getHudiOptions())
                            .mode(SaveMode.Append)
                            .save();

Spark conf:

entryPoint: /hudi/hudi-addon-edfx.jar
sparkParamsArguments = ["--class com.edifecs.em.cloud.hudi.setup.PreCreateEmptyTablesInHudi",
            "--conf spark.jars=/usr/lib/hudi/hudi-utilities-bundle.jar",
            "--conf spark.executor.instances=0",
            "--conf spark.executor.memory=4g",
            "--conf spark.driver.memory=4g",
            "--conf spark.driver.cores=4",
            "--conf spark.dynamicAllocation.initialExecutors=1"

This used to work fine but suddenly stopped working after hudi upgraded from 13.1 to 14.0 (Emr upgraded from 6.12 to 6.15)

I refered to similar issue: https://github.com/apache/hudi/issues/2997 I also added hudi-spark3-bundle_2.12-0.14.0.jar to the spark.jars but it didnt work. Dont know why it is not able to find this class.

Environment Description

  • Hudi version : 14.0

  • AWS EMR version : 6.15

Stacktrace

org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: org.apache.hudi. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:739) ~[spark-catalyst_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:860) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:256) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at com.edifecs.em.cloud.hudi.setup.PreCreateEmptyTablesInHudi.lambda$main$0(PreCreateEmptyTablesInHudi.java:170) ~[?:?]
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) ~[?:?]
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) ~[?:?]
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) ~[?:?]
	at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290) ~[?:?]
	at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:754) ~[?:?]
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) ~[?:?]
	at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) ~[?:?]
	at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) ~[?:?]
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) ~[?:?]
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) ~[?:?]
Caused by: java.lang.ClassNotFoundException: org.apache.hudi.DefaultSource
	at jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641) ~[?:?]
	at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188) ~[?:?]
	at java.lang.ClassLoader.loadClass(ClassLoader.java:525) ~[?:?]
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at scala.util.Try$.apply(Try.scala:213) ~[scala-library-2.12.15.jar:?]
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at scala.util.Failure.orElse(Try.scala:224) ~[scala-library-2.12.15.jar:?]
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	... 15 more```

ROOBALJINDAL avatar Jun 18 '24 12:06 ROOBALJINDAL

EMR here is guide to follow

https://youtu.be/jvbHUl9A4tQ?si=l7AdUR4vmr_5sDIq

Running Apache Hudi Delta Streamer On EMR Serverless Hands on Lab step by step guide for beginners

1

Video based guide

  • https://www.youtube.com/watch?v=jvbHUl9A4tQ&feature=youtu.be

Steps

Step 1: Download the sample Parquet files from the links

  • https://drive.google.com/drive/folders/1BwNEK649hErbsWcYLZhqCWnaXFX3mIsg?usp=share_link

Uplaod to S3 Folder as shown in diagram

image

Step 2: Start EMR Serverless Cluster

image image image

Step 3 Run Python Code to submit Job

  • Please change nd edit the varibales
try:
    import json
    import uuid
    import os
    import boto3
    from dotenv import load_dotenv

    load_dotenv(".env")
except Exception as e:
    pass

global AWS_ACCESS_KEY
global AWS_SECRET_KEY
global AWS_REGION_NAME

AWS_ACCESS_KEY = os.getenv("DEV_ACCESS_KEY")
AWS_SECRET_KEY = os.getenv("DEV_SECRET_KEY")
AWS_REGION_NAME = os.getenv("DEV_REGION")

client = boto3.client("emr-serverless",
                      aws_access_key_id=AWS_ACCESS_KEY,
                      aws_secret_access_key=AWS_SECRET_KEY,
                      region_name=AWS_REGION_NAME)


def lambda_handler_test_emr(event, context):
    # ------------------Hudi settings ---------------------------------------------
    glue_db = "hudi_db"
    table_name = "invoice"
    op = "UPSERT"
    table_type = "COPY_ON_WRITE"

    record_key = 'invoiceid'
    precombine = "replicadmstimestamp"
    partition_feild = 'destinationstate'
    source_ordering_field = 'replicadmstimestamp'

    delta_streamer_source = 's3://XXXXXXXXXXXX/raw'
    hudi_target_path = 's3://XXXXXXXXX/hudi'

    # ---------------------------------------------------------------------------------
    #                                       EMR
    # --------------------------------------------------------------------------------
    ApplicationId = "XXXXXXXXXXXXXXX"
    ExecutionTime = 600
    ExecutionArn = "XXXXXXXXXXXXXXXXXXXXXX"
    JobName = 'delta_streamer_{}'.format(table_name)

    # --------------------------------------------------------------------------------

    spark_submit_parameters = ' --conf spark.jars=/usr/lib/hudi/hudi-utilities-bundle.jar'
    spark_submit_parameters += ' --conf spark.serializer=org.apache.spark.serializer.KryoSerializer'
    spark_submit_parameters += ' --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
    spark_submit_parameters += ' --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
    spark_submit_parameters += ' --conf spark.sql.hive.convertMetastoreParquet=false'
    spark_submit_parameters += ' --conf mapreduce.fileoutputcommitter.marksuccessfuljobs=false'
    spark_submit_parameters += ' --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory'
    spark_submit_parameters += ' --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer'

    arguments = [
        "--table-type", table_type,
        "--op", op,
        "--enable-sync",
        "--source-ordering-field", source_ordering_field,
        "--source-class", "org.apache.hudi.utilities.sources.ParquetDFSSource",
        "--target-table", table_name,
        "--target-base-path", hudi_target_path,
        "--payload-class", "org.apache.hudi.common.model.AWSDmsAvroPayload",
        "--hoodie-conf", "hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator",
        "--hoodie-conf", "hoodie.datasource.write.recordkey.field={}".format(record_key),
        "--hoodie-conf", "hoodie.datasource.write.partitionpath.field={}".format(partition_feild),
        "--hoodie-conf", "hoodie.deltastreamer.source.dfs.root={}".format(delta_streamer_source),
        "--hoodie-conf", "hoodie.datasource.write.precombine.field={}".format(precombine),
        "--hoodie-conf", "hoodie.database.name={}".format(glue_db),
        "--hoodie-conf", "hoodie.datasource.hive_sync.enable=true",
        "--hoodie-conf", "hoodie.datasource.hive_sync.table={}".format(table_name),
        "--hoodie-conf", "hoodie.datasource.hive_sync.partition_fields={}".format(partition_feild),
    ]

    response = client.start_job_run(
        applicationId=ApplicationId,
        clientToken=uuid.uuid4().__str__(),
        executionRoleArn=ExecutionArn,
        jobDriver={
            'sparkSubmit': {
                'entryPoint': "command-runner.jar",
                'entryPointArguments': arguments,
                'sparkSubmitParameters': spark_submit_parameters
            },
        },
        executionTimeoutMinutes=ExecutionTime,
        name=JobName,
    )
    print("response", end="\n")
    print(response)


lambda_handler_test_emr(context=None, event=None)

Adhoc Query

image

soumilshah1995 avatar Jun 18 '24 13:06 soumilshah1995

@soumilshah1995 thanks for replying. I know how to use streamer in emr serverless, dont need tutorial. Can you please help me regariding this particular exception?

ROOBALJINDAL avatar Jun 18 '24 14:06 ROOBALJINDAL

the error indicates that Spark cannot find the Hudi data source (org.apache.hudi.DefaultSource), which typically means the required Hudi jar is not properly included or recognized by Spark please ensure you are using right jar files with right version of spark

soumilshah1995 avatar Jun 18 '24 16:06 soumilshah1995

@ROOBALJINDAL Also. Are you setting these spark configs. I can see you adding utilities bundle in spark.jars. Is it EMR Serverless? After spark session is initialised adding jar might not help you.

--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'

ad1happy2go avatar Jun 19 '24 11:06 ad1happy2go

@ROOBALJINDAL Were you able to make it work? Please let us know.

ad1happy2go avatar Jun 20 '24 15:06 ad1happy2go

No I havent passed these configs. Yes it is emr serverless and we use hudi-utilities bundle jar and it works but dont know why it is not able to find DefaultSource class @ad1happy2go

ROOBALJINDAL avatar Jun 24 '24 06:06 ROOBALJINDAL

@ROOBALJINDAL Did you tried adding to driver/executor extra class path

ad1happy2go avatar Aug 22 '24 12:08 ad1happy2go