Skip to content

Commit afee902

Browse files
committed
Attempt to fix streaming test failures after yarn branch merge
1 parent 1f20ef2 commit afee902

File tree

10 files changed

+42
-10
lines changed

10 files changed

+42
-10
lines changed

bagel/src/test/scala/bagel/BagelSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
2323
}
2424
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
2525
System.clearProperty("spark.driver.port")
26+
System.clearProperty("spark.hostPort")
2627
}
2728

2829
test("halting by voting") {

core/src/test/scala/spark/LocalSparkContext.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ object LocalSparkContext {
2727
sc.stop()
2828
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
2929
System.clearProperty("spark.driver.port")
30+
System.clearProperty("spark.hostPort")
3031
}
3132

3233
/** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */
@@ -38,4 +39,4 @@ object LocalSparkContext {
3839
}
3940
}
4041

41-
}
42+
}

repl/src/test/scala/spark/repl/ReplSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ class ReplSuite extends FunSuite {
3232
interp.sparkContext.stop()
3333
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
3434
System.clearProperty("spark.driver.port")
35+
System.clearProperty("spark.hostPort")
3536
return out.toString
3637
}
3738

streaming/src/main/scala/spark/streaming/Checkpoint.scala

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,28 +38,43 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
3838
private[streaming]
3939
class CheckpointWriter(checkpointDir: String) extends Logging {
4040
val file = new Path(checkpointDir, "graph")
41+
// The file to which we actually write - and then "move" to file.
42+
private val writeFile = new Path(file.getParent, file.getName + ".next")
43+
private val bakFile = new Path(file.getParent, file.getName + ".bk")
44+
45+
@volatile private var stopped = false
46+
4147
val conf = new Configuration()
4248
var fs = file.getFileSystem(conf)
4349
val maxAttempts = 3
4450
val executor = Executors.newFixedThreadPool(1)
4551

52+
// Removed code which validates whether there is only one CheckpointWriter per path 'file' since
53+
// I did not notice any errors - reintroduce it ?
54+
4655
class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable {
4756
def run() {
4857
var attempts = 0
4958
val startTime = System.currentTimeMillis()
5059
while (attempts < maxAttempts) {
60+
if (stopped) {
61+
logInfo("Already stopped, ignore checkpoint attempt for " + file)
62+
return
63+
}
5164
attempts += 1
5265
try {
5366
logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'")
54-
if (fs.exists(file)) {
55-
val bkFile = new Path(file.getParent, file.getName + ".bk")
56-
FileUtil.copy(fs, file, fs, bkFile, true, true, conf)
57-
logDebug("Moved existing checkpoint file to " + bkFile)
58-
}
59-
val fos = fs.create(file)
67+
// This is inherently thread unsafe .. so alleviating it by writing to '.new' and then doing moves : which should be pretty fast.
68+
val fos = fs.create(writeFile)
6069
fos.write(bytes)
6170
fos.close()
62-
fos.close()
71+
if (fs.exists(file) && fs.rename(file, bakFile)) {
72+
logDebug("Moved existing checkpoint file to " + bakFile)
73+
}
74+
// paranoia
75+
fs.delete(file, false)
76+
fs.rename(writeFile, file)
77+
6378
val finishTime = System.currentTimeMillis();
6479
logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file +
6580
"', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds")
@@ -84,6 +99,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
8499
}
85100

86101
def stop() {
102+
stopped = true
87103
executor.shutdown()
88104
}
89105
}

streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ object MasterFailureTest extends Logging {
7474

7575
val operation = (st: DStream[String]) => {
7676
val updateFunc = (values: Seq[Long], state: Option[Long]) => {
77+
logInfo("UpdateFunc .. state = " + state.getOrElse(0L) + ", values = " + values)
7778
Some(values.foldLeft(0L)(_ + _) + state.getOrElse(0L))
7879
}
7980
st.flatMap(_.split(" "))
@@ -159,6 +160,7 @@ object MasterFailureTest extends Logging {
159160

160161
// Setup the streaming computation with the given operation
161162
System.clearProperty("spark.driver.port")
163+
System.clearProperty("spark.hostPort")
162164
var ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map())
163165
ssc.checkpoint(checkpointDir.toString)
164166
val inputStream = ssc.textFileStream(testDir.toString)
@@ -205,6 +207,7 @@ object MasterFailureTest extends Logging {
205207
// (iii) Its not timed out yet
206208
System.clearProperty("spark.streaming.clock")
207209
System.clearProperty("spark.driver.port")
210+
System.clearProperty("spark.hostPort")
208211
ssc.start()
209212
val startTime = System.currentTimeMillis()
210213
while (!killed && !isLastOutputGenerated && !isTimedOut) {
@@ -357,13 +360,16 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
357360
// Write the data to a local file and then move it to the target test directory
358361
val localFile = new File(localTestDir, (i+1).toString)
359362
val hadoopFile = new Path(testDir, (i+1).toString)
363+
val tempHadoopFile = new Path(testDir, ".tmp_" + (i+1).toString)
360364
FileUtils.writeStringToFile(localFile, input(i).toString + "\n")
361365
var tries = 0
362366
var done = false
363367
while (!done && tries < maxTries) {
364368
tries += 1
365369
try {
366-
fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile)
370+
// fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile)
371+
fs.copyFromLocalFile(new Path(localFile.toString), tempHadoopFile)
372+
fs.rename(tempHadoopFile, hadoopFile)
367373
done = true
368374
} catch {
369375
case ioe: IOException => {

streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ class BasicOperationsSuite extends TestSuiteBase {
1515
after {
1616
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
1717
System.clearProperty("spark.driver.port")
18+
System.clearProperty("spark.hostPort")
1819
}
1920

2021
test("map") {

streaming/src/test/scala/spark/streaming/CheckpointSuite.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
3131

3232
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
3333
System.clearProperty("spark.driver.port")
34+
System.clearProperty("spark.hostPort")
3435
}
3536

3637
var ssc: StreamingContext = null
@@ -325,6 +326,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
325326
)
326327
ssc = new StreamingContext(checkpointDir)
327328
System.clearProperty("spark.driver.port")
329+
System.clearProperty("spark.hostPort")
328330
ssc.start()
329331
val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches)
330332
// the first element will be re-processed data of the last batch before restart
@@ -350,4 +352,4 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
350352
val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
351353
outputStream.output
352354
}
353-
}
355+
}

streaming/src/test/scala/spark/streaming/FailureSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@ class FailureSuite extends FunSuite with BeforeAndAfter with Logging {
2222
val batchDuration = Milliseconds(1000)
2323

2424
before {
25+
logInfo("BEFORE ...")
2526
FileUtils.deleteDirectory(new File(directory))
2627
}
2728

2829
after {
30+
logInfo("AFTER ...")
2931
FileUtils.deleteDirectory(new File(directory))
3032
}
3133

streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
4141
after {
4242
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
4343
System.clearProperty("spark.driver.port")
44+
System.clearProperty("spark.hostPort")
4445
}
4546

4647

streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ class WindowOperationsSuite extends TestSuiteBase {
1616
after {
1717
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
1818
System.clearProperty("spark.driver.port")
19+
System.clearProperty("spark.hostPort")
1920
}
2021

2122
val largerSlideInput = Seq(

0 commit comments

Comments
 (0)