Unexpected null equality
In SQL
SELECT NULL = NULL;
results in NULL, which is a falsy value when used in WHERE, so eg.
SELECT 1 WHERE NULL = NULL
returns an empty table.
However in Scala, both null == null and None == None are true.
Consequently these two lines should not pass.
Also, frameless's current behavior is incompatible with Datasets
import spark.implicits._
val a: Option[Int] = None
val b: Option[Int] = None
val data = X2(a, b) :: X2(a, a) :: Nil
val dataset = TypedDataset.create(data)
val untyped = spark.createDataset(data)
val A = dataset.col('a)
val B = dataset.col('b)
data.filter(x => x.a == x.b).toSeq // List(X2(None,None), X2(None,None))
// the two below should be equal
dataset.filter(A === B).collect().run().toSeq // WrappedArray(X2(None,None), X2(None,None))
untyped.filter($"a" === $"b").collect().toSeq // WrappedArray()
Strangely, @kmate 's PR #267 seems to resolve this problem within case classes.
Sorry, not within case classes, but when the option is parameterized over a case class.
Hi @dszakallas, thank you for bringing this up. I remember working on this maybe a year ago. We made some changes so you can compare a column with an optional literal.
For example, these would work in Frameless land:
dataset.filter(A === None).collect().run
res1: Seq[X2] = WrappedArray(X2(None,None), X2(None,None))
scala> dataset.filter(A =!= None).collect().run
res2: Seq[X2] = WrappedArray()
scala> dataset.filter(A =!= Some(2)).collect().run
res3: Seq[X2] = WrappedArray(X2(None,None), X2(None,None))
But they would not work for Dataframes:
scala> untyped.filter($"a" =!= None).show()
java.lang.RuntimeException: Unsupported literal type class scala.None$ None
at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:77)
at org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$create$2.apply(literals.scala:163)
at org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$create$2.apply(literals.scala:163)
at scala.util.Try.getOrElse(Try.scala:79)
at org.apache.spark.sql.catalyst.expressions.Literal$.create(literals.scala:162)
at org.apache.spark.sql.functions$.typedLit(functions.scala:112)
at org.apache.spark.sql.functions$.lit(functions.scala:95)
at org.apache.spark.sql.Column.$eq$bang$eq(Column.scala:302)
... 42 elided
scala> untyped.filter($"a" === null).collect()
res31: Array[X2] = Array()
I see now how this might contradict the SQL standard, but I think most Frameless users would expect dataset.filter(A =!= None) to behave as it behaves now and not how SELECT 1 WHERE NULL = NULL would work ... not sure if there is an easy choice here. Did this cause any unforeseen issues for you?
From the user perspective I think the most natural mapping from
untyped.filter($"a" === null) to frameless is dataset.filter(A === None), which I would expect to give the same results. One can argue that such an expression doesn't really make sense, but in cases like dataset.filter(A === B) it can really trip up the user. It's very unconventional to retain null (None) values here. I think most users don't want to do this.
If we want to retain the current, non-SQL compatible behavior, I think we should at least explicitly state in the documentation that Options do not behave according to Kleene logic and the correct rewrite of untyped.filter($"a" === $"b") where a and b are optional is dataset.filter(A.isNotNone & B.isNotNone & (A === B)).
Again thank you from bringing this up. It's great that you looked into this in such detail.
I personally believe that in the case where A and B are Option[Int] dataset.filter(A === B) should behave in the same way a scala collection would.
I am all in favor for adding this to the documentation: "If you want to achieve what vanilla Spark gives you for untyped.filter($"a" === $"b") then you have to re-write it as dataset.filter(A.isNotNone & B.isNotNone & (A === B))"
What's the behavior in joins? It would seem like this would be the most problematic case. If I am going to do an inner join on two columns with null values, then my result will contain the cross product of the rows with null.
It breaks as well of course. This is no exception.
scala> case class A(a: Option[String])
defined class A
scala> case class B(b: Option[String])
defined class B
scala> val a = spark.createDataset(Seq(A(None)))
a: org.apache.spark.sql.Dataset[A] = [a: string]
scala> val b = spark.createDataset(Seq(B(None)))
b: org.apache.spark.sql.Dataset[B] = [b: string]
scala> val x = a.join(b, $"a" === $"b")
x: org.apache.spark.sql.DataFrame = [a: string, b: string]
scala> x.show()
+---+---+
| a| b|
+---+---+
+---+---+
scala> x.count()
res12: Long = 0
scala> val ta = TypedDataset.create(Seq(A(None)))
ta: frameless.TypedDataset[A] = [a: string]
scala> val tb = TypedDataset.create(Seq(B(None)))
tb: frameless.TypedDataset[B] = [b: string]
scala> val ca = ta.col('a)
ca: frameless.TypedColumn[A,Option[String]] = a#76
scala> val cb = tb.col('b)
cb: frameless.TypedColumn[B,Option[String]] = b#82
scala> val tx = ta.joinInner(tb)(ca === cb)
tx: frameless.TypedDataset[(A, B)] = [_1: struct<a: string>, _2: struct<b: string>]
scala> tx.show().run()
+---+---+
| _1| _2|
+---+---+
| []| []|
+---+---+
scala> tx.count().run()
res14: Long = 1
ok, so this starts looking more like a problem. I didn't think about the implication of this for joins. We can start treating this as bug.
@dszakallas Were you interested in looking in to fixing this? I'm poking around for an issue to work on, this one seems pretty tractable for someone new to the project. So if you are not going to take it up, I might make a try.
@imarios I'm impressed with your open-mindedness on this one.
@sullivan- feel free to pick up this issue.
Thank you both for looking at this in such depth. @sullivan- feel free to send a PR for this, that would be awesome.
thanks @imarios. I've started looking into it, but got pulled off by other commitments. I'll get back to it soon.