Issue: NullPointerException When Using PermissiveRecordExceptionHandler in Structured Streaming Job
Description I have a Spark Structured Streaming job where I read Avro messages from Kafka, convert them to a Spark DataFrame using Abris, and then write the stream as a Delta table to S3. Below is the relevant configuration and code snippet.
Abris Configuration
val abrisConfig = AbrisConfig.fromConfluentAvro
.downloadReaderSchemaByVersion(1)
.andRecordNameStrategy(schemaName, schemaNamespace)
.usingSchemaRegistry(schemaRegistryUrl)
.withExceptionHandler(new PermissiveRecordExceptionHandler())
Code Example Here’s an example where I write the stream to the console for demonstration purposes:
val df = reader
.readKafkaStream(topic, kafkaConfig, options)
.withColumnRenamed("headers", KAFKA_HEADERS)
.withColumn("kafka_meta", map_from_entries(col(KAFKA_HEADERS)))
.withColumn("kafka_meta_column", col("kafka_meta.meta"))
.withColumn(KAFKA_HEADERS_META, from_avro(col("kafka_meta_column"), abrisConfig))
df.select(KAFKA_HEADERS_META).writeStream
.outputMode("append")
.format("console")
.option("truncate", "true")
.start()
The kafka_meta.meta column contains the Avro message I want to deserialize using from_avro. The schema for the Avro message is as follows:
{
"type": "record",
"name": "Example",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "timestamp", "type": "long" }
]
}
Problem I encounter the following error during execution: Caused by: java.lang.NullPointerException
Upon investigation, I concluded that this occurs because the PermissiveRecordExceptionHandler returns a null record for malformed data as expected. However, downstream transformations attempt to access the nested field meta (the column containing the Avro message), resulting in the NullPointerException.
Note: None of the columns originally contain null values, so the issue is not caused by the source data.
Question How can I handle this scenario to avoid the NullPointerException while keeping the permissive behavior of the exception handler? Any guidance would be appreciated.
Thank you!
Hi @talperetz1
I'd need a test or runnable program to reproduce the issue. I don't see any transformations downstream of from_avro(col("kafka_meta_column"), abrisConfig), which try to access nested fields of a null-value.
In general, you can write your own DeserializationExceptionHandler, which would for example write a row with default values instead of null values, or you can try to coalesce null values, wherever you might access a nested field of a null-value.