-
Notifications
You must be signed in to change notification settings - Fork 385
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add JobLogger to Spark #573
Changes from 14 commits
75be4a4
f8ac7c4
3c80ece
f7bef28
de891e6
f14a706
cbf0031
ce2d8e3
859aed7
1c7d721
04e2b0f
b599c4b
cb8037b
42f352f
157141c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -114,6 +114,14 @@ abstract class RDD[T: ClassManifest]( | |
name = _name | ||
this | ||
} | ||
|
||
/**generator of this RDD*/ | ||
var generator = Utils.getSparkCallSite(true) | ||
|
||
/**reset generator*/ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see this used anywhere else. What is the purpose of this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just like setName above, this is used to set generator intentionally |
||
def setGenerator(_generator: String) = { | ||
generator = _generator | ||
} | ||
|
||
/** | ||
* Set this RDD's storage level to persist its values across operations after the first time | ||
|
@@ -749,7 +757,7 @@ abstract class RDD[T: ClassManifest]( | |
private var storageLevel: StorageLevel = StorageLevel.NONE | ||
|
||
/** Record user function generating this RDD. */ | ||
private[spark] val origin = Utils.getSparkCallSite | ||
private[spark] val origin = Utils.getSparkCallSite() | ||
|
||
private[spark] def elementClassManifest: ClassManifest[T] = classManifest[T] | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,7 +42,7 @@ import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend | |
import spark.storage.BlockManagerUI | ||
import spark.util.{MetadataCleaner, TimeStampedHashMap} | ||
import spark.storage.{StorageStatus, StorageUtils, RDDInfo} | ||
|
||
import scala.util.DynamicVariable | ||
/** | ||
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark | ||
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. | ||
|
@@ -65,6 +65,8 @@ class SparkContext( | |
// Ensure logging is initialized before we spawn any threads | ||
initLogging() | ||
|
||
val addInfo = new DynamicVariable[String]("") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't see this set anywhere. What does it do? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is used to pass some additional information from outside, for example query plan from Shark. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be good to have a docstring for this and also to maybe give it a different name. How about this: /* Allows higher layer frameworks to describe the context of a job. */ |
||
|
||
// Set Spark driver host and port system properties | ||
if (System.getProperty("spark.driver.host") == null) { | ||
System.setProperty("spark.driver.host", Utils.localIpAddress) | ||
|
@@ -465,6 +467,7 @@ class SparkContext( | |
dagScheduler.sparkListeners += listener | ||
} | ||
|
||
// SparkListener(this) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove this (?) |
||
/** | ||
* Return a map from the slave to the max memory available for caching and the remaining | ||
* memory available for caching. | ||
|
@@ -575,10 +578,10 @@ class SparkContext( | |
partitions: Seq[Int], | ||
allowLocal: Boolean, | ||
resultHandler: (Int, U) => Unit) { | ||
val callSite = Utils.getSparkCallSite | ||
val callSite = Utils.getSparkCallSite() | ||
logInfo("Starting job: " + callSite) | ||
val start = System.nanoTime | ||
val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler) | ||
val result = dagScheduler.runJob(rdd, func, partitions, callSite + "|" + addInfo.value.toString, allowLocal, resultHandler) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line is over 100 characters |
||
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") | ||
rdd.doCheckpoint() | ||
result | ||
|
@@ -659,7 +662,7 @@ class SparkContext( | |
evaluator: ApproximateEvaluator[U, R], | ||
timeout: Long | ||
): PartialResult[R] = { | ||
val callSite = Utils.getSparkCallSite | ||
val callSite = Utils.getSparkCallSite() | ||
logInfo("Starting job: " + callSite) | ||
val start = System.nanoTime | ||
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -401,7 +401,7 @@ private object Utils extends Logging { | |
* (outside the spark package) that called into Spark, as well as which Spark method they called. | ||
* This is used, for example, to tell users where in their code each RDD got created. | ||
*/ | ||
def getSparkCallSite: String = { | ||
def getSparkCallSite(getfirstUserClass: Boolean = false): String = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A better way to refactor this would be to have two functions: formatSparkCallSite() => calls getCallSiteInfo then returns the formatted string you can replace the earlier references to getSparkCallSite with formatSparkCallSite, then for the new cases you added here, you can just call getCallSiteInfo and you only use the firstUserClass and ignore the other stuff. |
||
val trace = Thread.currentThread.getStackTrace().filter( el => | ||
(!el.getMethodName.contains("getStackTrace"))) | ||
|
||
|
@@ -413,6 +413,7 @@ private object Utils extends Logging { | |
var firstUserFile = "<unknown>" | ||
var firstUserLine = 0 | ||
var finished = false | ||
var firstUserClass = "<unknown>" | ||
|
||
for (el <- trace) { | ||
if (!finished) { | ||
|
@@ -427,11 +428,15 @@ private object Utils extends Logging { | |
else { | ||
firstUserLine = el.getLineNumber | ||
firstUserFile = el.getFileName | ||
firstUserClass = el.getClassName | ||
finished = true | ||
} | ||
} | ||
} | ||
"%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine) | ||
if(getfirstUserClass) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. put a space before first paran: |
||
firstUserClass | ||
else | ||
"%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine) | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -275,11 +275,17 @@ class DAGScheduler( | |
private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = { | ||
event match { | ||
case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener) => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be good to add an annotation field to JobSubmitted as well, (of type Option[String]) rather than using string parsing for this. |
||
val callSites = callSite.split("\\|",2) | ||
var jobAddInfo: String = "" | ||
if (callSites.size == 2) { | ||
jobAddInfo = callSites(1) | ||
} | ||
val runId = nextRunId.getAndIncrement() | ||
val finalStage = newStage(finalRDD, None, runId) | ||
val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener) | ||
val job = new ActiveJob(runId, finalStage, func, partitions, callSites(0), listener) | ||
clearCacheLocs() | ||
logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length + | ||
sparkListeners.foreach{_.onJobStart(job, jobAddInfo)} | ||
logInfo("Got job " + job.runId + " (" + callSites(0) + ") with " + partitions.length + | ||
" output partitions (allowLocal=" + allowLocal + ")") | ||
logInfo("Final stage: " + finalStage + " (" + finalStage.origin + ")") | ||
logInfo("Parents of final stage: " + finalStage.parents) | ||
|
@@ -297,6 +303,7 @@ class DAGScheduler( | |
handleExecutorLost(execId) | ||
|
||
case completion: CompletionEvent => | ||
sparkListeners.foreach{_.onTaskEnd(completion)} | ||
handleTaskCompletion(completion) | ||
|
||
case TaskSetFailed(taskSet, reason) => | ||
|
@@ -307,6 +314,8 @@ class DAGScheduler( | |
for (job <- activeJobs) { | ||
val error = new SparkException("Job cancelled because SparkContext was shut down") | ||
job.listener.jobFailed(error) | ||
val JobCancelEvent = new SparkListenerJobCancelled("SPARKCONTEXT_SHUTDOWN") | ||
sparkListeners.foreach{_.onJobEnd(job, JobCancelEvent)} | ||
} | ||
return true | ||
} | ||
|
@@ -454,6 +463,7 @@ class DAGScheduler( | |
} | ||
} | ||
if (tasks.size > 0) { | ||
sparkListeners.foreach{_.onStageSubmitted(stage, "TASKS_SIZE=" + tasks.size)} | ||
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") | ||
myPending ++= tasks | ||
logDebug("New pending tasks: " + myPending) | ||
|
@@ -507,6 +517,7 @@ class DAGScheduler( | |
activeJobs -= job | ||
resultStageToJob -= stage | ||
markStageAsFinished(stage) | ||
sparkListeners.foreach{_.onJobEnd(job, SparkListenerJobSuccess)} | ||
} | ||
job.listener.taskSucceeded(rt.outputId, event.result) | ||
} | ||
|
@@ -642,6 +653,8 @@ class DAGScheduler( | |
job.listener.jobFailed(new SparkException("Job failed: " + reason)) | ||
activeJobs -= job | ||
resultStageToJob -= resultStage | ||
val jobFailedEvent = new SparkListenerJobFailed(failedStage) | ||
sparkListeners.foreach{_.onJobEnd(job, jobFailedEvent)} | ||
} | ||
if (dependentStages.isEmpty) { | ||
logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change the comment to:
/** User-defined generator of this RDD. */