Skip to content

Commit

Permalink
[SPARK-27371][CORE] Support GPU-aware resources scheduling in Standalone
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

In this PR, we implements a complete process of GPU-aware resources scheduling
in Standalone. The whole process looks like: Worker sets up isolated resources
when it starts up and registers to master along with its resources. And, Master
picks up usable workers according to driver/executor's resource requirements to
launch driver/executor on them. Then, Worker launches the driver/executor after
preparing resources file, which is created under driver/executor's working directory,
with specified resource addresses(told by master). When driver/executor finished,
their resources could be recycled to worker. Finally, if a worker stops, it
should always release its resources firstly.

For the case of Workers and Drivers in **client** mode run on the same host, we introduce
a config option named `spark.resources.coordinate.enable`(default true) to indicate
whether Spark should coordinate resources for user. If `spark.resources.coordinate.enable=false`, user should be responsible for configuring different resources for Workers and Drivers when use resourcesFile or discovery script. If true, Spark would help user to assign different  resources for Workers and Drivers.

The solution for Spark to coordinate resources among Workers and Drivers is:

Generally, use a shared file named *____allocated_resources____.json* to sync allocated
resources info among Workers and Drivers on the same host.

After a Worker or Driver found all resources using the configured resourcesFile and/or
discovery script during launching, it should filter out available resources by excluding resources already allocated in *____allocated_resources____.json* and acquire resources from available resources according to its own requirement. After that, it should write its allocated resources along with its process id (pid) into *____allocated_resources____.json*.  Pid (proposed by tgravescs) here used to check whether the allocated resources are still valid in case of Worker or Driver crashes and doesn't release resources properly. And when a Worker or Driver finished, normally, it would always clean up its own allocated resources in *____allocated_resources____.json*.

Note that we'll always get a file lock before any access to file *____allocated_resources____.json*
and release the lock finally.

Futhermore, we appended resources info in `WorkerSchedulerStateResponse` to work
around master change behaviour in HA mode.

## How was this patch tested?

Added unit tests in WorkerSuite, MasterSuite, SparkContextSuite.

Manually tested with client/cluster mode (e.g. multiple workers) in a single node Standalone.

Closes apache#25047 from Ngone51/SPARK-27371.

Authored-by: wuyi <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
  • Loading branch information
Ngone51 authored and tgravescs committed Aug 9, 2019
1 parent 5159876 commit cbad616
Show file tree
Hide file tree
Showing 38 changed files with 1,217 additions and 228 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ scalastyle-on-compile.generated.xml
scalastyle-output.xml
scalastyle.txt
spark-*-bin-*.tgz
spark-resources/
spark-tests.log
src_managed/
streaming-tests.log
Expand Down
34 changes: 30 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.deploy.StandaloneResourceUtils._
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -245,6 +246,15 @@ class SparkContext(config: SparkConf) extends Logging {

def isLocal: Boolean = Utils.isLocalMaster(_conf)

private def isClientStandalone: Boolean = {
val isSparkCluster = master match {
case SparkMasterRegex.SPARK_REGEX(_) => true
case SparkMasterRegex.LOCAL_CLUSTER_REGEX(_, _, _) => true
case _ => false
}
deployMode == "client" && isSparkCluster
}

/**
* @return true if context is stopped or in the midst of stopping.
*/
Expand Down Expand Up @@ -380,7 +390,18 @@ class SparkContext(config: SparkConf) extends Logging {
_driverLogger = DriverLogger(_conf)

val resourcesFileOpt = conf.get(DRIVER_RESOURCES_FILE)
_resources = getOrDiscoverAllResources(_conf, SPARK_DRIVER_PREFIX, resourcesFileOpt)
val allResources = getOrDiscoverAllResources(_conf, SPARK_DRIVER_PREFIX, resourcesFileOpt)
_resources = {
// driver submitted in client mode under Standalone may have conflicting resources with
// other drivers/workers on this host. We should sync driver's resources info into
// SPARK_RESOURCES/SPARK_RESOURCES_COORDINATE_DIR/ to avoid collision.
if (isClientStandalone) {
acquireResources(_conf, SPARK_DRIVER_PREFIX, allResources, Utils.getProcessId)
} else {
allResources
}
}
logResourceInfo(SPARK_DRIVER_PREFIX, _resources)

// log out spark.app.name in the Spark driver logs
logInfo(s"Submitted application: $appName")
Expand Down Expand Up @@ -1911,8 +1932,10 @@ class SparkContext(config: SparkConf) extends Logging {
ShutdownHookManager.removeShutdownHook(_shutdownHookRef)
}

Utils.tryLogNonFatalError {
postApplicationEnd()
if (listenerBus != null) {
Utils.tryLogNonFatalError {
postApplicationEnd()
}
}
Utils.tryLogNonFatalError {
_driverLogger.foreach(_.stop())
Expand Down Expand Up @@ -1960,6 +1983,9 @@ class SparkContext(config: SparkConf) extends Logging {
Utils.tryLogNonFatalError {
_progressBar.foreach(_.stop())
}
if (isClientStandalone) {
releaseResources(_conf, SPARK_DRIVER_PREFIX, _resources, Utils.getProcessId)
}
_taskScheduler = null
// TODO: Cache.stop()?
if (_env != null) {
Expand Down Expand Up @@ -2726,7 +2752,7 @@ object SparkContext extends Logging {

// Calculate the max slots each executor can provide based on resources available on each
// executor and resources required by each task.
val taskResourceRequirements = parseTaskResourceRequirements(sc.conf)
val taskResourceRequirements = parseResourceRequirements(sc.conf, SPARK_TASK_PREFIX)
val executorResourcesAndAmounts =
parseAllResourceRequests(sc.conf, SPARK_EXECUTOR_PREFIX)
.map(request => (request.id.resourceName, request.amount)).toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.deploy

import java.net.URI

import org.apache.spark.resource.ResourceRequirement

private[spark] case class ApplicationDescription(
name: String,
maxCores: Option[Int],
Expand All @@ -32,7 +34,8 @@ private[spark] case class ApplicationDescription(
// number of executors this application wants to start with,
// only used if dynamic allocation is enabled
initialExecutorLimit: Option[Int] = None,
user: String = System.getProperty("user.name", "<unknown>")) {
user: String = System.getProperty("user.name", "<unknown>"),
resourceReqsPerExecutor: Seq[ResourceRequirement] = Seq.empty) {

override def toString: String = "ApplicationDescription(" + name + ")"
}
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config.Network.RPC_ASK_TIMEOUT
import org.apache.spark.resource.ResourceUtils
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.util.{SparkExitCode, ThreadUtils, Utils}

Expand Down Expand Up @@ -92,13 +93,15 @@ private class ClientEndpoint(
val command = new Command(mainClass,
Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
sys.env, classPathEntries, libraryPathEntries, javaOpts)

val driverResourceReqs = ResourceUtils.parseResourceRequirements(conf,
config.SPARK_DRIVER_PREFIX)
val driverDescription = new DriverDescription(
driverArgs.jarUrl,
driverArgs.memory,
driverArgs.cores,
driverArgs.supervise,
command)
command,
driverResourceReqs)
asyncSendToMasterAndForwardReply[SubmitDriverResponse](
RequestSubmitDriver(driverDescription))

Expand Down
28 changes: 22 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.deploy.master.RecoveryState.MasterState
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
import org.apache.spark.util.Utils

private[deploy] sealed trait DeployMessage extends Serializable

/** Contains messages sent between Scheduler endpoint nodes. */
private[deploy] object DeployMessages {

// Worker to Master

/**
Expand All @@ -43,6 +43,7 @@ private[deploy] object DeployMessages {
* @param memory the memory size of worker
* @param workerWebUiUrl the worker Web UI address
* @param masterAddress the master address used by the worker to connect
* @param resources the resources of worker
*/
case class RegisterWorker(
id: String,
Expand All @@ -52,7 +53,8 @@ private[deploy] object DeployMessages {
cores: Int,
memory: Int,
workerWebUiUrl: String,
masterAddress: RpcAddress)
masterAddress: RpcAddress,
resources: Map[String, ResourceInformation] = Map.empty)
extends DeployMessage {
Utils.checkHost(host)
assert (port > 0)
Expand All @@ -72,8 +74,18 @@ private[deploy] object DeployMessages {
exception: Option[Exception])
extends DeployMessage

case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription],
driverIds: Seq[String])
case class WorkerExecutorStateResponse(
desc: ExecutorDescription,
resources: Map[String, ResourceInformation])

case class WorkerDriverStateResponse(
driverId: String,
resources: Map[String, ResourceInformation])

case class WorkerSchedulerStateResponse(
id: String,
execResponses: List[WorkerExecutorStateResponse],
driverResponses: Seq[WorkerDriverStateResponse])

/**
* A worker will send this message to the master when it registers with the master. Then the
Expand Down Expand Up @@ -118,10 +130,14 @@ private[deploy] object DeployMessages {
execId: Int,
appDesc: ApplicationDescription,
cores: Int,
memory: Int)
memory: Int,
resources: Map[String, ResourceInformation] = Map.empty)
extends DeployMessage

case class LaunchDriver(driverId: String, driverDesc: DriverDescription) extends DeployMessage
case class LaunchDriver(
driverId: String,
driverDesc: DriverDescription,
resources: Map[String, ResourceInformation] = Map.empty) extends DeployMessage

case class KillDriver(driverId: String) extends DeployMessage

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package org.apache.spark.deploy

import org.apache.spark.resource.ResourceRequirement

private[deploy] case class DriverDescription(
jarUrl: String,
mem: Int,
cores: Int,
supervise: Boolean,
command: Command) {
command: Command,
resourceReqs: Seq[ResourceRequirement] = Seq.empty) {

override def toString: String = s"DriverDescription (${command.mainClass})"
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class LocalSparkCluster(
/* Start the Workers */
for (workerNum <- 1 to numWorkers) {
val workerEnv = Worker.startRpcEnvAndEndpoint(localHostname, 0, 0, coresPerWorker,
memoryPerWorker, masters, null, Some(workerNum), _conf)
memoryPerWorker, masters, null, Some(workerNum), _conf,
conf.get(config.Worker.SPARK_WORKER_RESOURCE_FILE))
workerRpcEnvs += workerEnv
}

Expand Down
Loading

0 comments on commit cbad616

Please sign in to comment.