blaze icon indicating copy to clipboard operation
blaze copied to clipboard

ClassCastException when have specify UDF

Open kettlelinna opened this issue 2 years ago • 2 comments

Describe the bug

I met this exception when my program have specify UDF

java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2287) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1417) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2293) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) at org.apache.spark.sql.blaze.NativeConverters$.$anonfun$deserializeExpression$4(NativeConverters.scala:1095) at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2764) at org.apache.spark.sql.blaze.NativeConverters$.$anonfun$deserializeExpression$2(NativeConverters.scala:1094) at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2764) at org.apache.spark.sql.blaze.NativeConverters$.deserializeExpression(NativeConverters.scala:1093) at org.apache.spark.sql.blaze.SparkUDFWrapperContext.(SparkUDFWrapperContext.scala:42)

To Reproduce Steps to reproduce the behavior:

  1. refer delta 2.3.0
  2. use delta format to load data, delta is base on parquet

Expected behavior

it should works whatever UDF

Desktop (please complete the following information):

  • OS: Centos7
  • Version compile base on latest code

kettlelinna avatar Jan 21 '24 11:01 kettlelinna

I found that SparkUDFWrapper comes from Project, so that it worked when I closed convert Project by set parameter spark.blaze.enable.project=false

21/01/2024 21:38:12 [INFO] Blaze - execution plan: ShuffleWriterExec: partitioning=Hash([ScalarFunctionExpr { fun: "<FUNC>", name: "Coalesce", args: [Column { name: "#187", index: 15 }, Column { name: "#203", index: 16 }], return_type: Utf8 }], 50) ProjectExec [#152@0 AS #152, (#153@1).[0] AS #299, (#153@1).[1] AS #300, (#153@1).[2] AS #301, (#153@1).[3] AS #302, (#153@1).[4] AS #303, (#153@1).[6] AS #304, (#153@1).[7] AS #305, #154@2 AS #154, #155@3 AS #155, #156@4 AS #156, #157@5 AS #157, #158@6 AS #158, #166@7 AS #166, (#153@1).[5] AS #176, CASE WHEN (#153@1).[0] IS NOT NULL THEN SparkUDFWrapper END AS #187, CASE WHEN (#154@2).[0] IS NOT NULL THEN SparkUDFWrapper END AS #203] ProjectExec [#152@0 AS #152, #153@1 AS #153, #154@2 AS #154, #155@3 AS #155, #156@4 AS #156, #157@5 AS #157, #158@6 AS #158, SparkUDFWrapper AS #166] FFIReader

kettlelinna avatar Jan 21 '24 14:01 kettlelinna

SparkUDFWrapperContext is constructed from executor side (through native code and jni). it deserializes the UDF expression which is serialized in driver side. the exception happens in deserialization. the expression should have implemented Serializable but i have no idea why it deserialized with exceptions.

richox avatar Jan 22 '24 14:01 richox