ClassCastException when have specify UDF
Describe the bug
I met this exception when my program have specify UDF
java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2287)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1417)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2293)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.spark.sql.blaze.NativeConverters$.$anonfun$deserializeExpression$4(NativeConverters.scala:1095)
at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2764)
at org.apache.spark.sql.blaze.NativeConverters$.$anonfun$deserializeExpression$2(NativeConverters.scala:1094)
at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2764)
at org.apache.spark.sql.blaze.NativeConverters$.deserializeExpression(NativeConverters.scala:1093)
at org.apache.spark.sql.blaze.SparkUDFWrapperContext.
To Reproduce Steps to reproduce the behavior:
- refer delta 2.3.0
- use delta format to load data, delta is base on parquet
Expected behavior
it should works whatever UDF
Desktop (please complete the following information):
- OS: Centos7
- Version compile base on latest code
I found that SparkUDFWrapper comes from Project, so that it worked when I closed convert Project by set parameter spark.blaze.enable.project=false
21/01/2024 21:38:12 [INFO] Blaze - execution plan: ShuffleWriterExec: partitioning=Hash([ScalarFunctionExpr { fun: "<FUNC>", name: "Coalesce", args: [Column { name: "#187", index: 15 }, Column { name: "#203", index: 16 }], return_type: Utf8 }], 50) ProjectExec [#152@0 AS #152, (#153@1).[0] AS #299, (#153@1).[1] AS #300, (#153@1).[2] AS #301, (#153@1).[3] AS #302, (#153@1).[4] AS #303, (#153@1).[6] AS #304, (#153@1).[7] AS #305, #154@2 AS #154, #155@3 AS #155, #156@4 AS #156, #157@5 AS #157, #158@6 AS #158, #166@7 AS #166, (#153@1).[5] AS #176, CASE WHEN (#153@1).[0] IS NOT NULL THEN SparkUDFWrapper END AS #187, CASE WHEN (#154@2).[0] IS NOT NULL THEN SparkUDFWrapper END AS #203] ProjectExec [#152@0 AS #152, #153@1 AS #153, #154@2 AS #154, #155@3 AS #155, #156@4 AS #156, #157@5 AS #157, #158@6 AS #158, SparkUDFWrapper AS #166] FFIReader
SparkUDFWrapperContext is constructed from executor side (through native code and jni). it deserializes the UDF expression which is serialized in driver side. the exception happens in deserialization.
the expression should have implemented Serializable but i have no idea why it deserialized with exceptions.