Skip to content

Commit bdac5cd

Browse files
pan3793dongjoon-hyun
authored andcommitted
[SPARK-54115][CORE][UI] Escalate display ordering priority of connect server operation threads in thread dump page
### What changes were proposed in this pull request? Escalate display ordering priority of connect server execution threads in the thread dump page by defining a custom `threadInfoOrdering`. For connect server runs in local deploy mode, tasks also run in driver, in driver thread dump page, task threads display first, then connect operation threads. For connect server runs in other deploy modes (YARN, K8s, Standalone), in driver thread dump page, connect operation threads display first. ### Why are the changes needed? Currently, Spark executor displays the task threads first on the thread dump page, this does improve the user experience in troubleshooting "task stuck" issues. There are a lot of similar stuck issues on the driver's side too, e.g. driver may be stuck at the HMS/HDFS RPC call on the query planning phase, displaying the connect operation threads at the top on the driver thread dump pages makes the users easy to diagnose driver stuck issues for the Connect server. ### Does this PR introduce _any_ user-facing change? Yes, it affects the live UI thread dump page. ### How was this patch tested? Add UT. Also manually tested. Start a connect server in local mode, and use a client to run some queries. Note, since it runs in local mode, the task also executes at the driver side, the page display task threads first, then connect operation threads. <img width="1561" height="1038" alt="Xnip2025-10-31_18-14-16" src="https://github.com/user-attachments/assets/0c48412a-c6cd-4422-8126-226888ce2c56" /> ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52816 from pan3793/SPARK-54115. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 7df66f9 commit bdac5cd

File tree

4 files changed

+78
-21
lines changed

4 files changed

+78
-21
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import org.slf4j.{MDC => SLF4JMDC}
4040

4141
import org.apache.spark._
4242
import org.apache.spark.deploy.SparkHadoopUtil
43+
import org.apache.spark.executor.Executor.TASK_THREAD_NAME_PREFIX
4344
import org.apache.spark.internal.{Logging, LogKeys}
4445
import org.apache.spark.internal.LogKeys._
4546
import org.apache.spark.internal.config._
@@ -132,7 +133,7 @@ private[spark] class Executor(
132133
private[executor] val threadPool = {
133134
val threadFactory = new ThreadFactoryBuilder()
134135
.setDaemon(true)
135-
.setNameFormat("Executor task launch worker-%d")
136+
.setNameFormat(s"$TASK_THREAD_NAME_PREFIX-%d")
136137
.setThreadFactory((r: Runnable) => new UninterruptibleThread(r, "unused"))
137138
.build()
138139
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
@@ -478,7 +479,7 @@ private[spark] class Executor(
478479

479480
val taskId = taskDescription.taskId
480481
val taskName = taskDescription.name
481-
val threadName = s"Executor task launch worker for $taskName"
482+
val threadName = s"$TASK_THREAD_NAME_PREFIX for $taskName"
482483
val mdcProperties = taskDescription.properties.asScala
483484
.filter(_._1.startsWith("mdc.")).toSeq
484485

@@ -1316,6 +1317,8 @@ private[spark] class Executor(
13161317
}
13171318

13181319
private[spark] object Executor extends Logging {
1320+
val TASK_THREAD_NAME_PREFIX = "Executor task launch worker"
1321+
13191322
// This is reserved for internal use by components that need to read task properties before a
13201323
// task is fully deserialized. When possible, the TaskContext.getLocalProperty call should be
13211324
// used instead.

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ import org.slf4j.Logger
6565

6666
import org.apache.spark.{SPARK_VERSION, _}
6767
import org.apache.spark.deploy.SparkHadoopUtil
68+
import org.apache.spark.executor.Executor.TASK_THREAD_NAME_PREFIX
6869
import org.apache.spark.internal.{Logging, MessageWithContext}
6970
import org.apache.spark.internal.LogKeys
7071
import org.apache.spark.internal.LogKeys._
@@ -2086,27 +2087,39 @@ private[spark] object Utils
20862087
}
20872088
}
20882089

2090+
val CONNECT_EXECUTE_THREAD_PREFIX = "SparkConnectExecuteThread"
2091+
2092+
private val threadInfoOrdering = Ordering.fromLessThan {
2093+
(threadTrace1: ThreadInfo, threadTrace2: ThreadInfo) => {
2094+
def priority(ti: ThreadInfo): Int = ti.getThreadName match {
2095+
case name if name.startsWith(TASK_THREAD_NAME_PREFIX) => 100
2096+
case name if name.startsWith(CONNECT_EXECUTE_THREAD_PREFIX) => 80
2097+
case _ => 0
2098+
}
2099+
2100+
val v1 = priority(threadTrace1)
2101+
val v2 = priority(threadTrace2)
2102+
if (v1 == v2) {
2103+
val name1 = threadTrace1.getThreadName.toLowerCase(Locale.ROOT)
2104+
val name2 = threadTrace2.getThreadName.toLowerCase(Locale.ROOT)
2105+
val nameCmpRes = name1.compareTo(name2)
2106+
if (nameCmpRes == 0) {
2107+
threadTrace1.getThreadId < threadTrace2.getThreadId
2108+
} else {
2109+
nameCmpRes < 0
2110+
}
2111+
} else {
2112+
v1 > v2
2113+
}
2114+
}
2115+
}
2116+
20892117
/** Return a thread dump of all threads' stacktraces. Used to capture dumps for the web UI */
20902118
def getThreadDump(): Array[ThreadStackTrace] = {
20912119
// We need to filter out null values here because dumpAllThreads() may return null array
20922120
// elements for threads that are dead / don't exist.
2093-
val threadInfos = ManagementFactory.getThreadMXBean.dumpAllThreads(true, true).filter(_ != null)
2094-
threadInfos.sortWith { case (threadTrace1, threadTrace2) =>
2095-
val v1 = if (threadTrace1.getThreadName.contains("Executor task launch")) 1 else 0
2096-
val v2 = if (threadTrace2.getThreadName.contains("Executor task launch")) 1 else 0
2097-
if (v1 == v2) {
2098-
val name1 = threadTrace1.getThreadName().toLowerCase(Locale.ROOT)
2099-
val name2 = threadTrace2.getThreadName().toLowerCase(Locale.ROOT)
2100-
val nameCmpRes = name1.compareTo(name2)
2101-
if (nameCmpRes == 0) {
2102-
threadTrace1.getThreadId < threadTrace2.getThreadId
2103-
} else {
2104-
nameCmpRes < 0
2105-
}
2106-
} else {
2107-
v1 > v2
2108-
}
2109-
}.map(threadInfoToThreadStackTrace)
2121+
ManagementFactory.getThreadMXBean.dumpAllThreads(true, true).filter(_ != null)
2122+
.sorted(threadInfoOrdering).map(threadInfoToThreadStackTrace)
21102123
}
21112124

21122125
/** Return a heap dump. Used to capture dumps for the web UI */

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.util
1919

2020
import java.io._
21+
import java.lang.management.ThreadInfo
2122
import java.lang.reflect.Field
2223
import java.net.{BindException, ServerSocket, URI}
2324
import java.nio.{ByteBuffer, ByteOrder}
@@ -37,6 +38,9 @@ import org.apache.hadoop.fs.Path
3738
import org.apache.hadoop.fs.audit.CommonAuditContext.currentAuditContext
3839
import org.apache.hadoop.ipc.{CallerContext => HadoopCallerContext}
3940
import org.apache.logging.log4j.Level
41+
import org.mockito.Mockito.doReturn
42+
import org.scalatest.PrivateMethodTester
43+
import org.scalatestplus.mockito.MockitoSugar.mock
4044

4145
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, TaskContext}
4246
import org.apache.spark.internal.config._
@@ -47,7 +51,7 @@ import org.apache.spark.scheduler.SparkListener
4751
import org.apache.spark.util.collection.Utils.createArray
4852
import org.apache.spark.util.io.ChunkedByteBufferInputStream
4953

50-
class UtilsSuite extends SparkFunSuite with ResetSystemProperties {
54+
class UtilsSuite extends SparkFunSuite with ResetSystemProperties with PrivateMethodTester {
5155

5256
test("timeConversion") {
5357
// Test -1
@@ -1126,6 +1130,42 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties {
11261130
assert(pValue > threshold)
11271131
}
11281132

1133+
test("ThreadInfoOrdering") {
1134+
val task1T = mock[ThreadInfo]
1135+
doReturn(11L).when(task1T).getThreadId
1136+
doReturn("Executor task launch worker for task 1.0 in stage 1.0 (TID 11)")
1137+
.when(task1T).getThreadName
1138+
doReturn("Executor task launch worker for task 1.0 in stage 1.0 (TID 11)")
1139+
.when(task1T).toString
1140+
1141+
val task2T = mock[ThreadInfo]
1142+
doReturn(12L).when(task2T).getThreadId
1143+
doReturn("Executor task launch worker for task 2.0 in stage 1.0 (TID 22)")
1144+
.when(task2T).getThreadName
1145+
doReturn("Executor task launch worker for task 2.0 in stage 1.0 (TID 22)")
1146+
.when(task2T).toString
1147+
1148+
val connectExecuteOp1T = mock[ThreadInfo]
1149+
doReturn(21L).when(connectExecuteOp1T).getThreadId
1150+
doReturn("SparkConnectExecuteThread_opId=16148fb4-4189-43c3-b8d4-8b3b6ddd41c7")
1151+
.when(connectExecuteOp1T).getThreadName
1152+
doReturn("SparkConnectExecuteThread_opId=16148fb4-4189-43c3-b8d4-8b3b6ddd41c7")
1153+
.when(connectExecuteOp1T).toString
1154+
1155+
val connectExecuteOp2T = mock[ThreadInfo]
1156+
doReturn(22L).when(connectExecuteOp2T).getThreadId
1157+
doReturn("SparkConnectExecuteThread_opId=4e4d1cac-ffde-46c1-b7c2-808b726cb47e")
1158+
.when(connectExecuteOp2T).getThreadName
1159+
doReturn("SparkConnectExecuteThread_opId=4e4d1cac-ffde-46c1-b7c2-808b726cb47e")
1160+
.when(connectExecuteOp2T).toString
1161+
1162+
val threadInfoOrderingMethod =
1163+
PrivateMethod[Ordering[ThreadInfo]](Symbol("threadInfoOrdering"))
1164+
val sorted = Seq(connectExecuteOp1T, connectExecuteOp2T, task1T, task2T)
1165+
.sorted(Utils.invokePrivate(threadInfoOrderingMethod()))
1166+
assert(sorted === Seq(task1T, task2T, connectExecuteOp1T, connectExecuteOp2T))
1167+
}
1168+
11291169
test("redact sensitive information") {
11301170
val sparkConf = new SparkConf
11311171

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.spark.sql.connect.service.{ExecuteHolder, ExecuteSessionTag, S
3333
import org.apache.spark.sql.connect.utils.ErrorUtils
3434
import org.apache.spark.sql.types.DataType
3535
import org.apache.spark.util.Utils
36+
import org.apache.spark.util.Utils.CONNECT_EXECUTE_THREAD_PREFIX
3637

3738
/**
3839
* This class launches the actual execution in an execution thread. The execution pushes the
@@ -329,7 +330,7 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends
329330
}
330331

331332
private class ExecutionThread()
332-
extends Thread(s"SparkConnectExecuteThread_opId=${executeHolder.operationId}") {
333+
extends Thread(s"${CONNECT_EXECUTE_THREAD_PREFIX}_opId=${executeHolder.operationId}") {
333334
override def run(): Unit = execute()
334335
}
335336
}

0 commit comments

Comments
 (0)