spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-48008][WIP] Support UDAFs in Spark Connect

Open xupefei opened this issue 1 year ago • 0 comments

What changes were proposed in this pull request?

This PR changes Spark Connect to support defining and registering Aggregator[IN, BUF, OUT] UDAFs. The mechanism is similar to supporting Scaler UDFs. On the client side, we serialize and send the Aggregator instance to the server, where the data is deserialized into an Aggregator instance recognized by Spark Core. With this PR we now have two Aggregator interfaces defined, one in Connect API and one in Core. They define exactly the same abstract methods and share the same SerialVersionUID, so the Java serialization engine could map one to another. It is very important to keep these two definitions always in sync.

A follow-up to this PR is to add Aggregator.toColumn API (now NotImplemented due to deps to Spark Core).

Why are the changes needed?

Spark Connect does not have UDAF support. We need to fix that.

Does this PR introduce any user-facing change?

Yes, Connect users could now define an Aggregator and register it:

val agg = new Aggregator[INT, INT, INT] { ... }
spark.udf.register("agg", udaf(agg))
val ds: Dataset[Data] = ...
val aggregated = ds.selectExpr("agg(i)")

How was this patch tested?

Added new tests.

Was this patch authored or co-authored using generative AI tooling?

Nope.

xupefei avatar Apr 26 '24 11:04 xupefei