-
Notifications
You must be signed in to change notification settings - Fork 245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Materialize the stream side first for BroadcastNestedLoopJoins [databricks] #12257
base: branch-25.04
Are you sure you want to change the base?
Conversation
Signed-off-by: sperlingxx <[email protected]>
Signed-off-by: sperlingxx <[email protected]>
build |
buildDataSize.foreach(_.add(GpuColumnVector.getTotalDeviceMemoryUsed(batch))) | ||
batch | ||
} else { | ||
GpuColumnVector.emptyBatch(buildSchema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like getBroadcastBatch
already handles the case of empty batch as
case v if SparkShimImpl.isEmptyRelation(v) =>
GpuColumnVector.emptyBatch(broadcastSchema)
so this seems to be duplicate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a little trick to make the call of GpuBroadcastHelper.getBroadcastBatchNumRows
looking a little bit useful. I am okay to remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
buildDataSize: GpuMetric): (ColumnarBatch, Iterator[ColumnarBatch]) = { | ||
GpuBroadcastHelper.getBroadcastBuiltBatchAndStreamIter( | ||
relation.asInstanceOf[Broadcast[Any]], | ||
buildPlan.schema, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better not to access the field member buildPlan
directly, since makeBuiltBatchAndStreamIter
is called on executors, then the entire object of this GpuBroadcastNestedLoopJoinExecBase
will be serialized and sent to executors. Try to make a local variable for it and use the local variable.
protected def makeBuiltBatchAndStreamIter(
relation: Any,
buildSchema: DataType,
streamIter: Iterator[ColumnarBatch],
buildTime: GpuMetric,
buildDataSize: GpuMetric): (ColumnarBatch, Iterator[ColumnarBatch]) = {
val localBuilSchema = bulidPlan.schema
streamed.executeColumnar().mapPartitions { streamedIter =>
makeBuiltBatchAndStreamIter(..., localBuilSchema, ...)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right. But since the original implement has been required the serialization of the entire class, this change will not introduce extra cost if I am not wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Solved.
...rc/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExecBase.scala
Outdated
Show resolved
Hide resolved
build |
Signed-off-by: sperlingxx <[email protected]>
build |
Fixes #12256
The PR is about to pick up the missing trick for the implementation of BroadcastNestedLoopJoin: materialize the stream side in front of the broadcast build batch, which has been applied on the BroadcastHashJoin.
In this PR, the private method of BCHJ
getBroadcastBuiltBatchAndStreamIter
is generalized as a common helper method. And it is applied onto BCNLJ as well.In addition, this PR is a more lightweight alternative for the closed PR: #12243, trying to solve the same problem through implementing the idea from #7642: "spillable flavor of objects deep into the join code, only taking a ColumnarBatch from it when needed"