ABRiS icon indicating copy to clipboard operation
ABRiS copied to clipboard

Issue: NullPointerException When Using PermissiveRecordExceptionHandler in Structured Streaming Job

Open talperetz1 opened this issue 1 year ago • 1 comments

Description I have a Spark Structured Streaming job where I read Avro messages from Kafka, convert them to a Spark DataFrame using Abris, and then write the stream as a Delta table to S3. Below is the relevant configuration and code snippet.

Abris Configuration

val abrisConfig = AbrisConfig.fromConfluentAvro
  .downloadReaderSchemaByVersion(1)
  .andRecordNameStrategy(schemaName, schemaNamespace)
  .usingSchemaRegistry(schemaRegistryUrl)
  .withExceptionHandler(new PermissiveRecordExceptionHandler())

Code Example Here’s an example where I write the stream to the console for demonstration purposes:

val df = reader
.readKafkaStream(topic, kafkaConfig, options)
.withColumnRenamed("headers", KAFKA_HEADERS)
.withColumn("kafka_meta", map_from_entries(col(KAFKA_HEADERS)))
.withColumn("kafka_meta_column", col("kafka_meta.meta"))
.withColumn(KAFKA_HEADERS_META, from_avro(col("kafka_meta_column"), abrisConfig))

df.select(KAFKA_HEADERS_META).writeStream
.outputMode("append")
.format("console")
.option("truncate", "true")
.start()

The kafka_meta.meta column contains the Avro message I want to deserialize using from_avro. The schema for the Avro message is as follows:

{
  "type": "record",
  "name": "Example",
  "fields": [
    { "name": "id", "type": "string" },
    { "name": "timestamp", "type": "long" }
  ]
}

Problem I encounter the following error during execution: Caused by: java.lang.NullPointerException

Upon investigation, I concluded that this occurs because the PermissiveRecordExceptionHandler returns a null record for malformed data as expected. However, downstream transformations attempt to access the nested field meta (the column containing the Avro message), resulting in the NullPointerException.

Note: None of the columns originally contain null values, so the issue is not caused by the source data.

Question How can I handle this scenario to avoid the NullPointerException while keeping the permissive behavior of the exception handler? Any guidance would be appreciated.

Thank you!

talperetz1 avatar Jan 07 '25 09:01 talperetz1

Hi @talperetz1 I'd need a test or runnable program to reproduce the issue. I don't see any transformations downstream of from_avro(col("kafka_meta_column"), abrisConfig), which try to access nested fields of a null-value.

In general, you can write your own DeserializationExceptionHandler, which would for example write a row with default values instead of null values, or you can try to coalesce null values, wherever you might access a nested field of a null-value.

kevinwallimann avatar Jan 14 '25 15:01 kevinwallimann