Skip to content

Commit e1ea806

Browse files
srowendongjoon-hyun
authored andcommitted
[SPARK-29291][CORE][SQL][STREAMING][MLLIB] Change procedure-like declaration to function + Unit for 2.13
### What changes were proposed in this pull request? Scala 2.13 emits a deprecation warning for procedure-like declarations: ``` def foo() { ... ``` This is equivalent to the following, so should be changed to avoid a warning: ``` def foo(): Unit = { ... ``` ### Why are the changes needed? It will avoid about a thousand compiler warnings when we start to support Scala 2.13. I wanted to make the change in 3.0 as there are less likely to be back-ports from 3.0 to 2.4 than 3.1 to 3.0, for example, minimizing that downside to touching so many files. Unfortunately, that makes this quite a big change. ### Does this PR introduce any user-facing change? No behavior change at all. ### How was this patch tested? Existing tests. Closes apache#25968 from srowen/SPARK-29291. Authored-by: Sean Owen <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 76791b8 commit e1ea806

File tree

538 files changed

+1453
-1409
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

538 files changed

+1453
-1409
lines changed

core/src/main/scala/org/apache/spark/ContextCleaner.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
7171

7272
private val listeners = new ConcurrentLinkedQueue[CleanerListener]()
7373

74-
private val cleaningThread = new Thread() { override def run() { keepCleaning() }}
74+
private val cleaningThread = new Thread() { override def run(): Unit = keepCleaning() }
7575

7676
private val periodicGCService: ScheduledExecutorService =
7777
ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc")

core/src/main/scala/org/apache/spark/FutureAction.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
115115

116116
@volatile private var _cancelled: Boolean = false
117117

118-
override def cancel() {
118+
override def cancel(): Unit = {
119119
_cancelled = true
120120
jobWaiter.cancel()
121121
}
@@ -132,7 +132,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
132132
value.get.get
133133
}
134134

135-
override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext) {
135+
override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext): Unit = {
136136
jobWaiter.completionFuture onComplete {_ => func(value.get)}
137137
}
138138

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

+10-10
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolE
2222
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
2323

2424
import scala.collection.JavaConverters._
25-
import scala.collection.mutable.{HashMap, HashSet, ListBuffer, Map}
25+
import scala.collection.mutable.{HashMap, ListBuffer, Map}
2626
import scala.concurrent.{ExecutionContext, Future}
2727
import scala.concurrent.duration.Duration
2828
import scala.reflect.ClassTag
@@ -272,7 +272,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
272272
}
273273

274274
/** Send a one-way message to the trackerEndpoint, to which we expect it to reply with true. */
275-
protected def sendTracker(message: Any) {
275+
protected def sendTracker(message: Any): Unit = {
276276
val response = askTracker[Boolean](message)
277277
if (response != true) {
278278
throw new SparkException(
@@ -307,7 +307,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
307307
*/
308308
def unregisterShuffle(shuffleId: Int): Unit
309309

310-
def stop() {}
310+
def stop(): Unit = {}
311311
}
312312

313313
/**
@@ -416,18 +416,18 @@ private[spark] class MapOutputTrackerMaster(
416416
shuffleStatuses.valuesIterator.count(_.hasCachedSerializedBroadcast)
417417
}
418418

419-
def registerShuffle(shuffleId: Int, numMaps: Int) {
419+
def registerShuffle(shuffleId: Int, numMaps: Int): Unit = {
420420
if (shuffleStatuses.put(shuffleId, new ShuffleStatus(numMaps)).isDefined) {
421421
throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice")
422422
}
423423
}
424424

425-
def registerMapOutput(shuffleId: Int, mapIndex: Int, status: MapStatus) {
425+
def registerMapOutput(shuffleId: Int, mapIndex: Int, status: MapStatus): Unit = {
426426
shuffleStatuses(shuffleId).addMapOutput(mapIndex, status)
427427
}
428428

429429
/** Unregister map output information of the given shuffle, mapper and block manager */
430-
def unregisterMapOutput(shuffleId: Int, mapIndex: Int, bmAddress: BlockManagerId) {
430+
def unregisterMapOutput(shuffleId: Int, mapIndex: Int, bmAddress: BlockManagerId): Unit = {
431431
shuffleStatuses.get(shuffleId) match {
432432
case Some(shuffleStatus) =>
433433
shuffleStatus.removeMapOutput(mapIndex, bmAddress)
@@ -438,7 +438,7 @@ private[spark] class MapOutputTrackerMaster(
438438
}
439439

440440
/** Unregister all map output information of the given shuffle. */
441-
def unregisterAllMapOutput(shuffleId: Int) {
441+
def unregisterAllMapOutput(shuffleId: Int): Unit = {
442442
shuffleStatuses.get(shuffleId) match {
443443
case Some(shuffleStatus) =>
444444
shuffleStatus.removeOutputsByFilter(x => true)
@@ -450,7 +450,7 @@ private[spark] class MapOutputTrackerMaster(
450450
}
451451

452452
/** Unregister shuffle data */
453-
def unregisterShuffle(shuffleId: Int) {
453+
def unregisterShuffle(shuffleId: Int): Unit = {
454454
shuffleStatuses.remove(shuffleId).foreach { shuffleStatus =>
455455
shuffleStatus.invalidateSerializedMapOutputStatusCache()
456456
}
@@ -633,7 +633,7 @@ private[spark] class MapOutputTrackerMaster(
633633
None
634634
}
635635

636-
def incrementEpoch() {
636+
def incrementEpoch(): Unit = {
637637
epochLock.synchronized {
638638
epoch += 1
639639
logDebug("Increasing epoch to " + epoch)
@@ -667,7 +667,7 @@ private[spark] class MapOutputTrackerMaster(
667667
}
668668
}
669669

670-
override def stop() {
670+
override def stop(): Unit = {
671671
mapOutputRequests.offer(PoisonPill)
672672
threadpool.shutdown()
673673
sendTracker(StopMapOutputTracker)

core/src/main/scala/org/apache/spark/SecurityManager.scala

+8-8
Original file line numberDiff line numberDiff line change
@@ -108,20 +108,20 @@ private[spark] class SecurityManager(
108108
* Admin acls should be set before the view or modify acls. If you modify the admin
109109
* acls you should also set the view and modify acls again to pick up the changes.
110110
*/
111-
def setViewAcls(defaultUsers: Set[String], allowedUsers: Seq[String]) {
111+
def setViewAcls(defaultUsers: Set[String], allowedUsers: Seq[String]): Unit = {
112112
viewAcls = adminAcls ++ defaultUsers ++ allowedUsers
113113
logInfo("Changing view acls to: " + viewAcls.mkString(","))
114114
}
115115

116-
def setViewAcls(defaultUser: String, allowedUsers: Seq[String]) {
116+
def setViewAcls(defaultUser: String, allowedUsers: Seq[String]): Unit = {
117117
setViewAcls(Set[String](defaultUser), allowedUsers)
118118
}
119119

120120
/**
121121
* Admin acls groups should be set before the view or modify acls groups. If you modify the admin
122122
* acls groups you should also set the view and modify acls groups again to pick up the changes.
123123
*/
124-
def setViewAclsGroups(allowedUserGroups: Seq[String]) {
124+
def setViewAclsGroups(allowedUserGroups: Seq[String]): Unit = {
125125
viewAclsGroups = adminAclsGroups ++ allowedUserGroups
126126
logInfo("Changing view acls groups to: " + viewAclsGroups.mkString(","))
127127
}
@@ -149,7 +149,7 @@ private[spark] class SecurityManager(
149149
* Admin acls should be set before the view or modify acls. If you modify the admin
150150
* acls you should also set the view and modify acls again to pick up the changes.
151151
*/
152-
def setModifyAcls(defaultUsers: Set[String], allowedUsers: Seq[String]) {
152+
def setModifyAcls(defaultUsers: Set[String], allowedUsers: Seq[String]): Unit = {
153153
modifyAcls = adminAcls ++ defaultUsers ++ allowedUsers
154154
logInfo("Changing modify acls to: " + modifyAcls.mkString(","))
155155
}
@@ -158,7 +158,7 @@ private[spark] class SecurityManager(
158158
* Admin acls groups should be set before the view or modify acls groups. If you modify the admin
159159
* acls groups you should also set the view and modify acls groups again to pick up the changes.
160160
*/
161-
def setModifyAclsGroups(allowedUserGroups: Seq[String]) {
161+
def setModifyAclsGroups(allowedUserGroups: Seq[String]): Unit = {
162162
modifyAclsGroups = adminAclsGroups ++ allowedUserGroups
163163
logInfo("Changing modify acls groups to: " + modifyAclsGroups.mkString(","))
164164
}
@@ -186,7 +186,7 @@ private[spark] class SecurityManager(
186186
* Admin acls should be set before the view or modify acls. If you modify the admin
187187
* acls you should also set the view and modify acls again to pick up the changes.
188188
*/
189-
def setAdminAcls(adminUsers: Seq[String]) {
189+
def setAdminAcls(adminUsers: Seq[String]): Unit = {
190190
adminAcls = adminUsers.toSet
191191
logInfo("Changing admin acls to: " + adminAcls.mkString(","))
192192
}
@@ -195,12 +195,12 @@ private[spark] class SecurityManager(
195195
* Admin acls groups should be set before the view or modify acls groups. If you modify the admin
196196
* acls groups you should also set the view and modify acls groups again to pick up the changes.
197197
*/
198-
def setAdminAclsGroups(adminUserGroups: Seq[String]) {
198+
def setAdminAclsGroups(adminUserGroups: Seq[String]): Unit = {
199199
adminAclsGroups = adminUserGroups.toSet
200200
logInfo("Changing admin acls groups to: " + adminAclsGroups.mkString(","))
201201
}
202202

203-
def setAcls(aclSetting: Boolean) {
203+
def setAcls(aclSetting: Boolean): Unit = {
204204
aclsOn = aclSetting
205205
logInfo("Changing acls enabled to: " + aclsOn)
206206
}

core/src/main/scala/org/apache/spark/SparkConf.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
504504
* Checks for illegal or deprecated config settings. Throws an exception for the former. Not
505505
* idempotent - may mutate this conf object to convert deprecated settings to supported ones.
506506
*/
507-
private[spark] def validateSettings() {
507+
private[spark] def validateSettings(): Unit = {
508508
if (contains("spark.local.dir")) {
509509
val msg = "Note that spark.local.dir will be overridden by the value set by " +
510510
"the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS" +

core/src/main/scala/org/apache/spark/SparkContext.scala

+22-23
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ class SparkContext(config: SparkConf) extends Logging {
366366
* @param logLevel The desired log level as a string.
367367
* Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
368368
*/
369-
def setLogLevel(logLevel: String) {
369+
def setLogLevel(logLevel: String): Unit = {
370370
// let's allow lowercase or mixed case too
371371
val upperCased = logLevel.toUpperCase(Locale.ROOT)
372372
require(SparkContext.VALID_LOG_LEVELS.contains(upperCased),
@@ -661,7 +661,7 @@ class SparkContext(config: SparkConf) extends Logging {
661661

662662
private[spark] def getLocalProperties: Properties = localProperties.get()
663663

664-
private[spark] def setLocalProperties(props: Properties) {
664+
private[spark] def setLocalProperties(props: Properties): Unit = {
665665
localProperties.set(props)
666666
}
667667

@@ -676,7 +676,7 @@ class SparkContext(config: SparkConf) extends Logging {
676676
* implementation of thread pools have worker threads spawn other worker threads.
677677
* As a result, local properties may propagate unpredictably.
678678
*/
679-
def setLocalProperty(key: String, value: String) {
679+
def setLocalProperty(key: String, value: String): Unit = {
680680
if (value == null) {
681681
localProperties.get.remove(key)
682682
} else {
@@ -692,7 +692,7 @@ class SparkContext(config: SparkConf) extends Logging {
692692
Option(localProperties.get).map(_.getProperty(key)).orNull
693693

694694
/** Set a human readable description of the current job. */
695-
def setJobDescription(value: String) {
695+
def setJobDescription(value: String): Unit = {
696696
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
697697
}
698698

@@ -720,7 +720,8 @@ class SparkContext(config: SparkConf) extends Logging {
720720
* are actually stopped in a timely manner, but is off by default due to HDFS-1208, where HDFS
721721
* may respond to Thread.interrupt() by marking nodes as dead.
722722
*/
723-
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false) {
723+
def setJobGroup(groupId: String,
724+
description: String, interruptOnCancel: Boolean = false): Unit = {
724725
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, description)
725726
setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
726727
// Note: Specifying interruptOnCancel in setJobGroup (rather than cancelJobGroup) avoids
@@ -731,7 +732,7 @@ class SparkContext(config: SparkConf) extends Logging {
731732
}
732733

733734
/** Clear the current thread's job group ID and its description. */
734-
def clearJobGroup() {
735+
def clearJobGroup(): Unit = {
735736
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, null)
736737
setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, null)
737738
setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, null)
@@ -1559,7 +1560,7 @@ class SparkContext(config: SparkConf) extends Logging {
15591560
* Register a listener to receive up-calls from events that happen during execution.
15601561
*/
15611562
@DeveloperApi
1562-
def addSparkListener(listener: SparkListenerInterface) {
1563+
def addSparkListener(listener: SparkListenerInterface): Unit = {
15631564
listenerBus.addToSharedQueue(listener)
15641565
}
15651566

@@ -1788,14 +1789,14 @@ class SparkContext(config: SparkConf) extends Logging {
17881789
/**
17891790
* Register an RDD to be persisted in memory and/or disk storage
17901791
*/
1791-
private[spark] def persistRDD(rdd: RDD[_]) {
1792+
private[spark] def persistRDD(rdd: RDD[_]): Unit = {
17921793
persistentRdds(rdd.id) = rdd
17931794
}
17941795

17951796
/**
17961797
* Unpersist an RDD from memory and/or disk storage
17971798
*/
1798-
private[spark] def unpersistRDD(rddId: Int, blocking: Boolean) {
1799+
private[spark] def unpersistRDD(rddId: Int, blocking: Boolean): Unit = {
17991800
env.blockManager.master.removeRdd(rddId, blocking)
18001801
persistentRdds.remove(rddId)
18011802
listenerBus.post(SparkListenerUnpersistRDD(rddId))
@@ -1811,7 +1812,7 @@ class SparkContext(config: SparkConf) extends Logging {
18111812
*
18121813
* @note A path can be added only once. Subsequent additions of the same path are ignored.
18131814
*/
1814-
def addJar(path: String) {
1815+
def addJar(path: String): Unit = {
18151816
def addLocalJarFile(file: File): String = {
18161817
try {
18171818
if (!file.exists()) {
@@ -2018,15 +2019,15 @@ class SparkContext(config: SparkConf) extends Logging {
20182019
* Set the thread-local property for overriding the call sites
20192020
* of actions and RDDs.
20202021
*/
2021-
def setCallSite(shortCallSite: String) {
2022+
def setCallSite(shortCallSite: String): Unit = {
20222023
setLocalProperty(CallSite.SHORT_FORM, shortCallSite)
20232024
}
20242025

20252026
/**
20262027
* Set the thread-local property for overriding the call sites
20272028
* of actions and RDDs.
20282029
*/
2029-
private[spark] def setCallSite(callSite: CallSite) {
2030+
private[spark] def setCallSite(callSite: CallSite): Unit = {
20302031
setLocalProperty(CallSite.SHORT_FORM, callSite.shortForm)
20312032
setLocalProperty(CallSite.LONG_FORM, callSite.longForm)
20322033
}
@@ -2035,7 +2036,7 @@ class SparkContext(config: SparkConf) extends Logging {
20352036
* Clear the thread-local property for overriding the call sites
20362037
* of actions and RDDs.
20372038
*/
2038-
def clearCallSite() {
2039+
def clearCallSite(): Unit = {
20392040
setLocalProperty(CallSite.SHORT_FORM, null)
20402041
setLocalProperty(CallSite.LONG_FORM, null)
20412042
}
@@ -2155,8 +2156,7 @@ class SparkContext(config: SparkConf) extends Logging {
21552156
def runJob[T, U: ClassTag](
21562157
rdd: RDD[T],
21572158
processPartition: (TaskContext, Iterator[T]) => U,
2158-
resultHandler: (Int, U) => Unit)
2159-
{
2159+
resultHandler: (Int, U) => Unit): Unit = {
21602160
runJob[T, U](rdd, processPartition, 0 until rdd.partitions.length, resultHandler)
21612161
}
21622162

@@ -2170,8 +2170,7 @@ class SparkContext(config: SparkConf) extends Logging {
21702170
def runJob[T, U: ClassTag](
21712171
rdd: RDD[T],
21722172
processPartition: Iterator[T] => U,
2173-
resultHandler: (Int, U) => Unit)
2174-
{
2173+
resultHandler: (Int, U) => Unit): Unit = {
21752174
val processFunc = (context: TaskContext, iter: Iterator[T]) => processPartition(iter)
21762175
runJob[T, U](rdd, processFunc, 0 until rdd.partitions.length, resultHandler)
21772176
}
@@ -2256,13 +2255,13 @@ class SparkContext(config: SparkConf) extends Logging {
22562255
* Cancel active jobs for the specified group. See `org.apache.spark.SparkContext.setJobGroup`
22572256
* for more information.
22582257
*/
2259-
def cancelJobGroup(groupId: String) {
2258+
def cancelJobGroup(groupId: String): Unit = {
22602259
assertNotStopped()
22612260
dagScheduler.cancelJobGroup(groupId)
22622261
}
22632262

22642263
/** Cancel all jobs that have been scheduled or are running. */
2265-
def cancelAllJobs() {
2264+
def cancelAllJobs(): Unit = {
22662265
assertNotStopped()
22672266
dagScheduler.cancelAllJobs()
22682267
}
@@ -2350,7 +2349,7 @@ class SparkContext(config: SparkConf) extends Logging {
23502349
* @param directory path to the directory where checkpoint files will be stored
23512350
* (must be HDFS path if running in cluster)
23522351
*/
2353-
def setCheckpointDir(directory: String) {
2352+
def setCheckpointDir(directory: String): Unit = {
23542353

23552354
// If we are running on a cluster, log a warning if the directory is local.
23562355
// Otherwise, the driver may attempt to reconstruct the checkpointed RDD from
@@ -2422,7 +2421,7 @@ class SparkContext(config: SparkConf) extends Logging {
24222421
}
24232422

24242423
/** Post the application start event */
2425-
private def postApplicationStart() {
2424+
private def postApplicationStart(): Unit = {
24262425
// Note: this code assumes that the task scheduler has been initialized and has contacted
24272426
// the cluster manager to get an application ID (in case the cluster manager provides one).
24282427
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
@@ -2432,12 +2431,12 @@ class SparkContext(config: SparkConf) extends Logging {
24322431
}
24332432

24342433
/** Post the application end event */
2435-
private def postApplicationEnd() {
2434+
private def postApplicationEnd(): Unit = {
24362435
listenerBus.post(SparkListenerApplicationEnd(System.currentTimeMillis))
24372436
}
24382437

24392438
/** Post the environment update event once the task scheduler is ready */
2440-
private def postEnvironmentUpdate() {
2439+
private def postEnvironmentUpdate(): Unit = {
24412440
if (taskScheduler != null) {
24422441
val schedulingMode = getSchedulingMode.toString
24432442
val addedJarPaths = addedJars.keys.toSeq

0 commit comments

Comments
 (0)