Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48037][CORE] Fix SortShuffleWriter lacks shuffle write related metrics resulting in potentially inaccurate data #46273

Closed
wants to merge 4 commits into from

Conversation

cxzl25
Copy link
Contributor

@cxzl25 cxzl25 commented Apr 29, 2024

What changes were proposed in this pull request?

This PR aims to fix SortShuffleWriter lacks shuffle write related metrics resulting in potentially inaccurate data.

Why are the changes needed?

When the shuffle writer is SortShuffleWriter, it does not use SQLShuffleWriteMetricsReporter to update metrics, which causes AQE to obtain runtime statistics and the rowCount obtained is 0.

Some optimization rules rely on rowCount statistics, such as EliminateLimits. Because rowCount is 0, it removes the limit operator. At this time, we get data results without limit.

override def runtimeStatistics: Statistics = {
val dataSize = metrics("dataSize").value
val rowCount = metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN).value
Statistics(dataSize, Some(rowCount))
}

object EliminateLimits extends Rule[LogicalPlan] {
private def canEliminate(limitExpr: Expression, child: LogicalPlan): Boolean = {
limitExpr.foldable && child.maxRows.exists { _ <= limitExpr.eval().asInstanceOf[Int] }
}

Does this PR introduce any user-facing change?

Yes

How was this patch tested?

Production environment verification.

master metrics
image

PR metrics

image

Was this patch authored or co-authored using generative AI tooling?

No

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test case for this, @cxzl25 ?

@@ -85,8 +86,10 @@ class AdaptiveQueryExecSuite
assert(planBefore.toString.startsWith("AdaptiveSparkPlan isFinalPlan=false"))
val result = dfAdaptive.collect()
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
val df = sql(query)
checkAnswer(df, result.toImmutableArraySeq)
if (!skipCheckAnswer) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometimes the SQL results are not always ordered, so a parameter has been added to support skip checking.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make a separate PR for this, @cxzl25 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes to core looks fine to me, but I am not very sure of why we are skipping the tests and whether the test is actually testing what we expect.
I will let @dongjoon-hyun review that (along with rest of the PR) better.

@dongjoon-hyun
Copy link
Member

Thank you for review, @mridulm .

@cxzl25
Copy link
Contributor Author

cxzl25 commented May 1, 2024

why we are skipping the tests

Limits after group by are not guaranteed to be in order.

[info]   == Results ==
[info]   !== Correct Answer - 1 ==            == Spark Answer - 1 ==
[info]    struct<id:bigint,count(1):bigint>   struct<id:bigint,count(1):bigint>
[info]   ![1,1]                               [0,1] (QueryTest.scala:267)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.apache.spark.sql.QueryTest$.newAssertionFailedException(QueryTest.scala:257)
[info]   at org.scalatest.Assertions.fail(Assertions.scala:933)
[info]   at org.scalatest.Assertions.fail$(Assertions.scala:929)
[info]   at org.apache.spark.sql.QueryTest$.fail(QueryTest.scala:257)
[info]   at org.apache.spark.sql.QueryTest$.checkAnswer(QueryTest.scala:267)
[info]   at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:153)
[info]   at org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite.$anonfun$runAdaptiveAndVerifyResult$1(AdaptiveQueryExecSuite.scala:91)

whether the test is actually testing what we expect

In this added UT check if the final execution plan of AQE contains the limit operator.

./bin/spark-sql --conf spark.driver.memory=6g
set spark.sql.shuffle.partitions=16777217;
create table foo as select id from range(2);
select id, count(*) from foo group by id limit 1;
== Physical Plan ==
AdaptiveSparkPlan (7)
+- == Final Plan ==
   LocalTableScan (1)
+- == Initial Plan ==
   CollectLimit (6)
   +- HashAggregate (5)
      +- Exchange (4)
         +- HashAggregate (3)
            +- Scan hive spark_catalog.default.foo (2)

dongjoon-hyun pushed a commit that referenced this pull request May 1, 2024
…AndVerifyResult` to skip check results

### What changes were proposed in this pull request?
This PR aims to support AdaptiveQueryExecSuite to skip check results.

### Why are the changes needed?
#46273 (comment)

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
GA

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #46316 from cxzl25/SPARK-48070.

Authored-by: sychen <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
@dongjoon-hyun
Copy link
Member

I merged #46316 . Could you rebase this PR to the master branch, @cxzl25 ?

@dongjoon-hyun
Copy link
Member

Also, cc @viirya , too.

|LIMIT 1
|""".stripMargin, skipCheckAnswer = true)
assert(findTopLevelLimit(plan).size == 1)
assert(findTopLevelLimit(adaptivePlan).size == 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this only verifies if there is specific operator (i.e., Limit) in the query plan, how is it related to the metrics you want to fix?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. It is due to AQE usage of runtime metrics.

@@ -710,7 +711,7 @@ private[spark] class ExternalSorter[K, V, C](
serializerManager,
serInstance,
blockId,
context.taskMetrics().shuffleWriteMetrics,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the metrics given at getWriter looks like also coming from context.taskMetrics().shuffleWriteMetrics, isn't it looking the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be SQLShuffleWriteMetricsReporter, which may not be the same as context.taskMetrics().shuffleWriteMetrics.

writer = manager.getWriter[Any, Any](
dep.shuffleHandle,
mapId,
context,
createMetricsReporter(context))
writer.write(inputs.asInstanceOf[Iterator[_ <: Product2[Any, Any]]])

def createShuffleWriteProcessor(metrics: Map[String, SQLMetric]): ShuffleWriteProcessor = {
new ShuffleWriteProcessor {
override protected def createMetricsReporter(
context: TaskContext): ShuffleWriteMetricsReporter = {
new SQLShuffleWriteMetricsReporter(context.taskMetrics().shuffleWriteMetrics, metrics)
}
}

@mridulm
Copy link
Contributor

mridulm commented May 7, 2024

Looks like this missed the 3.4 release by a month @dongjoon-hyun ... might have been a nice addition to it !

@dongjoon-hyun
Copy link
Member

Looks like this missed the 3.4 release by a month @dongjoon-hyun ... might have been a nice addition to it !

Ya, I agree and this is still a good addition to 4.0.0-preview.

Cc @cloud-fan as the release manager of 4.0.0-preview

|FROM t3
|GROUP BY id
|LIMIT 1
|""".stripMargin, skipCheckAnswer = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have to skip checking the answer? I think the query can be LIMIT 10 so that the result is deterministic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because AQE is disabled when checking the results, it will have many partitions, and the order is not guaranteed at this time. When AQE is enabled, it will be merged into one partition.

set spark.sql.adaptive.enabled=false;
set spark.sql.shuffle.partitions=1000;
create table foo as select id from range(2);
select id, count(*) from foo group by id limit 1;

output

1	1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK the checkAnswer util will sort the data before comparison.

Copy link
Contributor Author

@cxzl25 cxzl25 May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does sort, but the final result is sorted locally. Because the result of the limit is uncertain, the local sort has no effect.

def getErrorMessageInCheckAnswer(
df: DataFrame,
expectedAnswer: Seq[Row],
checkToRDD: Boolean = true): Option[String] = {
val isSorted = df.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty
if (checkToRDD) {
SQLExecution.withSQLConfPropagated(df.sparkSession) {
df.rdd.count() // Also attempt to deserialize as an RDD [SPARK-15791]
}
}
val sparkAnswer = try df.collect().toSeq catch {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can make it certain if the limit is larger than the number of result rows?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this test case, because spark.sql.adaptive.enabled=false, partitions will not be merged.
It has a large number of partitions and tasks, so it requires a large amount of driver memory to execute successfully.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.

@dongjoon-hyun
Copy link
Member

Merged to master for Apache Spark 4.0.0-preview.

Could you make backporting PRs to the release branches, @cxzl25 ?

cxzl25 added a commit to cxzl25/spark that referenced this pull request May 8, 2024
… metrics resulting in potentially inaccurate data

### What changes were proposed in this pull request?
This PR aims to fix SortShuffleWriter lacks shuffle write related metrics resulting in potentially inaccurate data.

### Why are the changes needed?
When the shuffle writer is SortShuffleWriter, it does not use SQLShuffleWriteMetricsReporter to update metrics, which causes AQE to obtain runtime statistics and the rowCount obtained is 0.

Some optimization rules rely on rowCount statistics, such as `EliminateLimits`. Because rowCount is 0, it removes the limit operator. At this time, we get data results without limit.

https://github.com/apache/spark/blob/59d5946cfd377e9203ccf572deb34f87fab7510c/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L168-L172

https://github.com/apache/spark/blob/59d5946cfd377e9203ccf572deb34f87fab7510c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L2067-L2070

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
Production environment verification.

**master metrics**
<img width="296" alt="image" src="https://github.com/apache/spark/assets/3898450/dc9b6e8a-93ec-4f59-a903-71aa5b11962c">

**PR metrics**

<img width="276" alt="image" src="https://github.com/apache/spark/assets/3898450/2d73b773-2dcc-4d23-81de-25dcadac86c1">

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#46273 from cxzl25/SPARK-48037.

Authored-by: sychen <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>

(cherry picked from commit e24f896)
cxzl25 added a commit to cxzl25/spark that referenced this pull request May 8, 2024
…AndVerifyResult` to skip check results

### What changes were proposed in this pull request?
This PR aims to support AdaptiveQueryExecSuite to skip check results.

### Why are the changes needed?
apache#46273 (comment)

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
GA

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#46316 from cxzl25/SPARK-48070.

Authored-by: sychen <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>

(cherry picked from commit 35767bb)
cxzl25 added a commit to cxzl25/spark that referenced this pull request May 8, 2024
… metrics resulting in potentially inaccurate data

### What changes were proposed in this pull request?
This PR aims to fix SortShuffleWriter lacks shuffle write related metrics resulting in potentially inaccurate data.

### Why are the changes needed?
When the shuffle writer is SortShuffleWriter, it does not use SQLShuffleWriteMetricsReporter to update metrics, which causes AQE to obtain runtime statistics and the rowCount obtained is 0.

Some optimization rules rely on rowCount statistics, such as `EliminateLimits`. Because rowCount is 0, it removes the limit operator. At this time, we get data results without limit.

https://github.com/apache/spark/blob/59d5946cfd377e9203ccf572deb34f87fab7510c/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L168-L172

https://github.com/apache/spark/blob/59d5946cfd377e9203ccf572deb34f87fab7510c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L2067-L2070

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
Production environment verification.

**master metrics**
<img width="296" alt="image" src="https://github.com/apache/spark/assets/3898450/dc9b6e8a-93ec-4f59-a903-71aa5b11962c">

**PR metrics**

<img width="276" alt="image" src="https://github.com/apache/spark/assets/3898450/2d73b773-2dcc-4d23-81de-25dcadac86c1">

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#46273 from cxzl25/SPARK-48037.

Authored-by: sychen <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>

(cherry picked from commit e24f896)
cxzl25 added a commit to cxzl25/spark that referenced this pull request May 8, 2024
…AndVerifyResult` to skip check results

### What changes were proposed in this pull request?
This PR aims to support AdaptiveQueryExecSuite to skip check results.

### Why are the changes needed?
apache#46273 (comment)

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
GA

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#46316 from cxzl25/SPARK-48070.

Authored-by: sychen <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>

(cherry picked from commit 35767bb)
JacobZheng0927 pushed a commit to JacobZheng0927/spark that referenced this pull request May 11, 2024
…AndVerifyResult` to skip check results

### What changes were proposed in this pull request?
This PR aims to support AdaptiveQueryExecSuite to skip check results.

### Why are the changes needed?
apache#46273 (comment)

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
GA

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#46316 from cxzl25/SPARK-48070.

Authored-by: sychen <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
JacobZheng0927 pushed a commit to JacobZheng0927/spark that referenced this pull request May 11, 2024
… metrics resulting in potentially inaccurate data

### What changes were proposed in this pull request?
This PR aims to fix SortShuffleWriter lacks shuffle write related metrics resulting in potentially inaccurate data.

### Why are the changes needed?
When the shuffle writer is SortShuffleWriter, it does not use SQLShuffleWriteMetricsReporter to update metrics, which causes AQE to obtain runtime statistics and the rowCount obtained is 0.

Some optimization rules rely on rowCount statistics, such as `EliminateLimits`. Because rowCount is 0, it removes the limit operator. At this time, we get data results without limit.

https://github.com/apache/spark/blob/59d5946cfd377e9203ccf572deb34f87fab7510c/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L168-L172

https://github.com/apache/spark/blob/59d5946cfd377e9203ccf572deb34f87fab7510c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L2067-L2070

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
Production environment verification.

**master metrics**
<img width="296" alt="image" src="https://github.com/apache/spark/assets/3898450/dc9b6e8a-93ec-4f59-a903-71aa5b11962c">

**PR metrics**

<img width="276" alt="image" src="https://github.com/apache/spark/assets/3898450/2d73b773-2dcc-4d23-81de-25dcadac86c1">

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#46273 from cxzl25/SPARK-48037.

Authored-by: sychen <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants