-
Notifications
You must be signed in to change notification settings - Fork 385
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add JobLogger to Spark #573
Changes from 10 commits
75be4a4
f8ac7c4
3c80ece
f7bef28
de891e6
f14a706
cbf0031
ce2d8e3
859aed7
1c7d721
04e2b0f
b599c4b
cb8037b
42f352f
157141c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,228 @@ | ||
package spark | ||
|
||
import java.util.Date | ||
import java.text.SimpleDateFormat | ||
import java.io.PrintWriter | ||
import java.io.File | ||
import java.io.FileNotFoundException | ||
import scala.collection.mutable.Map | ||
import scala.collection.mutable.HashMap | ||
import scala.collection.mutable.ListBuffer | ||
import spark.scheduler.Stage | ||
import scala.io.Source | ||
import spark.executor.TaskMetrics | ||
import spark.scheduler.cluster.TaskInfo | ||
|
||
//it is used to record runtime information for each job, including RDD graph tasks start/stop and shuffle information | ||
// and query plan information if there is any | ||
|
||
sealed trait JobLogger extends Logging { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With my proposed change (extend SparkListener and make JobLogger a specific implementation of SparkListener), you only need a single JobLogger implementation (no need to have JobLoggerOff). |
||
|
||
def createLogWriter(jobID: Int): Unit | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For functions that don't return anything, don't define the type as Unit. e.g. Just write def createLogWriter(jobID: Int) { |
||
|
||
def addStageIDToJobID(stages: List[Stage], jobID: Int): Unit | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Try to combine addStageIDToJobID with addJobIDToStageIDs. They convey the same information and having two APIs make it more error-prone. |
||
|
||
def writeJobLog(jobID: Int, info: String, withTime: Boolean): Unit | ||
|
||
def writeStageLog(stageID: Int, info: String, withTime: Boolean): Unit | ||
|
||
def closeLogWriter(jobID: Int): Unit | ||
|
||
def addJobIDToStageIDs(jobID: Int, stages: List[Stage]): Unit | ||
|
||
def recordRDDGraph(rdd: RDD[_], finalStage: Stage, shuffleToMapStage: Map[Int,Stage], jobID: Int): Unit | ||
|
||
def recordTaskMetrics(stageID: Int, status: String, taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit | ||
} | ||
|
||
object JobLogger { | ||
private val logSwitch = System.getProperty("spark.joblogger.switch", "true").toBoolean | ||
|
||
def init() = { | ||
if (logSwitch) { | ||
new JobLoggerOn | ||
} else { | ||
new JobLoggerOff | ||
} | ||
} | ||
} | ||
|
||
class JobLoggerOff extends JobLogger{ | ||
|
||
def createLogWriter(jobID: Int): Unit = { } | ||
|
||
def addStageIDToJobID(stages: List[Stage], jobID: Int): Unit = { } | ||
|
||
def writeJobLog(jobID: Int, info: String, withTime: Boolean): Unit = { } | ||
|
||
def writeStageLog(stageID: Int, info: String, withTime: Boolean): Unit = { } | ||
|
||
def closeLogWriter(jobID: Int): Unit = { } | ||
|
||
def addJobIDToStageIDs(jobID: Int, stages: List[Stage]): Unit = { } | ||
|
||
def recordRDDGraph(rdd: RDD[_], finalStage: Stage, shuffleToMapStage: Map[Int,Stage], jobID: Int): Unit = { } | ||
|
||
def recordTaskMetrics(stageID: Int, status: String, taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit = { } | ||
} | ||
|
||
class JobLoggerOn(val contextDirName: String) extends JobLogger { | ||
private val logDir = { if (System.getenv("SPARK_LOG_DIR") != null) System.getenv("SPARK_LOG_DIR") | ||
else "/tmp/spark" | ||
} //get log directory setting default is /tmp/spark | ||
private var jobIDToPrintWriter = new HashMap[Int, PrintWriter] | ||
private var stageIDToJobID = new HashMap[Int, Int] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is broken. A stage can be associated with more than one active job. See discussion of #414 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Map here only records the jobID which really "creates" the stage, the addStageIDToJobID function is called when the Stage is created. it means that the stage is executed in the activeJob that creates it. |
||
private var jobIDToStageIDs = new HashMap[Int, ListBuffer[Int]] | ||
|
||
val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") | ||
|
||
createContextDir() | ||
|
||
def this() = this(String.valueOf(System.currentTimeMillis())) | ||
|
||
//create a folder for each SparkContext, the folder's name is the creation time of the jobLogger | ||
def createContextDir() { | ||
val dir = new File(logDir + "/" + contextDirName + "/") | ||
if (dir.exists()) { | ||
return; | ||
} | ||
if (dir.mkdirs() == false) { | ||
logError("create context directory error:" + logDir + "/" + contextDirName + "/") | ||
} | ||
} | ||
|
||
//create a log file for one job, the file name is the jobID(which is an Int starting from 0) | ||
def createLogWriter(jobID: Int): Unit = { | ||
try{ | ||
val fileWriter = new PrintWriter(logDir + "/" + contextDirName + "/" + jobID) | ||
jobIDToPrintWriter += (jobID->fileWriter) | ||
jobIDToStageIDs += (jobID->new ListBuffer[Int]) | ||
} catch { | ||
case e: FileNotFoundException => e.printStackTrace(); | ||
} | ||
} | ||
|
||
//close log file for one job, and clean the stages related to the job in stageIDToJobID | ||
def closeLogWriter(jobID: Int): Unit = { | ||
jobIDToPrintWriter.get(jobID) match { | ||
case Some(fileWriter) => fileWriter.close() | ||
jobIDToPrintWriter -= jobID | ||
cleanStageIDToJobID(jobID) | ||
jobIDToStageIDs -= jobID | ||
case None => | ||
} | ||
} | ||
|
||
//write log information to log file by JobID, withTime parameter controls whether to recored time stamp for the information | ||
def writeJobLog(jobID: Int, info: String, withTime: Boolean): Unit = { | ||
var writeInfo = info | ||
if (withTime) { | ||
val date = new Date(System.currentTimeMillis()) | ||
writeInfo = DATE_FORMAT.format(date) + ": " +info | ||
} | ||
jobIDToPrintWriter.get(jobID) match { | ||
case Some(fileWriter) => fileWriter.println(writeInfo) | ||
case None => | ||
} | ||
} | ||
|
||
//write log information to log file by stageID, withTime parameter controls whether to recored time stamp for the information | ||
def writeStageLog(stageID: Int, info: String, withTime: Boolean): Unit = { | ||
stageIDToJobID.get(stageID) match { | ||
case Some(jobID) => writeJobLog(jobID, info, withTime) | ||
case None => | ||
} | ||
} | ||
|
||
def addJobIDToStageIDs(jobID: Int, stages: List[Stage]): Unit = { | ||
jobIDToStageIDs.get(jobID) match { | ||
case Some(listBuffer) => for(stage <- stages) listBuffer.append(stage.id) | ||
case None => | ||
} | ||
} | ||
|
||
//add a list of stages to stageIDToJobID | ||
def addStageIDToJobID(stages: List[Stage], jobID: Int): Unit = { | ||
for(stage <- stages){ | ||
stageIDToJobID += (stage.id->jobID) | ||
} | ||
} | ||
|
||
//clean stages related to one job in stageIDToJobID | ||
def cleanStageIDToJobID(jobID: Int): Unit = { | ||
jobIDToStageIDs.get(jobID) match{ | ||
case Some(stageIDList) => for(stageid <- stageIDList) stageIDToJobID -= stageid | ||
case None => | ||
} | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid pattern matching on Option -- especially when you are doing nothing with the None case; jobIDToStageIDs.get(jobID).map(_.foreach(stageid => stageIDToJobID -= stageid)) |
||
//generate indents and convert to String | ||
def indentString(indent: Int): String = { | ||
val sb = new StringBuilder() | ||
for (i <- 0 to indent) { | ||
sb.append(" ") | ||
} | ||
sb.toString() | ||
} | ||
|
||
//recored RDD graph for a given RDD, print the RDD recursively and represent the parent child relationship by indent. | ||
def recordRDDGraph(jobID: Int, rdd: RDD[_], finalStage: Stage, shuffleToMapStage: Map[Int,Stage]): Unit = { | ||
def recordRDDGraphInternal(rdd: RDD[_], indent: Int): Unit={ | ||
val space = indentString(indent) | ||
var rddName = rdd.getClass.getName | ||
for (dep <- rdd.dependencies) { | ||
var rddDesc: String="" | ||
dep match{ | ||
case shufDep: ShuffleDependency[_,_] => //if dependency is shuffle, parent is in a new stage | ||
var rddName = shufDep.rdd.getClass.getName | ||
if (shufDep.rdd.name != null) { | ||
rddName = shufDep.rdd.name | ||
} | ||
shuffleToMapStage.get(shufDep.shuffleId) match{ | ||
case Some(stage) => rddDesc = space + "RDD_ID:" + shufDep.rdd.id + " (" + rddName + " " + rdd.generator + ")" + | ||
" SHUFFLE_ID:" + shufDep.shuffleId + " STAGE_ID:" + stage.id | ||
case None => rddDesc = space + "RDD_ID:" + shufDep.rdd.id + " (" + rddName + " " + rdd.generator + ")" + | ||
" SHUFFLE_ID:" + shufDep.shuffleId + " STAGE_ID:" | ||
} | ||
case _ => | ||
var rddName = dep.rdd.getClass.getName | ||
if (dep.rdd.name != null) { | ||
rddName = dep.rdd.name | ||
} | ||
rddDesc = space + "RDD_ID:" + dep.rdd.id + " (" + rddName + " " + rdd.generator + ")" | ||
} | ||
writeJobLog(jobID, rddDesc, false) | ||
recordRDDGraphInternal(dep.rdd, indent+2) | ||
} | ||
} | ||
var rddName = rdd.getClass.getName | ||
if (rdd.name != null) { | ||
rddName = rdd.name | ||
} | ||
writeJobLog(jobID, "RDD_ID:" + rdd.id + " (" + rddName + " " + rdd.generator + ")" + " RESULT_STAGE STAGE_ID:" + finalStage.id, false) | ||
recordRDDGraphInternal(rdd, 1) | ||
} | ||
|
||
def recordTaskMetrics(stageID: Int, status: String, taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit = { | ||
val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID + " START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime + | ||
" DURATION=" + taskInfo.duration + " EXECUTOR_ID=" + taskInfo.executorId + " HOST=" + taskInfo.host | ||
|
||
val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime | ||
|
||
val readMetrics = { taskMetrics.shuffleReadMetrics match{ | ||
case Some(metrics) => " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched + " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched + | ||
" BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched + " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime + | ||
" REMOTE_FETCH_TIME=" + metrics.remoteFetchTime + " REMOTE_BYTES_READ=" + metrics.remoteBytesRead + | ||
" SHUFFLE_BYTES_READ_TIME=" + metrics.shuffleReadMillis | ||
case None => "" | ||
} | ||
} | ||
val writeMetrics = { taskMetrics.shuffleWriteMetrics match{ | ||
case Some(metrics) => " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten | ||
case None => "" | ||
} | ||
} | ||
|
||
writeStageLog(stageID, status + info + executorRunTime + readMetrics + writeMetrics, true) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -114,6 +114,14 @@ abstract class RDD[T: ClassManifest]( | |
name = _name | ||
this | ||
} | ||
|
||
/**generator of this RDD*/ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change the comment to: /** User-defined generator of this RDD. */ |
||
var generator = Utils.getRddGenerator | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we reuse "origin" for this? I noticed there are some minor differences in generator vs origin - but it would be great if we can merge the two since they are too similar. |
||
|
||
/**reset generator*/ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see this used anywhere else. What is the purpose of this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just like setName above, this is used to set generator intentionally |
||
def setGenerator(_generator: String) = { | ||
generator = _generator | ||
} | ||
|
||
/** | ||
* Set this RDD's storage level to persist its values across operations after the first time | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,7 +42,7 @@ import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend | |
import spark.storage.BlockManagerUI | ||
import spark.util.{MetadataCleaner, TimeStampedHashMap} | ||
import spark.storage.{StorageStatus, StorageUtils, RDDInfo} | ||
|
||
import scala.util.DynamicVariable | ||
/** | ||
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark | ||
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. | ||
|
@@ -65,6 +65,10 @@ class SparkContext( | |
// Ensure logging is initialized before we spawn any threads | ||
initLogging() | ||
|
||
val jobLogger = JobLogger.init | ||
|
||
val addInfo = new DynamicVariable[String]("") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't see this set anywhere. What does it do? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is used to pass some additional information from outside, for example query plan from Shark. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be good to have a docstring for this and also to maybe give it a different name. How about this: /* Allows higher layer frameworks to describe the context of a job. */ |
||
|
||
// Set Spark driver host and port system properties | ||
if (System.getProperty("spark.driver.host") == null) { | ||
System.setProperty("spark.driver.host", Utils.localIpAddress) | ||
|
@@ -578,7 +582,7 @@ class SparkContext( | |
val callSite = Utils.getSparkCallSite | ||
logInfo("Starting job: " + callSite) | ||
val start = System.nanoTime | ||
val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler) | ||
val result = dagScheduler.runJob(rdd, func, partitions, callSite + "|" + addInfo.value.toString, allowLocal, resultHandler) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line is over 100 characters |
||
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") | ||
rdd.doCheckpoint() | ||
result | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -475,4 +475,21 @@ private object Utils extends Logging { | |
} | ||
return false | ||
} | ||
|
||
def getRddGenerator = {//first class name out of Spark | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As mentioned earlier, try to use getSparkCallsite for this... If you need to modify getSparkCallsite, that is fine. |
||
var generator: String = "" | ||
var finished: Boolean = false | ||
val trace = Thread.currentThread.getStackTrace().filter( el => | ||
(!el.getMethodName.contains("getStackTrace")))//get all elements not containing getStackTrace | ||
|
||
for (el <- trace) { | ||
if (!finished) { | ||
if (!el.getClassName.startsWith("spark.")) { | ||
generator = el.getClassName | ||
finished = true | ||
} | ||
} | ||
} | ||
generator | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For imports, sort them in the following order:
Add a blank line for classes in different domain.
Do this for other files - Spark code didn't strictly follow this but we are enforcing that now ...