Skip to content

Commit a8dbcca

Browse files
committed
revert savamode check
1 parent 2498dfd commit a8dbcca

File tree

2 files changed

+54
-49
lines changed

2 files changed

+54
-49
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -412,28 +412,15 @@ case class DataSource(
412412
// 2. Output path must be a legal HDFS style file system path;
413413
// 3. It's OK that the output path doesn't exist yet;
414414
val allPaths = paths ++ caseInsensitiveOptions.get("path")
415-
val (outputPath, pathExists) = if (allPaths.length == 1) {
415+
val outputPath = if (allPaths.length == 1) {
416416
val path = new Path(allPaths.head)
417417
val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
418-
val qualifiedOutputPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
419-
(qualifiedOutputPath, fs.exists(qualifiedOutputPath))
418+
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
420419
} else {
421420
throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
422421
s"got: ${allPaths.mkString(", ")}")
423422
}
424423

425-
if (pathExists) {
426-
if (mode == SaveMode.ErrorIfExists) {
427-
throw new AnalysisException(s"path $outputPath already exists.")
428-
}
429-
if (mode == SaveMode.Ignore) {
430-
// Since the path already exists and the save mode is Ignore, we will just return.
431-
return
432-
}
433-
}
434-
435-
// if path does not exist, the ErrorIfExists and Ignore can be transformed to Append
436-
val transformedMode = if (mode != SaveMode.Overwrite) SaveMode.Append else mode
437424
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
438425
PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, caseSensitive)
439426

@@ -464,7 +451,7 @@ case class DataSource(
464451
fileFormat = format,
465452
options = options,
466453
query = data.logicalPlan,
467-
mode = transformedMode,
454+
mode = mode,
468455
catalogTable = catalogTable,
469456
fileIndex = fileIndex)
470457
sparkSession.sessionState.executePlan(plan).toRdd

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala

Lines changed: 51 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -97,45 +97,63 @@ case class InsertIntoHadoopFsRelationCommand(
9797
outputPath = outputPath.toString,
9898
isAppend = isAppend)
9999

100-
if (mode == SaveMode.Overwrite) {
101-
deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer)
100+
val doInsertion = (mode, pathExists) match {
101+
case (SaveMode.ErrorIfExists, true) =>
102+
throw new AnalysisException(s"path $qualifiedOutputPath already exists.")
103+
case (SaveMode.Overwrite, true) =>
104+
deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer)
105+
true
106+
case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
107+
true
108+
case (SaveMode.Ignore, exists) =>
109+
!exists
110+
case (s, exists) =>
111+
throw new IllegalStateException(s"unsupported save mode $s ($exists)")
102112
}
103113

104-
// Callback for updating metastore partition metadata after the insertion job completes.
105-
def refreshPartitionsCallback(updatedPartitions: Seq[TablePartitionSpec]): Unit = {
106-
if (partitionsTrackedByCatalog) {
107-
val newPartitions = updatedPartitions.toSet -- initialMatchingPartitions
108-
if (newPartitions.nonEmpty) {
109-
AlterTableAddPartitionCommand(
110-
catalogTable.get.identifier, newPartitions.toSeq.map(p => (p, None)),
111-
ifNotExists = true).run(sparkSession)
112-
}
113-
if (mode == SaveMode.Overwrite) {
114-
val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions
115-
if (deletedPartitions.nonEmpty) {
116-
AlterTableDropPartitionCommand(
117-
catalogTable.get.identifier, deletedPartitions.toSeq,
118-
ifExists = true, purge = false,
119-
retainData = true /* already deleted */).run(sparkSession)
114+
if (doInsertion) {
115+
116+
// Callback for updating metastore partition metadata after the insertion job completes.
117+
def refreshPartitionsCallback(updatedPartitions: Seq[TablePartitionSpec]): Unit = {
118+
if (partitionsTrackedByCatalog) {
119+
val newPartitions = updatedPartitions.toSet -- initialMatchingPartitions
120+
if (newPartitions.nonEmpty) {
121+
AlterTableAddPartitionCommand(
122+
catalogTable.get.identifier, newPartitions.toSeq.map(p => (p, None)),
123+
ifNotExists = true).run(sparkSession)
124+
}
125+
if (mode == SaveMode.Overwrite) {
126+
val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions
127+
if (deletedPartitions.nonEmpty) {
128+
AlterTableDropPartitionCommand(
129+
catalogTable.get.identifier, deletedPartitions.toSeq,
130+
ifExists = true, purge = false,
131+
retainData = true /* already deleted */).run(sparkSession)
132+
}
120133
}
121134
}
122135
}
123-
}
124136

125-
FileFormatWriter.write(
126-
sparkSession = sparkSession,
127-
queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
128-
fileFormat = fileFormat,
129-
committer = committer,
130-
outputSpec = FileFormatWriter.OutputSpec(
131-
qualifiedOutputPath.toString, customPartitionLocations),
132-
hadoopConf = hadoopConf,
133-
partitionColumns = partitionColumns,
134-
bucketSpec = bucketSpec,
135-
refreshFunction = refreshPartitionsCallback,
136-
options = options)
137-
138-
fileIndex.foreach(_.refresh())
137+
FileFormatWriter.write(
138+
sparkSession = sparkSession,
139+
queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
140+
fileFormat = fileFormat,
141+
committer = committer,
142+
outputSpec = FileFormatWriter.OutputSpec(
143+
qualifiedOutputPath.toString, customPartitionLocations),
144+
hadoopConf = hadoopConf,
145+
partitionColumns = partitionColumns,
146+
bucketSpec = bucketSpec,
147+
refreshFunction = refreshPartitionsCallback,
148+
options = options)
149+
150+
// refresh cached files in FileIndex
151+
fileIndex.foreach(_.refresh())
152+
// refresh data cache if table is cached
153+
sparkSession.catalog.refreshByPath(outputPath.toString)
154+
} else {
155+
logInfo("Skipping insertion into a relation that already exists.")
156+
}
139157

140158
Seq.empty[Row]
141159
}

0 commit comments

Comments
 (0)