Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48008][WIP] Support UDAFs in Spark Connect #46245

Open
wants to merge 12 commits into
base: master
Choose a base branch
from

Conversation

xupefei
Copy link
Contributor

@xupefei xupefei commented Apr 26, 2024

What changes were proposed in this pull request?

This PR changes Spark Connect to support defining and registering Aggregator[IN, BUF, OUT] UDAFs.
The mechanism is similar to supporting Scaler UDFs. On the client side, we serialize and send the Aggregator instance to the server, where the data is deserialized into an Aggregator instance recognized by Spark Core.
With this PR we now have two Aggregator interfaces defined, one in Connect API and one in Core. They define exactly the same abstract methods and share the same SerialVersionUID, so the Java serialization engine could map one to another. It is very important to keep these two definitions always in sync.

A follow-up to this PR is to add Aggregator.toColumn API (now NotImplemented due to deps to Spark Core).

Why are the changes needed?

Spark Connect does not have UDAF support. We need to fix that.

Does this PR introduce any user-facing change?

Yes, Connect users could now define an Aggregator and register it:

val agg = new Aggregator[INT, INT, INT] { ... }
spark.udf.register("agg", udaf(agg))
val ds: Dataset[Data] = ...
val aggregated = ds.selectExpr("agg(i)")

How was this patch tested?

Added new tests.

Was this patch authored or co-authored using generative AI tooling?

Nope.

@xupefei xupefei marked this pull request as ready for review April 30, 2024 12:42
@@ -49,6 +49,7 @@ import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
* @tparam OUT The type of the final output result.
* @since 1.6.0
*/
@SerialVersionUID(2093413866369130093L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TypedColumn?

Copy link
Contributor Author

@xupefei xupefei May 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's for mapping the client's Aggregator class to this one. This is required because we now serialise the whole Aggregator instance on the client side.

Copy link
Contributor Author

@xupefei xupefei May 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried multiple methods and still can't succeed without this UID. As long as we want users to define UDAF like this:

new Aggregator {
   def merge(...) = {...}
   ...
}

The serialized payload (either the whole Aggregator instance or individual methods (e.g., agg.merge _)) will carry a reference to the Aggregator instance that needs to be decoded on the server side. Without this UID, the decode will fail with an error message class UID not match.

* @since 4.0.0
*/
@SerialVersionUID(2093413866369130093L)
abstract class Aggregator[-IN, BUF, OUT] extends Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this common instead of having two abstract classes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that would be ideal. I was doing that before until I found that the Connect client should have another docstring @since 4.0.0. Could you suggest how could we document this on the client side if this class is moved to Common?

* Returns this `Aggregator` as a `TypedColumn` that can be used in `Dataset`.
* operations.
*/
def toColumn: TypedColumn[IN, OUT] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How should this work on the connect side?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should take a wildcard, we have done similar things before.

@xupefei xupefei requested a review from hvanhovell May 15, 2024 14:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants