Skip to content

Commit

Permalink
[SPARK-19810][BUILD][CORE] Remove support for Scala 2.10
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

- Remove Scala 2.10 build profiles and support
- Replace some 2.10 support in scripts with commented placeholders for 2.12 later
- Remove deprecated API calls from 2.10 support
- Remove usages of deprecated context bounds where possible
- Remove Scala 2.10 workarounds like ScalaReflectionLock
- Other minor Scala warning fixes

## How was this patch tested?

Existing tests

Author: Sean Owen <[email protected]>

Closes apache#17150 from srowen/SPARK-19810.
  • Loading branch information
srowen authored and cloud-fan committed Jul 13, 2017
1 parent e08d06b commit 425c4ad
Show file tree
Hide file tree
Showing 101 changed files with 311 additions and 5,231 deletions.
4 changes: 2 additions & 2 deletions R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ sparkR.stop <- function() {
#' list(spark.executor.memory="4g"),
#' list(LD_LIBRARY_PATH="/directory of JVM libraries (libjvm.so) on workers/"),
#' c("one.jar", "two.jar", "three.jar"),
#' c("com.databricks:spark-avro_2.10:2.0.1"))
#' c("com.databricks:spark-avro_2.11:2.0.1"))
#'}
#' @note sparkR.init since 1.4.0
sparkR.init <- function(
Expand Down Expand Up @@ -357,7 +357,7 @@ sparkRHive.init <- function(jsc = NULL) {
#' sparkR.session("yarn-client", "SparkR", "/home/spark",
#' list(spark.executor.memory="4g"),
#' c("one.jar", "two.jar", "three.jar"),
#' c("com.databricks:spark-avro_2.10:2.0.1"))
#' c("com.databricks:spark-avro_2.11:2.0.1"))
#' sparkR.session(spark.master = "yarn-client", spark.executor.memory = "4g")
#'}
#' @note sparkR.session since 2.0.0
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/tests/fulltests/test_client.R
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ test_that("multiple packages don't produce a warning", {

test_that("sparkJars sparkPackages as character vectors", {
args <- generateSparkSubmitArgs("", "", c("one.jar", "two.jar", "three.jar"), "",
c("com.databricks:spark-avro_2.10:2.0.1"))
c("com.databricks:spark-avro_2.11:2.0.1"))
expect_match(args, "--jars one.jar,two.jar,three.jar")
expect_match(args, "--packages com.databricks:spark-avro_2.10:2.0.1")
expect_match(args, "--packages com.databricks:spark-avro_2.11:2.0.1")
})
22 changes: 11 additions & 11 deletions bin/load-spark-env.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,21 @@ if [%SPARK_ENV_LOADED%] == [] (

rem Setting SPARK_SCALA_VERSION if not already set.

set ASSEMBLY_DIR2="%SPARK_HOME%\assembly\target\scala-2.11"
set ASSEMBLY_DIR1="%SPARK_HOME%\assembly\target\scala-2.10"
rem set ASSEMBLY_DIR2="%SPARK_HOME%\assembly\target\scala-2.11"
rem set ASSEMBLY_DIR1="%SPARK_HOME%\assembly\target\scala-2.12"

if [%SPARK_SCALA_VERSION%] == [] (

if exist %ASSEMBLY_DIR2% if exist %ASSEMBLY_DIR1% (
echo "Presence of build for both scala versions(SCALA 2.10 and SCALA 2.11) detected."
echo "Either clean one of them or, set SPARK_SCALA_VERSION=2.11 in spark-env.cmd."
exit 1
)
if exist %ASSEMBLY_DIR2% (
rem if exist %ASSEMBLY_DIR2% if exist %ASSEMBLY_DIR1% (
rem echo "Presence of build for multiple Scala versions detected."
rem echo "Either clean one of them or, set SPARK_SCALA_VERSION=2.11 in spark-env.cmd."
rem exit 1
rem )
rem if exist %ASSEMBLY_DIR2% (
set SPARK_SCALA_VERSION=2.11
) else (
set SPARK_SCALA_VERSION=2.10
)
rem ) else (
rem set SPARK_SCALA_VERSION=2.12
rem )
)
exit /b 0

Expand Down
22 changes: 11 additions & 11 deletions bin/load-spark-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,18 @@ fi

if [ -z "$SPARK_SCALA_VERSION" ]; then

ASSEMBLY_DIR2="${SPARK_HOME}/assembly/target/scala-2.11"
ASSEMBLY_DIR1="${SPARK_HOME}/assembly/target/scala-2.10"
#ASSEMBLY_DIR2="${SPARK_HOME}/assembly/target/scala-2.11"
#ASSEMBLY_DIR1="${SPARK_HOME}/assembly/target/scala-2.12"

if [[ -d "$ASSEMBLY_DIR2" && -d "$ASSEMBLY_DIR1" ]]; then
echo -e "Presence of build for both scala versions(SCALA 2.10 and SCALA 2.11) detected." 1>&2
echo -e 'Either clean one of them or, export SPARK_SCALA_VERSION=2.11 in spark-env.sh.' 1>&2
exit 1
fi
#if [[ -d "$ASSEMBLY_DIR2" && -d "$ASSEMBLY_DIR1" ]]; then
# echo -e "Presence of build for multiple Scala versions detected." 1>&2
# echo -e 'Either clean one of them or, export SPARK_SCALA_VERSION=2.11 in spark-env.sh.' 1>&2
# exit 1
#fi

if [ -d "$ASSEMBLY_DIR2" ]; then
#if [ -d "$ASSEMBLY_DIR2" ]; then
export SPARK_SCALA_VERSION="2.11"
else
export SPARK_SCALA_VERSION="2.10"
fi
#else
# export SPARK_SCALA_VERSION="2.12"
#fi
fi
6 changes: 3 additions & 3 deletions build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ install_mvn() {

# Install zinc under the build/ folder
install_zinc() {
local zinc_path="zinc-0.3.11/bin/zinc"
local zinc_path="zinc-0.3.15/bin/zinc"
[ ! -f "${_DIR}/${zinc_path}" ] && ZINC_INSTALL_FLAG=1
local TYPESAFE_MIRROR=${TYPESAFE_MIRROR:-https://downloads.typesafe.com}

install_app \
"${TYPESAFE_MIRROR}/zinc/0.3.11" \
"zinc-0.3.11.tgz" \
"${TYPESAFE_MIRROR}/zinc/0.3.15" \
"zinc-0.3.15.tgz" \
"${zinc_path}"
ZINC_BIN="${_DIR}/${zinc_path}"
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/Accumulable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ trait AccumulableParam[R, T] extends Serializable {

@deprecated("use AccumulatorV2", "2.0.0")
private[spark] class
GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T]
GrowableAccumulableParam[R : ClassTag, T]
(implicit rg: R => Growable[T] with TraversableOnce[T] with Serializable)
extends AccumulableParam[R, T] {

def addAccumulator(growable: R, elem: T): R = {
Expand Down
15 changes: 4 additions & 11 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,6 @@ class SparkContext(config: SparkConf) extends Logging {
// log out Spark Version in Spark driver log
logInfo(s"Running Spark version $SPARK_VERSION")

warnDeprecatedVersions()

/* ------------------------------------------------------------------------------------- *
| Private variables. These variables keep the internal state of the context, and are |
| not accessible by the outside world. They're mutable since we want to initialize all |
Expand Down Expand Up @@ -349,13 +347,6 @@ class SparkContext(config: SparkConf) extends Logging {
value
}

private def warnDeprecatedVersions(): Unit = {
val javaVersion = System.getProperty("java.version").split("[+.\\-]+", 3)
if (scala.util.Properties.releaseVersion.exists(_.startsWith("2.10"))) {
logWarning("Support for Scala 2.10 is deprecated as of Spark 2.1.0")
}
}

/** Control our logLevel. This overrides any user-defined log settings.
* @param logLevel The desired log level as a string.
* Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
Expand Down Expand Up @@ -1396,6 +1387,8 @@ class SparkContext(config: SparkConf) extends Logging {
@deprecated("use AccumulatorV2", "2.0.0")
def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T]
(initialValue: R): Accumulable[R, T] = {
// TODO the context bound (<%) above should be replaced with simple type bound and implicit
// conversion but is a breaking change. This should be fixed in Spark 3.x.
val param = new GrowableAccumulableParam[R, T]
val acc = new Accumulable(initialValue, param)
cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc))
Expand Down Expand Up @@ -2605,9 +2598,9 @@ object SparkContext extends Logging {
*/
private[spark] val LEGACY_DRIVER_IDENTIFIER = "<driver>"

private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T])
private implicit def arrayToArrayWritable[T <: Writable : ClassTag](arr: Traversable[T])
: ArrayWritable = {
def anyToWritable[U <% Writable](u: U): Writable = u
def anyToWritable[U <: Writable](u: U): Writable = u

new ArrayWritable(classTag[T].runtimeClass.asInstanceOf[Class[Writable]],
arr.map(x => anyToWritable(x)).toArray)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.rdd

import scala.reflect.{classTag, ClassTag}
import scala.reflect.ClassTag

import org.apache.hadoop.io.Writable
import org.apache.hadoop.io.compress.CompressionCodec
Expand All @@ -39,40 +39,8 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag
extends Logging
with Serializable {

private val keyWritableClass =
if (_keyWritableClass == null) {
// pre 1.3.0, we need to use Reflection to get the Writable class
getWritableClass[K]()
} else {
_keyWritableClass
}

private val valueWritableClass =
if (_valueWritableClass == null) {
// pre 1.3.0, we need to use Reflection to get the Writable class
getWritableClass[V]()
} else {
_valueWritableClass
}

private def getWritableClass[T <% Writable: ClassTag](): Class[_ <: Writable] = {
val c = {
if (classOf[Writable].isAssignableFrom(classTag[T].runtimeClass)) {
classTag[T].runtimeClass
} else {
// We get the type of the Writable class by looking at the apply method which converts
// from T to Writable. Since we have two apply methods we filter out the one which
// is not of the form "java.lang.Object apply(java.lang.Object)"
implicitly[T => Writable].getClass.getDeclaredMethods().filter(
m => m.getReturnType().toString != "class java.lang.Object" &&
m.getName() == "apply")(0).getReturnType

}
// TODO: use something like WritableConverter to avoid reflection
}
c.asInstanceOf[Class[_ <: Writable]]
}

// TODO the context bound (<%) above should be replaced with simple type bound and implicit
// conversion but is a breaking change. This should be fixed in Spark 3.x.

/**
* Output the RDD as a Hadoop SequenceFile using the Writable types we infer from the RDD's key
Expand All @@ -90,24 +58,24 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag
// valueWritableClass at the compile time. To implement that, we need to add type parameters to
// SequenceFileRDDFunctions. however, SequenceFileRDDFunctions is a public class so it will be a
// breaking change.
val convertKey = self.keyClass != keyWritableClass
val convertValue = self.valueClass != valueWritableClass
val convertKey = self.keyClass != _keyWritableClass
val convertValue = self.valueClass != _valueWritableClass

logInfo("Saving as sequence file of type (" + keyWritableClass.getSimpleName + "," +
valueWritableClass.getSimpleName + ")" )
logInfo("Saving as sequence file of type " +
s"(${_keyWritableClass.getSimpleName},${_valueWritableClass.getSimpleName})" )
val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
val jobConf = new JobConf(self.context.hadoopConfiguration)
if (!convertKey && !convertValue) {
self.saveAsHadoopFile(path, keyWritableClass, valueWritableClass, format, jobConf, codec)
self.saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)
} else if (!convertKey && convertValue) {
self.map(x => (x._1, anyToWritable(x._2))).saveAsHadoopFile(
path, keyWritableClass, valueWritableClass, format, jobConf, codec)
path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)
} else if (convertKey && !convertValue) {
self.map(x => (anyToWritable(x._1), x._2)).saveAsHadoopFile(
path, keyWritableClass, valueWritableClass, format, jobConf, codec)
path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)
} else if (convertKey && convertValue) {
self.map(x => (anyToWritable(x._1), anyToWritable(x._2))).saveAsHadoopFile(
path, keyWritableClass, valueWritableClass, format, jobConf, codec)
path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)
}
}
}
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ private[spark] object RpcTimeout {
var foundProp: Option[(String, String)] = None
while (itr.hasNext && foundProp.isEmpty) {
val propKey = itr.next()
conf.getOption(propKey).foreach { prop => foundProp = Some(propKey, prop) }
conf.getOption(propKey).foreach { prop => foundProp = Some((propKey, prop)) }
}
val finalProp = foundProp.getOrElse(timeoutPropList.head, defaultValue)
val finalProp = foundProp.getOrElse((timeoutPropList.head, defaultValue))
val timeout = { Utils.timeStringAsSeconds(finalProp._2).seconds }
new RpcTimeout(timeout, finalProp._1)
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private[spark] object JettyUtils extends Logging {
// implicit conversion from many types of functions to jetty Handlers.
type Responder[T] = HttpServletRequest => T

class ServletParams[T <% AnyRef](val responder: Responder[T],
class ServletParams[T <: AnyRef](val responder: Responder[T],
val contentType: String,
val extractFn: T => String = (in: Any) => in.toString) {}

Expand All @@ -68,7 +68,7 @@ private[spark] object JettyUtils extends Logging {
implicit def textResponderToServlet(responder: Responder[String]): ServletParams[String] =
new ServletParams(responder, "text/plain")

def createServlet[T <% AnyRef](
def createServlet[T <: AnyRef](
servletParams: ServletParams[T],
securityMgr: SecurityManager,
conf: SparkConf): HttpServlet = {
Expand Down Expand Up @@ -113,7 +113,7 @@ private[spark] object JettyUtils extends Logging {
}

/** Create a context handler that responds to a request with the given path prefix */
def createServletHandler[T <% AnyRef](
def createServletHandler[T <: AnyRef](
path: String,
servletParams: ServletParams[T],
securityMgr: SecurityManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,16 @@ private[spark] object FileAppender extends Logging {
val validatedParams: Option[(Long, String)] = rollingInterval match {
case "daily" =>
logInfo(s"Rolling executor logs enabled for $file with daily rolling")
Some(24 * 60 * 60 * 1000L, "--yyyy-MM-dd")
Some((24 * 60 * 60 * 1000L, "--yyyy-MM-dd"))
case "hourly" =>
logInfo(s"Rolling executor logs enabled for $file with hourly rolling")
Some(60 * 60 * 1000L, "--yyyy-MM-dd--HH")
Some((60 * 60 * 1000L, "--yyyy-MM-dd--HH"))
case "minutely" =>
logInfo(s"Rolling executor logs enabled for $file with rolling every minute")
Some(60 * 1000L, "--yyyy-MM-dd--HH-mm")
Some((60 * 1000L, "--yyyy-MM-dd--HH-mm"))
case IntParam(seconds) =>
logInfo(s"Rolling executor logs enabled for $file with rolling $seconds seconds")
Some(seconds * 1000L, "--yyyy-MM-dd--HH-mm-ss")
Some((seconds * 1000L, "--yyyy-MM-dd--HH-mm-ss"))
case _ =>
logWarning(s"Illegal interval for rolling executor logs [$rollingInterval], " +
s"rolling logs not enabled")
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,11 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {

val normalFile = new File(normalDir, "part-00000")
val normalContent = sc.sequenceFile[String, String](normalDir).collect
assert(normalContent === Array.fill(100)("abc", "abc"))
assert(normalContent === Array.fill(100)(("abc", "abc")))

val compressedFile = new File(compressedOutputDir, "part-00000" + codec.getDefaultExtension)
val compressedContent = sc.sequenceFile[String, String](compressedOutputDir).collect
assert(compressedContent === Array.fill(100)("abc", "abc"))
assert(compressedContent === Array.fill(100)(("abc", "abc")))

assert(compressedFile.length < normalFile.length)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {

test("add dependencies works correctly") {
val md = SparkSubmitUtils.getModuleDescriptor
val artifacts = SparkSubmitUtils.extractMavenCoordinates("com.databricks:spark-csv_2.10:0.1," +
"com.databricks:spark-avro_2.10:0.1")
val artifacts = SparkSubmitUtils.extractMavenCoordinates("com.databricks:spark-csv_2.11:0.1," +
"com.databricks:spark-avro_2.11:0.1")

SparkSubmitUtils.addDependenciesToIvy(md, artifacts, "default")
assert(md.getDependencies.length === 2)
Expand Down Expand Up @@ -196,7 +196,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
SparkSubmitUtils.buildIvySettings(None, None),
isTest = true)
assert(path === "", "should return empty path")
val main = MavenCoordinate("org.apache.spark", "spark-streaming-kafka-assembly_2.10", "1.2.0")
val main = MavenCoordinate("org.apache.spark", "spark-streaming-kafka-assembly_2.11", "1.2.0")
IvyTestUtils.withRepository(main, None, None) { repo =>
val files = SparkSubmitUtils.resolveMavenCoordinates(
coordinates + "," + main.toString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) extend
case Some(appId) =>
apps.remove(appId)
master.send(UnregisterApplication(appId))
case None =>
}
driverIdToAppId.remove(driverId)
}
Expand Down Expand Up @@ -575,7 +576,7 @@ class MasterSuite extends SparkFunSuite
override val rpcEnv: RpcEnv = master.rpcEnv

override def receive: PartialFunction[Any, Unit] = {
case KillExecutor(_, appId, execId) => killedExecutors.add(appId, execId)
case KillExecutor(_, appId, execId) => killedExecutors.add((appId, execId))
case KillDriver(driverId) => killedDrivers.add(driverId)
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}

import scala.collection.mutable.Map
import scala.concurrent.duration._
import scala.language.postfixOps

import org.mockito.ArgumentCaptor
import org.mockito.Matchers.{any, eq => meq}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.rdd

import scala.concurrent.duration._
import scala.language.postfixOps

import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
locality: TaskLocality.Value): Option[(Int, TaskLocality.Value)] = {
if (!hasDequeuedSpeculatedTask) {
hasDequeuedSpeculatedTask = true
Some(0, TaskLocality.PROCESS_LOCAL)
Some((0, TaskLocality.PROCESS_LOCAL))
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val d2 = d.map { i => w(i) -> i * 2 }.setName("shuffle input 1")
val d3 = d.map { i => w(i) -> (0 to (i % 5)) }.setName("shuffle input 2")
val d4 = d2.cogroup(d3, numSlices).map { case (k, (v1, v2)) =>
w(k) -> (v1.size, v2.size)
(w(k), (v1.size, v2.size))
}
d4.setName("A Cogroup")
d4.collectAsMap()
Expand Down
Loading

0 comments on commit 425c4ad

Please sign in to comment.