Skip to content

Commit ba437fc

Browse files
MaxGekkgatorsmile
authored andcommitted
[SPARK-24805][SQL] Do not ignore avro files without extensions by default
## What changes were proposed in this pull request? In the PR, I propose to change default behaviour of AVRO datasource which currently ignores files without `.avro` extension in read by default. This PR sets the default value for `avro.mapred.ignore.inputs.without.extension` to `false` in the case if the parameter is not set by an user. ## How was this patch tested? Added a test file without extension in AVRO format, and new test for reading the file with and wihout specified schema. Author: Maxim Gekk <[email protected]> Author: Maxim Gekk <[email protected]> Closes #21769 from MaxGekk/avro-without-extension.
1 parent b0c95a1 commit ba437fc

File tree

2 files changed

+47
-12
lines changed

2 files changed

+47
-12
lines changed

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
6262
// Schema evolution is not supported yet. Here we only pick a single random sample file to
6363
// figure out the schema of the whole dataset.
6464
val sampleFile =
65-
if (conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, true)) {
65+
if (AvroFileFormat.ignoreFilesWithoutExtensions(conf)) {
6666
files.find(_.getPath.getName.endsWith(".avro")).getOrElse {
6767
throw new FileNotFoundException(
6868
"No Avro files found. Hadoop option \"avro.mapred.ignore.inputs.without.extension\" " +
@@ -170,10 +170,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
170170
// Doing input file filtering is improper because we may generate empty tasks that process no
171171
// input files but stress the scheduler. We should probably add a more general input file
172172
// filtering mechanism for `FileFormat` data sources. See SPARK-16317.
173-
if (
174-
conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, true) &&
175-
!file.filePath.endsWith(".avro")
176-
) {
173+
if (AvroFileFormat.ignoreFilesWithoutExtensions(conf) && !file.filePath.endsWith(".avro")) {
177174
Iterator.empty
178175
} else {
179176
val reader = {
@@ -278,4 +275,11 @@ private[avro] object AvroFileFormat {
278275
value.readFields(new DataInputStream(in))
279276
}
280277
}
278+
279+
def ignoreFilesWithoutExtensions(conf: Configuration): Boolean = {
280+
// Files without .avro extensions are not ignored by default
281+
val defaultValue = false
282+
283+
conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, defaultValue)
284+
}
281285
}

external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
package org.apache.spark.sql.avro
1919

2020
import java.io._
21-
import java.nio.file.Files
21+
import java.net.URL
22+
import java.nio.file.{Files, Path, Paths}
2223
import java.sql.{Date, Timestamp}
2324
import java.util.{TimeZone, UUID}
2425

@@ -622,7 +623,12 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
622623
intercept[FileNotFoundException] {
623624
withTempPath { dir =>
624625
FileUtils.touch(new File(dir, "test"))
625-
spark.read.avro(dir.toString)
626+
val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration
627+
try {
628+
hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true")
629+
spark.read.avro(dir.toString)
630+
} finally {
631+
hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty) }
626632
}
627633
}
628634

@@ -684,12 +690,18 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
684690

685691
Files.createFile(new File(tempSaveDir, "non-avro").toPath)
686692

687-
val newDf = spark
688-
.read
689-
.option(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true")
690-
.avro(tempSaveDir)
693+
val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration
694+
val count = try {
695+
hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true")
696+
val newDf = spark
697+
.read
698+
.avro(tempSaveDir)
699+
newDf.count()
700+
} finally {
701+
hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty)
702+
}
691703

692-
assert(newDf.count == 8)
704+
assert(count == 8)
693705
}
694706
}
695707

@@ -805,4 +817,23 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
805817
assert(readDf.collect().sameElements(writeDf.collect()))
806818
}
807819
}
820+
821+
test("SPARK-24805: do not ignore files without .avro extension by default") {
822+
withTempDir { dir =>
823+
Files.copy(
824+
Paths.get(new URL(episodesAvro).toURI),
825+
Paths.get(dir.getCanonicalPath, "episodes"))
826+
827+
val fileWithoutExtension = s"${dir.getCanonicalPath}/episodes"
828+
val df1 = spark.read.avro(fileWithoutExtension)
829+
assert(df1.count == 8)
830+
831+
val schema = new StructType()
832+
.add("title", StringType)
833+
.add("air_date", StringType)
834+
.add("doctor", IntegerType)
835+
val df2 = spark.read.schema(schema).avro(fileWithoutExtension)
836+
assert(df2.count == 8)
837+
}
838+
}
808839
}

0 commit comments

Comments
 (0)