Skip to content

Commit

Permalink
[SPARK-23361][YARN] Allow AM to restart after initial tokens expire.
Browse files Browse the repository at this point in the history
Currently, the Spark AM relies on the initial set of tokens created by
the submission client to be able to talk to HDFS and other services that
require delegation tokens. This means that after those tokens expire, a
new AM will fail to start (e.g. when there is an application failure and
re-attempts are enabled).

This PR makes it so that the first thing the AM does when the user provides
a principal and keytab is to create new delegation tokens for use. This
makes sure that the AM can be started irrespective of how old the original
token set is. It also allows all of the token management to be done by the
AM - there is no need for the submission client to set configuration values
to tell the AM when to renew tokens.

Note that even though in this case the AM will not be using the delegation
tokens created by the submission client, those tokens still need to be provided
to YARN, since they are used to do log aggregation.

To be able to re-use the code in the AMCredentialRenewal for the above
purposes, I refactored that class a bit so that it can fetch tokens into
a pre-defined UGI, insted of always logging in.

Another issue with re-attempts is that, after the fix that allows the AM
to restart correctly, new executors would get confused about when to
update credentials, because the credential updater used the update time
initially set up by the submission code. This could make the executor
fail to update credentials in time, since that value would be very out
of date in the situation described in the bug.

To fix that, I changed the YARN code to use the new RPC-based mechanism
for distributing tokens to executors. This allowed the old credential
updater code to be removed, and a lot of code in the renewer to be
simplified.

I also made two currently hardcoded values (the renewal time ratio, and
the retry wait) configurable; while this probably never needs to be set
by anyone in a production environment, it helps with testing; that's also
why they're not documented.

Tested on real cluster with a specially crafted application to test this
functionality: checked proper access to HDFS, Hive and HBase in cluster
mode with token renewal on and AM restarts. Tested things still work in
client mode too.

Author: Marcelo Vanzin <[email protected]>

Closes apache#20657 from vanzin/SPARK-23361.
  • Loading branch information
Marcelo Vanzin authored and jerryshao committed Mar 23, 2018
1 parent b2edc30 commit 5fa4384
Show file tree
Hide file tree
Showing 16 changed files with 238 additions and 565 deletions.
12 changes: 7 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -603,13 +603,15 @@ private[spark] object SparkConf extends Logging {
"Please use spark.kryoserializer.buffer instead. The default value for " +
"spark.kryoserializer.buffer.mb was previously specified as '0.064'. Fractional values " +
"are no longer accepted. To specify the equivalent now, one may use '64k'."),
DeprecatedConfig("spark.rpc", "2.0", "Not used any more."),
DeprecatedConfig("spark.rpc", "2.0", "Not used anymore."),
DeprecatedConfig("spark.scheduler.executorTaskBlacklistTime", "2.1.0",
"Please use the new blacklisting options, spark.blacklist.*"),
DeprecatedConfig("spark.yarn.am.port", "2.0.0", "Not used any more"),
DeprecatedConfig("spark.executor.port", "2.0.0", "Not used any more"),
DeprecatedConfig("spark.yarn.am.port", "2.0.0", "Not used anymore"),
DeprecatedConfig("spark.executor.port", "2.0.0", "Not used anymore"),
DeprecatedConfig("spark.shuffle.service.index.cache.entries", "2.3.0",
"Not used any more. Please use spark.shuffle.service.index.cache.size")
"Not used anymore. Please use spark.shuffle.service.index.cache.size"),
DeprecatedConfig("spark.yarn.credentials.file.retention.count", "2.4.0", "Not used anymore."),
DeprecatedConfig("spark.yarn.credentials.file.retention.days", "2.4.0", "Not used anymore.")
)

Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
Expand Down Expand Up @@ -748,7 +750,7 @@ private[spark] object SparkConf extends Logging {
}
if (key.startsWith("spark.akka") || key.startsWith("spark.ssl.akka")) {
logWarning(
s"The configuration key $key is not supported any more " +
s"The configuration key $key is not supported anymore " +
s"because Spark doesn't use Akka since 2.0")
}
}
Expand Down
32 changes: 11 additions & 21 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdenti
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -146,7 +147,8 @@ class SparkHadoopUtil extends Logging {
private[spark] def addDelegationTokens(tokens: Array[Byte], sparkConf: SparkConf) {
UserGroupInformation.setConfiguration(newConfiguration(sparkConf))
val creds = deserialize(tokens)
logInfo(s"Adding/updating delegation tokens ${dumpTokens(creds)}")
logInfo("Updating delegation tokens for current user.")
logDebug(s"Adding/updating delegation tokens ${dumpTokens(creds)}")
addCurrentUserCredentials(creds)
}

Expand Down Expand Up @@ -321,19 +323,6 @@ class SparkHadoopUtil extends Logging {
}
}

/**
* Return a fresh Hadoop configuration, bypassing the HDFS cache mechanism.
* This is to prevent the DFSClient from using an old cached token to connect to the NameNode.
*/
private[spark] def getConfBypassingFSCache(
hadoopConf: Configuration,
scheme: String): Configuration = {
val newConf = new Configuration(hadoopConf)
val confKey = s"fs.${scheme}.impl.disable.cache"
newConf.setBoolean(confKey, true)
newConf
}

/**
* Dump the credentials' tokens to string values.
*
Expand Down Expand Up @@ -447,16 +436,17 @@ object SparkHadoopUtil {
def get: SparkHadoopUtil = instance

/**
* Given an expiration date (e.g. for Hadoop Delegation Tokens) return a the date
* when a given fraction of the duration until the expiration date has passed.
* Formula: current time + (fraction * (time until expiration))
* Given an expiration date for the current set of credentials, calculate the time when new
* credentials should be created.
*
* @param expirationDate Drop-dead expiration date
* @param fraction fraction of the time until expiration return
* @return Date when the fraction of the time until expiration has passed
* @param conf Spark configuration
* @return Timestamp when new credentials should be created.
*/
private[spark] def getDateOfNextUpdate(expirationDate: Long, fraction: Double): Long = {
private[spark] def nextCredentialRenewalTime(expirationDate: Long, conf: SparkConf): Long = {
val ct = System.currentTimeMillis
(ct + (fraction * (expirationDate - ct))).toLong
val ratio = conf.get(CREDENTIALS_RENEWAL_INTERVAL_RATIO)
(ct + (ratio * (expirationDate - ct))).toLong
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
driverConf.set(key, value)
}
}
if (driverConf.contains("spark.yarn.credentials.file")) {
logInfo("Will periodically update credentials from: " +
driverConf.get("spark.yarn.credentials.file"))
Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
.getMethod("startCredentialUpdater", classOf[SparkConf])
.invoke(null, driverConf)
}

cfg.hadoopDelegationCreds.foreach { tokens =>
SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf)
Expand All @@ -234,11 +227,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
env.rpcEnv.awaitTermination()
if (driverConf.contains("spark.yarn.credentials.file")) {
Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
.getMethod("stopCredentialUpdater")
.invoke(null)
}
}
}

Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -525,4 +525,16 @@ package object config {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1g")

private[spark] val CREDENTIALS_RENEWAL_INTERVAL_RATIO =
ConfigBuilder("spark.security.credentials.renewalRatio")
.doc("Ratio of the credential's expiration time when Spark should fetch new credentials.")
.doubleConf
.createWithDefault(0.75d)

private[spark] val CREDENTIALS_RENEWAL_RETRY_WAIT =
ConfigBuilder("spark.security.credentials.retryWait")
.doc("How long to wait before retrying to fetch new credentials after a failure.")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("1h")

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.ThreadUtils


Expand Down Expand Up @@ -63,7 +64,7 @@ private[spark] class MesosHadoopDelegationTokenManager(
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}")
(SparkHadoopUtil.get.serialize(creds), SparkHadoopUtil.getDateOfNextUpdate(rt, 0.75))
(SparkHadoopUtil.get.serialize(creds), SparkHadoopUtil.nextCredentialRenewalTime(rt, conf))
} catch {
case e: Exception =>
logError(s"Failed to fetch Hadoop delegation tokens $e")
Expand Down Expand Up @@ -104,8 +105,10 @@ private[spark] class MesosHadoopDelegationTokenManager(
} catch {
case e: Exception =>
// Log the error and try to write new tokens back in an hour
logWarning("Couldn't broadcast tokens, trying again in an hour", e)
credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS)
val delay = TimeUnit.SECONDS.toMillis(conf.get(config.CREDENTIALS_RENEWAL_RETRY_WAIT))
logWarning(
s"Couldn't broadcast tokens, trying again in ${UIUtils.formatDuration(delay)}", e)
credentialRenewerThread.schedule(this, delay, TimeUnit.MILLISECONDS)
return
}
scheduleRenewal(this)
Expand Down Expand Up @@ -135,7 +138,7 @@ private[spark] class MesosHadoopDelegationTokenManager(
"related configurations in the target services.")
currTime
} else {
SparkHadoopUtil.getDateOfNextUpdate(nextRenewalTime, 0.75)
SparkHadoopUtil.nextCredentialRenewalTime(nextRenewalTime, conf)
}
logInfo(s"Time of next renewal is in ${timeOfNextRenewal - System.currentTimeMillis()} ms")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import scala.concurrent.duration.Duration
import scala.util.control.NonFatal

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.StringUtils
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
Expand All @@ -41,7 +40,7 @@ import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.deploy.yarn.security.{AMCredentialRenewer, YARNHadoopDelegationTokenManager}
import org.apache.spark.deploy.yarn.security.AMCredentialRenewer
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.rpc._
Expand Down Expand Up @@ -79,42 +78,43 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends

private val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))

private val ugi = {
val original = UserGroupInformation.getCurrentUser()

// If a principal and keytab were provided, log in to kerberos, and set up a thread to
// renew the kerberos ticket when needed. Because the UGI API does not expose the TTL
// of the TGT, use a configuration to define how often to check that a relogin is necessary.
// checkTGTAndReloginFromKeytab() is a no-op if the relogin is not yet needed.
val principal = sparkConf.get(PRINCIPAL).orNull
val keytab = sparkConf.get(KEYTAB).orNull
if (principal != null && keytab != null) {
UserGroupInformation.loginUserFromKeytab(principal, keytab)

val renewer = new Thread() {
override def run(): Unit = Utils.tryLogNonFatalError {
while (true) {
TimeUnit.SECONDS.sleep(sparkConf.get(KERBEROS_RELOGIN_PERIOD))
UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab()
}
}
private val userClassLoader = {
val classpath = Client.getUserClasspath(sparkConf)
val urls = classpath.map { entry =>
new URL("file:" + new File(entry.getPath()).getAbsolutePath())
}

if (isClusterMode) {
if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
} else {
new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
}
renewer.setName("am-kerberos-renewer")
renewer.setDaemon(true)
renewer.start()

// Transfer the original user's tokens to the new user, since that's needed to connect to
// YARN. It also copies over any delegation tokens that might have been created by the
// client, which will then be transferred over when starting executors (until new ones
// are created by the periodic task).
val newUser = UserGroupInformation.getCurrentUser()
SparkHadoopUtil.get.transferCredentials(original, newUser)
newUser
} else {
SparkHadoopUtil.get.createSparkUser()
new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
}
}

private val credentialRenewer: Option[AMCredentialRenewer] = sparkConf.get(KEYTAB).map { _ =>
new AMCredentialRenewer(sparkConf, yarnConf)
}

private val ugi = credentialRenewer match {
case Some(cr) =>
// Set the context class loader so that the token renewer has access to jars distributed
// by the user.
val currentLoader = Thread.currentThread().getContextClassLoader()
Thread.currentThread().setContextClassLoader(userClassLoader)
try {
cr.start()
} finally {
Thread.currentThread().setContextClassLoader(currentLoader)
}

case _ =>
SparkHadoopUtil.get.createSparkUser()
}

private val client = doAsUser { new YarnRMClient() }

// Default to twice the number of executors (twice the maximum number of executors if dynamic
Expand Down Expand Up @@ -148,23 +148,6 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
// A flag to check whether user has initialized spark context
@volatile private var registered = false

private val userClassLoader = {
val classpath = Client.getUserClasspath(sparkConf)
val urls = classpath.map { entry =>
new URL("file:" + new File(entry.getPath()).getAbsolutePath())
}

if (isClusterMode) {
if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
} else {
new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
}
} else {
new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
}
}

// Lock for controlling the allocator (heartbeat) thread.
private val allocatorLock = new Object()

Expand All @@ -189,8 +172,6 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
// In cluster mode, used to tell the AM when the user's SparkContext has been initialized.
private val sparkContextPromise = Promise[SparkContext]()

private var credentialRenewer: AMCredentialRenewer = _

// Load the list of localized files set by the client. This is used when launching executors,
// and is loaded here so that these configs don't pollute the Web UI's environment page in
// cluster mode.
Expand Down Expand Up @@ -316,31 +297,6 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
}
}

// If the credentials file config is present, we must periodically renew tokens. So create
// a new AMDelegationTokenRenewer
if (sparkConf.contains(CREDENTIALS_FILE_PATH)) {
// Start a short-lived thread for AMCredentialRenewer, the only purpose is to set the
// classloader so that main jar and secondary jars could be used by AMCredentialRenewer.
val credentialRenewerThread = new Thread {
setName("AMCredentialRenewerStarter")
setContextClassLoader(userClassLoader)

override def run(): Unit = {
val credentialManager = new YARNHadoopDelegationTokenManager(
sparkConf,
yarnConf,
conf => YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, conf))

val credentialRenewer =
new AMCredentialRenewer(sparkConf, yarnConf, credentialManager)
credentialRenewer.scheduleLoginFromKeytab()
}
}

credentialRenewerThread.start()
credentialRenewerThread.join()
}

if (isClusterMode) {
runDriver()
} else {
Expand Down Expand Up @@ -409,9 +365,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
logDebug("shutting down user thread")
userClassThread.interrupt()
}
if (!inShutdown && credentialRenewer != null) {
credentialRenewer.stop()
credentialRenewer = null
if (!inShutdown) {
credentialRenewer.foreach(_.stop())
}
}
}
Expand Down Expand Up @@ -468,6 +423,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
securityMgr,
localResources)

credentialRenewer.foreach(_.setDriverRef(driverRef))

// Initialize the AM endpoint *after* the allocator has been initialized. This ensures
// that when the driver sends an initial executor request (e.g. after an AM restart),
// the allocator is ready to service requests.
Expand Down
Loading

0 comments on commit 5fa4384

Please sign in to comment.