Skip to content

Commit 8fabbab

Browse files
maryannxuegatorsmile
authored andcommitted
[SPARK-29350] Fix BroadcastExchange reuse in Dynamic Partition Pruning
### What changes were proposed in this pull request? Dynamic partition pruning filters are added as an in-subquery containing a `BroadcastExchangeExec` in case of a broadcast hash join. This PR makes the `ReuseExchange` rule visit in-subquery nodes, to ensure the new `BroadcastExchangeExec` added by dynamic partition pruning can be reused. ### Why are the changes needed? This initial dynamic partition pruning PR did not enable this reuse, which means a broadcast exchange would be executed twice, in the main query and in the DPP filter. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added broadcast exchange reuse check in `DynamicPartitionPruningSuite` Closes apache#26015 from maryannxue/exchange-reuse. Authored-by: maryannxue <[email protected]> Signed-off-by: Xiao Li <[email protected]>
1 parent aedf090 commit 8fabbab

File tree

2 files changed

+35
-8
lines changed

2 files changed

+35
-8
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow
2626
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Expression, SortOrder}
2727
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
2828
import org.apache.spark.sql.catalyst.rules.Rule
29-
import org.apache.spark.sql.execution.{ExplainUtils, LeafExecNode, SparkPlan, UnaryExecNode}
29+
import org.apache.spark.sql.execution._
3030
import org.apache.spark.sql.internal.SQLConf
3131
import org.apache.spark.sql.types.StructType
3232
import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -109,9 +109,10 @@ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
109109
}
110110
// Build a hash map using schema of exchanges to avoid O(N*N) sameResult calls.
111111
val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]()
112-
plan.transformUp {
112+
113+
// Replace a Exchange duplicate with a ReusedExchange
114+
def reuse: PartialFunction[Exchange, SparkPlan] = {
113115
case exchange: Exchange =>
114-
// the exchanges that have same results usually also have same schemas (same column names).
115116
val sameSchema = exchanges.getOrElseUpdate(exchange.schema, ArrayBuffer[Exchange]())
116117
val samePlan = sameSchema.find { e =>
117118
exchange.sameResult(e)
@@ -125,5 +126,16 @@ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
125126
exchange
126127
}
127128
}
129+
130+
plan transformUp {
131+
case exchange: Exchange => reuse(exchange)
132+
} transformAllExpressions {
133+
// Lookup inside subqueries for duplicate exchanges
134+
case in: InSubqueryExec =>
135+
val newIn = in.plan.transformUp {
136+
case exchange: Exchange => reuse(exchange)
137+
}
138+
in.copy(plan = newIn.asInstanceOf[BaseSubqueryExec])
139+
}
128140
}
129141
}

sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
2222
import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression}
2323
import org.apache.spark.sql.catalyst.plans.ExistenceJoin
2424
import org.apache.spark.sql.execution._
25+
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec}
2526
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
2627
import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper}
2728
import org.apache.spark.sql.functions._
@@ -161,22 +162,36 @@ class DynamicPartitionPruningSuite
161162
df: DataFrame,
162163
withSubquery: Boolean,
163164
withBroadcast: Boolean): Unit = {
164-
val dpExprs = collectDynamicPruningExpressions(df.queryExecution.executedPlan)
165+
val plan = df.queryExecution.executedPlan
166+
val dpExprs = collectDynamicPruningExpressions(plan)
165167
val hasSubquery = dpExprs.exists {
166168
case InSubqueryExec(_, _: SubqueryExec, _, _) => true
167169
case _ => false
168170
}
169-
val hasSubqueryBroadcast = dpExprs.exists {
170-
case InSubqueryExec(_, _: SubqueryBroadcastExec, _, _) => true
171-
case _ => false
171+
val subqueryBroadcast = dpExprs.collect {
172+
case InSubqueryExec(_, b: SubqueryBroadcastExec, _, _) => b
172173
}
173174

174175
val hasFilter = if (withSubquery) "Should" else "Shouldn't"
175176
assert(hasSubquery == withSubquery,
176177
s"$hasFilter trigger DPP with a subquery duplicate:\n${df.queryExecution}")
177178
val hasBroadcast = if (withBroadcast) "Should" else "Shouldn't"
178-
assert(hasSubqueryBroadcast == withBroadcast,
179+
assert(subqueryBroadcast.nonEmpty == withBroadcast,
179180
s"$hasBroadcast trigger DPP with a reused broadcast exchange:\n${df.queryExecution}")
181+
182+
subqueryBroadcast.foreach { s =>
183+
s.child match {
184+
case _: ReusedExchangeExec => // reuse check ok.
185+
case b: BroadcastExchangeExec =>
186+
val hasReuse = plan.find {
187+
case ReusedExchangeExec(_, e) => e eq b
188+
case _ => false
189+
}.isDefined
190+
assert(hasReuse, s"$s\nshould have been reused in\n$plan")
191+
case _ =>
192+
fail(s"Invalid child node found in\n$s")
193+
}
194+
}
180195
}
181196

182197
/**

0 commit comments

Comments
 (0)