diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index b9e9e59bff54..eafb5519117d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1518,7 +1518,7 @@ object SQLConf { " register class names for which data source V2 write paths are disabled. Writes from these" + " sources will fall back to the V1 sources.") .stringConf - .createWithDefault("csv,json,orc,text") + .createWithDefault("csv,json,orc,text,parquet") val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers") .doc("A comma-separated list of fully qualified data source register class names for which" + diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index 12e9067f0503..c0b8b270bab1 100644 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -3,7 +3,7 @@ org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2 org.apache.spark.sql.execution.datasources.noop.NoopDataSource org.apache.spark.sql.execution.datasources.orc.OrcFileFormat -org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2 org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2 org.apache.spark.sql.execution.streaming.ConsoleSinkProvider org.apache.spark.sql.execution.streaming.sources.RateStreamProvider diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 64f739fe3596..03aca89bc642 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -43,10 +43,10 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2 import org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2 import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2 +import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2 import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils -import org.apache.spark.util.Utils /** * A command to create a table with the same definition of the given existing table. @@ -241,7 +241,8 @@ case class AlterTableAddColumnsCommand( // Hive type is already considered as hive serde table, so the logic will not // come in here. case _: CSVFileFormat | _: JsonFileFormat | _: ParquetFileFormat => - case _: JsonDataSourceV2 | _: CSVDataSourceV2 | _: OrcDataSourceV2 => + case _: JsonDataSourceV2 | _: CSVDataSourceV2 | + _: OrcDataSourceV2 | _: ParquetDataSourceV2 => case s if s.getClass.getCanonicalName.endsWith("OrcFileFormat") => case s => throw new AnalysisException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala index 463ee9a88730..c2211cccb501 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileTable} import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable +import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} @@ -65,6 +66,7 @@ object SchemaPruning extends Rule[LogicalPlan] { prunedDataSchema => { val prunedFileTable = table match { case o: OrcTable => o.copy(userSpecifiedSchema = Some(prunedDataSchema)) + case p: ParquetTable => p.copy(userSpecifiedSchema = Some(prunedDataSchema)) case _ => val message = s"${table.formatName} data source doesn't support schema pruning." throw new AnalysisException(message) @@ -121,7 +123,7 @@ object SchemaPruning extends Rule[LogicalPlan] { * Checks to see if the given [[FileTable]] can be pruned. Currently we support ORC v2. */ private def canPruneTable(table: FileTable) = - table.isInstanceOf[OrcTable] + table.isInstanceOf[OrcTable] || table.isInstanceOf[ParquetTable] /** * Normalizes the names of the attribute references in the given projects and filters to reflect diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index f2da159c5c95..6c30aea836be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -161,105 +161,7 @@ class ParquetFileFormat sparkSession: SparkSession, parameters: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { - val parquetOptions = new ParquetOptions(parameters, sparkSession.sessionState.conf) - - // Should we merge schemas from all Parquet part-files? - val shouldMergeSchemas = parquetOptions.mergeSchema - - val mergeRespectSummaries = sparkSession.sessionState.conf.isParquetSchemaRespectSummaries - - val filesByType = splitFiles(files) - - // Sees which file(s) we need to touch in order to figure out the schema. - // - // Always tries the summary files first if users don't require a merged schema. In this case, - // "_common_metadata" is more preferable than "_metadata" because it doesn't contain row - // groups information, and could be much smaller for large Parquet files with lots of row - // groups. If no summary file is available, falls back to some random part-file. - // - // NOTE: Metadata stored in the summary files are merged from all part-files. However, for - // user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know - // how to merge them correctly if some key is associated with different values in different - // part-files. When this happens, Parquet simply gives up generating the summary file. This - // implies that if a summary file presents, then: - // - // 1. Either all part-files have exactly the same Spark SQL schema, or - // 2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus - // their schemas may differ from each other). - // - // Here we tend to be pessimistic and take the second case into account. Basically this means - // we can't trust the summary files if users require a merged schema, and must touch all part- - // files to do the merge. - val filesToTouch = - if (shouldMergeSchemas) { - // Also includes summary files, 'cause there might be empty partition directories. - - // If mergeRespectSummaries config is true, we assume that all part-files are the same for - // their schema with summary files, so we ignore them when merging schema. - // If the config is disabled, which is the default setting, we merge all part-files. - // In this mode, we only need to merge schemas contained in all those summary files. - // You should enable this configuration only if you are very sure that for the parquet - // part-files to read there are corresponding summary files containing correct schema. - - // As filed in SPARK-11500, the order of files to touch is a matter, which might affect - // the ordering of the output columns. There are several things to mention here. - // - // 1. If mergeRespectSummaries config is false, then it merges schemas by reducing from - // the first part-file so that the columns of the lexicographically first file show - // first. - // - // 2. If mergeRespectSummaries config is true, then there should be, at least, - // "_metadata"s for all given files, so that we can ensure the columns of - // the lexicographically first file show first. - // - // 3. If shouldMergeSchemas is false, but when multiple files are given, there is - // no guarantee of the output order, since there might not be a summary file for the - // lexicographically first file, which ends up putting ahead the columns of - // the other files. However, this should be okay since not enabling - // shouldMergeSchemas means (assumes) all the files have the same schemas. - - val needMerged: Seq[FileStatus] = - if (mergeRespectSummaries) { - Seq.empty - } else { - filesByType.data - } - needMerged ++ filesByType.metadata ++ filesByType.commonMetadata - } else { - // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet - // don't have this. - filesByType.commonMetadata.headOption - // Falls back to "_metadata" - .orElse(filesByType.metadata.headOption) - // Summary file(s) not found, the Parquet file is either corrupted, or different part- - // files contain conflicting user defined metadata (two or more values are associated - // with a same key in different files). In either case, we fall back to any of the - // first part-file, and just assume all schemas are consistent. - .orElse(filesByType.data.headOption) - .toSeq - } - ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession) - } - - case class FileTypes( - data: Seq[FileStatus], - metadata: Seq[FileStatus], - commonMetadata: Seq[FileStatus]) - - private def splitFiles(allFiles: Seq[FileStatus]): FileTypes = { - val leaves = allFiles.toArray.sortBy(_.getPath.toString) - - FileTypes( - data = leaves.filterNot(f => isSummaryFile(f.getPath)), - metadata = - leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE), - commonMetadata = - leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)) - } - - private def isSummaryFile(file: Path): Boolean = { - file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE || - file.getName == ParquetFileWriter.PARQUET_METADATA_FILE + ParquetUtils.inferSchema(sparkSession, parameters, files) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 2578b0ba4305..8508322f54e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -40,7 +40,7 @@ import org.apache.spark.unsafe.types.UTF8String /** * Some utility function to convert Spark data source filters to Parquet filters. */ -private[parquet] class ParquetFilters( +class ParquetFilters( schema: MessageType, pushDownDate: Boolean, pushDownTimestamp: Boolean, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala index 8361762b0970..e7753cec681c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.OutputWriter // NOTE: This class is instantiated and used on executor side only, no need to be serializable. -private[parquet] class ParquetOutputWriter(path: String, context: TaskAttemptContext) +class ParquetOutputWriter(path: String, context: TaskAttemptContext) extends OutputWriter { private val recordWriter: RecordWriter[Void, InternalRow] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index df7766520290..2c7231d2c3e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -49,7 +49,7 @@ import org.apache.spark.sql.types._ * Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]] * to [[prepareForRead()]], but use a private `var` for simplicity. */ -private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone], +class ParquetReadSupport(val convertTz: Option[TimeZone], enableVectorizedReader: Boolean) extends ReadSupport[UnsafeRow] with Logging { private var catalystRequestedSchema: StructType = _ @@ -130,7 +130,7 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone], } } -private[parquet] object ParquetReadSupport { +object ParquetReadSupport { val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema" val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala new file mode 100644 index 000000000000..7e7dba92f37b --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.parquet.hadoop.ParquetFileWriter + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.StructType + +object ParquetUtils { + def inferSchema( + sparkSession: SparkSession, + parameters: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + val parquetOptions = new ParquetOptions(parameters, sparkSession.sessionState.conf) + + // Should we merge schemas from all Parquet part-files? + val shouldMergeSchemas = parquetOptions.mergeSchema + + val mergeRespectSummaries = sparkSession.sessionState.conf.isParquetSchemaRespectSummaries + + val filesByType = splitFiles(files) + + // Sees which file(s) we need to touch in order to figure out the schema. + // + // Always tries the summary files first if users don't require a merged schema. In this case, + // "_common_metadata" is more preferable than "_metadata" because it doesn't contain row + // groups information, and could be much smaller for large Parquet files with lots of row + // groups. If no summary file is available, falls back to some random part-file. + // + // NOTE: Metadata stored in the summary files are merged from all part-files. However, for + // user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know + // how to merge them correctly if some key is associated with different values in different + // part-files. When this happens, Parquet simply gives up generating the summary file. This + // implies that if a summary file presents, then: + // + // 1. Either all part-files have exactly the same Spark SQL schema, or + // 2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus + // their schemas may differ from each other). + // + // Here we tend to be pessimistic and take the second case into account. Basically this means + // we can't trust the summary files if users require a merged schema, and must touch all part- + // files to do the merge. + val filesToTouch = + if (shouldMergeSchemas) { + // Also includes summary files, 'cause there might be empty partition directories. + + // If mergeRespectSummaries config is true, we assume that all part-files are the same for + // their schema with summary files, so we ignore them when merging schema. + // If the config is disabled, which is the default setting, we merge all part-files. + // In this mode, we only need to merge schemas contained in all those summary files. + // You should enable this configuration only if you are very sure that for the parquet + // part-files to read there are corresponding summary files containing correct schema. + + // As filed in SPARK-11500, the order of files to touch is a matter, which might affect + // the ordering of the output columns. There are several things to mention here. + // + // 1. If mergeRespectSummaries config is false, then it merges schemas by reducing from + // the first part-file so that the columns of the lexicographically first file show + // first. + // + // 2. If mergeRespectSummaries config is true, then there should be, at least, + // "_metadata"s for all given files, so that we can ensure the columns of + // the lexicographically first file show first. + // + // 3. If shouldMergeSchemas is false, but when multiple files are given, there is + // no guarantee of the output order, since there might not be a summary file for the + // lexicographically first file, which ends up putting ahead the columns of + // the other files. However, this should be okay since not enabling + // shouldMergeSchemas means (assumes) all the files have the same schemas. + + val needMerged: Seq[FileStatus] = + if (mergeRespectSummaries) { + Seq.empty + } else { + filesByType.data + } + needMerged ++ filesByType.metadata ++ filesByType.commonMetadata + } else { + // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet + // don't have this. + filesByType.commonMetadata.headOption + // Falls back to "_metadata" + .orElse(filesByType.metadata.headOption) + // Summary file(s) not found, the Parquet file is either corrupted, or different part- + // files contain conflicting user defined metadata (two or more values are associated + // with a same key in different files). In either case, we fall back to any of the + // first part-file, and just assume all schemas are consistent. + .orElse(filesByType.data.headOption) + .toSeq + } + ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession) + } + + case class FileTypes( + data: Seq[FileStatus], + metadata: Seq[FileStatus], + commonMetadata: Seq[FileStatus]) + + private def splitFiles(allFiles: Seq[FileStatus]): FileTypes = { + val leaves = allFiles.toArray.sortBy(_.getPath.toString) + + FileTypes( + data = leaves.filterNot(f => isSummaryFile(f.getPath)), + metadata = + leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE), + commonMetadata = + leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)) + } + + private def isSummaryFile(file: Path): Boolean = { + file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE || + file.getName == ParquetFileWriter.PARQUET_METADATA_FILE + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index 8814e3c6ccf9..f6490614ab05 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -49,7 +49,7 @@ import org.apache.spark.sql.types._ * of this option is propagated to this class by the `init()` method and its Hadoop configuration * argument. */ -private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { +class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { // A `ValueWriter` is responsible for writing a field of an `InternalRow` to the record consumer. // Here we are using `SpecializedGetters` rather than `InternalRow` so that we can directly access // data in `ArrayData` without the help of `SpecificMutableRow`. @@ -442,7 +442,7 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit } } -private[parquet] object ParquetWriteSupport { +object ParquetWriteSupport { val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes" def setSchema(schema: StructType, configuration: Configuration): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala index d4bad291021c..bb033e875f60 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala @@ -18,8 +18,12 @@ package org.apache.spark.sql.execution.datasources.v2 import java.io.{FileNotFoundException, IOException} +import org.apache.parquet.io.ParquetDecodingException + import org.apache.spark.internal.Logging import org.apache.spark.rdd.InputFileBlockHolder +import org.apache.spark.sql.execution.QueryExecutionException +import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2.reader.PartitionReader @@ -66,6 +70,19 @@ class FilePartitionReader[T](readers: Iterator[PartitionedFileReader[T]]) val hasNext = try { currentReader.next() } catch { + case e: SchemaColumnConvertNotSupportedException => + val message = "Parquet column cannot be converted in " + + s"file ${currentReader.file.filePath}. Column: ${e.getColumn}, " + + s"Expected: ${e.getLogicalType}, Found: ${e.getPhysicalType}" + throw new QueryExecutionException(message, e) + case e: ParquetDecodingException => + if (e.getMessage.contains("Can not read value at")) { + val message = "Encounter error while reading parquet files. " + + "One possible cause: Parquet column cannot be converted in the " + + "corresponding files. Details: " + throw new QueryExecutionException(message, e) + } + throw e case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles => logWarning( s"Skipped the rest of the content in the corrupted file: $currentReader", e) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetDataSourceV2.scala new file mode 100644 index 000000000000..0b6d5a960374 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetDataSourceV2.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.parquet + +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.execution.datasources.v2._ +import org.apache.spark.sql.sources.v2.Table +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class ParquetDataSourceV2 extends FileDataSourceV2 { + + override def fallbackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" + + override def getTable(options: CaseInsensitiveStringMap): Table = { + val paths = getPaths(options) + val tableName = getTableName(paths) + ParquetTable(tableName, sparkSession, options, paths, None, fallbackFileFormat) + } + + override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = { + val paths = getPaths(options) + val tableName = getTableName(paths) + ParquetTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat) + } +} + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala new file mode 100644 index 000000000000..4a281ba46eb5 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.parquet + +import java.net.URI +import java.util.TimeZone + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.parquet.filter2.compat.FilterCompat +import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate} +import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS +import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetInputSplit, ParquetRecordReader} + +import org.apache.spark.TaskContext +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.datasources.{PartitionedFile, RecordReaderIterator} +import org.apache.spark.sql.execution.datasources.parquet._ +import org.apache.spark.sql.execution.datasources.v2._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReader} +import org.apache.spark.sql.types.{AtomicType, StructType} +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.SerializableConfiguration + +/** + * A factory used to create Parquet readers. + * + * @param sqlConf SQL configuration. + * @param broadcastedConf Broadcast serializable Hadoop Configuration. + * @param dataSchema Schema of Parquet files. + * @param readDataSchema Required schema of Parquet files. + * @param partitionSchema Schema of partitions. + * @param filters Filters to be pushed down in the batch scan. + */ +case class ParquetPartitionReaderFactory( + sqlConf: SQLConf, + broadcastedConf: Broadcast[SerializableConfiguration], + dataSchema: StructType, + readDataSchema: StructType, + partitionSchema: StructType, + filters: Array[Filter]) extends FilePartitionReaderFactory with Logging { + private val isCaseSensitive = sqlConf.caseSensitiveAnalysis + private val resultSchema = StructType(partitionSchema.fields ++ readDataSchema.fields) + private val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled + private val enableVectorizedReader: Boolean = sqlConf.parquetVectorizedReaderEnabled && + resultSchema.forall(_.dataType.isInstanceOf[AtomicType]) + private val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled + private val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion + private val capacity = sqlConf.parquetVectorizedReaderBatchSize + private val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown + private val pushDownDate = sqlConf.parquetFilterPushDownDate + private val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp + private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal + private val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith + private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold + + override def supportColumnarReads(partition: InputPartition): Boolean = { + sqlConf.parquetVectorizedReaderEnabled && sqlConf.wholeStageEnabled && + resultSchema.length <= sqlConf.wholeStageMaxNumFields && + resultSchema.forall(_.dataType.isInstanceOf[AtomicType]) + } + + override def buildReader(file: PartitionedFile): PartitionReader[InternalRow] = { + val reader = if (enableVectorizedReader) { + createVectorizedReader(file) + } else { + createRowBaseReader(file) + } + + val fileReader = new PartitionReader[InternalRow] { + override def next(): Boolean = reader.nextKeyValue() + + override def get(): InternalRow = reader.getCurrentValue.asInstanceOf[InternalRow] + + override def close(): Unit = reader.close() + } + + new PartitionReaderWithPartitionValues(fileReader, readDataSchema, + partitionSchema, file.partitionValues) + } + + override def buildColumnarReader(file: PartitionedFile): PartitionReader[ColumnarBatch] = { + val vectorizedReader = createVectorizedReader(file) + vectorizedReader.enableReturningBatches() + + new PartitionReader[ColumnarBatch] { + override def next(): Boolean = vectorizedReader.nextKeyValue() + + override def get(): ColumnarBatch = + vectorizedReader.getCurrentValue.asInstanceOf[ColumnarBatch] + + override def close(): Unit = vectorizedReader.close() + } + } + + private def buildReaderBase[T]( + file: PartitionedFile, + buildReaderFunc: ( + ParquetInputSplit, InternalRow, TaskAttemptContextImpl, Option[FilterPredicate], + Option[TimeZone]) => RecordReader[Void, T]): RecordReader[Void, T] = { + val conf = broadcastedConf.value.value + + val filePath = new Path(new URI(file.filePath)) + val split = + new org.apache.parquet.hadoop.ParquetInputSplit( + filePath, + file.start, + file.start + file.length, + file.length, + Array.empty, + null) + + lazy val footerFileMetaData = + ParquetFileReader.readFooter(conf, filePath, SKIP_ROW_GROUPS).getFileMetaData + // Try to push down filters when filter push-down is enabled. + val pushed = if (enableParquetFilterPushDown) { + val parquetSchema = footerFileMetaData.getSchema + val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp, + pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive) + filters + // Collects all converted Parquet filter predicates. Notice that not all predicates can be + // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` + // is used here. + .flatMap(parquetFilters.createFilter) + .reduceOption(FilterApi.and) + } else { + None + } + // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps' + // *only* if the file was created by something other than "parquet-mr", so check the actual + // writer here for this file. We have to do this per-file, as each file in the table may + // have different writers. + // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads. + def isCreatedByParquetMr: Boolean = + footerFileMetaData.getCreatedBy().startsWith("parquet-mr") + + val convertTz = + if (timestampConversion && !isCreatedByParquetMr) { + Some(DateTimeUtils.getTimeZone(conf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) + } else { + None + } + + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) + + // Try to push down filters when filter push-down is enabled. + // Notice: This push-down is RowGroups level, not individual records. + if (pushed.isDefined) { + ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) + } + val reader = + buildReaderFunc(split, file.partitionValues, hadoopAttemptContext, pushed, convertTz) + reader.initialize(split, hadoopAttemptContext) + reader + } + + private def createRowBaseReader(file: PartitionedFile): RecordReader[Void, UnsafeRow] = { + buildReaderBase(file, createRowBaseParquetReader) + } + + private def createRowBaseParquetReader( + split: ParquetInputSplit, + partitionValues: InternalRow, + hadoopAttemptContext: TaskAttemptContextImpl, + pushed: Option[FilterPredicate], + convertTz: Option[TimeZone]): RecordReader[Void, UnsafeRow] = { + logDebug(s"Falling back to parquet-mr") + val taskContext = Option(TaskContext.get()) + // ParquetRecordReader returns UnsafeRow + val readSupport = new ParquetReadSupport(convertTz, enableVectorizedReader = false) + val reader = if (pushed.isDefined && enableRecordFilter) { + val parquetFilter = FilterCompat.get(pushed.get, null) + new ParquetRecordReader[UnsafeRow](readSupport, parquetFilter) + } else { + new ParquetRecordReader[UnsafeRow](readSupport) + } + val iter = new RecordReaderIterator(reader) + // SPARK-23457 Register a task completion lister before `initialization`. + taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) + reader + } + + private def createVectorizedReader(file: PartitionedFile): VectorizedParquetRecordReader = { + val vectorizedReader = buildReaderBase(file, createParquetVectorizedReader) + .asInstanceOf[VectorizedParquetRecordReader] + vectorizedReader.initBatch(partitionSchema, file.partitionValues) + vectorizedReader + } + + private def createParquetVectorizedReader( + split: ParquetInputSplit, + partitionValues: InternalRow, + hadoopAttemptContext: TaskAttemptContextImpl, + pushed: Option[FilterPredicate], + convertTz: Option[TimeZone]): VectorizedParquetRecordReader = { + val taskContext = Option(TaskContext.get()) + val vectorizedReader = new VectorizedParquetRecordReader( + convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity) + val iter = new RecordReaderIterator(vectorizedReader) + // SPARK-23457 Register a task completion lister before `initialization`. + taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) + logDebug(s"Appending $partitionSchema $partitionValues") + vectorizedReader + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala new file mode 100644 index 000000000000..a67aa3b92ce8 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.parquet.hadoop.ParquetInputFormat + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex +import org.apache.spark.sql.execution.datasources.parquet.{ParquetReadSupport, ParquetWriteSupport} +import org.apache.spark.sql.execution.datasources.v2.FileScan +import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.util.SerializableConfiguration + +case class ParquetScan( + sparkSession: SparkSession, + hadoopConf: Configuration, + fileIndex: PartitioningAwareFileIndex, + dataSchema: StructType, + readDataSchema: StructType, + readPartitionSchema: StructType, + pushedFilters: Array[Filter], + options: CaseInsensitiveStringMap) + extends FileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema) { + override def isSplitable(path: Path): Boolean = true + + override def createReaderFactory(): PartitionReaderFactory = { + val readDataSchemaAsJson = readDataSchema.json + hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) + hadoopConf.set( + ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, + readDataSchemaAsJson) + hadoopConf.set( + ParquetWriteSupport.SPARK_ROW_SCHEMA, + readDataSchemaAsJson) + hadoopConf.set( + SQLConf.SESSION_LOCAL_TIMEZONE.key, + sparkSession.sessionState.conf.sessionLocalTimeZone) + hadoopConf.setBoolean( + SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, + sparkSession.sessionState.conf.nestedSchemaPruningEnabled) + hadoopConf.setBoolean( + SQLConf.CASE_SENSITIVE.key, + sparkSession.sessionState.conf.caseSensitiveAnalysis) + + ParquetWriteSupport.setSchema(readDataSchema, hadoopConf) + + // Sets flags for `ParquetToSparkSchemaConverter` + hadoopConf.setBoolean( + SQLConf.PARQUET_BINARY_AS_STRING.key, + sparkSession.sessionState.conf.isParquetBinaryAsString) + hadoopConf.setBoolean( + SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, + sparkSession.sessionState.conf.isParquetINT96AsTimestamp) + + val broadcastedConf = sparkSession.sparkContext.broadcast( + new SerializableConfiguration(hadoopConf)) + ParquetPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, + dataSchema, readDataSchema, readPartitionSchema, pushedFilters) + } + + override def equals(obj: Any): Boolean = obj match { + case p: ParquetScan => + fileIndex == p.fileIndex && dataSchema == p.dataSchema && + readDataSchema == p.readDataSchema && readPartitionSchema == p.readPartitionSchema && + options == p.options && equivalentFilters(pushedFilters, p.pushedFilters) + case _ => false + } + + override def hashCode(): Int = getClass.hashCode() +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala new file mode 100644 index 000000000000..4b8b434af88e --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2.parquet + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, SparkToParquetSchemaConverter} +import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.sources.v2.reader.{Scan, SupportsPushDownFilters} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +case class ParquetScanBuilder( + sparkSession: SparkSession, + fileIndex: PartitioningAwareFileIndex, + schema: StructType, + dataSchema: StructType, + options: CaseInsensitiveStringMap) + extends FileScanBuilder(sparkSession, fileIndex, dataSchema) with SupportsPushDownFilters { + lazy val hadoopConf = { + val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap + // Hadoop Configurations are case sensitive. + sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) + } + + lazy val pushedParquetFilters = { + val sqlConf = sparkSession.sessionState.conf + val pushDownDate = sqlConf.parquetFilterPushDownDate + val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp + val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal + val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith + val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold + val isCaseSensitive = sqlConf.caseSensitiveAnalysis + val parquetSchema = + new SparkToParquetSchemaConverter(sparkSession.sessionState.conf).convert(schema) + val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp, + pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive) + parquetFilters.convertibleFilters(this.filters).toArray + } + + private var filters: Array[Filter] = Array.empty + + override def pushFilters(filters: Array[Filter]): Array[Filter] = { + this.filters = filters + this.filters + } + + // Note: for Parquet, the actual filter push down happens in [[ParquetPartitionReaderFactory]]. + // It requires the Parquet physical schema to determine whether a filter is convertible. + // All filters that can be converted to Parquet are pushed down. + override def pushedFilters(): Array[Filter] = pushedParquetFilters + + override def build(): Scan = { + ParquetScan(sparkSession, hadoopConf, fileIndex, dataSchema, readDataSchema(), + readPartitionSchema(), pushedParquetFilters, options) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala new file mode 100644 index 000000000000..dce851dbcd33 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.parquet + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.fs.FileStatus + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils +import org.apache.spark.sql.execution.datasources.v2.FileTable +import org.apache.spark.sql.sources.v2.writer.WriteBuilder +import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +case class ParquetTable( + name: String, + sparkSession: SparkSession, + options: CaseInsensitiveStringMap, + paths: Seq[String], + userSpecifiedSchema: Option[StructType], + fallbackFileFormat: Class[_ <: FileFormat]) + extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + + override def newScanBuilder(options: CaseInsensitiveStringMap): ParquetScanBuilder = + new ParquetScanBuilder(sparkSession, fileIndex, schema, dataSchema, options) + + override def inferSchema(files: Seq[FileStatus]): Option[StructType] = + ParquetUtils.inferSchema(sparkSession, options.asScala.toMap, files) + + override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = + new ParquetWriteBuilder(options, paths, formatName, supportsDataType) + + override def supportsDataType(dataType: DataType): Boolean = dataType match { + case _: AtomicType => true + + case st: StructType => st.forall { f => supportsDataType(f.dataType) } + + case ArrayType(elementType, _) => supportsDataType(elementType) + + case MapType(keyType, valueType, _) => + supportsDataType(keyType) && supportsDataType(valueType) + + case udt: UserDefinedType[_] => supportsDataType(udt.sqlType) + + case _ => false + } + + override def formatName: String = "Parquet" +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWriteBuilder.scala new file mode 100644 index 000000000000..bfe2084299df --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWriteBuilder.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.parquet + +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapreduce.{Job, OutputCommitter, TaskAttemptContext} +import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat} +import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel +import org.apache.parquet.hadoop.codec.CodecConfig +import org.apache.parquet.hadoop.util.ContextUtil + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} +import org.apache.spark.sql.execution.datasources.parquet._ +import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class ParquetWriteBuilder( + options: CaseInsensitiveStringMap, + paths: Seq[String], + formatName: String, + supportsDataType: DataType => Boolean) + extends FileWriteBuilder(options, paths, formatName, supportsDataType) with Logging { + + override def prepareWrite( + sqlConf: SQLConf, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + val parquetOptions = new ParquetOptions(options, sqlConf) + + val conf = ContextUtil.getConfiguration(job) + + val committerClass = + conf.getClass( + SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key, + classOf[ParquetOutputCommitter], + classOf[OutputCommitter]) + + if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) { + logInfo("Using default output committer for Parquet: " + + classOf[ParquetOutputCommitter].getCanonicalName) + } else { + logInfo("Using user defined output committer for Parquet: " + committerClass.getCanonicalName) + } + + conf.setClass( + SQLConf.OUTPUT_COMMITTER_CLASS.key, + committerClass, + classOf[OutputCommitter]) + + // We're not really using `ParquetOutputFormat[Row]` for writing data here, because we override + // it in `ParquetOutputWriter` to support appending and dynamic partitioning. The reason why + // we set it here is to setup the output committer class to `ParquetOutputCommitter`, which is + // bundled with `ParquetOutputFormat[Row]`. + job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]]) + + ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport]) + + // This metadata is useful for keeping UDTs like Vector/Matrix. + ParquetWriteSupport.setSchema(dataSchema, conf) + + // Sets flags for `ParquetWriteSupport`, which converts Catalyst schema to Parquet + // schema and writes actual rows to Parquet files. + conf.set(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, sqlConf.writeLegacyParquetFormat.toString) + + conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, sqlConf.parquetOutputTimestampType.toString) + + // Sets compression scheme + conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName) + + // SPARK-15719: Disables writing Parquet summary files by default. + if (conf.get(ParquetOutputFormat.JOB_SUMMARY_LEVEL) == null + && conf.get(ParquetOutputFormat.ENABLE_JOB_SUMMARY) == null) { + conf.setEnum(ParquetOutputFormat.JOB_SUMMARY_LEVEL, JobSummaryLevel.NONE) + } + + if (ParquetOutputFormat.getJobSummaryLevel(conf) == JobSummaryLevel.NONE + && !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) { + // output summary is requested, but the class is not a Parquet Committer + logWarning(s"Committer $committerClass is not a ParquetOutputCommitter and cannot" + + s" create job summaries. " + + s"Set Parquet option ${ParquetOutputFormat.JOB_SUMMARY_LEVEL} to NONE.") + } + + new OutputWriterFactory { + override def newInstance( + path: String, + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + new ParquetOutputWriter(path, context) + } + + override def getFileExtension(context: TaskAttemptContext): String = { + CodecConfig.from(context).getCodec.getExtension + ".parquet" + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index dd11b5c50398..ea4794e7c809 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -333,7 +333,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo // TODO: test file source V2 after write path is fixed. Seq(true).foreach { useV1 => val useV1List = if (useV1) { - "csv,json,orc" + "csv,json,orc,parquet" } else { "" } @@ -378,7 +378,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo // TODO: test file source V2 after write path is fixed. Seq(true).foreach { useV1 => val useV1List = if (useV1) { - "csv,orc" + "csv,orc,parquet" } else { "" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index da2645ccca96..68e4b122575d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan +import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -2980,62 +2981,82 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } test("SPARK-27699 Validate pushed down filters") { - def checkPushedFilters(df: DataFrame, filters: Array[sources.Filter]): Unit = { + def checkPushedFilters(format: String, df: DataFrame, filters: Array[sources.Filter]): Unit = { val scan = df.queryExecution.sparkPlan .find(_.isInstanceOf[BatchScanExec]).get.asInstanceOf[BatchScanExec] .scan - assert(scan.isInstanceOf[OrcScan]) - assert(scan.asInstanceOf[OrcScan].pushedFilters === filters) - } - withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "") { - withTempPath { dir => - spark.range(10).map(i => (i, i.toString)).toDF("id", "s").write.orc(dir.getCanonicalPath) - val df = spark.read.orc(dir.getCanonicalPath) - checkPushedFilters( - df.where(('id < 2 and 's.contains("foo")) or ('id > 10 and 's.contains("bar"))), - Array(sources.Or(sources.LessThan("id", 2), sources.GreaterThan("id", 10)))) - checkPushedFilters( - df.where('s.contains("foo") or ('id > 10 and 's.contains("bar"))), - Array.empty) - checkPushedFilters( - df.where('id < 2 and not('id > 10 and 's.contains("bar"))), - Array(sources.IsNotNull("id"), sources.LessThan("id", 2))) + format match { + case "orc" => + assert(scan.isInstanceOf[OrcScan]) + assert(scan.asInstanceOf[OrcScan].pushedFilters === filters) + case "parquet" => + assert(scan.isInstanceOf[ParquetScan]) + assert(scan.asInstanceOf[ParquetScan].pushedFilters === filters) + case _ => + fail(s"unknow format $format") + } + } + + Seq("orc", "parquet").foreach { format => + withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "") { + withTempPath { dir => + spark.range(10).map(i => (i, i.toString)).toDF("id", "s") + .write + .format(format) + .save(dir.getCanonicalPath) + val df = spark.read.format(format).load(dir.getCanonicalPath) + checkPushedFilters( + format, + df.where(('id < 2 and 's.contains("foo")) or ('id > 10 and 's.contains("bar"))), + Array(sources.Or(sources.LessThan("id", 2), sources.GreaterThan("id", 10)))) + checkPushedFilters( + format, + df.where('s.contains("foo") or ('id > 10 and 's.contains("bar"))), + Array.empty) + checkPushedFilters( + format, + df.where('id < 2 and not('id > 10 and 's.contains("bar"))), + Array(sources.IsNotNull("id"), sources.LessThan("id", 2))) + } } } } test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") { - Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery => - withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) { - withTable("t") { - sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)") - sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)") - if (enableOptimizeMetadataOnlyQuery) { - // The result is wrong if we enable the configuration. - checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(5)) - } else { - checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(null)) + withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { + Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery => + withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> + enableOptimizeMetadataOnlyQuery.toString) { + withTable("t") { + sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)") + sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)") + if (enableOptimizeMetadataOnlyQuery) { + // The result is wrong if we enable the configuration. + checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(5)) + } else { + checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(null)) + } + checkAnswer(sql("SELECT MAX(col1) FROM t"), Row(null)) } - checkAnswer(sql("SELECT MAX(col1) FROM t"), Row(null)) - } - withTempPath { path => - val tabLocation = path.getCanonicalPath - val partLocation1 = tabLocation + "/p=3" - val partLocation2 = tabLocation + "/p=1" - // SPARK-23271 empty RDD when saved should write a metadata only file - val df = spark.emptyDataFrame.select(lit(1).as("col")) - df.write.parquet(partLocation1) - val df2 = spark.range(10).toDF("col") - df2.write.parquet(partLocation2) - val readDF = spark.read.parquet(tabLocation) - if (enableOptimizeMetadataOnlyQuery) { - // The result is wrong if we enable the configuration. - checkAnswer(readDF.selectExpr("max(p)"), Row(3)) - } else { - checkAnswer(readDF.selectExpr("max(p)"), Row(1)) + withTempPath { path => + val tabLocation = path.getCanonicalPath + val partLocation1 = tabLocation + "/p=3" + val partLocation2 = tabLocation + "/p=1" + // SPARK-23271 empty RDD when saved should write a metadata only file + val df = spark.emptyDataFrame.select(lit(1).as("col")) + df.write.parquet(partLocation1) + val df2 = spark.range(10).toDF("col") + df2.write.parquet(partLocation2) + val readDF = spark.read.parquet(tabLocation) + if (enableOptimizeMetadataOnlyQuery) { + // The result is wrong if we enable the configuration. + checkAnswer(readDF.selectExpr("max(p)"), Row(3)) + } else { + checkAnswer(readDF.selectExpr("max(p)"), Row(1)) + } + checkAnswer(readDF.selectExpr("max(col)"), Row(9)) } - checkAnswer(readDF.selectExpr("max(col)"), Row(9)) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 0671524a2ab6..faa7cbb36117 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -172,15 +172,17 @@ class PlannerSuite extends SharedSQLContext { } test("SPARK-11390 explain should print PushedFilters of PhysicalRDD") { - withTempPath { file => - val path = file.getCanonicalPath - testData.write.parquet(path) - val df = spark.read.parquet(path) - df.createOrReplaceTempView("testPushed") - - withTempView("testPushed") { - val exp = sql("select * from testPushed where key = 15").queryExecution.sparkPlan - assert(exp.toString.contains("PushedFilters: [IsNotNull(key), EqualTo(key,15)]")) + withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { + withTempPath { file => + val path = file.getCanonicalPath + testData.write.parquet(path) + val df = spark.read.parquet(path) + df.createOrReplaceTempView("testPushed") + + withTempView("testPushed") { + val exp = sql("select * from testPushed where key = 15").queryExecution.sparkPlan + assert(exp.toString.contains("PushedFilters: [IsNotNull(key), EqualTo(key,15)]")) + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala index 4731da47a19d..b252100d890e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala @@ -33,25 +33,27 @@ class SameResultSuite extends QueryTest with SharedSQLContext { import testImplicits._ test("FileSourceScanExec: different orders of data filters and partition filters") { - withTempPath { path => - val tmpDir = path.getCanonicalPath - spark.range(10) - .selectExpr("id as a", "id + 1 as b", "id + 2 as c", "id + 3 as d") - .write - .partitionBy("a", "b") - .parquet(tmpDir) - val df = spark.read.parquet(tmpDir) - // partition filters: a > 1 AND b < 9 - // data filters: c > 1 AND d < 9 - val plan1 = getFileSourceScanExec(df.where("a > 1 AND b < 9 AND c > 1 AND d < 9")) - val plan2 = getFileSourceScanExec(df.where("b < 9 AND a > 1 AND d < 9 AND c > 1")) - assert(plan1.sameResult(plan2)) + withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { + withTempPath { path => + val tmpDir = path.getCanonicalPath + spark.range(10) + .selectExpr("id as a", "id + 1 as b", "id + 2 as c", "id + 3 as d") + .write + .partitionBy("a", "b") + .parquet(tmpDir) + val df = spark.read.parquet(tmpDir) + // partition filters: a > 1 AND b < 9 + // data filters: c > 1 AND d < 9 + val plan1 = getFileSourceScanExec(df.where("a > 1 AND b < 9 AND c > 1 AND d < 9")) + val plan2 = getFileSourceScanExec(df.where("b < 9 AND a > 1 AND d < 9 AND c > 1")) + assert(plan1.sameResult(plan2)) + } } } test("FileScan: different orders of data filters and partition filters") { withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "") { - Seq("orc", "json", "csv").foreach { format => + Seq("orc", "json", "csv", "parquet").foreach { format => withTempPath { path => val tmpDir = path.getCanonicalPath spark.range(10) @@ -73,7 +75,7 @@ class SameResultSuite extends QueryTest with SharedSQLContext { val plan3 = df.where("b < 9 AND a > 1 AND d < 8 AND c > 1").queryExecution.sparkPlan assert(!plan1.sameResult(plan3)) // The [[FileScan]]s should have different results if they support filter pushdown. - if (format == "orc") { + if (format == "orc" || format == "parquet") { val scan3 = getBatchScanExec(plan3) assert(!scan1.sameResult(scan3)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index 47ff372992b9..e9d0556ebb51 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution import org.apache.spark.SparkEnv import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext class SparkPlanSuite extends QueryTest with SharedSQLContext { @@ -35,27 +37,50 @@ class SparkPlanSuite extends QueryTest with SharedSQLContext { } test("SPARK-23731 plans should be canonicalizable after being (de)serialized") { - withTempPath { path => - spark.range(1).write.parquet(path.getAbsolutePath) - val df = spark.read.parquet(path.getAbsolutePath) - val fileSourceScanExec = - df.queryExecution.sparkPlan.collectFirst { case p: FileSourceScanExec => p }.get - val serializer = SparkEnv.get.serializer.newInstance() - val readback = - serializer.deserialize[FileSourceScanExec](serializer.serialize(fileSourceScanExec)) - try { - readback.canonicalized - } catch { - case e: Throwable => fail("FileSourceScanExec was not canonicalizable", e) + withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { + withTempPath { path => + spark.range(1).write.parquet(path.getAbsolutePath) + val df = spark.read.parquet(path.getAbsolutePath) + val fileSourceScanExec = + df.queryExecution.sparkPlan.collectFirst { case p: FileSourceScanExec => p }.get + val serializer = SparkEnv.get.serializer.newInstance() + val readback = + serializer.deserialize[FileSourceScanExec](serializer.serialize(fileSourceScanExec)) + try { + readback.canonicalized + } catch { + case e: Throwable => fail("FileSourceScanExec was not canonicalizable", e) + } + } + } + } + + test("SPARK-27418 BatchScanExec should be canonicalizable after being (de)serialized") { + withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "") { + withTempPath { path => + spark.range(1).write.parquet(path.getAbsolutePath) + val df = spark.read.parquet(path.getAbsolutePath) + val batchScanExec = + df.queryExecution.sparkPlan.collectFirst { case p: BatchScanExec => p }.get + val serializer = SparkEnv.get.serializer.newInstance() + val readback = + serializer.deserialize[BatchScanExec](serializer.serialize(batchScanExec)) + try { + readback.canonicalized + } catch { + case e: Throwable => fail("BatchScanExec was not canonicalizable", e) + } } } } test("SPARK-25357 SparkPlanInfo of FileScan contains nonEmpty metadata") { - withTempPath { path => - spark.range(5).write.parquet(path.getAbsolutePath) - val f = spark.read.parquet(path.getAbsolutePath) - assert(SparkPlanInfo.fromSparkPlan(f.queryExecution.sparkPlan).metadata.nonEmpty) + withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { + withTempPath { path => + spark.range(5).write.parquet(path.getAbsolutePath) + val f = spark.read.parquet(path.getAbsolutePath) + assert(SparkPlanInfo.fromSparkPlan(f.queryExecution.sparkPlan).metadata.nonEmpty) + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 28cb7611f0eb..c5d12d618e05 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -26,13 +26,16 @@ import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.filter2.predicate.Operators.{Column => _, _} import org.apache.parquet.schema.MessageType -import org.apache.spark.SparkException +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.orc.OrcFilters +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType @@ -58,9 +61,9 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorV2} * dependent on this configuration, don't forget you better explicitly set this configuration * within the test. */ -class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContext { +abstract class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContext { - private def createParquetFilters( + protected def createParquetFilters( schema: MessageType, caseSensitive: Option[Boolean] = None): ParquetFilters = new ParquetFilters(schema, conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp, @@ -82,57 +85,12 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } - private def checkFilterPredicate( + def checkFilterPredicate( df: DataFrame, predicate: Predicate, filterClass: Class[_ <: FilterPredicate], checker: (DataFrame, Seq[Row]) => Unit, - expected: Seq[Row]): Unit = { - val output = predicate.collect { case a: Attribute => a }.distinct - - withSQLConf( - SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true", - SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED.key -> "true", - SQLConf.PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED.key -> "true", - SQLConf.PARQUET_FILTER_PUSHDOWN_DECIMAL_ENABLED.key -> "true", - SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED.key -> "true", - // Disable adding filters from constraints because it adds, for instance, - // is-not-null to pushed filters, which makes it hard to test if the pushed - // filter is expected or not (this had to be fixed with SPARK-13495). - SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> InferFiltersFromConstraints.ruleName, - SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { - val query = df - .select(output.map(e => Column(e)): _*) - .where(Column(predicate)) - - var maybeRelation: Option[HadoopFsRelation] = None - val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect { - case PhysicalOperation(_, filters, - LogicalRelation(relation: HadoopFsRelation, _, _, _)) => - maybeRelation = Some(relation) - filters - }.flatten.reduceLeftOption(_ && _) - assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query") - - val (_, selectedFilters, _) = - DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq) - assert(selectedFilters.nonEmpty, "No filter is pushed down") - val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema) - val parquetFilters = createParquetFilters(schema) - // In this test suite, all the simple predicates are convertible here. - assert(parquetFilters.convertibleFilters(selectedFilters) == selectedFilters) - val pushedParquetFilters = selectedFilters.map { pred => - val maybeFilter = parquetFilters.createFilter(pred) - assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred") - maybeFilter.get - } - // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`) - assert(pushedParquetFilters.exists(_.getClass === filterClass), - s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") - - checker(stripSparkFilter(query), expected) - } - } + expected: Seq[Row]): Unit private def checkFilterPredicate (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: Seq[Row]) @@ -1427,6 +1385,127 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } +class ParquetV1FilterSuite extends ParquetFilterSuite { + override protected def sparkConf: SparkConf = + super + .sparkConf + .set(SQLConf.USE_V1_SOURCE_READER_LIST, "parquet") + .set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "parquet") + + override def checkFilterPredicate( + df: DataFrame, + predicate: Predicate, + filterClass: Class[_ <: FilterPredicate], + checker: (DataFrame, Seq[Row]) => Unit, + expected: Seq[Row]): Unit = { + val output = predicate.collect { case a: Attribute => a }.distinct + + withSQLConf( + SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true", + SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED.key -> "true", + SQLConf.PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED.key -> "true", + SQLConf.PARQUET_FILTER_PUSHDOWN_DECIMAL_ENABLED.key -> "true", + SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED.key -> "true", + // Disable adding filters from constraints because it adds, for instance, + // is-not-null to pushed filters, which makes it hard to test if the pushed + // filter is expected or not (this had to be fixed with SPARK-13495). + SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> InferFiltersFromConstraints.ruleName, + SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { + val query = df + .select(output.map(e => Column(e)): _*) + .where(Column(predicate)) + + var maybeRelation: Option[HadoopFsRelation] = None + val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect { + case PhysicalOperation(_, filters, + LogicalRelation(relation: HadoopFsRelation, _, _, _)) => + maybeRelation = Some(relation) + filters + }.flatten.reduceLeftOption(_ && _) + assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query") + + val (_, selectedFilters, _) = + DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq) + assert(selectedFilters.nonEmpty, "No filter is pushed down") + val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema) + val parquetFilters = createParquetFilters(schema) + // In this test suite, all the simple predicates are convertible here. + assert(parquetFilters.convertibleFilters(selectedFilters) === selectedFilters) + val pushedParquetFilters = selectedFilters.map { pred => + val maybeFilter = parquetFilters.createFilter(pred) + assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred") + maybeFilter.get + } + // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`) + assert(pushedParquetFilters.exists(_.getClass === filterClass), + s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") + + checker(stripSparkFilter(query), expected) + } + } +} + +class ParquetV2FilterSuite extends ParquetFilterSuite { + // TODO: enable Parquet V2 write path after file source V2 writers are workable. + override protected def sparkConf: SparkConf = + super + .sparkConf + .set(SQLConf.USE_V1_SOURCE_READER_LIST, "") + + override def checkFilterPredicate( + df: DataFrame, + predicate: Predicate, + filterClass: Class[_ <: FilterPredicate], + checker: (DataFrame, Seq[Row]) => Unit, + expected: Seq[Row]): Unit = { + val output = predicate.collect { case a: Attribute => a }.distinct + + withSQLConf( + SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true", + SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED.key -> "true", + SQLConf.PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED.key -> "true", + SQLConf.PARQUET_FILTER_PUSHDOWN_DECIMAL_ENABLED.key -> "true", + SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED.key -> "true", + // Disable adding filters from constraints because it adds, for instance, + // is-not-null to pushed filters, which makes it hard to test if the pushed + // filter is expected or not (this had to be fixed with SPARK-13495). + SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> InferFiltersFromConstraints.ruleName, + SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { + val query = df + .select(output.map(e => Column(e)): _*) + .where(Column(predicate)) + + query.queryExecution.optimizedPlan.collectFirst { + case PhysicalOperation(_, filters, + DataSourceV2Relation(parquetTable: ParquetTable, _, options)) => + assert(filters.nonEmpty, "No filter is analyzed from the given query") + val scanBuilder = parquetTable.newScanBuilder(options) + val sourceFilters = filters.flatMap(DataSourceStrategy.translateFilter).toArray + scanBuilder.pushFilters(sourceFilters) + val pushedFilters = scanBuilder.pushedFilters() + assert(pushedFilters.nonEmpty, "No filter is pushed down") + val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema) + val parquetFilters = createParquetFilters(schema) + // In this test suite, all the simple predicates are convertible here. + assert(parquetFilters.convertibleFilters(sourceFilters) === pushedFilters) + val pushedParquetFilters = pushedFilters.map { pred => + val maybeFilter = parquetFilters.createFilter(pred) + assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred") + maybeFilter.get + } + // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`) + assert(pushedParquetFilters.exists(_.getClass === filterClass), + s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") + + checker(stripSparkFilter(query), expected) + + case _ => + throw new AnalysisException("Can not match ParquetTable in the query.") + } + } + } +} + class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] { private var _sum = 0 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index febbe054641c..6f3ed3d85e93 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -29,6 +29,7 @@ import com.google.common.io.Files import org.apache.hadoop.fs.Path import org.apache.parquet.hadoop.ParquetOutputFormat +import org.apache.spark.SparkConf import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils @@ -36,6 +37,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.{PartitionPath => Partition} +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileTable} import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -48,7 +50,8 @@ case class ParquetData(intField: Int, stringField: String) // The data that also includes the partitioning key case class ParquetDataWithKey(intField: Int, pi: Int, stringField: String, ps: String) -class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with SharedSQLContext { +abstract class ParquetPartitionDiscoverySuite + extends QueryTest with ParquetTest with SharedSQLContext { import PartitioningUtils._ import testImplicits._ @@ -569,53 +572,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha } } - test("read partitioned table - partition key included in Parquet file") { - withTempDir { base => - for { - pi <- Seq(1, 2) - ps <- Seq("foo", "bar") - } { - makeParquetFile( - (1 to 10).map(i => ParquetDataWithKey(i, pi, i.toString, ps)), - makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)) - } - - spark.read.parquet(base.getCanonicalPath).createOrReplaceTempView("t") - - withTempView("t") { - checkAnswer( - sql("SELECT * FROM t"), - for { - i <- 1 to 10 - pi <- Seq(1, 2) - ps <- Seq("foo", "bar") - } yield Row(i, pi, i.toString, ps)) - - checkAnswer( - sql("SELECT intField, pi FROM t"), - for { - i <- 1 to 10 - pi <- Seq(1, 2) - _ <- Seq("foo", "bar") - } yield Row(i, pi)) - - checkAnswer( - sql("SELECT * FROM t WHERE pi = 1"), - for { - i <- 1 to 10 - ps <- Seq("foo", "bar") - } yield Row(i, 1, i.toString, ps)) - - checkAnswer( - sql("SELECT * FROM t WHERE ps = 'foo'"), - for { - i <- 1 to 10 - pi <- Seq(1, 2) - } yield Row(i, pi, i.toString, "foo")) - } - } - } - test("read partitioned table - with nulls") { withTempDir { base => for { @@ -657,39 +613,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha } } - test("read partitioned table - with nulls and partition keys are included in Parquet file") { - withTempDir { base => - for { - pi <- Seq(1, 2) - ps <- Seq("foo", null.asInstanceOf[String]) - } { - makeParquetFile( - (1 to 10).map(i => ParquetDataWithKey(i, pi, i.toString, ps)), - makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)) - } - - val parquetRelation = spark.read.format("parquet").load(base.getCanonicalPath) - parquetRelation.createOrReplaceTempView("t") - - withTempView("t") { - checkAnswer( - sql("SELECT * FROM t"), - for { - i <- 1 to 10 - pi <- Seq(1, 2) - ps <- Seq("foo", null.asInstanceOf[String]) - } yield Row(i, pi, i.toString, ps)) - - checkAnswer( - sql("SELECT * FROM t WHERE ps IS NULL"), - for { - i <- 1 to 10 - pi <- Seq(1, 2) - } yield Row(i, pi, i.toString, null)) - } - } - } - test("read partitioned table - merging compatible schemas") { withTempDir { base => makeParquetFile( @@ -715,20 +638,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha } } - test("SPARK-7749 Non-partitioned table should have empty partition spec") { - withTempPath { dir => - (1 to 10).map(i => (i, i.toString)).toDF("a", "b").write.parquet(dir.getCanonicalPath) - val queryExecution = spark.read.parquet(dir.getCanonicalPath).queryExecution - queryExecution.analyzed.collectFirst { - case LogicalRelation( - HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), _, _, _) => - assert(location.partitionSpec() === PartitionSpec.emptySpec) - }.getOrElse { - fail(s"Expecting a matching HadoopFsRelation, but got:\n$queryExecution") - } - } - } - test("SPARK-7847: Dynamic partition directory path escaping and unescaping") { withTempPath { dir => val df = Seq("/", "[]", "?").zipWithIndex.map(_.swap).toDF("i", "s") @@ -1083,6 +992,158 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha } } + test("SPARK-22109: Resolve type conflicts between strings and timestamps in partition column") { + val df = Seq( + (1, "2015-01-01 00:00:00"), + (2, "2014-01-01 00:00:00"), + (3, "blah")).toDF("i", "str") + + withTempPath { path => + df.write.format("parquet").partitionBy("str").save(path.getAbsolutePath) + checkAnswer(spark.read.load(path.getAbsolutePath), df) + } + } + + test("Resolve type conflicts - decimals, dates and timestamps in partition column") { + withTempPath { path => + val df = Seq((1, "2014-01-01"), (2, "2016-01-01"), (3, "2015-01-01 00:01:00")).toDF("i", "ts") + df.write.format("parquet").partitionBy("ts").save(path.getAbsolutePath) + checkAnswer( + spark.read.load(path.getAbsolutePath), + Row(1, Timestamp.valueOf("2014-01-01 00:00:00")) :: + Row(2, Timestamp.valueOf("2016-01-01 00:00:00")) :: + Row(3, Timestamp.valueOf("2015-01-01 00:01:00")) :: Nil) + } + + withTempPath { path => + val df = Seq((1, "1"), (2, "3"), (3, "2" * 30)).toDF("i", "decimal") + df.write.format("parquet").partitionBy("decimal").save(path.getAbsolutePath) + checkAnswer( + spark.read.load(path.getAbsolutePath), + Row(1, BigDecimal("1")) :: + Row(2, BigDecimal("3")) :: + Row(3, BigDecimal("2" * 30)) :: Nil) + } + } + + test("SPARK-23436: invalid Dates should be inferred as String in partition inference") { + withTempPath { path => + val data = Seq(("1", "2018-01", "2018-01-01-04", "test")) + .toDF("id", "date_month", "date_hour", "data") + + data.write.partitionBy("date_month", "date_hour").parquet(path.getAbsolutePath) + val input = spark.read.parquet(path.getAbsolutePath).select("id", + "date_month", "date_hour", "data") + + assert(input.schema.sameType(input.schema)) + checkAnswer(input, data) + } + } +} + +class ParquetV1PartitionDiscoverySuite extends ParquetPartitionDiscoverySuite { + import testImplicits._ + + override protected def sparkConf: SparkConf = + super + .sparkConf + .set(SQLConf.USE_V1_SOURCE_READER_LIST, "parquet") + .set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "parquet") + + test("read partitioned table - partition key included in Parquet file") { + withTempDir { base => + for { + pi <- Seq(1, 2) + ps <- Seq("foo", "bar") + } { + makeParquetFile( + (1 to 10).map(i => ParquetDataWithKey(i, pi, i.toString, ps)), + makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)) + } + + spark.read.parquet(base.getCanonicalPath).createOrReplaceTempView("t") + + withTempView("t") { + checkAnswer( + sql("SELECT * FROM t"), + for { + i <- 1 to 10 + pi <- Seq(1, 2) + ps <- Seq("foo", "bar") + } yield Row(i, pi, i.toString, ps)) + + checkAnswer( + sql("SELECT intField, pi FROM t"), + for { + i <- 1 to 10 + pi <- Seq(1, 2) + _ <- Seq("foo", "bar") + } yield Row(i, pi)) + + checkAnswer( + sql("SELECT * FROM t WHERE pi = 1"), + for { + i <- 1 to 10 + ps <- Seq("foo", "bar") + } yield Row(i, 1, i.toString, ps)) + + checkAnswer( + sql("SELECT * FROM t WHERE ps = 'foo'"), + for { + i <- 1 to 10 + pi <- Seq(1, 2) + } yield Row(i, pi, i.toString, "foo")) + } + } + } + + test("read partitioned table - with nulls and partition keys are included in Parquet file") { + withTempDir { base => + for { + pi <- Seq(1, 2) + ps <- Seq("foo", null.asInstanceOf[String]) + } { + makeParquetFile( + (1 to 10).map(i => ParquetDataWithKey(i, pi, i.toString, ps)), + makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)) + } + + val parquetRelation = spark.read.format("parquet").load(base.getCanonicalPath) + parquetRelation.createOrReplaceTempView("t") + + withTempView("t") { + checkAnswer( + sql("SELECT * FROM t"), + for { + i <- 1 to 10 + pi <- Seq(1, 2) + ps <- Seq("foo", null.asInstanceOf[String]) + } yield Row(i, pi, i.toString, ps)) + + checkAnswer( + sql("SELECT * FROM t WHERE ps IS NULL"), + for { + i <- 1 to 10 + pi <- Seq(1, 2) + } yield Row(i, pi, i.toString, null)) + } + } + } + + test("SPARK-7749 Non-partitioned table should have empty partition spec") { + withTempPath { dir => + (1 to 10).map(i => (i, i.toString)).toDF("a", "b").write.parquet(dir.getCanonicalPath) + val queryExecution = spark.read.parquet(dir.getCanonicalPath).queryExecution + queryExecution.analyzed.collectFirst { + case LogicalRelation( + HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), _, _, _) => + assert(location.partitionSpec() === PartitionSpec.emptySpec) + }.getOrElse { + fail(s"Expecting a matching HadoopFsRelation, but got:\n$queryExecution") + } + } + } + test("SPARK-18108 Parquet reader fails when data column types conflict with partition ones") { withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") { withTempPath { dir => @@ -1125,52 +1186,150 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha } } } +} - test("SPARK-22109: Resolve type conflicts between strings and timestamps in partition column") { - val df = Seq( - (1, "2015-01-01 00:00:00"), - (2, "2014-01-01 00:00:00"), - (3, "blah")).toDF("i", "str") +class ParquetV2PartitionDiscoverySuite extends ParquetPartitionDiscoverySuite { + import testImplicits._ - withTempPath { path => - df.write.format("parquet").partitionBy("str").save(path.getAbsolutePath) - checkAnswer(spark.read.load(path.getAbsolutePath), df) + // TODO: enable Parquet V2 write path after file source V2 writers are workable. + override protected def sparkConf: SparkConf = + super + .sparkConf + .set(SQLConf.USE_V1_SOURCE_READER_LIST, "") + + test("read partitioned table - partition key included in Parquet file") { + withTempDir { base => + for { + pi <- Seq(1, 2) + ps <- Seq("foo", "bar") + } { + makeParquetFile( + (1 to 10).map(i => ParquetDataWithKey(i, pi, i.toString, ps)), + makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)) + } + + spark.read.parquet(base.getCanonicalPath).createOrReplaceTempView("t") + + withTempView("t") { + checkAnswer( + sql("SELECT * FROM t"), + for { + i <- 1 to 10 + pi <- Seq(1, 2) + ps <- Seq("foo", "bar") + } yield Row(i, i.toString, pi, ps)) + + checkAnswer( + sql("SELECT intField, pi FROM t"), + for { + i <- 1 to 10 + pi <- Seq(1, 2) + _ <- Seq("foo", "bar") + } yield Row(i, pi)) + + checkAnswer( + sql("SELECT * FROM t WHERE pi = 1"), + for { + i <- 1 to 10 + ps <- Seq("foo", "bar") + } yield Row(i, i.toString, 1, ps)) + + checkAnswer( + sql("SELECT * FROM t WHERE ps = 'foo'"), + for { + i <- 1 to 10 + pi <- Seq(1, 2) + } yield Row(i, i.toString, pi, "foo")) + } } } - test("Resolve type conflicts - decimals, dates and timestamps in partition column") { - withTempPath { path => - val df = Seq((1, "2014-01-01"), (2, "2016-01-01"), (3, "2015-01-01 00:01:00")).toDF("i", "ts") - df.write.format("parquet").partitionBy("ts").save(path.getAbsolutePath) - checkAnswer( - spark.read.load(path.getAbsolutePath), - Row(1, Timestamp.valueOf("2014-01-01 00:00:00")) :: - Row(2, Timestamp.valueOf("2016-01-01 00:00:00")) :: - Row(3, Timestamp.valueOf("2015-01-01 00:01:00")) :: Nil) + test("read partitioned table - with nulls and partition keys are included in Parquet file") { + withTempDir { base => + for { + pi <- Seq(1, 2) + ps <- Seq("foo", null.asInstanceOf[String]) + } { + makeParquetFile( + (1 to 10).map(i => ParquetDataWithKey(i, pi, i.toString, ps)), + makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)) + } + + val parquetRelation = spark.read.format("parquet").load(base.getCanonicalPath) + parquetRelation.createOrReplaceTempView("t") + + withTempView("t") { + checkAnswer( + sql("SELECT * FROM t"), + for { + i <- 1 to 10 + pi <- Seq(1, 2) + ps <- Seq("foo", null.asInstanceOf[String]) + } yield Row(i, i.toString, pi, ps)) + + checkAnswer( + sql("SELECT * FROM t WHERE ps IS NULL"), + for { + i <- 1 to 10 + pi <- Seq(1, 2) + } yield Row(i, i.toString, pi, null)) + } } + } - withTempPath { path => - val df = Seq((1, "1"), (2, "3"), (3, "2" * 30)).toDF("i", "decimal") - df.write.format("parquet").partitionBy("decimal").save(path.getAbsolutePath) - checkAnswer( - spark.read.load(path.getAbsolutePath), - Row(1, BigDecimal("1")) :: - Row(2, BigDecimal("3")) :: - Row(3, BigDecimal("2" * 30)) :: Nil) + test("SPARK-7749 Non-partitioned table should have empty partition spec") { + withTempPath { dir => + (1 to 10).map(i => (i, i.toString)).toDF("a", "b").write.parquet(dir.getCanonicalPath) + val queryExecution = spark.read.parquet(dir.getCanonicalPath).queryExecution + queryExecution.analyzed.collectFirst { + case DataSourceV2Relation(fileTable: FileTable, _, _) => + assert(fileTable.fileIndex.partitionSpec() === PartitionSpec.emptySpec) + }.getOrElse { + fail(s"Expecting a matching DataSourceV2Relation, but got:\n$queryExecution") + } } } - test("SPARK-23436: invalid Dates should be inferred as String in partition inference") { - withTempPath { path => - val data = Seq(("1", "2018-01", "2018-01-01-04", "test")) - .toDF("id", "date_month", "date_hour", "data") + test("SPARK-18108 Parquet reader fails when data column types conflict with partition ones") { + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") { + withTempPath { dir => + val path = dir.getCanonicalPath + val df = Seq((1L, 2.0)).toDF("a", "b") + df.write.parquet(s"$path/a=1") + checkAnswer(spark.read.parquet(s"$path"), Seq(Row(2.0, 1))) + } + } + } - data.write.partitionBy("date_month", "date_hour").parquet(path.getAbsolutePath) - val input = spark.read.parquet(path.getAbsolutePath).select("id", - "date_month", "date_hour", "data") + test("SPARK-21463: MetadataLogFileIndex should respect userSpecifiedSchema for partition cols") { + withTempDir { tempDir => + val output = new File(tempDir, "output").toString + val checkpoint = new File(tempDir, "chkpoint").toString + try { + val stream = MemoryStream[(String, Int)] + val df = stream.toDS().toDF("time", "value") + val sq = df.writeStream + .option("checkpointLocation", checkpoint) + .format("parquet") + .partitionBy("time") + .start(output) - assert(input.schema.sameType(input.schema)) - checkAnswer(input, data) + stream.addData(("2017-01-01-00", 1), ("2017-01-01-01", 2)) + sq.processAllAvailable() + + val schema = new StructType() + .add("time", StringType) + .add("value", IntegerType) + val readBack = spark.read.schema(schema).parquet(output) + assert(readBack.schema.toSet === schema.toSet) + + checkAnswer( + readBack, + Seq(Row(1, "2017-01-01-00"), Row(2, "2017-01-01-01")) + ) + } finally { + spark.streams.active.foreach(_.stop()) + } } } -} +} \ No newline at end of file diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 4959275a7312..7aa0ba7f4e0c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -23,13 +23,15 @@ import java.util.concurrent.TimeUnit import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.parquet.hadoop.ParquetOutputFormat -import org.apache.spark.{DebugFilesystem, SparkException} +import org.apache.spark.{DebugFilesystem, SparkConf, SparkException} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement} +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -38,7 +40,7 @@ import org.apache.spark.util.Utils /** * A test suite that tests various Parquet queries. */ -class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext { +abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext { import testImplicits._ test("simple select queries") { @@ -767,29 +769,6 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext assert(ParquetReadSupport.expandUDT(schema) === expected) } - test("returning batch for wide table") { - withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "10") { - withTempPath { dir => - val path = dir.getCanonicalPath - val df = spark.range(10).select(Seq.tabulate(11) {i => ('id + i).as(s"c$i")} : _*) - df.write.mode(SaveMode.Overwrite).parquet(path) - - // donot return batch, because whole stage codegen is disabled for wide table (>200 columns) - val df2 = spark.read.parquet(path) - val fileScan2 = df2.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get - assert(!fileScan2.asInstanceOf[FileSourceScanExec].supportsBatch) - checkAnswer(df2, df) - - // return batch - val columns = Seq.tabulate(9) {i => s"c$i"} - val df3 = df2.selectExpr(columns : _*) - val fileScan3 = df3.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get - assert(fileScan3.asInstanceOf[FileSourceScanExec].supportsBatch) - checkAnswer(df3, df.selectExpr(columns : _*)) - } - } - } - test("SPARK-15719: disable writing summary files by default") { withTempPath { dir => val path = dir.getCanonicalPath @@ -926,6 +905,76 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } +class ParquetV1QuerySuite extends ParquetQuerySuite { + import testImplicits._ + + override protected def sparkConf: SparkConf = + super + .sparkConf + .set(SQLConf.USE_V1_SOURCE_READER_LIST, "parquet") + .set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "parquet") + + test("returning batch for wide table") { + withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "10") { + withTempPath { dir => + val path = dir.getCanonicalPath + val df = spark.range(10).select(Seq.tabulate(11) {i => ('id + i).as(s"c$i")} : _*) + df.write.mode(SaveMode.Overwrite).parquet(path) + + // donot return batch, because whole stage codegen is disabled for wide table (>200 columns) + val df2 = spark.read.parquet(path) + val fileScan2 = df2.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get + assert(!fileScan2.asInstanceOf[FileSourceScanExec].supportsBatch) + checkAnswer(df2, df) + + // return batch + val columns = Seq.tabulate(9) {i => s"c$i"} + val df3 = df2.selectExpr(columns : _*) + val fileScan3 = df3.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get + assert(fileScan3.asInstanceOf[FileSourceScanExec].supportsBatch) + checkAnswer(df3, df.selectExpr(columns : _*)) + } + } + } +} + +class ParquetV2QuerySuite extends ParquetQuerySuite { + import testImplicits._ + + // TODO: enable Parquet V2 write path after file source V2 writers are workable. + override protected def sparkConf: SparkConf = + super + .sparkConf + .set(SQLConf.USE_V1_SOURCE_READER_LIST, "") + + test("returning batch for wide table") { + withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "10") { + withTempPath { dir => + val path = dir.getCanonicalPath + val df = spark.range(10).select(Seq.tabulate(11) {i => ('id + i).as(s"c$i")} : _*) + df.write.mode(SaveMode.Overwrite).parquet(path) + + // donot return batch, because whole stage codegen is disabled for wide table (>200 columns) + val df2 = spark.read.parquet(path) + val fileScan2 = df2.queryExecution.sparkPlan.find(_.isInstanceOf[BatchScanExec]).get + val parquetScan2 = fileScan2.asInstanceOf[BatchScanExec].scan.asInstanceOf[ParquetScan] + // The method `supportColumnarReads` in Parquet doesn't depends on the input partition. + // Here we can pass null input partition to the method for testing propose. + assert(!parquetScan2.createReaderFactory().supportColumnarReads(null)) + checkAnswer(df2, df) + + // return batch + val columns = Seq.tabulate(9) {i => s"c$i"} + val df3 = df2.selectExpr(columns : _*) + val fileScan3 = df3.queryExecution.sparkPlan.find(_.isInstanceOf[BatchScanExec]).get + val parquetScan3 = fileScan3.asInstanceOf[BatchScanExec].scan.asInstanceOf[ParquetScan] + assert(parquetScan3.createReaderFactory().supportColumnarReads(null)) + checkAnswer(df3, df.selectExpr(columns : _*)) + } + } + } +} + object TestingUDT { case class SingleElement(element: Long) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala index 3d97d64ba639..70184b609f2d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala @@ -17,11 +17,49 @@ package org.apache.spark.sql.execution.datasources.parquet +import org.apache.spark.SparkConf +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.execution.datasources.SchemaPruningSuite +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.internal.SQLConf -class ParquetSchemaPruningSuite extends SchemaPruningSuite { +abstract class ParquetSchemaPruningSuite extends SchemaPruningSuite { override protected val dataSourceName: String = "parquet" override protected val vectorizedReaderEnabledKey: String = SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key + +} + +class ParquetV1SchemaPruningSuite extends ParquetSchemaPruningSuite { + override protected def sparkConf: SparkConf = + super + .sparkConf + .set(SQLConf.USE_V1_SOURCE_READER_LIST, "parquet") + .set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "parquet") +} + +class ParquetV2SchemaPruningSuite extends ParquetSchemaPruningSuite { + // TODO: enable Parquet V2 write path after file source V2 writers are workable. + override protected def sparkConf: SparkConf = + super + .sparkConf + .set(SQLConf.USE_V1_SOURCE_READER_LIST, "") + + override def checkScanSchemata(df: DataFrame, expectedSchemaCatalogStrings: String*): Unit = { + val fileSourceScanSchemata = + df.queryExecution.executedPlan.collect { + case scan: BatchScanExec => scan.scan.asInstanceOf[ParquetScan].readDataSchema + } + assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, + s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + + s"but expected $expectedSchemaCatalogStrings") + fileSourceScanSchemata.zip(expectedSchemaCatalogStrings).foreach { + case (scanSchema, expectedScanSchemaCatalogString) => + val expectedScanSchema = CatalystSqlParser.parseDataType(expectedScanSchemaCatalogString) + implicit val equality = schemaEquality + assert(scanSchema === expectedScanSchema) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 528a4d0ca800..f8e4822af11f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -23,7 +23,7 @@ import scala.reflect.runtime.universe.TypeTag import org.apache.parquet.io.ParquetDecodingException import org.apache.parquet.schema.{MessageType, MessageTypeParser} -import org.apache.spark.SparkException +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index a665fe6f987a..a8d230870aeb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -437,28 +437,31 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared ) assert(res2 === (150L, 0L, 150L) :: (0L, 150L, 10L) :: Nil) - withTempDir { tempDir => - val dir = new File(tempDir, "pqS").getCanonicalPath - - spark.range(10).write.parquet(dir) - spark.read.parquet(dir).createOrReplaceTempView("pqS") - - // The executed plan looks like: - // Exchange RoundRobinPartitioning(2) - // +- BroadcastNestedLoopJoin BuildLeft, Cross - // :- BroadcastExchange IdentityBroadcastMode - // : +- Exchange RoundRobinPartitioning(3) - // : +- *Range (0, 30, step=1, splits=2) - // +- *FileScan parquet [id#465L] Batched: true, Format: Parquet, Location: ...(ignored) - val res3 = InputOutputMetricsHelper.run( - spark.range(30).repartition(3).crossJoin(sql("select * from pqS")).repartition(2).toDF() - ) - // The query above is executed in the following stages: - // 1. range(30) => (30, 0, 30) - // 2. sql("select * from pqS") => (0, 30, 0) - // 3. crossJoin(...) of 1. and 2. => (10, 0, 300) - // 4. shuffle & return results => (0, 300, 0) - assert(res3 === (30L, 0L, 30L) :: (0L, 30L, 0L) :: (10L, 0L, 300L) :: (0L, 300L, 0L) :: Nil) + // TODO: test file source V2 as well when its statistics is correctly computed. + withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { + withTempDir { tempDir => + val dir = new File(tempDir, "pqS").getCanonicalPath + + spark.range(10).write.parquet(dir) + spark.read.parquet(dir).createOrReplaceTempView("pqS") + + // The executed plan looks like: + // Exchange RoundRobinPartitioning(2) + // +- BroadcastNestedLoopJoin BuildLeft, Cross + // :- BroadcastExchange IdentityBroadcastMode + // : +- Exchange RoundRobinPartitioning(3) + // : +- *Range (0, 30, step=1, splits=2) + // +- *FileScan parquet [id#465L] Batched: true, Format: Parquet, Location: ...(ignored) + val res3 = InputOutputMetricsHelper.run( + spark.range(30).repartition(3).crossJoin(sql("select * from pqS")).repartition(2).toDF() + ) + // The query above is executed in the following stages: + // 1. range(30) => (30, 0, 30) + // 2. sql("select * from pqS") => (0, 30, 0) + // 3. crossJoin(...) of 1. and 2. => (10, 0, 300) + // 4. shuffle & return results => (0, 300, 0) + assert(res3 === (30L, 0L, 30L) :: (0L, 30L, 0L) :: (10L, 0L, 300L) :: (0L, 300L, 0L) :: Nil) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala index 60cb40f16c73..311a8ef3257d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala @@ -18,7 +18,10 @@ package org.apache.spark.sql.execution.python import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan, SparkPlanTest} +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.functions.col +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSQLContext { @@ -88,39 +91,80 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSQLContext { assert(arrowEvalNodes.size == 2) } - test("Python UDF should not break column pruning/filter pushdown") { - withTempPath { f => - spark.range(10).select($"id".as("a"), $"id".as("b")) - .write.parquet(f.getCanonicalPath) - val df = spark.read.parquet(f.getCanonicalPath) + test("Python UDF should not break column pruning/filter pushdown -- Parquet V1") { + withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { + withTempPath { f => + spark.range(10).select($"id".as("a"), $"id".as("b")) + .write.parquet(f.getCanonicalPath) + val df = spark.read.parquet(f.getCanonicalPath) - withClue("column pruning") { - val query = df.filter(batchedPythonUDF($"a")).select($"a") + withClue("column pruning") { + val query = df.filter(batchedPythonUDF($"a")).select($"a") - val pythonEvalNodes = collectBatchExec(query.queryExecution.executedPlan) - assert(pythonEvalNodes.length == 1) + val pythonEvalNodes = collectBatchExec(query.queryExecution.executedPlan) + assert(pythonEvalNodes.length == 1) - val scanNodes = query.queryExecution.executedPlan.collect { - case scan: FileSourceScanExec => scan + val scanNodes = query.queryExecution.executedPlan.collect { + case scan: FileSourceScanExec => scan + } + assert(scanNodes.length == 1) + assert(scanNodes.head.output.map(_.name) == Seq("a")) + } + + withClue("filter pushdown") { + val query = df.filter($"a" > 1 && batchedPythonUDF($"a")) + val pythonEvalNodes = collectBatchExec(query.queryExecution.executedPlan) + assert(pythonEvalNodes.length == 1) + + val scanNodes = query.queryExecution.executedPlan.collect { + case scan: FileSourceScanExec => scan + } + assert(scanNodes.length == 1) + // 'a is not null and 'a > 1 + assert(scanNodes.head.dataFilters.length == 2) + assert(scanNodes.head.dataFilters.flatMap(_.references.map(_.name)).distinct == Seq("a")) } - assert(scanNodes.length == 1) - assert(scanNodes.head.output.map(_.name) == Seq("a")) } + } + } + + test("Python UDF should not break column pruning/filter pushdown -- Parquet V2") { + withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "") { + withTempPath { f => + spark.range(10).select($"id".as("a"), $"id".as("b")) + .write.parquet(f.getCanonicalPath) + val df = spark.read.parquet(f.getCanonicalPath) - withClue("filter pushdown") { - val query = df.filter($"a" > 1 && batchedPythonUDF($"a")) - val pythonEvalNodes = collectBatchExec(query.queryExecution.executedPlan) - assert(pythonEvalNodes.length == 1) + withClue("column pruning") { + val query = df.filter(batchedPythonUDF($"a")).select($"a") - val scanNodes = query.queryExecution.executedPlan.collect { - case scan: FileSourceScanExec => scan + val pythonEvalNodes = collectBatchExec(query.queryExecution.executedPlan) + assert(pythonEvalNodes.length == 1) + + val scanNodes = query.queryExecution.executedPlan.collect { + case scan: BatchScanExec => scan + } + assert(scanNodes.length == 1) + assert(scanNodes.head.output.map(_.name) == Seq("a")) + } + + withClue("filter pushdown") { + val query = df.filter($"a" > 1 && batchedPythonUDF($"a")) + val pythonEvalNodes = collectBatchExec(query.queryExecution.executedPlan) + assert(pythonEvalNodes.length == 1) + + val scanNodes = query.queryExecution.executedPlan.collect { + case scan: BatchScanExec => scan + } + assert(scanNodes.length == 1) + // 'a is not null and 'a > 1 + val filters = scanNodes.head.scan.asInstanceOf[ParquetScan].pushedFilters + assert(filters.length == 2) + assert(filters.flatMap(_.references).distinct === Array("a")) } - assert(scanNodes.length == 1) - // 'a is not null and 'a > 1 - assert(scanNodes.head.dataFilters.length == 2) - assert(scanNodes.head.dataFilters.flatMap(_.references.map(_.name)).distinct == Seq("a")) } } } + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala index 3ae305655e92..057450bc8f9d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala @@ -131,10 +131,12 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSQLContext { withTempPath { file => val path = file.getCanonicalPath // Dummy File writer should fail as expected. - val exception = intercept[AnalysisException] { - df.write.format(dummyParquetWriterV2).save(path) + withSQLConf(SQLConf.USE_V1_SOURCE_WRITER_LIST.key -> "") { + val exception = intercept[AnalysisException] { + df.write.format(dummyParquetWriterV2).save(path) + } + assert(exception.message.equals("Dummy file writer")) } - assert(exception.message.equals("Dummy file writer")) df.write.parquet(path) // Fallback reads to V1 checkAnswer(spark.read.format(dummyParquetWriterV2).load(path), df) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 020ab234f67a..5b50f1d2f0af 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -121,75 +121,78 @@ class FileStreamSinkSuite extends StreamTest { var query: StreamingQuery = null - try { - query = - ds.map(i => (i, i * 1000)) - .toDF("id", "value") - .writeStream - .partitionBy("id") - .option("checkpointLocation", checkpointDir) - .format("parquet") - .start(outputDir) - - inputData.addData(1, 2, 3) - failAfter(streamingTimeout) { - query.processAllAvailable() - } + // TODO: test file source V2 as well. + withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { + try { + query = + ds.map(i => (i, i * 1000)) + .toDF("id", "value") + .writeStream + .partitionBy("id") + .option("checkpointLocation", checkpointDir) + .format("parquet") + .start(outputDir) + + inputData.addData(1, 2, 3) + failAfter(streamingTimeout) { + query.processAllAvailable() + } - val outputDf = spark.read.parquet(outputDir) - val expectedSchema = new StructType() - .add(StructField("value", IntegerType, nullable = false)) - .add(StructField("id", IntegerType)) - assert(outputDf.schema === expectedSchema) - - // Verify that MetadataLogFileIndex is being used and the correct partitioning schema has - // been inferred - val hadoopdFsRelations = outputDf.queryExecution.analyzed.collect { - case LogicalRelation(baseRelation: HadoopFsRelation, _, _, _) => baseRelation - } - assert(hadoopdFsRelations.size === 1) - assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileIndex]) - assert(hadoopdFsRelations.head.partitionSchema.exists(_.name == "id")) - assert(hadoopdFsRelations.head.dataSchema.exists(_.name == "value")) - - // Verify the data is correctly read - checkDatasetUnorderly( - outputDf.as[(Int, Int)], - (1000, 1), (2000, 2), (3000, 3)) - - /** Check some condition on the partitions of the FileScanRDD generated by a DF */ - def checkFileScanPartitions(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = { - val getFileScanRDD = df.queryExecution.executedPlan.collect { - case scan: DataSourceScanExec if scan.inputRDDs().head.isInstanceOf[FileScanRDD] => - scan.inputRDDs().head.asInstanceOf[FileScanRDD] - }.headOption.getOrElse { - fail(s"No FileScan in query\n${df.queryExecution}") + val outputDf = spark.read.parquet(outputDir) + val expectedSchema = new StructType() + .add(StructField("value", IntegerType, nullable = false)) + .add(StructField("id", IntegerType)) + assert(outputDf.schema === expectedSchema) + + // Verify that MetadataLogFileIndex is being used and the correct partitioning schema has + // been inferred + val hadoopdFsRelations = outputDf.queryExecution.analyzed.collect { + case LogicalRelation(baseRelation: HadoopFsRelation, _, _, _) => baseRelation + } + assert(hadoopdFsRelations.size === 1) + assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileIndex]) + assert(hadoopdFsRelations.head.partitionSchema.exists(_.name == "id")) + assert(hadoopdFsRelations.head.dataSchema.exists(_.name == "value")) + + // Verify the data is correctly read + checkDatasetUnorderly( + outputDf.as[(Int, Int)], + (1000, 1), (2000, 2), (3000, 3)) + + /** Check some condition on the partitions of the FileScanRDD generated by a DF */ + def checkFileScanPartitions(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = { + val getFileScanRDD = df.queryExecution.executedPlan.collect { + case scan: DataSourceScanExec if scan.inputRDDs().head.isInstanceOf[FileScanRDD] => + scan.inputRDDs().head.asInstanceOf[FileScanRDD] + }.headOption.getOrElse { + fail(s"No FileScan in query\n${df.queryExecution}") + } + func(getFileScanRDD.filePartitions) } - func(getFileScanRDD.filePartitions) - } - // Read without pruning - checkFileScanPartitions(outputDf) { partitions => - // There should be as many distinct partition values as there are distinct ids - assert(partitions.flatMap(_.files.map(_.partitionValues)).distinct.size === 3) - } + // Read without pruning + checkFileScanPartitions(outputDf) { partitions => + // There should be as many distinct partition values as there are distinct ids + assert(partitions.flatMap(_.files.map(_.partitionValues)).distinct.size === 3) + } - // Read with pruning, should read only files in partition dir id=1 - checkFileScanPartitions(outputDf.filter("id = 1")) { partitions => - val filesToBeRead = partitions.flatMap(_.files) - assert(filesToBeRead.map(_.filePath).forall(_.contains("/id=1/"))) - assert(filesToBeRead.map(_.partitionValues).distinct.size === 1) - } + // Read with pruning, should read only files in partition dir id=1 + checkFileScanPartitions(outputDf.filter("id = 1")) { partitions => + val filesToBeRead = partitions.flatMap(_.files) + assert(filesToBeRead.map(_.filePath).forall(_.contains("/id=1/"))) + assert(filesToBeRead.map(_.partitionValues).distinct.size === 1) + } - // Read with pruning, should read only files in partition dir id=1 and id=2 - checkFileScanPartitions(outputDf.filter("id in (1,2)")) { partitions => - val filesToBeRead = partitions.flatMap(_.files) - assert(!filesToBeRead.map(_.filePath).exists(_.contains("/id=3/"))) - assert(filesToBeRead.map(_.partitionValues).distinct.size === 2) - } - } finally { - if (query != null) { - query.stop() + // Read with pruning, should read only files in partition dir id=1 and id=2 + checkFileScanPartitions(outputDf.filter("id in (1,2)")) { partitions => + val filesToBeRead = partitions.flatMap(_.files) + assert(!filesToBeRead.map(_.filePath).exists(_.contains("/id=3/"))) + assert(filesToBeRead.map(_.partitionValues).distinct.size === 2) + } + } finally { + if (query != null) { + query.stop() + } } } } @@ -274,6 +277,7 @@ class FileStreamSinkSuite extends StreamTest { "CAST(start as BIGINT) AS start", "CAST(end as BIGINT) AS end", "count") + .orderBy("start") // sort the DataFrame in order to compare with the expected one. checkDataset( outputDf.as[(Long, Long, Long)], expectedResult.map(x => (x._1._1, x._1._2, x._2)): _*) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index f229b08a20aa..293bacdf22f8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -218,9 +218,12 @@ class StreamSuite extends StreamTest { } } - val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load() - assertDF(df) - assertDF(df) + // TODO: fix file source V2 as well. + withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { + val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load() + assertDF(df) + assertDF(df) + } } test("Within the same streaming query, one StreamingRelation should only be transformed to one " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala index cfd7204ea293..8176bcc87719 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala @@ -197,30 +197,32 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { } test("deduplicate with file sink") { - withTempDir { output => - withTempDir { checkpointDir => - val outputPath = output.getAbsolutePath - val inputData = MemoryStream[String] - val result = inputData.toDS().dropDuplicates() - val q = result.writeStream - .format("parquet") - .outputMode(Append) - .option("checkpointLocation", checkpointDir.getPath) - .start(outputPath) - try { - inputData.addData("a") - q.processAllAvailable() - checkDataset(spark.read.parquet(outputPath).as[String], "a") - - inputData.addData("a") // Dropped - q.processAllAvailable() - checkDataset(spark.read.parquet(outputPath).as[String], "a") - - inputData.addData("b") - q.processAllAvailable() - checkDataset(spark.read.parquet(outputPath).as[String], "a", "b") - } finally { - q.stop() + withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { + withTempDir { output => + withTempDir { checkpointDir => + val outputPath = output.getAbsolutePath + val inputData = MemoryStream[String] + val result = inputData.toDS().dropDuplicates() + val q = result.writeStream + .format("parquet") + .outputMode(Append) + .option("checkpointLocation", checkpointDir.getPath) + .start(outputPath) + try { + inputData.addData("a") + q.processAllAvailable() + checkDataset(spark.read.parquet(outputPath).as[String], "a") + + inputData.addData("a") // Dropped + q.processAllAvailable() + checkDataset(spark.read.parquet(outputPath).as[String], "a") + + inputData.addData("b") + q.processAllAvailable() + checkDataset(spark.read.parquet(outputPath).as[String], "a", "b") + } finally { + q.stop() + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 13976ee1e2da..dd4efb2955ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -995,55 +995,59 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi // Reading a file sink output in a batch query should detect the legacy _spark_metadata // directory and throw an error - val e = intercept[SparkException] { - spark.read.load(outputDir.getCanonicalPath).as[Int] - } - assertMigrationError(e.getMessage, sparkMetadataDir, legacySparkMetadataDir) + // TODO: test file source V2 as well. + withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { + val e = intercept[SparkException] { + spark.read.load(outputDir.getCanonicalPath).as[Int] + } + assertMigrationError(e.getMessage, sparkMetadataDir, legacySparkMetadataDir) - // Restarting the streaming query should detect the legacy _spark_metadata directory and throw - // an error - val inputData = MemoryStream[Int] - val e2 = intercept[SparkException] { - inputData.toDF() - .writeStream - .format("parquet") - .option("checkpointLocation", checkpointDir.getCanonicalPath) - .start(outputDir.getCanonicalPath) - } - assertMigrationError(e2.getMessage, sparkMetadataDir, legacySparkMetadataDir) + // Restarting the streaming query should detect the legacy _spark_metadata directory and + // throw an error + val inputData = MemoryStream[Int] + val e2 = intercept[SparkException] { + inputData.toDF() + .writeStream + .format("parquet") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .start(outputDir.getCanonicalPath) + } + assertMigrationError(e2.getMessage, sparkMetadataDir, legacySparkMetadataDir) + + // Move "_spark_metadata" to fix the file sink and test the checkpoint path. + FileUtils.moveDirectory(legacySparkMetadataDir, sparkMetadataDir) + + // Restarting the streaming query should detect the legacy + // checkpoint path and throw an error. + val e3 = intercept[SparkException] { + inputData.toDF() + .writeStream + .format("parquet") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .start(outputDir.getCanonicalPath) + } + assertMigrationError(e3.getMessage, checkpointDir, legacyCheckpointDir) - // Move "_spark_metadata" to fix the file sink and test the checkpoint path. - FileUtils.moveDirectory(legacySparkMetadataDir, sparkMetadataDir) + // Fix the checkpoint path and verify that the user can migrate the issue by moving files. + FileUtils.moveDirectory(legacyCheckpointDir, checkpointDir) - // Restarting the streaming query should detect the legacy checkpoint path and throw an error - val e3 = intercept[SparkException] { - inputData.toDF() + val q = inputData.toDF() .writeStream .format("parquet") .option("checkpointLocation", checkpointDir.getCanonicalPath) .start(outputDir.getCanonicalPath) - } - assertMigrationError(e3.getMessage, checkpointDir, legacyCheckpointDir) - - // Fix the checkpoint path and verify that the user can migrate the issue by moving files. - FileUtils.moveDirectory(legacyCheckpointDir, checkpointDir) - - val q = inputData.toDF() - .writeStream - .format("parquet") - .option("checkpointLocation", checkpointDir.getCanonicalPath) - .start(outputDir.getCanonicalPath) - try { - q.processAllAvailable() - // Check the query id to make sure it did use checkpoint - assert(q.id.toString == "09be7fb3-49d8-48a6-840d-e9c2ad92a898") + try { + q.processAllAvailable() + // Check the query id to make sure it did use checkpoint + assert(q.id.toString == "09be7fb3-49d8-48a6-840d-e9c2ad92a898") - // Verify that the batch query can read "_spark_metadata" correctly after migration. - val df = spark.read.load(outputDir.getCanonicalPath) - assert(df.queryExecution.executedPlan.toString contains "MetadataLogFileIndex") - checkDatasetUnorderly(df.as[Int], 1, 2, 3) - } finally { - q.stop() + // Verify that the batch query can read "_spark_metadata" correctly after migration. + val df = spark.read.load(outputDir.getCanonicalPath) + assert(df.queryExecution.executedPlan.toString contains "MetadataLogFileIndex") + checkDatasetUnorderly(df.as[Int], 1, 2, 3) + } finally { + q.stop() + } } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 95a57aaf2a13..ba807fb58fe4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -907,8 +907,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv createDF(10, 19).write.mode(SaveMode.Append).format("orc").saveAsTable("appendOrcToParquet") } assert(e.getMessage.contains( - "The format of the existing table default.appendOrcToParquet is `ParquetFileFormat`. " + - "It doesn't match the specified format")) + "The format of the existing table default.appendOrcToParquet is `Parquet")) } withTable("appendParquetToJson") { @@ -917,9 +916,9 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv createDF(10, 19).write.mode(SaveMode.Append).format("parquet") .saveAsTable("appendParquetToJson") }.getMessage - // The format of the existing table can be JsonDataSourceV2 or JsonFileFormat. - assert(msg.contains("The format of the existing table default.appendParquetToJson is `Json")) - assert(msg.contains("It doesn't match the specified format `ParquetFileFormat`")) + + assert(msg.contains( + "The format of the existing table default.appendParquetToJson is `Json")) } withTable("appendTextToJson") { @@ -930,7 +929,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv }.getMessage // The format of the existing table can be JsonDataSourceV2 or JsonFileFormat. assert(msg.contains("The format of the existing table default.appendTextToJson is `Json")) - assert(msg.contains("It doesn't match the specified format")) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 56c16c8d91b9..9579419d285b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2337,7 +2337,10 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") { Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery => - withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) { + // This test case is only for file source V1. As the rule OptimizeMetadataOnlyQuery is + // disabled by default, we can skip testing file source v2 in current stage. + withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString, + SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { withTable("t") { sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)") sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index 0587cfebc8d3..4e9c2e7ee789 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -38,7 +38,8 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes val dataSourceName: String - protected val parquetDataSourceName: String = "parquet" + protected val parquetDataSourceName: String = + classOf[org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat].getCanonicalName private def isParquetDataSource: Boolean = dataSourceName == parquetDataSourceName