Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add JobLogger to Spark #573

Closed
wants to merge 15 commits into from
1 change: 1 addition & 0 deletions core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
itr.setDelegate(blockFetcherItr)
CompletionIterator[(K,V), Iterator[(K,V)]](itr, {
val shuffleMetrics = new ShuffleReadMetrics
shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
shuffleMetrics.shuffleReadMillis = itr.getNetMillis
shuffleMetrics.remoteFetchTime = itr.remoteFetchTime
shuffleMetrics.fetchWaitTime = itr.fetchWaitTime
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/scala/spark/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ abstract class RDD[T: ClassManifest](
name = _name
this
}

/**generator of this RDD*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the comment to:

/** User-defined generator of this RDD. */

var generator = Utils.getSparkCallSite(true)

/**reset generator*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this used anywhere else. What is the purpose of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just like setName above, this is used to set generator intentionally

def setGenerator(_generator: String) = {
generator = _generator
}

/**
* Set this RDD's storage level to persist its values across operations after the first time
Expand Down Expand Up @@ -749,7 +757,7 @@ abstract class RDD[T: ClassManifest](
private var storageLevel: StorageLevel = StorageLevel.NONE

/** Record user function generating this RDD. */
private[spark] val origin = Utils.getSparkCallSite
private[spark] val origin = Utils.getSparkCallSite()

private[spark] def elementClassManifest: ClassManifest[T] = classManifest[T]

Expand Down
11 changes: 7 additions & 4 deletions core/src/main/scala/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend
import spark.storage.BlockManagerUI
import spark.util.{MetadataCleaner, TimeStampedHashMap}
import spark.storage.{StorageStatus, StorageUtils, RDDInfo}

import scala.util.DynamicVariable
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
Expand All @@ -65,6 +65,8 @@ class SparkContext(
// Ensure logging is initialized before we spawn any threads
initLogging()

val addInfo = new DynamicVariable[String]("")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't see this set anywhere. What does it do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is used to pass some additional information from outside, for example query plan from Shark.
it can be replaced by localProperties in SparkContext adding by FairScheduler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to have a docstring for this and also to maybe give it a different name. How about this:

/* Allows higher layer frameworks to describe the context of a job. */
val annotation = new DynamicVariableString


// Set Spark driver host and port system properties
if (System.getProperty("spark.driver.host") == null) {
System.setProperty("spark.driver.host", Utils.localIpAddress)
Expand Down Expand Up @@ -465,6 +467,7 @@ class SparkContext(
dagScheduler.sparkListeners += listener
}

// SparkListener(this)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this (?)

/**
* Return a map from the slave to the max memory available for caching and the remaining
* memory available for caching.
Expand Down Expand Up @@ -575,10 +578,10 @@ class SparkContext(
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
val callSite = Utils.getSparkCallSite
val callSite = Utils.getSparkCallSite()
logInfo("Starting job: " + callSite)
val start = System.nanoTime
val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler)
val result = dagScheduler.runJob(rdd, func, partitions, callSite + "|" + addInfo.value.toString, allowLocal, resultHandler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is over 100 characters

logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
rdd.doCheckpoint()
result
Expand Down Expand Up @@ -659,7 +662,7 @@ class SparkContext(
evaluator: ApproximateEvaluator[U, R],
timeout: Long
): PartialResult[R] = {
val callSite = Utils.getSparkCallSite
val callSite = Utils.getSparkCallSite()
logInfo("Starting job: " + callSite)
val start = System.nanoTime
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout)
Expand Down
9 changes: 7 additions & 2 deletions core/src/main/scala/spark/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ private object Utils extends Logging {
* (outside the spark package) that called into Spark, as well as which Spark method they called.
* This is used, for example, to tell users where in their code each RDD got created.
*/
def getSparkCallSite: String = {
def getSparkCallSite(getfirstUserClass: Boolean = false): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A better way to refactor this would be to have two functions:
getCallSiteInfo() => returns (lastSparkMethod, firstUserFile, firstUserLine, firstUserClass)

formatSparkCallSite() => calls getCallSiteInfo then returns the formatted string

you can replace the earlier references to getSparkCallSite with formatSparkCallSite, then for the new cases you added here, you can just call getCallSiteInfo and you only use the firstUserClass and ignore the other stuff.

val trace = Thread.currentThread.getStackTrace().filter( el =>
(!el.getMethodName.contains("getStackTrace")))

Expand All @@ -413,6 +413,7 @@ private object Utils extends Logging {
var firstUserFile = "<unknown>"
var firstUserLine = 0
var finished = false
var firstUserClass = "<unknown>"

for (el <- trace) {
if (!finished) {
Expand All @@ -427,11 +428,15 @@ private object Utils extends Logging {
else {
firstUserLine = el.getLineNumber
firstUserFile = el.getFileName
firstUserClass = el.getClassName
finished = true
}
}
}
"%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine)
if(getfirstUserClass)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put a space before first paran:
if (getFirstUserClass)

firstUserClass
else
"%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine)
}

/**
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ object TaskMetrics {


class ShuffleReadMetrics extends Serializable {

var shuffleFinishTime: Long = _
/**
* Total number of blocks fetched in a shuffle (remote or local)
*/
var totalBlocksFetched : Int = _
var totalBlocksFetched: Int = _

/**
* Number of remote blocks fetched in a shuffle
Expand Down
17 changes: 15 additions & 2 deletions core/src/main/scala/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,17 @@ class DAGScheduler(
private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = {
event match {
case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to add an annotation field to JobSubmitted as well, (of type Option[String]) rather than using string parsing for this.

val callSites = callSite.split("\\|",2)
var jobAddInfo: String = ""
if (callSites.size == 2) {
jobAddInfo = callSites(1)
}
val runId = nextRunId.getAndIncrement()
val finalStage = newStage(finalRDD, None, runId)
val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener)
val job = new ActiveJob(runId, finalStage, func, partitions, callSites(0), listener)
clearCacheLocs()
logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length +
sparkListeners.foreach{_.onJobStart(job, jobAddInfo)}
logInfo("Got job " + job.runId + " (" + callSites(0) + ") with " + partitions.length +
" output partitions (allowLocal=" + allowLocal + ")")
logInfo("Final stage: " + finalStage + " (" + finalStage.origin + ")")
logInfo("Parents of final stage: " + finalStage.parents)
Expand All @@ -297,6 +303,7 @@ class DAGScheduler(
handleExecutorLost(execId)

case completion: CompletionEvent =>
sparkListeners.foreach{_.onTaskEnd(completion)}
handleTaskCompletion(completion)

case TaskSetFailed(taskSet, reason) =>
Expand All @@ -307,6 +314,8 @@ class DAGScheduler(
for (job <- activeJobs) {
val error = new SparkException("Job cancelled because SparkContext was shut down")
job.listener.jobFailed(error)
val JobCancelEvent = new SparkListenerJobCancelled("SPARKCONTEXT_SHUTDOWN")
sparkListeners.foreach{_.onJobEnd(job, JobCancelEvent)}
}
return true
}
Expand Down Expand Up @@ -454,6 +463,7 @@ class DAGScheduler(
}
}
if (tasks.size > 0) {
sparkListeners.foreach{_.onStageSubmitted(stage, "TASKS_SIZE=" + tasks.size)}
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
myPending ++= tasks
logDebug("New pending tasks: " + myPending)
Expand Down Expand Up @@ -507,6 +517,7 @@ class DAGScheduler(
activeJobs -= job
resultStageToJob -= stage
markStageAsFinished(stage)
sparkListeners.foreach{_.onJobEnd(job, SparkListenerJobSuccess)}
}
job.listener.taskSucceeded(rt.outputId, event.result)
}
Expand Down Expand Up @@ -642,6 +653,8 @@ class DAGScheduler(
job.listener.jobFailed(new SparkException("Job failed: " + reason))
activeJobs -= job
resultStageToJob -= resultStage
val jobFailedEvent = new SparkListenerJobFailed(failedStage)
sparkListeners.foreach{_.onJobEnd(job, jobFailedEvent)}
}
if (dependentStages.isEmpty) {
logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done")
Expand Down
Loading