1818package org .apache .spark .streaming
1919
2020import java .io .File
21- import java .util .concurrent .{TimeUnit , Semaphore }
2221
2322import scala .collection .mutable .ArrayBuffer
2423import scala .reflect .ClassTag
@@ -277,44 +276,47 @@ class CheckpointSuite extends TestSuiteBase {
277276 // the master failure and uses them again to process a large window operation.
278277 // It also tests whether batches, whose processing was incomplete due to the
279278 // failure, are re-processed or not.
280- test(" recovery with file input stream " ) {
279+ test(" recovery with file input ztream " ) {
281280 // Set up the streaming context and input streams
281+ val batchDuration = Seconds (2 ) // Due to 1-second resolution of setLastModified() on some OS's.
282282 val testDir = Utils .createTempDir()
283- var ssc = new StreamingContext (conf, Seconds ( 1 ) )
283+ var ssc = new StreamingContext (conf, batchDuration )
284284 ssc.checkpoint(checkpointDir)
285- val clock = ssc.scheduler.clock.asInstanceOf [ManualClock ]
285+ // This is a var because it's re-assigned when we restart from a checkpoint:
286+ var clock = ssc.scheduler.clock.asInstanceOf [ManualClock ]
287+ clock.setTime(1000000 ) // So that we don't have negative offsets due to windowing
286288 val waiter = new StreamingTestWaiter (ssc)
287289 val fileStream = ssc.textFileStream(testDir.toString)
288290 // Making value 3 take large time to process, to ensure that the master
289291 // shuts down in the middle of processing the 3rd batch
290292 val mappedStream = fileStream.map(s => {
291293 val i = s.toInt
292- if (i == 3 ) Thread .sleep(2000 )
294+ if (i == 3 ) Thread .sleep(4000 )
293295 i
294296 })
295297
296298 // Reducing over a large window to ensure that recovery from master failure
297299 // requires reprocessing of all the files seen before the failure
298- val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds ( 30 ), Seconds ( 1 ) )
300+ val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30 , batchDuration )
299301 val outputBuffer = new ArrayBuffer [Seq [Int ]]
300302 var outputStream = new TestOutputStream (reducedStream, outputBuffer)
301303 outputStream.register()
302304 ssc.start()
303305
306+ clock.addToTime(batchDuration.milliseconds)
304307 // Create files and advance manual clock to process them
305- clock.addToTime(1000 )
306308 for (i <- Seq (1 , 2 , 3 )) {
307309 val file = new File (testDir, i.toString)
308310 Files .write(i + " \n " , file, Charsets .UTF_8 )
309- assert(file.setLastModified(clock.currentTime()))
311+ assert(file.setLastModified(clock.currentTime()) && file.lastModified() === clock.currentTime() )
310312 clock.addToTime(batchDuration.milliseconds)
311313 if (i != 3 ) { // Since we want to shut down while the 3rd batch is processing
312314 waiter.waitForTotalBatchesCompleted(i, Seconds (10 ))
313315 }
314316 }
315317 clock.addToTime(batchDuration.milliseconds)
316318 waiter.waitForTotalBatchesStarted(3 , Seconds (10 ))
317- Thread .sleep(100 )
319+ Thread .sleep(1000 ) // To wait for execution to actually begin
318320 logInfo(" Output = " + outputStream.output.mkString(" ," ))
319321 assert(outputStream.output.size > 0 , " No files processed before restart" )
320322 ssc.stop()
@@ -330,14 +332,20 @@ class CheckpointSuite extends TestSuiteBase {
330332 for (i <- Seq (4 , 5 , 6 )) {
331333 val file = new File (testDir, i.toString)
332334 Files .write(i + " \n " , file, Charsets .UTF_8 )
333- assert(file.setLastModified(clock.currentTime()))
335+ assert(file.setLastModified(clock.currentTime()) && file.lastModified() === clock.currentTime() )
334336 clock.addToTime(1000 )
335337 }
336338
337339 // Recover context from checkpoint file and verify whether the files that were
338340 // recorded before failure were saved and successfully recovered
339341 logInfo(" *********** RESTARTING ************" )
340342 ssc = new StreamingContext (checkpointDir)
343+ // Copy over the time from the old clock so that we don't appear to have time-traveled:
344+ clock = {
345+ val newClock = ssc.scheduler.clock.asInstanceOf [ManualClock ]
346+ newClock.setTime(clock.currentTime())
347+ newClock
348+ }
341349 fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf [FileInputDStream [_, _, _]]
342350 assert(recordedFiles.exists(_.endsWith(" 1" )))
343351 assert(recordedFiles.exists(_.endsWith(" 2" )))
@@ -346,11 +354,11 @@ class CheckpointSuite extends TestSuiteBase {
346354 // Restart stream computation
347355 val postRestartWaiter = new StreamingTestWaiter (ssc)
348356 ssc.start()
349- clock.addToTime(1000 )
357+ clock.addToTime(batchDuration.milliseconds )
350358 for ((i, index) <- Seq (7 , 8 , 9 ).zipWithIndex) {
351359 val file = new File (testDir, i.toString)
352360 Files .write(i + " \n " , file, Charsets .UTF_8 )
353- assert(file.setLastModified(clock.currentTime()))
361+ assert(file.setLastModified(clock.currentTime()) && file.lastModified() === clock.currentTime() )
354362 clock.addToTime(batchDuration.milliseconds)
355363 postRestartWaiter.waitForTotalBatchesCompleted(index + 1 , Seconds (10 ))
356364 }
0 commit comments