Skip to content

Commit

Permalink
[SPARK-23550][CORE] Cleanup Utils.
Browse files Browse the repository at this point in the history
A few different things going on:
- Remove unused methods.
- Move JSON methods to the only class that uses them.
- Move test-only methods to TestUtils.
- Make getMaxResultSize() a config constant.
- Reuse functionality from existing libraries (JRE or JavaUtils) where possible.

The change also includes changes to a few tests to call `Utils.createTempFile` correctly,
so that temp dirs are created under the designated top-level temp dir instead of
potentially polluting git index.

Author: Marcelo Vanzin <[email protected]>

Closes apache#20706 from vanzin/SPARK-23550.
  • Loading branch information
Marcelo Vanzin authored and gatorsmile committed Mar 7, 2018
1 parent 53561d2 commit c99fc9a
Show file tree
Hide file tree
Showing 21 changed files with 152 additions and 236 deletions.
26 changes: 25 additions & 1 deletion core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.net.{HttpURLConnection, URI, URL}
import java.nio.charset.StandardCharsets
import java.security.SecureRandom
import java.security.cert.X509Certificate
import java.util.Arrays
import java.util.{Arrays, Properties}
import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
import java.util.jar.{JarEntry, JarOutputStream}
import javax.net.ssl._
Expand All @@ -35,6 +35,7 @@ import scala.sys.process.{Process, ProcessLogger}
import scala.util.Try

import com.google.common.io.{ByteStreams, Files}
import org.apache.log4j.PropertyConfigurator

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -256,6 +257,29 @@ private[spark] object TestUtils {
s"Can't find $numExecutors executors before $timeout milliseconds elapsed")
}

/**
* config a log4j properties used for testsuite
*/
def configTestLog4j(level: String): Unit = {
val pro = new Properties()
pro.put("log4j.rootLogger", s"$level, console")
pro.put("log4j.appender.console", "org.apache.log4j.ConsoleAppender")
pro.put("log4j.appender.console.target", "System.err")
pro.put("log4j.appender.console.layout", "org.apache.log4j.PatternLayout")
pro.put("log4j.appender.console.layout.ConversionPattern",
"%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n")
PropertyConfigurator.configure(pro)
}

/**
* Lists files recursively.
*/
def recursiveList(f: File): Array[File] = {
require(f.isDirectory)
val current = f.listFiles
current ++ current.filter(_.isDirectory).flatMap(recursiveList)
}

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy

import java.io.{ByteArrayOutputStream, PrintStream}
import java.io.{ByteArrayOutputStream, File, PrintStream}
import java.lang.reflect.InvocationTargetException
import java.net.URI
import java.nio.charset.StandardCharsets
Expand Down Expand Up @@ -233,7 +233,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
// Set name from main class if not given
name = Option(name).orElse(Option(mainClass)).orNull
if (name == null && primaryResource != null) {
name = Utils.stripDirectory(primaryResource)
name = new File(primaryResource).getName()
}

// Action should be SUBMIT unless otherwise specified
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager}
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task, TaskDescription}
Expand Down Expand Up @@ -141,8 +142,7 @@ private[spark] class Executor(
conf.getSizeAsBytes("spark.task.maxDirectResultSize", 1L << 20),
RpcUtils.maxMessageSizeBytes(conf))

// Limit of bytes for total size of results (default is 1GB)
private val maxResultSize = Utils.getMaxResultSize(conf)
private val maxResultSize = conf.get(MAX_RESULT_SIZE)

// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.regex.PatternSyntaxException
import scala.util.matching.Regex

import org.apache.spark.network.util.{ByteUnit, JavaUtils}
import org.apache.spark.util.Utils

private object ConfigHelpers {

Expand All @@ -45,7 +46,7 @@ private object ConfigHelpers {
}

def stringToSeq[T](str: String, converter: String => T): Seq[T] = {
str.split(",").map(_.trim()).filter(_.nonEmpty).map(converter)
Utils.stringToSeq(str).map(converter)
}

def seqToString[T](v: Seq[T], stringConverter: T => String): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,4 +520,9 @@ package object config {
.checkValue(v => v > 0, "The threshold should be positive.")
.createWithDefault(10000000)

private[spark] val MAX_RESULT_SIZE = ConfigBuilder("spark.driver.maxResultSize")
.doc("Size limit for results.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1g")

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ private[spark] class TaskSetManager(
val SPECULATION_QUANTILE = conf.getDouble("spark.speculation.quantile", 0.75)
val SPECULATION_MULTIPLIER = conf.getDouble("spark.speculation.multiplier", 1.5)

// Limit of bytes for total size of results (default is 1GB)
val maxResultSize = Utils.getMaxResultSize(conf)
val maxResultSize = conf.get(config.MAX_RESULT_SIZE)

val speculationEnabled = conf.getBoolean("spark.speculation", false)

Expand Down
Loading

0 comments on commit c99fc9a

Please sign in to comment.