Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
303e828
Add StreamingTestWaiter class
JoshRosen Dec 12, 2014
f8f6c93
Remove (all but one) sleep calls from “socket input stream” test
JoshRosen Dec 12, 2014
6ce0681
Refactor several tests to use verifyOutput()
JoshRosen Dec 12, 2014
5c31b8a
Revert "Remove (all but one) sleep calls from “socket input stream” t…
JoshRosen Dec 12, 2014
ad0056b
WIP towards improved CheckpointSuite sleep logic.
JoshRosen Dec 13, 2014
3db335f
[SPARK-4835] Disable OutputSpec validation for streaming jobs.
JoshRosen Dec 13, 2014
b245217
Remove several (but not all) sleep() calls in InputStreamSuite.
JoshRosen Dec 13, 2014
12635b4
Remove sleep() in runStreamsWithPartitions(); use streaming’s Duratio…
JoshRosen Dec 13, 2014
1a0fcb9
Fix indentation
JoshRosen Dec 16, 2014
c81a477
Refactor tests to use Charsets.UTF_8 (see #2781).
JoshRosen Dec 16, 2014
abf5050
Modify FileInputDStream to use Clock class.
JoshRosen Dec 16, 2014
bc0db94
First version of testFileStream() that passes w/o Thread.sleep
JoshRosen Dec 16, 2014
ee8c8f8
Remove System.currentTimeMillis call added in last commit.
JoshRosen Dec 16, 2014
27c8def
Remove unused Interval.currentInterval() method.
JoshRosen Dec 16, 2014
9c939d9
Use Scalatest Assertions === in MasterFailureTest.
JoshRosen Dec 16, 2014
590f006
Add ability to block on batch start events.
JoshRosen Dec 17, 2014
c9c477f
WIP towards SPARK-1600
JoshRosen Dec 17, 2014
63162b2
Synchronize `currentTime` in ManualClock
JoshRosen Dec 17, 2014
520bade
First passing version of the refactored SPARK-1600 test.
JoshRosen Dec 17, 2014
1304776
Remove debugging line
JoshRosen Dec 17, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.{Map, mutable}
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import scala.util.DynamicVariable

import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import org.apache.hadoop.conf.{Configurable, Configuration}
Expand Down Expand Up @@ -961,7 +962,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val outfmt = job.getOutputFormatClass
val jobFormat = outfmt.newInstance

if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
val validationDisabled = PairRDDFunctions.disableOutputSpecValidation.value
if (!validationDisabled && self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
// FileOutputFormat ignores the filesystem parameter
jobFormat.checkOutputSpecs(job)
}
Expand Down Expand Up @@ -1039,7 +1041,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
valueClass.getSimpleName + ")")

if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
val validationDisabled = PairRDDFunctions.disableOutputSpecValidation.value
if (!validationDisabled && self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
// FileOutputFormat ignores the filesystem parameter
val ignoredFs = FileSystem.get(hadoopConf)
hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)
Expand Down Expand Up @@ -1118,4 +1121,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])

private[spark] object PairRDDFunctions {
val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256
/**
* Used by Spark Streaming in order to bypass the `spark.hadoop.validateOutputSpecs` checks
* for save actions launched by Spark Streaming, since the validation may break Spark Streaming's
* ability to recover from checkpoints. See SPARK-4835 for more details.
*/
val disableOutputSpecValidation: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false)
}
4 changes: 3 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,9 @@ Apart from these, the following properties are also available, and may be useful
<td>If set to true, validates the output specification (e.g. checking if the output directory already exists)
used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing
output directories. We recommend that users do not disable this except if trying to achieve compatibility with
previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.</td>
previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little uneasy with this approach because its not clean. Ideally nothing in Spark should refer to the requirements of higher level libraries like Spark Streaming.

This setting is ignored for jobs launched by the Spark Streaming scheduler, since data may need
to be written to a pre-existing output directory during checkpoint recovery.</td>
</tr>
<tr>
<td><code>spark.hadoop.cloneConf</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ package org.apache.spark.streaming.flume

import java.net.{InetSocketAddress, ServerSocket}
import java.nio.ByteBuffer
import java.nio.charset.Charset

import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.concurrent.duration._
import scala.language.postfixOps

import com.google.common.base.Charsets
import org.apache.avro.ipc.NettyTransceiver
import org.apache.avro.ipc.specific.SpecificRequestor
import org.apache.flume.source.avro
Expand Down Expand Up @@ -138,7 +138,7 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L
status should be (avro.Status.OK)
}

val decoder = Charset.forName("UTF-8").newDecoder()
val decoder = Charsets.UTF_8.newDecoder()
eventually(timeout(10 seconds), interval(100 milliseconds)) {
val outputEvents = outputBuffer.flatten.map { _.event }
outputEvents.foreach {
Expand Down
11 changes: 0 additions & 11 deletions streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,3 @@ class Interval(val beginTime: Time, val endTime: Time) {

override def toString = "[" + beginTime + ", " + endTime + "]"
}

private[streaming]
object Interval {
def currentInterval(duration: Duration): Interval = {
val time = new Time(System.currentTimeMillis)
val intervalBegin = time.floor(duration)
new Interval(intervalBegin, intervalBegin + duration)
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
newFilesOnly: Boolean = true)
extends InputDStream[(K, V)](ssc_) {

// This is a def so that it works during checkpoint recovery:
private def clock = ssc.scheduler.clock

// Data to be saved as part of the streaming checkpoints
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I approve this change. But probably should be a different PR that just touches this input stream and its tests.

// Initial ignore threshold based on which old, existing files in the directory (at the time of
// starting the streaming application) will be ignored or considered
private val initialModTimeIgnoreThreshold = if (newFilesOnly) System.currentTimeMillis() else 0L
private val initialModTimeIgnoreThreshold = if (newFilesOnly) clock.currentTime() else 0L

/*
* Make sure that the information of files selected in the last few batches are remembered.
Expand Down Expand Up @@ -151,7 +154,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
*/
private def findNewFiles(currentTime: Long): Array[String] = {
try {
lastNewFileFindingTime = System.currentTimeMillis
lastNewFileFindingTime = clock.currentTime()

// Calculate ignore threshold
val modTimeIgnoreThreshold = math.max(
Expand All @@ -164,7 +167,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
}
val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
val timeTaken = System.currentTimeMillis - lastNewFileFindingTime
val timeTaken = clock.currentTime() - lastNewFileFindingTime
logInfo("Finding new files took " + timeTaken + " ms")
logDebug("# cached file times = " + fileToModTime.size)
if (timeTaken > slideDuration.milliseconds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.collection.JavaConversions._
import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors}
import akka.actor.{ActorRef, Actor, Props}
import org.apache.spark.{SparkException, Logging, SparkEnv}
import org.apache.spark.rdd.PairRDDFunctions
import org.apache.spark.streaming._


Expand Down Expand Up @@ -168,7 +169,12 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
private class JobHandler(job: Job) extends Runnable {
def run() {
eventActor ! JobStarted(job)
job.run()
// Disable checks for existing output directories in jobs launched by the streaming scheduler,
// since we may need to write output to an existing directory during checkpoint recovery;
// see SPARK-4835 for more details.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
job.run()
}
eventActor ! JobCompleted(job)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ class SystemClock() extends Clock {
private[streaming]
class ManualClock() extends Clock {

var time = 0L
private var time = 0L

def currentTime() = time
def currentTime() = this.synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I approve this. Please put this in the same PR as the file stream fix.

time
}

def setTime(timeToSet: Long) = {
this.synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ class BasicOperationsSuite extends TestSuiteBase {
if (rememberDuration != null) ssc.remember(rememberDuration)
val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput)
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
assert(clock.time === Seconds(10).milliseconds)
assert(clock.currentTime() === Seconds(10).milliseconds)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please put this in the same PR as the file stream fix.

assert(output.size === numExpectedOutput)
operatedStream
}
Expand Down
Loading