Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add JobLogger to Spark #573

Closed
wants to merge 15 commits into from
228 changes: 228 additions & 0 deletions core/src/main/scala/spark/JobLogger.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
package spark

import java.util.Date
import java.text.SimpleDateFormat
import java.io.PrintWriter
import java.io.File
import java.io.FileNotFoundException
import scala.collection.mutable.Map
import scala.collection.mutable.HashMap
import scala.collection.mutable.ListBuffer
import spark.scheduler.Stage
import scala.io.Source
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For imports, sort them in the following order:

  1. java packages
  2. scala packages
  3. everything else in alphabetical order.

Add a blank line for classes in different domain.

Do this for other files - Spark code didn't strictly follow this but we are enforcing that now ...

import spark.executor.TaskMetrics
import spark.scheduler.cluster.TaskInfo

//it is used to record runtime information for each job, including RDD graph tasks start/stop and shuffle information
// and query plan information if there is any

sealed trait JobLogger extends Logging {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With my proposed change (extend SparkListener and make JobLogger a specific implementation of SparkListener), you only need a single JobLogger implementation (no need to have JobLoggerOff).


def createLogWriter(jobID: Int): Unit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For functions that don't return anything, don't define the type as Unit. e.g. Just write

def createLogWriter(jobID: Int) {
...
}


def addStageIDToJobID(stages: List[Stage], jobID: Int): Unit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to combine addStageIDToJobID with addJobIDToStageIDs. They convey the same information and having two APIs make it more error-prone.


def writeJobLog(jobID: Int, info: String, withTime: Boolean): Unit

def writeStageLog(stageID: Int, info: String, withTime: Boolean): Unit

def closeLogWriter(jobID: Int): Unit

def addJobIDToStageIDs(jobID: Int, stages: List[Stage]): Unit

def recordRDDGraph(rdd: RDD[_], finalStage: Stage, shuffleToMapStage: Map[Int,Stage], jobID: Int): Unit

def recordTaskMetrics(stageID: Int, status: String, taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit
}

object JobLogger {
private val logSwitch = System.getProperty("spark.joblogger.switch", "true").toBoolean

def init() = {
if (logSwitch) {
new JobLoggerOn
} else {
new JobLoggerOff
}
}
}

class JobLoggerOff extends JobLogger{

def createLogWriter(jobID: Int): Unit = { }

def addStageIDToJobID(stages: List[Stage], jobID: Int): Unit = { }

def writeJobLog(jobID: Int, info: String, withTime: Boolean): Unit = { }

def writeStageLog(stageID: Int, info: String, withTime: Boolean): Unit = { }

def closeLogWriter(jobID: Int): Unit = { }

def addJobIDToStageIDs(jobID: Int, stages: List[Stage]): Unit = { }

def recordRDDGraph(rdd: RDD[_], finalStage: Stage, shuffleToMapStage: Map[Int,Stage], jobID: Int): Unit = { }

def recordTaskMetrics(stageID: Int, status: String, taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit = { }
}

class JobLoggerOn(val contextDirName: String) extends JobLogger {
private val logDir = { if (System.getenv("SPARK_LOG_DIR") != null) System.getenv("SPARK_LOG_DIR")
else "/tmp/spark"
} //get log directory setting default is /tmp/spark
private var jobIDToPrintWriter = new HashMap[Int, PrintWriter]
private var stageIDToJobID = new HashMap[Int, Int]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is broken. A stage can be associated with more than one active job. See discussion of #414

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Map here only records the jobID which really "creates" the stage, the addStageIDToJobID function is called when the Stage is created. it means that the stage is executed in the activeJob that creates it.

private var jobIDToStageIDs = new HashMap[Int, ListBuffer[Int]]

val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")

createContextDir()

def this() = this(String.valueOf(System.currentTimeMillis()))

//create a folder for each SparkContext, the folder's name is the creation time of the jobLogger
def createContextDir() {
val dir = new File(logDir + "/" + contextDirName + "/")
if (dir.exists()) {
return;
}
if (dir.mkdirs() == false) {
logError("create context directory error:" + logDir + "/" + contextDirName + "/")
}
}

//create a log file for one job, the file name is the jobID(which is an Int starting from 0)
def createLogWriter(jobID: Int): Unit = {
try{
val fileWriter = new PrintWriter(logDir + "/" + contextDirName + "/" + jobID)
jobIDToPrintWriter += (jobID->fileWriter)
jobIDToStageIDs += (jobID->new ListBuffer[Int])
} catch {
case e: FileNotFoundException => e.printStackTrace();
}
}

//close log file for one job, and clean the stages related to the job in stageIDToJobID
def closeLogWriter(jobID: Int): Unit = {
jobIDToPrintWriter.get(jobID) match {
case Some(fileWriter) => fileWriter.close()
jobIDToPrintWriter -= jobID
cleanStageIDToJobID(jobID)
jobIDToStageIDs -= jobID
case None =>
}
}

//write log information to log file by JobID, withTime parameter controls whether to recored time stamp for the information
def writeJobLog(jobID: Int, info: String, withTime: Boolean): Unit = {
var writeInfo = info
if (withTime) {
val date = new Date(System.currentTimeMillis())
writeInfo = DATE_FORMAT.format(date) + ": " +info
}
jobIDToPrintWriter.get(jobID) match {
case Some(fileWriter) => fileWriter.println(writeInfo)
case None =>
}
}

//write log information to log file by stageID, withTime parameter controls whether to recored time stamp for the information
def writeStageLog(stageID: Int, info: String, withTime: Boolean): Unit = {
stageIDToJobID.get(stageID) match {
case Some(jobID) => writeJobLog(jobID, info, withTime)
case None =>
}
}

def addJobIDToStageIDs(jobID: Int, stages: List[Stage]): Unit = {
jobIDToStageIDs.get(jobID) match {
case Some(listBuffer) => for(stage <- stages) listBuffer.append(stage.id)
case None =>
}
}

//add a list of stages to stageIDToJobID
def addStageIDToJobID(stages: List[Stage], jobID: Int): Unit = {
for(stage <- stages){
stageIDToJobID += (stage.id->jobID)
}
}

//clean stages related to one job in stageIDToJobID
def cleanStageIDToJobID(jobID: Int): Unit = {
jobIDToStageIDs.get(jobID) match{
case Some(stageIDList) => for(stageid <- stageIDList) stageIDToJobID -= stageid
case None =>
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid pattern matching on Option -- especially when you are doing nothing with the None case;

jobIDToStageIDs.get(jobID).map(_.foreach(stageid => stageIDToJobID -= stageid))

//generate indents and convert to String
def indentString(indent: Int): String = {
val sb = new StringBuilder()
for (i <- 0 to indent) {
sb.append(" ")
}
sb.toString()
}

//recored RDD graph for a given RDD, print the RDD recursively and represent the parent child relationship by indent.
def recordRDDGraph(jobID: Int, rdd: RDD[_], finalStage: Stage, shuffleToMapStage: Map[Int,Stage]): Unit = {
def recordRDDGraphInternal(rdd: RDD[_], indent: Int): Unit={
val space = indentString(indent)
var rddName = rdd.getClass.getName
for (dep <- rdd.dependencies) {
var rddDesc: String=""
dep match{
case shufDep: ShuffleDependency[_,_] => //if dependency is shuffle, parent is in a new stage
var rddName = shufDep.rdd.getClass.getName
if (shufDep.rdd.name != null) {
rddName = shufDep.rdd.name
}
shuffleToMapStage.get(shufDep.shuffleId) match{
case Some(stage) => rddDesc = space + "RDD_ID:" + shufDep.rdd.id + " (" + rddName + " " + rdd.generator + ")" +
" SHUFFLE_ID:" + shufDep.shuffleId + " STAGE_ID:" + stage.id
case None => rddDesc = space + "RDD_ID:" + shufDep.rdd.id + " (" + rddName + " " + rdd.generator + ")" +
" SHUFFLE_ID:" + shufDep.shuffleId + " STAGE_ID:"
}
case _ =>
var rddName = dep.rdd.getClass.getName
if (dep.rdd.name != null) {
rddName = dep.rdd.name
}
rddDesc = space + "RDD_ID:" + dep.rdd.id + " (" + rddName + " " + rdd.generator + ")"
}
writeJobLog(jobID, rddDesc, false)
recordRDDGraphInternal(dep.rdd, indent+2)
}
}
var rddName = rdd.getClass.getName
if (rdd.name != null) {
rddName = rdd.name
}
writeJobLog(jobID, "RDD_ID:" + rdd.id + " (" + rddName + " " + rdd.generator + ")" + " RESULT_STAGE STAGE_ID:" + finalStage.id, false)
recordRDDGraphInternal(rdd, 1)
}

def recordTaskMetrics(stageID: Int, status: String, taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit = {
val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID + " START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime +
" DURATION=" + taskInfo.duration + " EXECUTOR_ID=" + taskInfo.executorId + " HOST=" + taskInfo.host

val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime

val readMetrics = { taskMetrics.shuffleReadMetrics match{
case Some(metrics) => " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched + " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched +
" BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched + " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime +
" REMOTE_FETCH_TIME=" + metrics.remoteFetchTime + " REMOTE_BYTES_READ=" + metrics.remoteBytesRead +
" SHUFFLE_BYTES_READ_TIME=" + metrics.shuffleReadMillis
case None => ""
}
}
val writeMetrics = { taskMetrics.shuffleWriteMetrics match{
case Some(metrics) => " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten
case None => ""
}
}

writeStageLog(stageID, status + info + executorRunTime + readMetrics + writeMetrics, true)
}
}
8 changes: 8 additions & 0 deletions core/src/main/scala/spark/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ abstract class RDD[T: ClassManifest](
name = _name
this
}

/**generator of this RDD*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the comment to:

/** User-defined generator of this RDD. */

var generator = Utils.getRddGenerator
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse "origin" for this? I noticed there are some minor differences in generator vs origin - but it would be great if we can merge the two since they are too similar.


/**reset generator*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this used anywhere else. What is the purpose of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just like setName above, this is used to set generator intentionally

def setGenerator(_generator: String) = {
generator = _generator
}

/**
* Set this RDD's storage level to persist its values across operations after the first time
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend
import spark.storage.BlockManagerUI
import spark.util.{MetadataCleaner, TimeStampedHashMap}
import spark.storage.{StorageStatus, StorageUtils, RDDInfo}

import scala.util.DynamicVariable
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
Expand All @@ -65,6 +65,10 @@ class SparkContext(
// Ensure logging is initialized before we spawn any threads
initLogging()

val jobLogger = JobLogger.init

val addInfo = new DynamicVariable[String]("")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't see this set anywhere. What does it do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is used to pass some additional information from outside, for example query plan from Shark.
it can be replaced by localProperties in SparkContext adding by FairScheduler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to have a docstring for this and also to maybe give it a different name. How about this:

/* Allows higher layer frameworks to describe the context of a job. */
val annotation = new DynamicVariableString


// Set Spark driver host and port system properties
if (System.getProperty("spark.driver.host") == null) {
System.setProperty("spark.driver.host", Utils.localIpAddress)
Expand Down Expand Up @@ -578,7 +582,7 @@ class SparkContext(
val callSite = Utils.getSparkCallSite
logInfo("Starting job: " + callSite)
val start = System.nanoTime
val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler)
val result = dagScheduler.runJob(rdd, func, partitions, callSite + "|" + addInfo.value.toString, allowLocal, resultHandler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is over 100 characters

logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
rdd.doCheckpoint()
result
Expand Down
17 changes: 17 additions & 0 deletions core/src/main/scala/spark/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -475,4 +475,21 @@ private object Utils extends Logging {
}
return false
}

def getRddGenerator = {//first class name out of Spark
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned earlier, try to use getSparkCallsite for this... If you need to modify getSparkCallsite, that is fine.

var generator: String = ""
var finished: Boolean = false
val trace = Thread.currentThread.getStackTrace().filter( el =>
(!el.getMethodName.contains("getStackTrace")))//get all elements not containing getStackTrace

for (el <- trace) {
if (!finished) {
if (!el.getClassName.startsWith("spark.")) {
generator = el.getClassName
finished = true
}
}
}
generator
}
}
Loading