Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Materialize the stream side first for BroadcastNestedLoopJoins [databricks] #12257

Open
wants to merge 4 commits into
base: branch-25.04
Choose a base branch
from

Conversation

sperlingxx
Copy link
Collaborator

@sperlingxx sperlingxx commented Mar 4, 2025

Fixes #12256

The PR is about to pick up the missing trick for the implementation of BroadcastNestedLoopJoin: materialize the stream side in front of the broadcast build batch, which has been applied on the BroadcastHashJoin.

In this PR, the private method of BCHJ getBroadcastBuiltBatchAndStreamIter is generalized as a common helper method. And it is applied onto BCNLJ as well.

In addition, this PR is a more lightweight alternative for the closed PR: #12243, trying to solve the same problem through implementing the idea from #7642: "spillable flavor of objects deep into the join code, only taking a ColumnarBatch from it when needed"

@sperlingxx
Copy link
Collaborator Author

build

buildDataSize.foreach(_.add(GpuColumnVector.getTotalDeviceMemoryUsed(batch)))
batch
} else {
GpuColumnVector.emptyBatch(buildSchema)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like getBroadcastBatch already handles the case of empty batch as

      case v if SparkShimImpl.isEmptyRelation(v) =>
        GpuColumnVector.emptyBatch(broadcastSchema)

so this seems to be duplicate.

Copy link
Collaborator Author

@sperlingxx sperlingxx Mar 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a little trick to make the call of GpuBroadcastHelper.getBroadcastBatchNumRows looking a little bit useful. I am okay to remove it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

buildDataSize: GpuMetric): (ColumnarBatch, Iterator[ColumnarBatch]) = {
GpuBroadcastHelper.getBroadcastBuiltBatchAndStreamIter(
relation.asInstanceOf[Broadcast[Any]],
buildPlan.schema,
Copy link
Collaborator

@firestarman firestarman Mar 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better not to access the field member buildPlan directly, since makeBuiltBatchAndStreamIter is called on executors, then the entire object of this GpuBroadcastNestedLoopJoinExecBase will be serialized and sent to executors. Try to make a local variable for it and use the local variable.

  protected def makeBuiltBatchAndStreamIter(
      relation: Any,
      buildSchema: DataType,
      streamIter: Iterator[ColumnarBatch],
      buildTime: GpuMetric,
      buildDataSize: GpuMetric): (ColumnarBatch, Iterator[ColumnarBatch]) = {

val localBuilSchema = bulidPlan.schema
streamed.executeColumnar().mapPartitions { streamedIter =>
   makeBuiltBatchAndStreamIter(..., localBuilSchema, ...)
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

@sperlingxx sperlingxx Mar 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. But since the original implement has been required the serialization of the entire class, this change will not introduce extra cost if I am not wrong.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solved.

@sperlingxx
Copy link
Collaborator Author

build

Signed-off-by: sperlingxx <[email protected]>
@sperlingxx
Copy link
Collaborator Author

build

@sperlingxx sperlingxx requested a review from firestarman March 4, 2025 10:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEA] BroadcastNestedLoopJoin does not support stream-side first adjustment
2 participants