diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index c43e1f2fe135..f9c54aa7df2a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -25,6 +25,7 @@ import scala.collection.{Map, mutable}
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
+import scala.util.DynamicVariable
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import org.apache.hadoop.conf.{Configurable, Configuration}
@@ -961,7 +962,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val outfmt = job.getOutputFormatClass
val jobFormat = outfmt.newInstance
- if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
+ val validationDisabled = PairRDDFunctions.disableOutputSpecValidation.value
+ if (!validationDisabled && self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
// FileOutputFormat ignores the filesystem parameter
jobFormat.checkOutputSpecs(job)
}
@@ -1039,7 +1041,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
valueClass.getSimpleName + ")")
- if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
+ val validationDisabled = PairRDDFunctions.disableOutputSpecValidation.value
+ if (!validationDisabled && self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
// FileOutputFormat ignores the filesystem parameter
val ignoredFs = FileSystem.get(hadoopConf)
hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)
@@ -1118,4 +1121,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
private[spark] object PairRDDFunctions {
val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256
+ /**
+ * Used by Spark Streaming in order to bypass the `spark.hadoop.validateOutputSpecs` checks
+ * for save actions launched by Spark Streaming, since the validation may break Spark Streaming's
+ * ability to recover from checkpoints. See SPARK-4835 for more details.
+ */
+ val disableOutputSpecValidation: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false)
}
diff --git a/docs/configuration.md b/docs/configuration.md
index acee267883ed..8d8e45bf1d2d 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -709,7 +709,9 @@ Apart from these, the following properties are also available, and may be useful
If set to true, validates the output specification (e.g. checking if the output directory already exists)
used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing
output directories. We recommend that users do not disable this except if trying to achieve compatibility with
- previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand. |
+ previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.
+ This setting is ignored for jobs launched by the Spark Streaming scheduler, since data may need
+ to be written to a pre-existing output directory during checkpoint recovery.
spark.hadoop.cloneConf |
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index 13943ed5442b..549ab8889d37 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -19,13 +19,13 @@ package org.apache.spark.streaming.flume
import java.net.{InetSocketAddress, ServerSocket}
import java.nio.ByteBuffer
-import java.nio.charset.Charset
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.concurrent.duration._
import scala.language.postfixOps
+import com.google.common.base.Charsets
import org.apache.avro.ipc.NettyTransceiver
import org.apache.avro.ipc.specific.SpecificRequestor
import org.apache.flume.source.avro
@@ -138,7 +138,7 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L
status should be (avro.Status.OK)
}
- val decoder = Charset.forName("UTF-8").newDecoder()
+ val decoder = Charsets.UTF_8.newDecoder()
eventually(timeout(10 seconds), interval(100 milliseconds)) {
val outputEvents = outputBuffer.flatten.map { _.event }
outputEvents.foreach {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
index ad4f3fdd14ad..621da4f13bbe 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
@@ -47,14 +47,3 @@ class Interval(val beginTime: Time, val endTime: Time) {
override def toString = "[" + beginTime + ", " + endTime + "]"
}
-
-private[streaming]
-object Interval {
- def currentInterval(duration: Duration): Interval = {
- val time = new Time(System.currentTimeMillis)
- val intervalBegin = time.floor(duration)
- new Interval(intervalBegin, intervalBegin + duration)
- }
-}
-
-
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 5f13fdc5579e..f44e561c7bf8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -74,12 +74,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
newFilesOnly: Boolean = true)
extends InputDStream[(K, V)](ssc_) {
+ // This is a def so that it works during checkpoint recovery:
+ private def clock = ssc.scheduler.clock
+
// Data to be saved as part of the streaming checkpoints
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
// Initial ignore threshold based on which old, existing files in the directory (at the time of
// starting the streaming application) will be ignored or considered
- private val initialModTimeIgnoreThreshold = if (newFilesOnly) System.currentTimeMillis() else 0L
+ private val initialModTimeIgnoreThreshold = if (newFilesOnly) clock.currentTime() else 0L
/*
* Make sure that the information of files selected in the last few batches are remembered.
@@ -151,7 +154,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
*/
private def findNewFiles(currentTime: Long): Array[String] = {
try {
- lastNewFileFindingTime = System.currentTimeMillis
+ lastNewFileFindingTime = clock.currentTime()
// Calculate ignore threshold
val modTimeIgnoreThreshold = math.max(
@@ -164,7 +167,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
}
val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
- val timeTaken = System.currentTimeMillis - lastNewFileFindingTime
+ val timeTaken = clock.currentTime() - lastNewFileFindingTime
logInfo("Finding new files took " + timeTaken + " ms")
logDebug("# cached file times = " + fileToModTime.size)
if (timeTaken > slideDuration.milliseconds) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index cfa3cd8925c8..0e0f5bd3b9db 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -22,6 +22,7 @@ import scala.collection.JavaConversions._
import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors}
import akka.actor.{ActorRef, Actor, Props}
import org.apache.spark.{SparkException, Logging, SparkEnv}
+import org.apache.spark.rdd.PairRDDFunctions
import org.apache.spark.streaming._
@@ -168,7 +169,12 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
private class JobHandler(job: Job) extends Runnable {
def run() {
eventActor ! JobStarted(job)
- job.run()
+ // Disable checks for existing output directories in jobs launched by the streaming scheduler,
+ // since we may need to write output to an existing directory during checkpoint recovery;
+ // see SPARK-4835 for more details.
+ PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
+ job.run()
+ }
eventActor ! JobCompleted(job)
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
index 7cd867ce34b8..d6d96d7ba00f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
@@ -59,9 +59,11 @@ class SystemClock() extends Clock {
private[streaming]
class ManualClock() extends Clock {
- var time = 0L
+ private var time = 0L
- def currentTime() = time
+ def currentTime() = this.synchronized {
+ time
+ }
def setTime(timeToSet: Long) = {
this.synchronized {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 86b96785d7b8..349630de840a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -639,7 +639,7 @@ class BasicOperationsSuite extends TestSuiteBase {
if (rememberDuration != null) ssc.remember(rememberDuration)
val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput)
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- assert(clock.time === Seconds(10).milliseconds)
+ assert(clock.currentTime() === Seconds(10).milliseconds)
assert(output.size === numExpectedOutput)
operatedStream
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index c97998add8ff..fa49a766ada6 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -18,11 +18,11 @@
package org.apache.spark.streaming
import java.io.File
-import java.nio.charset.Charset
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
+import com.google.common.base.Charsets
import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
@@ -46,8 +46,6 @@ class CheckpointSuite extends TestSuiteBase {
override def batchDuration = Milliseconds(500)
- override def actuallyWait = true // to allow checkpoints to be written
-
override def beforeFunction() {
super.beforeFunction()
Utils.deleteRecursively(new File(checkpointDir))
@@ -61,9 +59,7 @@ class CheckpointSuite extends TestSuiteBase {
test("basic rdd checkpoints + dstream graph checkpoint recovery") {
- assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second")
-
- conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 500 ms")
val stateStreamCheckpointInterval = Seconds(1)
val fs = FileSystem.getLocal(new Configuration())
@@ -88,7 +84,7 @@ class CheckpointSuite extends TestSuiteBase {
// Run till a time such that at least one RDD in the stream should have been checkpointed,
// then check whether some RDD has been checkpointed or not
ssc.start()
- advanceTimeWithRealDelay(ssc, firstNumBatches)
+ advanceTimeWithRealDelay(ssc, firstNumBatches.toInt)
logInfo("Checkpoint data of state stream = \n" + stateStream.checkpointData)
assert(!stateStream.checkpointData.currentCheckpointFiles.isEmpty,
"No checkpointed RDDs in state stream before first failure")
@@ -102,7 +98,7 @@ class CheckpointSuite extends TestSuiteBase {
// Run till a further time such that previous checkpoint files in the stream would be deleted
// and check whether the earlier checkpoint files are deleted
val checkpointFiles = stateStream.checkpointData.currentCheckpointFiles.map(x => new File(x._2))
- advanceTimeWithRealDelay(ssc, secondNumBatches)
+ advanceTimeWithRealDelay(ssc, secondNumBatches.toInt)
checkpointFiles.foreach(file =>
assert(!file.exists, "Checkpoint file '" + file + "' was not deleted"))
ssc.stop()
@@ -154,6 +150,7 @@ class CheckpointSuite extends TestSuiteBase {
val key = "spark.mykey"
val value = "myvalue"
System.setProperty(key, value)
+ // This purposely doesn't use the `conf` from this test suite:
ssc = new StreamingContext(master, framework, batchDuration)
val originalConf = ssc.conf
@@ -281,34 +278,45 @@ class CheckpointSuite extends TestSuiteBase {
// failure, are re-processed or not.
test("recovery with file input stream") {
// Set up the streaming context and input streams
+ val batchDuration = Seconds(2) // Due to 1-second resolution of setLastModified() on some OS's.
val testDir = Utils.createTempDir()
- var ssc = new StreamingContext(master, framework, Seconds(1))
+ var ssc = new StreamingContext(conf, batchDuration)
ssc.checkpoint(checkpointDir)
+ // This is a var because it's re-assigned when we restart from a checkpoint:
+ var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ clock.setTime(1000000) // So that we don't have negative offsets due to windowing
+ val waiter = new StreamingTestWaiter(ssc)
val fileStream = ssc.textFileStream(testDir.toString)
// Making value 3 take large time to process, to ensure that the master
// shuts down in the middle of processing the 3rd batch
val mappedStream = fileStream.map(s => {
val i = s.toInt
- if (i == 3) Thread.sleep(2000)
+ if (i == 3) Thread.sleep(4000)
i
})
// Reducing over a large window to ensure that recovery from master failure
// requires reprocessing of all the files seen before the failure
- val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
+ val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
val outputBuffer = new ArrayBuffer[Seq[Int]]
var outputStream = new TestOutputStream(reducedStream, outputBuffer)
outputStream.register()
ssc.start()
+ clock.addToTime(batchDuration.milliseconds)
// Create files and advance manual clock to process them
- // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- Thread.sleep(1000)
for (i <- Seq(1, 2, 3)) {
- Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
- // wait to make sure that the file is written such that it gets shown in the file listings
- Thread.sleep(1000)
+ val file = new File(testDir, i.toString)
+ Files.write(i + "\n", file, Charsets.UTF_8)
+ assert(file.setLastModified(clock.currentTime()) && file.lastModified() === clock.currentTime())
+ clock.addToTime(batchDuration.milliseconds)
+ if (i != 3) { // Since we want to shut down while the 3rd batch is processing
+ waiter.waitForTotalBatchesCompleted(i, Seconds(10))
+ }
}
+ clock.addToTime(batchDuration.milliseconds)
+ waiter.waitForTotalBatchesStarted(3, Seconds(10))
+ Thread.sleep(1000) // To wait for execution to actually begin
logInfo("Output = " + outputStream.output.mkString(","))
assert(outputStream.output.size > 0, "No files processed before restart")
ssc.stop()
@@ -316,45 +324,58 @@ class CheckpointSuite extends TestSuiteBase {
// Verify whether files created have been recorded correctly or not
var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
- assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
- assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
- assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
+ assert(recordedFiles.exists(_.endsWith("1")))
+ assert(recordedFiles.exists(_.endsWith("2")))
+ assert(recordedFiles.exists(_.endsWith("3")))
// Create files while the master is down
for (i <- Seq(4, 5, 6)) {
- Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
- Thread.sleep(1000)
+ val file = new File(testDir, i.toString)
+ Files.write(i + "\n", file, Charsets.UTF_8)
+ assert(file.setLastModified(clock.currentTime()) && file.lastModified() === clock.currentTime())
+ clock.addToTime(1000)
}
// Recover context from checkpoint file and verify whether the files that were
// recorded before failure were saved and successfully recovered
logInfo("*********** RESTARTING ************")
ssc = new StreamingContext(checkpointDir)
+ // Copy over the time from the old clock so that we don't appear to have time-traveled:
+ clock = {
+ val newClock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ newClock.setTime(clock.currentTime())
+ newClock
+ }
fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
- assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
- assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
- assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
+ assert(recordedFiles.exists(_.endsWith("1")))
+ assert(recordedFiles.exists(_.endsWith("2")))
+ assert(recordedFiles.exists(_.endsWith("3")))
// Restart stream computation
+ val postRestartWaiter = new StreamingTestWaiter(ssc)
ssc.start()
- for (i <- Seq(7, 8, 9)) {
- Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
- Thread.sleep(1000)
+ clock.addToTime(batchDuration.milliseconds)
+ for ((i, index) <- Seq(7, 8, 9).zipWithIndex) {
+ val file = new File(testDir, i.toString)
+ Files.write(i + "\n", file, Charsets.UTF_8)
+ assert(file.setLastModified(clock.currentTime()) && file.lastModified() === clock.currentTime())
+ clock.addToTime(batchDuration.milliseconds)
+ postRestartWaiter.waitForTotalBatchesCompleted(index + 1, Seconds(10))
}
- Thread.sleep(1000)
+ clock.addToTime(batchDuration.milliseconds)
logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
assert(outputStream.output.size > 0, "No files processed after restart")
ssc.stop()
// Verify whether files created while the driver was down have been recorded or not
- assert(!recordedFiles.filter(_.endsWith("4")).isEmpty)
- assert(!recordedFiles.filter(_.endsWith("5")).isEmpty)
- assert(!recordedFiles.filter(_.endsWith("6")).isEmpty)
+ assert(recordedFiles.exists(_.endsWith("4")))
+ assert(recordedFiles.exists(_.endsWith("5")))
+ assert(recordedFiles.exists(_.endsWith("6")))
// Verify whether new files created after recover have been recorded or not
- assert(!recordedFiles.filter(_.endsWith("7")).isEmpty)
- assert(!recordedFiles.filter(_.endsWith("8")).isEmpty)
- assert(!recordedFiles.filter(_.endsWith("9")).isEmpty)
+ assert(recordedFiles.exists(_.endsWith("7")))
+ assert(recordedFiles.exists(_.endsWith("8")))
+ assert(recordedFiles.exists(_.endsWith("9")))
// Append the new output to the old buffer
outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
@@ -409,8 +430,7 @@ class CheckpointSuite extends TestSuiteBase {
ssc.start()
val output = advanceTimeWithRealDelay[V](ssc, initialNumBatches)
ssc.stop()
- verifyOutput[V](output, expectedOutput.take(initialNumBatches), true)
- Thread.sleep(1000)
+ verifyOutput(output, expectedOutput.take(initialNumBatches), useSet = true)
// Restart and complete the computation from checkpoint file
logInfo(
@@ -419,10 +439,13 @@ class CheckpointSuite extends TestSuiteBase {
"\n-------------------------------------------\n"
)
ssc = new StreamingContext(checkpointDir)
+ val waiter = new StreamingTestWaiter(ssc)
ssc.start()
+ // Wait for the last batch before restart to be re-processed:
+ waiter.waitForTotalBatchesCompleted(1, timeout = Durations.seconds(10))
val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches)
// the first element will be re-processed data of the last batch before restart
- verifyOutput[V](outputNew, expectedOutput.takeRight(nextNumExpectedOutputs), true)
+ verifyOutput(outputNew, expectedOutput.takeRight(nextNumExpectedOutputs), useSet = true)
ssc.stop()
ssc = null
}
@@ -431,15 +454,15 @@ class CheckpointSuite extends TestSuiteBase {
* Advances the manual clock on the streaming scheduler by given number of batches.
* It also waits for the expected amount of time for each batch.
*/
- def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
+ def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Int): Seq[Seq[V]] = {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- logInfo("Manual clock before advancing = " + clock.time)
- for (i <- 1 to numBatches.toInt) {
+ val waiter = new StreamingTestWaiter(ssc)
+ logInfo("Manual clock before advancing = " + clock.currentTime())
+ for (i <- 1 to numBatches) {
clock.addToTime(batchDuration.milliseconds)
- Thread.sleep(batchDuration.milliseconds)
+ waiter.waitForTotalBatchesCompleted(i, timeout = Durations.seconds(10))
}
- logInfo("Manual clock after advancing = " + clock.time)
- Thread.sleep(batchDuration.milliseconds)
+ logInfo("Manual clock after advancing = " + clock.currentTime())
val outputStream = ssc.graph.getOutputStreams.filter { dstream =>
dstream.isInstanceOf[TestOutputStreamWithPartitions[V]]
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 307052a4a9cb..39dcf9cf0897 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -17,23 +17,17 @@
package org.apache.spark.streaming
-import akka.actor.Actor
-import akka.actor.Props
-import akka.util.ByteString
-
import java.io.{File, BufferedWriter, OutputStreamWriter}
import java.net.{InetSocketAddress, SocketException, ServerSocket}
-import java.nio.charset.Charset
import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer, SynchronizedQueue}
-import scala.concurrent.duration._
import scala.language.postfixOps
+import com.google.common.base.Charsets
import com.google.common.io.Files
import org.scalatest.BeforeAndAfter
-import org.scalatest.concurrent.Eventually._
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
@@ -54,46 +48,30 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
+ val waiter = new StreamingTestWaiter(ssc)
val networkStream = ssc.socketTextStream(
"localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
- def output = outputBuffer.flatMap(x => x)
outputStream.register()
ssc.start()
// Feed data to the server to send to the network receiver
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
- val expectedOutput = input.map(_.toString)
- Thread.sleep(1000)
+ val expectedOutput: Seq[Seq[String]] = input.map(i => Seq(i.toString))
for (i <- 0 until input.size) {
testServer.send(input(i).toString + "\n")
- Thread.sleep(500)
+ Thread.sleep(500) // This call is to allow time for the testServer to send the data to Spark
clock.addToTime(batchDuration.milliseconds)
}
- Thread.sleep(1000)
+ waiter.waitForTotalBatchesCompleted(input.size, timeout = Durations.seconds(10))
logInfo("Stopping server")
testServer.stop()
logInfo("Stopping context")
ssc.stop()
- // Verify whether data received was as expected
- logInfo("--------------------------------")
- logInfo("output.size = " + outputBuffer.size)
- logInfo("output")
- outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
- logInfo("expected output.size = " + expectedOutput.size)
- logInfo("expected output")
- expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
- logInfo("--------------------------------")
-
- // Verify whether all the elements received are as expected
- // (whether the elements were received one in each interval is not verified)
- assert(output.size === expectedOutput.size)
- for (i <- 0 until output.size) {
- assert(output(i) === expectedOutput(i))
- }
+ verifyOutput(outputBuffer, expectedOutput, useSet = false)
}
@@ -147,6 +125,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
test("queue input stream - oneAtATime = true") {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
+ val waiter = new StreamingTestWaiter(ssc)
val queue = new SynchronizedQueue[RDD[String]]()
val queueStream = ssc.queueStream(queue, oneAtATime = true)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
@@ -159,37 +138,24 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq("1", "2", "3", "4", "5")
val expectedOutput = input.map(Seq(_))
- //Thread.sleep(1000)
val inputIterator = input.toIterator
for (i <- 0 until input.size) {
// Enqueue more than 1 item per tick but they should dequeue one at a time
inputIterator.take(2).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
clock.addToTime(batchDuration.milliseconds)
}
- Thread.sleep(1000)
+ waiter.waitForTotalBatchesCompleted(input.size, timeout = Durations.seconds(10))
+
logInfo("Stopping context")
ssc.stop()
- // Verify whether data received was as expected
- logInfo("--------------------------------")
- logInfo("output.size = " + outputBuffer.size)
- logInfo("output")
- outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
- logInfo("expected output.size = " + expectedOutput.size)
- logInfo("expected output")
- expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
- logInfo("--------------------------------")
-
- // Verify whether all the elements received are as expected
- assert(output.size === expectedOutput.size)
- for (i <- 0 until output.size) {
- assert(output(i) === expectedOutput(i))
- }
+ verifyOutput(outputBuffer, expectedOutput, useSet = false)
}
test("queue input stream - oneAtATime = false") {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
+ val waiter = new StreamingTestWaiter(ssc)
val queue = new SynchronizedQueue[RDD[String]]()
val queueStream = ssc.queueStream(queue, oneAtATime = false)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
@@ -207,30 +173,16 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val inputIterator = input.toIterator
inputIterator.take(3).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
clock.addToTime(batchDuration.milliseconds)
- Thread.sleep(1000)
+ waiter.waitForTotalBatchesCompleted(1, timeout = Durations.seconds(10))
// Enqueue the remaining items (again one by one), merged in the final batch
inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
clock.addToTime(batchDuration.milliseconds)
- Thread.sleep(1000)
+ waiter.waitForTotalBatchesCompleted(2, timeout = Durations.seconds(10))
logInfo("Stopping context")
ssc.stop()
- // Verify whether data received was as expected
- logInfo("--------------------------------")
- logInfo("output.size = " + outputBuffer.size)
- logInfo("output")
- outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
- logInfo("expected output.size = " + expectedOutput.size)
- logInfo("expected output")
- expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
- logInfo("--------------------------------")
-
- // Verify whether all the elements received are as expected
- assert(output.size === expectedOutput.size)
- for (i <- 0 until output.size) {
- assert(output(i) === expectedOutput(i))
- }
+ verifyOutput(outputBuffer, expectedOutput, useSet = false)
}
def testFileStream(newFilesOnly: Boolean) {
@@ -238,14 +190,17 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val testDir: File = null
try {
val testDir = Utils.createTempDir()
+ // Create a file that exists before the StreamingContext is created
val existingFile = new File(testDir, "0")
- Files.write("0\n", existingFile, Charset.forName("UTF-8"))
+ Files.write("0\n", existingFile, Charsets.UTF_8)
+ assert(existingFile.setLastModified(10000))
- Thread.sleep(1000)
// Set up the streaming context and input streams
- val newConf = conf.clone.set(
- "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
- ssc = new StreamingContext(newConf, batchDuration)
+ ssc = new StreamingContext(conf, batchDuration)
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ // This `setTime` call ensures that the clock is past the creation time of `existingFile`
+ clock.setTime(10000 + 1000)
+ val waiter = new StreamingTestWaiter(ssc)
val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
@@ -253,13 +208,15 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
outputStream.register()
ssc.start()
- // Create files in the directory
+ // Over time, create files in the directory
val input = Seq(1, 2, 3, 4, 5)
input.foreach { i =>
- Thread.sleep(batchDuration.milliseconds)
+ clock.addToTime(batchDuration.milliseconds)
val file = new File(testDir, i.toString)
- Files.write(i + "\n", file, Charset.forName("UTF-8"))
+ Files.write(i + "\n", file, Charsets.UTF_8)
+ assert(file.setLastModified(clock.currentTime()))
logInfo("Created file " + file)
+ waiter.waitForTotalBatchesCompleted(i, timeout = Durations.seconds(10))
}
// Verify that all the files have been read
@@ -268,9 +225,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
} else {
(Seq(0) ++ input).map(_.toString).toSet
}
- eventually(timeout(maxWaitTimeMillis milliseconds), interval(100 milliseconds)) {
- assert(outputBuffer.flatten.toSet === expectedOutput)
- }
+ assert(outputBuffer.flatten.toSet === expectedOutput)
} finally {
if (ssc != null) ssc.stop()
if (testDir != null) Utils.deleteRecursively(testDir)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
index 5dbb7232009e..5aa1c8252c3a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
@@ -27,17 +27,17 @@ import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import java.io.{File, IOException}
-import java.nio.charset.Charset
import java.util.UUID
+import com.google.common.base.Charsets
import com.google.common.io.Files
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
-
+import org.scalatest.Assertions
private[streaming]
-object MasterFailureTest extends Logging {
+object MasterFailureTest extends Logging with Assertions {
@volatile var killed = false
@volatile var killCount = 0
@@ -81,7 +81,7 @@ object MasterFailureTest extends Logging {
// Verify whether all the values of the expected output is present
// in the output
- assert(output.distinct.toSet == expectedOutput.toSet)
+ assert(output.distinct.toSet === expectedOutput.toSet)
}
@@ -114,7 +114,7 @@ object MasterFailureTest extends Logging {
// Verify whether the last expected output value has been generated, there by
// confirming that none of the inputs have been missed
- assert(output.last == expectedOutput.last)
+ assert(output.last === expectedOutput.last)
}
/**
@@ -130,7 +130,7 @@ object MasterFailureTest extends Logging {
): Seq[T] = {
// Just making sure that the expected output does not have duplicates
- assert(expectedOutput.distinct.toSet == expectedOutput.toSet)
+ assert(expectedOutput.distinct.toSet === expectedOutput.toSet)
// Reset all state
reset()
@@ -362,7 +362,7 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
val localFile = new File(localTestDir, (i + 1).toString)
val hadoopFile = new Path(testDir, (i + 1).toString)
val tempHadoopFile = new Path(testDir, ".tmp_" + (i + 1).toString)
- Files.write(input(i) + "\n", localFile, Charset.forName("UTF-8"))
+ Files.write(input(i) + "\n", localFile, Charsets.UTF_8)
var tries = 0
var done = false
while (!done && tries < maxTries) {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 52972f63c6c5..a40baac0bdb6 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming
import java.io.{ObjectInputStream, IOException}
+import java.util.concurrent.TimeoutException
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.SynchronizedBuffer
@@ -26,6 +27,7 @@ import scala.reflect.ClassTag
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ForEachDStream}
+import org.apache.spark.streaming.scheduler.{StreamingListenerBatchStarted, StreamingListenerBatchCompleted, StreamingListener}
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.rdd.RDD
@@ -103,6 +105,77 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
def toTestOutputStream = new TestOutputStream[T](this.parent, this.output.map(_.flatten))
}
+/**
+ * This is an interface that can be used to block until certain events occur, such as
+ * the start/completion of batches. This is much less brittle than waiting on wall-clock time.
+ * Internally, this is implemented using a StreamingListener. Constructing a new instance of this
+ * class automatically registers a StreamingListener on the given StreamingContext.
+ */
+class StreamingTestWaiter(ssc: StreamingContext) {
+
+ // All access to this state should be guarded by `StreamingListener.this.synchronized`
+ private var numCompletedBatches = 0
+ private var numStartedBatches = 0
+
+ private val listener = new StreamingListener {
+ override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit =
+ StreamingTestWaiter.this.synchronized {
+ numStartedBatches += 1
+ StreamingTestWaiter.this.notifyAll()
+ }
+ override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit =
+ StreamingTestWaiter.this.synchronized {
+ numCompletedBatches += 1
+ StreamingTestWaiter.this.notifyAll()
+ }
+ }
+ ssc.addStreamingListener(listener)
+
+ def getNumCompletedBatches: Int = this.synchronized {
+ numCompletedBatches
+ }
+
+ def getNumStartedBatches: Int = this.synchronized {
+ numStartedBatches
+ }
+
+ /**
+ * Block until the number of completed batches reaches the given threshold.
+ */
+ def waitForTotalBatchesCompleted(
+ targetNumBatches: Int,
+ timeout: Duration): Unit = this.synchronized {
+ val startTime = System.currentTimeMillis()
+ def successful = getNumCompletedBatches >= targetNumBatches
+ def timedOut = (System.currentTimeMillis() - startTime) >= timeout.milliseconds
+ while (!timedOut && !successful) {
+ this.wait(timeout.milliseconds)
+ }
+ if (!successful && timedOut) {
+ throw new TimeoutException(s"Waited for $targetNumBatches completed batches, but only" +
+ s" $numCompletedBatches have completed after $timeout")
+ }
+ }
+
+ /**
+ * Block until the number of started batches reaches the given threshold.
+ */
+ def waitForTotalBatchesStarted(
+ targetNumBatches: Int,
+ timeout: Duration): Unit = this.synchronized {
+ val startTime = System.currentTimeMillis()
+ def successful = getNumStartedBatches >= targetNumBatches
+ def timedOut = (System.currentTimeMillis() - startTime) >= timeout.milliseconds
+ while (!timedOut && !successful) {
+ this.wait(timeout.milliseconds)
+ }
+ if (!successful && timedOut) {
+ throw new TimeoutException(s"Waited for $targetNumBatches started batches, but only" +
+ s" $numStartedBatches have started after $timeout")
+ }
+ }
+}
+
/**
* This is the base trait for Spark Streaming testsuites. This provides basic functionality
* to run user-defined set of input on user-defined stream operations, and verify the output.
@@ -286,22 +359,23 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
val output = outputStream.output
try {
+ val waiter = new StreamingTestWaiter(ssc)
// Start computation
ssc.start()
// Advance manual clock
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- logInfo("Manual clock before advancing = " + clock.time)
+ logInfo("Manual clock before advancing = " + clock.currentTime())
if (actuallyWait) {
for (i <- 1 to numBatches) {
logInfo("Actually waiting for " + batchDuration)
clock.addToTime(batchDuration.milliseconds)
- Thread.sleep(batchDuration.milliseconds)
+ waiter.waitForTotalBatchesCompleted(i, timeout = batchDuration * 5)
}
} else {
clock.addToTime(numBatches * batchDuration.milliseconds)
}
- logInfo("Manual clock after advancing = " + clock.time)
+ logInfo("Manual clock after advancing = " + clock.currentTime())
// Wait until expected number of output items have been generated
val startTime = System.currentTimeMillis()