New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48037][CORE] Fix SortShuffleWriter lacks shuffle write related metrics resulting in potentially inaccurate data #46273
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a test case for this, @cxzl25 ?
@@ -85,8 +86,10 @@ class AdaptiveQueryExecSuite | |||
assert(planBefore.toString.startsWith("AdaptiveSparkPlan isFinalPlan=false")) | |||
val result = dfAdaptive.collect() | |||
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { | |||
val df = sql(query) | |||
checkAnswer(df, result.toImmutableArraySeq) | |||
if (!skipCheckAnswer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sometimes the SQL results are not always ordered, so a parameter has been added to support skip checking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you make a separate PR for this, @cxzl25 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes to core
looks fine to me, but I am not very sure of why we are skipping the tests and whether the test is actually testing what we expect.
I will let @dongjoon-hyun review that (along with rest of the PR) better.
Thank you for review, @mridulm . |
Limits after group by are not guaranteed to be in order.
In this added UT check if the final execution plan of AQE contains the limit operator. ./bin/spark-sql --conf spark.driver.memory=6g set spark.sql.shuffle.partitions=16777217;
create table foo as select id from range(2);
select id, count(*) from foo group by id limit 1;
|
…AndVerifyResult` to skip check results ### What changes were proposed in this pull request? This PR aims to support AdaptiveQueryExecSuite to skip check results. ### Why are the changes needed? #46273 (comment) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? GA ### Was this patch authored or co-authored using generative AI tooling? No Closes #46316 from cxzl25/SPARK-48070. Authored-by: sychen <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
Also, cc @viirya , too. |
|LIMIT 1 | ||
|""".stripMargin, skipCheckAnswer = true) | ||
assert(findTopLevelLimit(plan).size == 1) | ||
assert(findTopLevelLimit(adaptivePlan).size == 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, this only verifies if there is specific operator (i.e., Limit
) in the query plan, how is it related to the metrics you want to fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see. It is due to AQE usage of runtime metrics.
@@ -710,7 +711,7 @@ private[spark] class ExternalSorter[K, V, C]( | |||
serializerManager, | |||
serInstance, | |||
blockId, | |||
context.taskMetrics().shuffleWriteMetrics, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, the metrics
given at getWriter
looks like also coming from context.taskMetrics().shuffleWriteMetrics
, isn't it looking the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be SQLShuffleWriteMetricsReporter
, which may not be the same as context.taskMetrics().shuffleWriteMetrics
.
spark/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala
Lines 51 to 56 in 9e12bd7
writer = manager.getWriter[Any, Any]( | |
dep.shuffleHandle, | |
mapId, | |
context, | |
createMetricsReporter(context)) | |
writer.write(inputs.asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) |
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
Lines 449 to 455 in 9e12bd7
def createShuffleWriteProcessor(metrics: Map[String, SQLMetric]): ShuffleWriteProcessor = { | |
new ShuffleWriteProcessor { | |
override protected def createMetricsReporter( | |
context: TaskContext): ShuffleWriteMetricsReporter = { | |
new SQLShuffleWriteMetricsReporter(context.taskMetrics().shuffleWriteMetrics, metrics) | |
} | |
} |
Looks like this missed the 3.4 release by a month @dongjoon-hyun ... might have been a nice addition to it ! |
Ya, I agree and this is still a good addition to 4.0.0-preview. Cc @cloud-fan as the release manager of 4.0.0-preview |
|FROM t3 | ||
|GROUP BY id | ||
|LIMIT 1 | ||
|""".stripMargin, skipCheckAnswer = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have to skip checking the answer? I think the query can be LIMIT 10
so that the result is deterministic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because AQE is disabled when checking the results, it will have many partitions, and the order is not guaranteed at this time. When AQE is enabled, it will be merged into one partition.
set spark.sql.adaptive.enabled=false;
set spark.sql.shuffle.partitions=1000;
create table foo as select id from range(2);
select id, count(*) from foo group by id limit 1;
output
1 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK the checkAnswer
util will sort the data before comparison.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does sort, but the final result is sorted locally. Because the result of the limit is uncertain, the local sort has no effect.
spark/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
Lines 282 to 293 in 08c6bb9
def getErrorMessageInCheckAnswer( | |
df: DataFrame, | |
expectedAnswer: Seq[Row], | |
checkToRDD: Boolean = true): Option[String] = { | |
val isSorted = df.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty | |
if (checkToRDD) { | |
SQLExecution.withSQLConfPropagated(df.sparkSession) { | |
df.rdd.count() // Also attempt to deserialize as an RDD [SPARK-15791] | |
} | |
} | |
val sparkAnswer = try df.collect().toSeq catch { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can make it certain if the limit is larger than the number of result rows?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this test case, because spark.sql.adaptive.enabled=false
, partitions will not be merged.
It has a large number of partitions and tasks, so it requires a large amount of driver memory to execute successfully.
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
Merged to master for Apache Spark 4.0.0-preview. Could you make backporting PRs to the release branches, @cxzl25 ? |
… metrics resulting in potentially inaccurate data ### What changes were proposed in this pull request? This PR aims to fix SortShuffleWriter lacks shuffle write related metrics resulting in potentially inaccurate data. ### Why are the changes needed? When the shuffle writer is SortShuffleWriter, it does not use SQLShuffleWriteMetricsReporter to update metrics, which causes AQE to obtain runtime statistics and the rowCount obtained is 0. Some optimization rules rely on rowCount statistics, such as `EliminateLimits`. Because rowCount is 0, it removes the limit operator. At this time, we get data results without limit. https://github.com/apache/spark/blob/59d5946cfd377e9203ccf572deb34f87fab7510c/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L168-L172 https://github.com/apache/spark/blob/59d5946cfd377e9203ccf572deb34f87fab7510c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L2067-L2070 ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Production environment verification. **master metrics** <img width="296" alt="image" src="https://github.com/apache/spark/assets/3898450/dc9b6e8a-93ec-4f59-a903-71aa5b11962c"> **PR metrics** <img width="276" alt="image" src="https://github.com/apache/spark/assets/3898450/2d73b773-2dcc-4d23-81de-25dcadac86c1"> ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#46273 from cxzl25/SPARK-48037. Authored-by: sychen <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit e24f896)
…AndVerifyResult` to skip check results ### What changes were proposed in this pull request? This PR aims to support AdaptiveQueryExecSuite to skip check results. ### Why are the changes needed? apache#46273 (comment) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? GA ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#46316 from cxzl25/SPARK-48070. Authored-by: sychen <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 35767bb)
… metrics resulting in potentially inaccurate data ### What changes were proposed in this pull request? This PR aims to fix SortShuffleWriter lacks shuffle write related metrics resulting in potentially inaccurate data. ### Why are the changes needed? When the shuffle writer is SortShuffleWriter, it does not use SQLShuffleWriteMetricsReporter to update metrics, which causes AQE to obtain runtime statistics and the rowCount obtained is 0. Some optimization rules rely on rowCount statistics, such as `EliminateLimits`. Because rowCount is 0, it removes the limit operator. At this time, we get data results without limit. https://github.com/apache/spark/blob/59d5946cfd377e9203ccf572deb34f87fab7510c/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L168-L172 https://github.com/apache/spark/blob/59d5946cfd377e9203ccf572deb34f87fab7510c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L2067-L2070 ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Production environment verification. **master metrics** <img width="296" alt="image" src="https://github.com/apache/spark/assets/3898450/dc9b6e8a-93ec-4f59-a903-71aa5b11962c"> **PR metrics** <img width="276" alt="image" src="https://github.com/apache/spark/assets/3898450/2d73b773-2dcc-4d23-81de-25dcadac86c1"> ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#46273 from cxzl25/SPARK-48037. Authored-by: sychen <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit e24f896)
…AndVerifyResult` to skip check results ### What changes were proposed in this pull request? This PR aims to support AdaptiveQueryExecSuite to skip check results. ### Why are the changes needed? apache#46273 (comment) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? GA ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#46316 from cxzl25/SPARK-48070. Authored-by: sychen <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 35767bb)
…AndVerifyResult` to skip check results ### What changes were proposed in this pull request? This PR aims to support AdaptiveQueryExecSuite to skip check results. ### Why are the changes needed? apache#46273 (comment) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? GA ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#46316 from cxzl25/SPARK-48070. Authored-by: sychen <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
… metrics resulting in potentially inaccurate data ### What changes were proposed in this pull request? This PR aims to fix SortShuffleWriter lacks shuffle write related metrics resulting in potentially inaccurate data. ### Why are the changes needed? When the shuffle writer is SortShuffleWriter, it does not use SQLShuffleWriteMetricsReporter to update metrics, which causes AQE to obtain runtime statistics and the rowCount obtained is 0. Some optimization rules rely on rowCount statistics, such as `EliminateLimits`. Because rowCount is 0, it removes the limit operator. At this time, we get data results without limit. https://github.com/apache/spark/blob/59d5946cfd377e9203ccf572deb34f87fab7510c/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L168-L172 https://github.com/apache/spark/blob/59d5946cfd377e9203ccf572deb34f87fab7510c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L2067-L2070 ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Production environment verification. **master metrics** <img width="296" alt="image" src="https://github.com/apache/spark/assets/3898450/dc9b6e8a-93ec-4f59-a903-71aa5b11962c"> **PR metrics** <img width="276" alt="image" src="https://github.com/apache/spark/assets/3898450/2d73b773-2dcc-4d23-81de-25dcadac86c1"> ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#46273 from cxzl25/SPARK-48037. Authored-by: sychen <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
This PR aims to fix SortShuffleWriter lacks shuffle write related metrics resulting in potentially inaccurate data.
Why are the changes needed?
When the shuffle writer is SortShuffleWriter, it does not use SQLShuffleWriteMetricsReporter to update metrics, which causes AQE to obtain runtime statistics and the rowCount obtained is 0.
Some optimization rules rely on rowCount statistics, such as
EliminateLimits
. Because rowCount is 0, it removes the limit operator. At this time, we get data results without limit.spark/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
Lines 168 to 172 in 59d5946
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
Lines 2067 to 2070 in 59d5946
Does this PR introduce any user-facing change?
Yes
How was this patch tested?
Production environment verification.
master metrics
PR metrics
Was this patch authored or co-authored using generative AI tooling?
No