Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,4 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false)
}
}

/** Whether we are using a direct output committer */
def isDirectOutput(): Boolean = committer.getClass.getSimpleName.contains("Direct")
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,6 @@ object SparkHadoopMapReduceWriter extends Logging {
isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
committer.setupJob(jobContext)

// When speculation is on and output committer class name contains "Direct", we should warn
// users that they may loss data if they are using a direct output committer.
if (SparkHadoopWriterUtils.isSpeculationEnabled(sparkConf) && committer.isDirectOutput) {
val warningMessage =
s"$committer may be an output committer that writes data directly to " +
"the final location. Because speculation is enabled, this output committer may " +
"cause data loss (see the case in SPARK-10063). If possible, please use an output " +
"committer that does not have this behavior (e.g. FileOutputCommitter)."
logWarning(warningMessage)
}

// Try to write all RDD partitions as a Hadoop OutputFormat.
try {
val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => {
Expand Down Expand Up @@ -230,10 +219,6 @@ object SparkHadoopWriterUtils {
enabledInConf && !validationDisabled
}

def isSpeculationEnabled(conf: SparkConf): Boolean = {
conf.getBoolean("spark.speculation", false)
}

// TODO: these don't seem like the right abstractions.
// We should abstract the duplicate code in a less awkward way.

Expand Down