-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48037][CORE] Fix SortShuffleWriter lacks shuffle write related metrics resulting in potentially inaccurate data #46273
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -2502,6 +2502,26 @@ class AdaptiveQueryExecSuite | |||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
test("SPARK-48037: Fix SortShuffleWriter lacks shuffle write related metrics " + | ||||||||||||||||||||||||||
"resulting in potentially inaccurate data") { | ||||||||||||||||||||||||||
withTable("t3") { | ||||||||||||||||||||||||||
withSQLConf( | ||||||||||||||||||||||||||
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", | ||||||||||||||||||||||||||
SQLConf.SHUFFLE_PARTITIONS.key -> "16777217") { | ||||||||||||||||||||||||||
cxzl25 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||
sql("CREATE TABLE t3 USING PARQUET AS SELECT id FROM range(2)") | ||||||||||||||||||||||||||
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( | ||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||
|SELECT id, count(*) | ||||||||||||||||||||||||||
|FROM t3 | ||||||||||||||||||||||||||
|GROUP BY id | ||||||||||||||||||||||||||
|LIMIT 1 | ||||||||||||||||||||||||||
|""".stripMargin, skipCheckAnswer = true) | ||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we have to skip checking the answer? I think the query can be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because AQE is disabled when checking the results, it will have many partitions, and the order is not guaranteed at this time. When AQE is enabled, it will be merged into one partition. set spark.sql.adaptive.enabled=false;
set spark.sql.shuffle.partitions=1000;
create table foo as select id from range(2);
select id, count(*) from foo group by id limit 1; output
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AFAIK the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It does sort, but the final result is sorted locally. Because the result of the limit is uncertain, the local sort has no effect. spark/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala Lines 282 to 293 in 08c6bb9
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can make it certain if the limit is larger than the number of result rows? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this test case, because |
||||||||||||||||||||||||||
assert(findTopLevelLimit(plan).size == 1) | ||||||||||||||||||||||||||
cxzl25 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||
assert(findTopLevelLimit(adaptivePlan).size == 1) | ||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, this only verifies if there is specific operator (i.e., There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I see. It is due to AQE usage of runtime metrics. |
||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
test("SPARK-37063: OptimizeSkewInRebalancePartitions support optimize non-root node") { | ||||||||||||||||||||||||||
withTempView("v") { | ||||||||||||||||||||||||||
withSQLConf( | ||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, the
metrics
given atgetWriter
looks like also coming fromcontext.taskMetrics().shuffleWriteMetrics
, isn't it looking the same?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be
SQLShuffleWriteMetricsReporter
, which may not be the same ascontext.taskMetrics().shuffleWriteMetrics
.spark/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala
Lines 51 to 56 in 9e12bd7
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
Lines 449 to 455 in 9e12bd7