-
Notifications
You must be signed in to change notification settings - Fork 28.8k
[SPARK-53401][SQL] Enable Direct Passthrough Partitioning in the DataFrame API #52153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
d1fe3da
7146dd8
b3f2a94
fad6256
7be523b
53ce88a
84bafd8
6a13e3b
228ca21
599a3d6
643b31a
695278a
e5f4c74
31d4c22
799549a
4ab3e4b
ef1fa45
97cc15c
fddd5da
e23ea46
09ec99b
5c9f681
c9b6df5
5d91e0e
8805934
b0add14
f678101
13210a1
8287ac5
1480f86
276a650
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalyst.expressions | ||
|
||
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, FalseLiteral} | ||
import org.apache.spark.sql.catalyst.expressions.codegen.Block._ | ||
import org.apache.spark.sql.types.{AbstractDataType, DataType, LongType} | ||
|
||
/** | ||
* Expression that takes a partition ID value and passes it through directly for use in | ||
* shuffle partitioning. This is used with RepartitionByExpression to allow users to | ||
* directly specify target partition IDs. | ||
* | ||
* The child expression must evaluate to an integral type and must not be null. | ||
* The resulting partition ID must be in the range [0, numPartitions). | ||
*/ | ||
@ExpressionDescription( | ||
usage = "_FUNC_(expr) - Returns the partition ID specified by expr for direct shuffle " + | ||
"partitioning.", | ||
arguments = """ | ||
Arguments: | ||
* expr - an integral expression that specifies the target partition ID | ||
""", | ||
examples = """ | ||
Examples: | ||
> df.repartition(10, direct_shuffle_partition_id($"partition_id")) | ||
> df.repartition(10, expr("direct_shuffle_partition_id(id % 5)")) | ||
""", | ||
since = "4.1.0", | ||
group = "misc_funcs" | ||
) | ||
case class DirectShufflePartitionID(child: Expression) | ||
shujingyang-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
extends UnaryExpression | ||
with ExpectsInputTypes { | ||
shujingyang-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
override def dataType: DataType = child.dataType | ||
|
||
override def inputTypes: Seq[AbstractDataType] = LongType :: Nil | ||
shujingyang-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
override def nullable: Boolean = false | ||
|
||
override val prettyName: String = "direct_shuffle_partition_id" | ||
|
||
override def eval(input: InternalRow): Any = { | ||
val result = child.eval(input) | ||
if (result == null) { | ||
throw new IllegalArgumentException( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's add a user-facing error condition for it, or we still treat null as 0. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yep we treat null as 0. I think it's not able to create and test out a user-facing error condition here as this should only happen if there is an internal error There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update: removing this as we mark DirectShufflePartitionID as Unevaluable |
||
"The partition ID expression must not be null.") | ||
} | ||
nullSafeEval(result) | ||
} | ||
|
||
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { | ||
val childGen = child.genCode(ctx) | ||
val resultCode = | ||
s""" | ||
|${childGen.code} | ||
|if (${childGen.isNull}) { | ||
| throw new IllegalArgumentException( | ||
| "The partition ID expression must not be null."); | ||
|} | ||
|""".stripMargin | ||
|
||
ev.copy(code = code"$resultCode", isNull = FalseLiteral, value = childGen.value) | ||
} | ||
|
||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
override protected def withNewChildInternal(newChild: Expression): DirectShufflePartitionID = | ||
copy(child = newChild) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -946,3 +946,24 @@ case class ShuffleSpecCollection(specs: Seq[ShuffleSpec]) extends ShuffleSpec { | |
specs.head.numPartitions | ||
} | ||
} | ||
|
||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/** | ||
* Represents a partitioning where partition IDs are passed through directly from the | ||
* DirectShufflePartitionID expression. This partitioning scheme is used when users | ||
* want to directly control partition placement rather than using hash-based partitioning. | ||
* | ||
* This partitioning maps directly to the PartitionIdPassthrough RDD partitioner. | ||
*/ | ||
case class ShufflePartitionIdPassThrough( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could creating this on a column with high cardinality lead to a sudden increase in partitions? Will subsequent AQE rules try to act and reduce the number of partitions? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nope, it will not reuse or remove shuffles. This is more to replace RDD's Partitioner API so people can completely migrate to DataFrame API. For the fact of performance and efficiency, it won't be super useful. |
||
expr: DirectShufflePartitionID, | ||
numPartitions: Int) extends HashPartitioningLike { | ||
|
||
// TODO(SPARK-53401): Support Shuffle Spec in Direct Partition ID Pass Through | ||
def partitionIdExpression: Expression = Pmod(expr, Literal(numPartitions)) | ||
|
||
override def expressions: Seq[Expression] = expr :: Nil | ||
|
||
override protected def withNewChildrenInternal( | ||
newChildren: IndexedSeq[Expression]): ShufflePartitionIdPassThrough = | ||
copy(expr = newChildren.head.asInstanceOf[DirectShufflePartitionID]) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2785,6 +2785,51 @@ class DataFrameSuite extends QueryTest | |
val df1 = df.select("a").orderBy("b").orderBy("all") | ||
checkAnswer(df1, Seq(Row(1), Row(4))) | ||
} | ||
|
||
test("SPARK-53401: direct_shuffle_partition_id - should partition rows to the specified " + | ||
"partition ID") { | ||
val numPartitions = 10 | ||
val df = spark.range(100).withColumn("p_id", col("id") % numPartitions) | ||
|
||
val repartitioned = df.repartition(numPartitions, direct_shuffle_partition_id($"p_id")) | ||
val result = repartitioned.withColumn("actual_p_id", spark_partition_id()) | ||
|
||
assert(result.filter(col("p_id") =!= col("actual_p_id")).count() == 0) | ||
|
||
assert(result.rdd.getNumPartitions == numPartitions) | ||
} | ||
|
||
test("SPARK-53401: direct_shuffle_partition_id - should work with expr()") { | ||
val numPartitions = 5 | ||
val df = spark.range(50).withColumn("p_id", col("id") % numPartitions) | ||
|
||
val repartitioned = df.repartition(numPartitions, expr("direct_shuffle_partition_id(p_id)")) | ||
val result = repartitioned.withColumn("actual_p_id", spark_partition_id()) | ||
|
||
assert(result.filter(col("p_id") =!= col("actual_p_id")).count() == 0) | ||
} | ||
|
||
test("SPARK-53401: direct_shuffle_partition_id - should fail when partition ID is null") { | ||
val df = spark.range(10).withColumn("p_id", | ||
when(col("id") < 5, col("id")).otherwise(lit(null).cast("long")) | ||
) | ||
val repartitioned = df.repartition(5, direct_shuffle_partition_id($"p_id")) | ||
|
||
val e = intercept[SparkException] { | ||
repartitioned.collect() | ||
} | ||
assert(e.getCause.isInstanceOf[IllegalArgumentException]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what's the actual error? if the error message is not clear we should do explicit null check, or simply treat null as partition id 0. |
||
assert(e.getCause.getMessage.contains("The partition ID expression must not be null.")) | ||
} | ||
|
||
test("SPARK-53401: direct_shuffle_partition_id - should fail analysis for non-integral types") { | ||
val df = spark.range(5).withColumn("s", lit("a")) | ||
val e = intercept[AnalysisException] { | ||
df.repartition(5, direct_shuffle_partition_id($"s")).collect() | ||
} | ||
// Should fail with type error from DirectShufflePartitionID expression | ||
assert(e.getMessage.contains("requires an integral type")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. where do we throw this error now? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In Pmod. The full error message is
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh it's execution time. We should fail earlier in the analysis time, like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added
|
||
} | ||
} | ||
|
||
case class GroupByKey(a: Int, b: Int) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this partition id be changed by AQE?