hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] NullPointerException in merge into Spark Sql HoodieSparkSqlWriter$.mergeParamsAndGetHoodieConfig

Open vicuna96 opened this issue 3 years ago • 3 comments

Tips before filing an issue

  • Have you gone through our FAQs?

  • Join the mailing list to engage in conversations and get faster support at [email protected].

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced

Hi team, we are getting a NullPointerException in trying to use a merge statement to update columns in a table which is saved in hive. We perform the initial load of the table using hive sync options, but we do not use these hive sync options in subsequent runs as this would lead to a java.lang.NoSuchMethodError: org.apache.hadoop.hive.metastore.IMetaStoreClient.alter_table_with_environmentContext(Ljava/lang/String;Ljava/lang/String;Lorg/apache/hadoop/hive/metastore/api/Table;Lorg/apache/hadoop/hive/metastore/api/EnvironmentContext;)V .

To Reproduce

Steps to reproduce the behavior:

  1. Created a table with hms hive sync using the following syntax
  sfoSubDF.write.format("hudi").
    options(hudiOptions).
    option(TABLE_TYPE.key(), "COPY_ON_WRITE").
    option(OPERATION.key(), "bulk_insert").
    option(KEYGENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.ComplexKeyGenerator").
    option(PRECOMBINE_FIELD.key(), "PROCESSING_TS").
    option(RECORDKEY_FIELD.key(), "KEY1,KEY2").
    option(PARTITIONPATH_FIELD.key(), "PARTITION_DT").
    option(HIVE_STYLE_PARTITIONING.key(), "true").
    option(HIVE_SYNC_MODE.key(), "hms").
    option(HIVE_DATABASE.key(), database).
    option(HIVE_TABLE.key(), tableName).
    option(HIVE_SYNC_ENABLED.key(), "true").
    option(TBL_NAME.key(), tableName).
    mode(Overwrite).
    save(toPath)

Then we attempt to run a partial update on top of the table, using the merge spark-sql syntax

 merge into $HIVE_DB.$datasetName as target
 using $sourceAliasOrder as source
 on ${getDefaultMergeCondition()}
 when matched and ${PTC.RECORD_TS} <> source.${PTC.RECORD_TS} then update set
  ${PTC.RECORD_TS} = source.${PTC.RECORD_TS}

This automatically raises this null pointer exception, on a call to parametersWithWriteDefaults function as detailed in the stack trace included.

Expected behavior

We are expecting the partial update merge into statement to lead to an update of the corresponding columns in the base table. Note that since we are not able to use hive sync using hms on hudi (as described in https://github.com/apache/hudi/issues/4700) we would then run msck repair to update any necessary table metadata.

Environment Description

  • Hudi version : 0.10.0

  • Spark version : 2.4.7

  • Hive version : 2.3.7

  • Hadoop version : 2.10.1

  • Storage (HDFS/S3/GCS..) : GCS

  • Running on Docker? (yes/no) : No

Additional context

Add any other context about the problem here.

Stacktrace

22/05/14 00:50:12 INFO org.apache.hudi.common.table.HoodieTableMetaClient: Loading HoodieTableMetaClient from gs://my_hudi_bucket/staging_zone/WorkflowPublish/orderTableTesting
22/05/14 00:50:12 INFO org.apache.hudi.common.table.HoodieTableConfig: Loading table properties from gs://my_hudi_bucket/staging_zone/WorkflowPublish/orderTableTesting/.hoodie/hoodie.properties
22/05/14 00:50:12 INFO org.apache.hudi.common.table.HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from gs://my_hudi_bucket/staging_zone/WorkflowPublish/orderTableTesting
22/05/14 00:50:12 INFO org.apache.hudi.common.table.timeline.HoodieActiveTimeline: Loaded instants upto : Option{val=[20220513213440105__commit__COMPLETED]}
22/05/14 00:50:12 WARN org.apache.hudi.common.config.DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
22/05/14 00:50:12 WARN org.apache.hudi.common.config.DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
22/05/14 00:50:12 INFO org.apache.hudi.common.table.HoodieTableMetaClient: Loading HoodieTableMetaClient from gs://my_hudi_bucket/staging_zone/WorkflowPublish/orderTableTesting
22/05/14 00:50:12 INFO org.apache.hudi.common.table.HoodieTableConfig: Loading table properties from gs://my_hudi_bucket/staging_zone/WorkflowPublish/orderTableTesting/.hoodie/hoodie.properties
22/05/14 00:50:12 INFO org.apache.hudi.common.table.HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from gs://my_hudi_bucket/staging_zone/WorkflowPublish/orderTableTesting
22/05/14 00:50:12 WARN com.google.cloud.hadoop.fs.gcs.GoogleHadoopSyncableOutputStream: hflush(): No-op due to rate limit (RateLimiter[stableRate=0.2qps]): readers will *not* yet see flushed data for gs://opddev-dev-dpaas-phs-logs/history-server/spark-events/ghs-gif-streaming/application_1652454321869_0247_1.lz4.inprogress
22/05/14 00:50:12 ERROR com.walmart.archetype.core.WorkFlowManager: Exception while running Some(WorkflowPublish) Exception = null
22/05/14 00:50:12 ERROR org.apache.spark.deploy.yarn.ApplicationMaster: User class threw exception: java.lang.NullPointerException
java.lang.NullPointerException
	at java.util.Hashtable.put(Hashtable.java:460)
	at java.util.Hashtable.putAll(Hashtable.java:524)
	at org.apache.hudi.HoodieWriterUtils$.parametersWithWriteDefaults(HoodieWriterUtils.scala:52)
	at org.apache.hudi.HoodieSparkSqlWriter$.mergeParamsAndGetHoodieConfig(HoodieSparkSqlWriter.scala:722)
	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:91)
	at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.executeUpsert(MergeIntoHoodieTableCommand.scala:285)
	at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:155)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
	at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:194)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3369)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:643)

vicuna96 avatar May 14 '22 01:05 vicuna96

Hi team, just to update - it looks like this hms table create may not be updating fields such as primaryKey, type, preCombineField as they are optional. I'm assuming we need to specify them in the spark-sql environment itself during execution?

spark.sql("show create table testing.test_hudi_hive_sync").show(false)

CREATE TABLE `testing`.`test_hudi_hive_sync` (`_hoodie_commit_time` STRING, `_hoodie_commit_seqno` STRING, `_hoodie_record_key` STRING, `_hoodie_partition_path` STRING, `_hoodie_file_name` STRING, `KEY1` INT, `KEY2` STRING, `PROCESSING_TS` TIMESTAMP, `PARTITION_DT` DATE)
USING hudi
OPTIONS (
  `serialization.format` '1',
  path 'gs://my_hudi_bucket/staging_zone/WorkflowPublish/orderTableTesting'
)
PARTITIONED BY (PARTITION_DT)

vicuna96 avatar May 19 '22 20:05 vicuna96

Hi @vicuna96 Cloud u print the following configurations?

val table = sparkSession.sessionState.catalog.getTableMetadata(TableIdentifier("table", Option("database")))
// check options start with hooide.
println(sparkSession.sessionState.conf.getAllConfs)
println(table.storage.properties ++ table.properties)

minihippo avatar Jun 16 '22 03:06 minihippo

@vicuna96 Is this still a problem? Can you check the table props as suggested by @minihippo above? In my local test with latest master, I am able to see the PK and partition field.

codope avatar Aug 01 '22 08:08 codope

@vicuna96 : any updates please.

nsivabalan avatar Aug 28 '22 02:08 nsivabalan

@vicuna96 : gentle ping.

nsivabalan avatar Sep 12 '22 22:09 nsivabalan

Any update on the issue? @nsivabalan @codope I am facing exactly same issue. difference being i wrote a bootstrap job to do bulk_insert using hudi-spark job and then i was running incremental run using dbt-spark with hudi.

Hudi version : 0.10.0

Spark version : 3.1.2

Hive version : 3.1.2

Hadoop version : Amazon 3.2.1

Storage (HDFS/S3/GCS..) : S3

Running on Docker? (yes/no) : No

o/p of the commands suggested by @minihippo

Database: data_model
Table: click_fact
Owner: hive
Created Time: Tue Sep 13 22:17:07 UTC 2022
Last Access: UNKNOWN
Created By: Spark 2.2 or prior
Type: EXTERNAL
Provider: hudi
Table Properties: [bucketing_version=2, last_commit_time_sync=20220914060931454]
Statistics: 823082628 bytes
Location: <masked>
Serde Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat: org.apache.hudi.hadoop.HoodieParquetInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
Storage Properties: [hoodie.query.as.ro.table=false]
Schema: root
 |-- _hoodie_commit_time: string (nullable = true)
 |-- _hoodie_commit_seqno: string (nullable = true)
 |-- _hoodie_record_key: string (nullable = true)
 |-- _hoodie_partition_path: string (nullable = true)
 |-- _hoodie_file_name: string (nullable = true)
 |-- user_customer_id: string (nullable = true)
 |-- Permissions_Min: string (nullable = true)
 |-- Permissions_Max: string (nullable = true)
)

hudi params used for bulk_insert

.option("hoodie.bulkinsert.shuffle.parallelism", appConfig.getNumPartitions())
                .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY(), "COPY_ON_WRITE")
                .option(DataSourceWriteOptions.OPERATION_OPT_KEY(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL())
                .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), appConfig.getRecordKey()) //"user_customer_id"
                .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(),appConfig.getPreCombineKey()) //"user_customer_id"
                .option(HoodieWriteConfig.TABLE_NAME, appConfig.getTblName()) //"click_fact"
                .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), appConfig.getDbName()) //"data_model"
                .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), appConfig.getTblName()) //"click_fact"
                .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY(), "true")
                .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY(), "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
                .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(), "org.apache.hudi.hive.NonPartitionedExtractor")
                .option("hoodie.parquet.compression.codec", "snappy")
                .format("org.apache.hudi")
                .mode(SaveMode.Append)
                .save(appConfig.getOutputPath());

following is the stacktrace

java.lang.NullPointerException
	at java.util.Hashtable.put(Hashtable.java:460)
	at java.util.Hashtable.putAll(Hashtable.java:524)
	at org.apache.hudi.HoodieWriterUtils$.parametersWithWriteDefaults(HoodieWriterUtils.scala:52)
	at org.apache.hudi.HoodieSparkSqlWriter$.mergeParamsAndGetHoodieConfig(HoodieSparkSqlWriter.scala:722)
	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:91)
	at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.executeUpsert(MergeIntoHoodieTableCommand.scala:285)
	at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:155)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
	at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:230)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3751)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3749)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:230)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:101)

nitinkul avatar Sep 14 '22 11:09 nitinkul

@nitinkul @vicuna96 Since primary key and precombine field rarely change, could you try setting these in hudi-defaults.conf and retry?

codope avatar Sep 19 '22 13:09 codope

@nitinkul @vicuna96 : gentle ping.

nsivabalan avatar Sep 30 '22 05:09 nsivabalan

I could not reproduce with the latest master. Sharing the spark-sql and spark-shell scripts in this gist: https://gist.github.com/codope/f9e89093b98e58ad097820a35376caed

codope avatar Sep 30 '22 10:09 codope

Hi @nsivabalan , I see in Hudi version 0.11.0 with the remaining runtime environment unchanged, that the "show create table ${tableName}" command in spark-sql still doesn't register these properties. However, the upsert for this version (0.11.0) goes through without raising a NullPointerException, and HiveSyncTool is able successfully sync the changes.

scala> println(table.storage.properties ++ table.properties)
Map(hoodie.query.as.ro.table -> false, serialization.format -> 1, transient_lastDdlTime -> 1663200570, last_commit_time_sync -> 20220915003838063)

vicuna96 avatar Oct 09 '22 23:10 vicuna96

@vicuna96 This is expected and I believe the NPE was fixed by https://github.com/apache/hudi/commit/1f173127e4980eff0b6ad99ce9db1bda8dae0d32

codope avatar Oct 12 '22 15:10 codope

@vicuna96 : did you get a chance to try out that patch. if the issue is resolved, can we close out the github issue.

nsivabalan avatar Oct 19 '22 17:10 nsivabalan

@vicuna96 : gentle ping.

nsivabalan avatar Oct 25 '22 03:10 nsivabalan

since the fix is landed, closing out the gitb issue issue. feel free to raise a new issue if you are looking for further assistance.

nsivabalan avatar Nov 03 '22 03:11 nsivabalan