[SUPPORT] NullPointerException in merge into Spark Sql HoodieSparkSqlWriter$.mergeParamsAndGetHoodieConfig
Tips before filing an issue
-
Have you gone through our FAQs?
-
Join the mailing list to engage in conversations and get faster support at [email protected].
-
If you have triaged this as a bug, then file an issue directly.
Describe the problem you faced
Hi team, we are getting a NullPointerException in trying to use a merge statement to update columns in a table which is saved in hive. We perform the initial load of the table using hive sync options, but we do not use these hive sync options in subsequent runs as this would lead to a java.lang.NoSuchMethodError: org.apache.hadoop.hive.metastore.IMetaStoreClient.alter_table_with_environmentContext(Ljava/lang/String;Ljava/lang/String;Lorg/apache/hadoop/hive/metastore/api/Table;Lorg/apache/hadoop/hive/metastore/api/EnvironmentContext;)V .
To Reproduce
Steps to reproduce the behavior:
- Created a table with hms hive sync using the following syntax
sfoSubDF.write.format("hudi").
options(hudiOptions).
option(TABLE_TYPE.key(), "COPY_ON_WRITE").
option(OPERATION.key(), "bulk_insert").
option(KEYGENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.ComplexKeyGenerator").
option(PRECOMBINE_FIELD.key(), "PROCESSING_TS").
option(RECORDKEY_FIELD.key(), "KEY1,KEY2").
option(PARTITIONPATH_FIELD.key(), "PARTITION_DT").
option(HIVE_STYLE_PARTITIONING.key(), "true").
option(HIVE_SYNC_MODE.key(), "hms").
option(HIVE_DATABASE.key(), database).
option(HIVE_TABLE.key(), tableName).
option(HIVE_SYNC_ENABLED.key(), "true").
option(TBL_NAME.key(), tableName).
mode(Overwrite).
save(toPath)
Then we attempt to run a partial update on top of the table, using the merge spark-sql syntax
merge into $HIVE_DB.$datasetName as target
using $sourceAliasOrder as source
on ${getDefaultMergeCondition()}
when matched and ${PTC.RECORD_TS} <> source.${PTC.RECORD_TS} then update set
${PTC.RECORD_TS} = source.${PTC.RECORD_TS}
This automatically raises this null pointer exception, on a call to parametersWithWriteDefaults function as detailed in the stack trace included.
Expected behavior
We are expecting the partial update merge into statement to lead to an update of the corresponding columns in the base table. Note that since we are not able to use hive sync using hms on hudi (as described in https://github.com/apache/hudi/issues/4700) we would then run msck repair to update any necessary table metadata.
Environment Description
-
Hudi version : 0.10.0
-
Spark version : 2.4.7
-
Hive version : 2.3.7
-
Hadoop version : 2.10.1
-
Storage (HDFS/S3/GCS..) : GCS
-
Running on Docker? (yes/no) : No
Additional context
Add any other context about the problem here.
Stacktrace
22/05/14 00:50:12 INFO org.apache.hudi.common.table.HoodieTableMetaClient: Loading HoodieTableMetaClient from gs://my_hudi_bucket/staging_zone/WorkflowPublish/orderTableTesting
22/05/14 00:50:12 INFO org.apache.hudi.common.table.HoodieTableConfig: Loading table properties from gs://my_hudi_bucket/staging_zone/WorkflowPublish/orderTableTesting/.hoodie/hoodie.properties
22/05/14 00:50:12 INFO org.apache.hudi.common.table.HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from gs://my_hudi_bucket/staging_zone/WorkflowPublish/orderTableTesting
22/05/14 00:50:12 INFO org.apache.hudi.common.table.timeline.HoodieActiveTimeline: Loaded instants upto : Option{val=[20220513213440105__commit__COMPLETED]}
22/05/14 00:50:12 WARN org.apache.hudi.common.config.DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
22/05/14 00:50:12 WARN org.apache.hudi.common.config.DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
22/05/14 00:50:12 INFO org.apache.hudi.common.table.HoodieTableMetaClient: Loading HoodieTableMetaClient from gs://my_hudi_bucket/staging_zone/WorkflowPublish/orderTableTesting
22/05/14 00:50:12 INFO org.apache.hudi.common.table.HoodieTableConfig: Loading table properties from gs://my_hudi_bucket/staging_zone/WorkflowPublish/orderTableTesting/.hoodie/hoodie.properties
22/05/14 00:50:12 INFO org.apache.hudi.common.table.HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from gs://my_hudi_bucket/staging_zone/WorkflowPublish/orderTableTesting
22/05/14 00:50:12 WARN com.google.cloud.hadoop.fs.gcs.GoogleHadoopSyncableOutputStream: hflush(): No-op due to rate limit (RateLimiter[stableRate=0.2qps]): readers will *not* yet see flushed data for gs://opddev-dev-dpaas-phs-logs/history-server/spark-events/ghs-gif-streaming/application_1652454321869_0247_1.lz4.inprogress
22/05/14 00:50:12 ERROR com.walmart.archetype.core.WorkFlowManager: Exception while running Some(WorkflowPublish) Exception = null
22/05/14 00:50:12 ERROR org.apache.spark.deploy.yarn.ApplicationMaster: User class threw exception: java.lang.NullPointerException
java.lang.NullPointerException
at java.util.Hashtable.put(Hashtable.java:460)
at java.util.Hashtable.putAll(Hashtable.java:524)
at org.apache.hudi.HoodieWriterUtils$.parametersWithWriteDefaults(HoodieWriterUtils.scala:52)
at org.apache.hudi.HoodieSparkSqlWriter$.mergeParamsAndGetHoodieConfig(HoodieSparkSqlWriter.scala:722)
at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:91)
at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.executeUpsert(MergeIntoHoodieTableCommand.scala:285)
at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:155)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:194)
at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3369)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:80)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:643)
Hi team, just to update - it looks like this hms table create may not be updating fields such as primaryKey, type, preCombineField as they are optional. I'm assuming we need to specify them in the spark-sql environment itself during execution?
spark.sql("show create table testing.test_hudi_hive_sync").show(false)
CREATE TABLE `testing`.`test_hudi_hive_sync` (`_hoodie_commit_time` STRING, `_hoodie_commit_seqno` STRING, `_hoodie_record_key` STRING, `_hoodie_partition_path` STRING, `_hoodie_file_name` STRING, `KEY1` INT, `KEY2` STRING, `PROCESSING_TS` TIMESTAMP, `PARTITION_DT` DATE)
USING hudi
OPTIONS (
`serialization.format` '1',
path 'gs://my_hudi_bucket/staging_zone/WorkflowPublish/orderTableTesting'
)
PARTITIONED BY (PARTITION_DT)
Hi @vicuna96 Cloud u print the following configurations?
val table = sparkSession.sessionState.catalog.getTableMetadata(TableIdentifier("table", Option("database")))
// check options start with hooide.
println(sparkSession.sessionState.conf.getAllConfs)
println(table.storage.properties ++ table.properties)
@vicuna96 Is this still a problem? Can you check the table props as suggested by @minihippo above? In my local test with latest master, I am able to see the PK and partition field.
@vicuna96 : any updates please.
@vicuna96 : gentle ping.
Any update on the issue? @nsivabalan @codope
I am facing exactly same issue. difference being i wrote a bootstrap job to do bulk_insert using hudi-spark job and then i was running incremental run using dbt-spark with hudi.
Hudi version : 0.10.0
Spark version : 3.1.2
Hive version : 3.1.2
Hadoop version : Amazon 3.2.1
Storage (HDFS/S3/GCS..) : S3
Running on Docker? (yes/no) : No
o/p of the commands suggested by @minihippo
Database: data_model
Table: click_fact
Owner: hive
Created Time: Tue Sep 13 22:17:07 UTC 2022
Last Access: UNKNOWN
Created By: Spark 2.2 or prior
Type: EXTERNAL
Provider: hudi
Table Properties: [bucketing_version=2, last_commit_time_sync=20220914060931454]
Statistics: 823082628 bytes
Location: <masked>
Serde Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat: org.apache.hudi.hadoop.HoodieParquetInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
Storage Properties: [hoodie.query.as.ro.table=false]
Schema: root
|-- _hoodie_commit_time: string (nullable = true)
|-- _hoodie_commit_seqno: string (nullable = true)
|-- _hoodie_record_key: string (nullable = true)
|-- _hoodie_partition_path: string (nullable = true)
|-- _hoodie_file_name: string (nullable = true)
|-- user_customer_id: string (nullable = true)
|-- Permissions_Min: string (nullable = true)
|-- Permissions_Max: string (nullable = true)
)
hudi params used for bulk_insert
.option("hoodie.bulkinsert.shuffle.parallelism", appConfig.getNumPartitions())
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY(), "COPY_ON_WRITE")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL())
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), appConfig.getRecordKey()) //"user_customer_id"
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(),appConfig.getPreCombineKey()) //"user_customer_id"
.option(HoodieWriteConfig.TABLE_NAME, appConfig.getTblName()) //"click_fact"
.option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), appConfig.getDbName()) //"data_model"
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), appConfig.getTblName()) //"click_fact"
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY(), "true")
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY(), "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(), "org.apache.hudi.hive.NonPartitionedExtractor")
.option("hoodie.parquet.compression.codec", "snappy")
.format("org.apache.hudi")
.mode(SaveMode.Append)
.save(appConfig.getOutputPath());
following is the stacktrace
java.lang.NullPointerException
at java.util.Hashtable.put(Hashtable.java:460)
at java.util.Hashtable.putAll(Hashtable.java:524)
at org.apache.hudi.HoodieWriterUtils$.parametersWithWriteDefaults(HoodieWriterUtils.scala:52)
at org.apache.hudi.HoodieSparkSqlWriter$.mergeParamsAndGetHoodieConfig(HoodieSparkSqlWriter.scala:722)
at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:91)
at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.executeUpsert(MergeIntoHoodieTableCommand.scala:285)
at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:155)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:230)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3751)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3749)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:230)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:101)
@nitinkul @vicuna96 Since primary key and precombine field rarely change, could you try setting these in hudi-defaults.conf and retry?
@nitinkul @vicuna96 : gentle ping.
I could not reproduce with the latest master. Sharing the spark-sql and spark-shell scripts in this gist: https://gist.github.com/codope/f9e89093b98e58ad097820a35376caed
Hi @nsivabalan , I see in Hudi version 0.11.0 with the remaining runtime environment unchanged, that the "show create table ${tableName}" command in spark-sql still doesn't register these properties. However, the upsert for this version (0.11.0) goes through without raising a NullPointerException, and HiveSyncTool is able successfully sync the changes.
scala> println(table.storage.properties ++ table.properties)
Map(hoodie.query.as.ro.table -> false, serialization.format -> 1, transient_lastDdlTime -> 1663200570, last_commit_time_sync -> 20220915003838063)
@vicuna96 This is expected and I believe the NPE was fixed by https://github.com/apache/hudi/commit/1f173127e4980eff0b6ad99ce9db1bda8dae0d32
@vicuna96 : did you get a chance to try out that patch. if the issue is resolved, can we close out the github issue.
@vicuna96 : gentle ping.
since the fix is landed, closing out the gitb issue issue. feel free to raise a new issue if you are looking for further assistance.