Support for UDAF
Currently we have support for UDF only. UDAF are really useful and it will be nice if we can support them.
Relevant scala docs and this. The second seems to be the preferred way to do this with Datasets.
The Aggregator:
trait Aggregator[IN,BUF,OUT]
- IN The input type for the aggregation.
- BUF The type of the intermediate value of the reduction.
- OUT The type of the final output result.
Example from SO:
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders}
class BelowThreshold[I](f: I => Boolean) extends Aggregator[I, Boolean, Boolean]
with Serializable {
val zero = false
def reduce(acc: Boolean, x: I) = acc | f(x)
def merge(acc1: Boolean, acc2: Boolean) = acc1 | acc2
def finish(acc: Boolean) = acc
def bufferEncoder: Encoder[Boolean] = Encoders.scalaBoolean
def outputEncoder: Encoder[Boolean] = Encoders.scalaBoolean
}
val belowThreshold = new BelowThreshold[(String, Int)](_._2 < - 40).toColumn
df.as[(String, Int)].groupByKey(_._1).agg(belowThreshold)
Examples from spark
Other example from Databricks:
val simpleSum = new Aggregator[Int, Int, Int] with Serializable {
def zero: Int = 0 // The initial value.
def reduce(b: Int, a: Int) = b + a // Add an element to the running total
def merge(b1: Int, b2: Int) = b1 + b2 // Merge intermediate values.
def finish(b: Int) = b // Return the final result.
}.toColumn
val ds = Seq(1, 2, 3, 4).toDS()
ds.select(simpleSum).collect
todo: Create an TypedAggregator that returns a TypedColumn.
The entire construct seems to be quite type safe. This could be really simple.
Adding myself as a placeholder. I would love to get help if anyone is interested.
I'd like to give this a go, if no one else has already started on it.
Hi @jacobBaumbach ! sorry for the delay! we would love your help. Feel free to ask for any help. Best!