New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48008][WIP] Support UDAFs in Spark Connect #46245
base: master
Are you sure you want to change the base?
Conversation
@@ -49,6 +49,7 @@ import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression | |||
* @tparam OUT The type of the final output result. | |||
* @since 1.6.0 | |||
*/ | |||
@SerialVersionUID(2093413866369130093L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TypedColumn
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's for mapping the client's Aggregator
class to this one. This is required because we now serialise the whole Aggregator
instance on the client side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried multiple methods and still can't succeed without this UID. As long as we want users to define UDAF like this:
new Aggregator {
def merge(...) = {...}
...
}
The serialized payload (either the whole Aggregator instance or individual methods (e.g., agg.merge _
)) will carry a reference to the Aggregator instance that needs to be decoded on the server side. Without this UID, the decode will fail with an error message class UID not match
.
* @since 4.0.0 | ||
*/ | ||
@SerialVersionUID(2093413866369130093L) | ||
abstract class Aggregator[-IN, BUF, OUT] extends Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this common instead of having two abstract classes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that would be ideal. I was doing that before until I found that the Connect client should have another docstring @since 4.0.0
. Could you suggest how could we document this on the client side if this class is moved to Common?
...connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
Outdated
Show resolved
Hide resolved
connector/connect/common/src/main/protobuf/spark/connect/expressions.proto
Outdated
Show resolved
Hide resolved
* Returns this `Aggregator` as a `TypedColumn` that can be used in `Dataset`. | ||
* operations. | ||
*/ | ||
def toColumn: TypedColumn[IN, OUT] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How should this work on the connect side?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should take a wildcard, we have done similar things before.
What changes were proposed in this pull request?
This PR changes Spark Connect to support defining and registering
Aggregator[IN, BUF, OUT]
UDAFs.The mechanism is similar to supporting Scaler UDFs. On the client side, we serialize and send the
Aggregator
instance to the server, where the data is deserialized into anAggregator
instance recognized by Spark Core.With this PR we now have two
Aggregator
interfaces defined, one in Connect API and one in Core. They define exactly the same abstract methods and share the sameSerialVersionUID
, so the Java serialization engine could map one to another. It is very important to keep these two definitions always in sync.A follow-up to this PR is to add
Aggregator.toColumn
API (now NotImplemented due to deps to Spark Core).Why are the changes needed?
Spark Connect does not have UDAF support. We need to fix that.
Does this PR introduce any user-facing change?
Yes, Connect users could now define an Aggregator and register it:
How was this patch tested?
Added new tests.
Was this patch authored or co-authored using generative AI tooling?
Nope.