diff --git a/src/main/scala/com/microsoft/hyperspace/index/covering/CoveringIndex.scala b/src/main/scala/com/microsoft/hyperspace/index/covering/CoveringIndex.scala index 3bf7d155b..30eef1e8b 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/covering/CoveringIndex.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/covering/CoveringIndex.scala @@ -19,7 +19,6 @@ package com.microsoft.hyperspace.index.covering import org.apache.spark.sql.{DataFrame, SaveMode} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.functions.{col, input_file_name} -import org.apache.spark.sql.hyperspace.utils.StructTypeUtils import org.apache.spark.sql.types.StructType import com.microsoft.hyperspace.index._ @@ -27,31 +26,34 @@ import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer import com.microsoft.hyperspace.util.ResolverUtils import com.microsoft.hyperspace.util.ResolverUtils.ResolvedColumn +/** + * CoveringIndex data is stored as bucketed & sorted by indexedColumns so that it can substitute + * for a shuffle node in a join query plan or a data scan node in a filter query plan. + */ case class CoveringIndex( override val indexedColumns: Seq[String], includedColumns: Seq[String], schema: StructType, numBuckets: Int, override val properties: Map[String, String]) - extends Index { + extends CoveringIndexTrait { override def kind: String = CoveringIndex.kind override def kindAbbr: String = CoveringIndex.kindAbbr - override def referencedColumns: Seq[String] = indexedColumns ++ includedColumns - override def withNewProperties(newProperties: Map[String, String]): CoveringIndex = { copy(properties = newProperties) } - override def statistics(extended: Boolean = false): Map[String, String] = { - simpleStatistics ++ (if (extended) extendedStatistics else Map.empty) + protected def copyIndex( + indexedCols: Seq[String], + includedCols: Seq[String], + schema: StructType): CoveringIndex = { + copy(indexedColumns = indexedCols, includedColumns = includedCols, schema = schema) } - override def canHandleDeletedFiles: Boolean = hasLineageColumn - - override def write(ctx: IndexerContext, indexData: DataFrame): Unit = { + protected def write(ctx: IndexerContext, indexData: DataFrame, mode: SaveMode): Unit = { // Run job val repartitionedIndexData = { // We are repartitioning with normalized columns (e.g., flattened nested column). @@ -65,95 +67,7 @@ case class CoveringIndex( ctx.indexDataPath.toString, numBuckets, indexedColumns, - SaveMode.Overwrite) - } - - override def optimize(ctx: IndexerContext, indexDataFilesToOptimize: Seq[FileInfo]): Unit = { - // Rewrite index using the eligible files to optimize. - val indexData = ctx.spark.read.parquet(indexDataFilesToOptimize.map(_.name): _*) - val repartitionedIndexData = - indexData.repartition(numBuckets, indexedColumns.map(indexData(_)): _*) - repartitionedIndexData.write.saveWithBuckets( - repartitionedIndexData, - ctx.indexDataPath.toString, - numBuckets, - indexedColumns, - SaveMode.Overwrite) - } - - override def refreshIncremental( - ctx: IndexerContext, - appendedSourceData: Option[DataFrame], - deletedSourceDataFiles: Seq[FileInfo], - indexContent: Content): (CoveringIndex, Index.UpdateMode) = { - val updatedIndex = if (appendedSourceData.nonEmpty) { - val (indexData, resolvedIndexedColumns, resolvedIncludedColumns) = - CoveringIndex.createIndexData( - ctx, - appendedSourceData.get, - indexedColumns.map(ResolvedColumn(_).name), - includedColumns.map(ResolvedColumn(_).name), - hasLineageColumn) - write(ctx, indexData) - copy( - indexedColumns = resolvedIndexedColumns.map(_.normalizedName), - includedColumns = resolvedIncludedColumns.map(_.normalizedName), - schema = schema.merge(indexData.schema)) - } else { - this - } - if (deletedSourceDataFiles.nonEmpty) { - // For an index with lineage, find all the source data files which have been deleted, - // and use index records' lineage to mark and remove index entries which belong to - // deleted source data files as those entries are no longer valid. - val refreshDF = ctx.spark.read - .parquet(indexContent.files.map(_.toString): _*) - .filter(!col(IndexConstants.DATA_FILE_NAME_ID).isin(deletedSourceDataFiles.map(_.id): _*)) - - // Write refreshed data using Append mode if there are index data files from appended files. - val writeMode = if (appendedSourceData.nonEmpty) { - SaveMode.Append - } else { - SaveMode.Overwrite - } - refreshDF.write.saveWithBuckets( - refreshDF, - ctx.indexDataPath.toString, - numBuckets, - indexedColumns, - writeMode) - } - - // If there is no deleted files, there are index data files only for appended data in this - // version and we need to add the index data files of previous index version. - // Otherwise, as previous index data is rewritten in this version while excluding - // indexed rows from deleted files, all necessary index data files exist in this version. - val updatedMode = if (deletedSourceDataFiles.isEmpty) { - Index.UpdateMode.Merge - } else { - Index.UpdateMode.Overwrite - } - (updatedIndex, updatedMode) - } - - override def refreshFull( - ctx: IndexerContext, - sourceData: DataFrame): (CoveringIndex, DataFrame) = { - val (indexData, resolvedIndexedColumns, resolvedIncludedColumns) = - CoveringIndex.createIndexData( - ctx, - sourceData, - // As indexed & included columns in previousLogEntry are resolved & prefixed names, - // need to remove the prefix to resolve with the dataframe for refresh. - indexedColumns.map(ResolvedColumn(_).name), - includedColumns.map(ResolvedColumn(_).name), - hasLineageColumn) - ( - copy( - indexedColumns = resolvedIndexedColumns.map(_.normalizedName), - includedColumns = resolvedIncludedColumns.map(_.normalizedName), - schema = indexData.schema), - indexData) + mode) } override def equals(o: Any): Boolean = @@ -170,22 +84,21 @@ case class CoveringIndex( (indexedColumns, includedColumns, numBuckets) } - def bucketSpec: BucketSpec = - BucketSpec( - numBuckets = numBuckets, - bucketColumnNames = indexedColumns, - sortColumnNames = indexedColumns) - - def hasLineageColumn: Boolean = IndexUtils.hasLineageColumn(properties) + def bucketSpec: Option[BucketSpec] = + Some( + BucketSpec( + numBuckets = numBuckets, + bucketColumnNames = indexedColumns, + sortColumnNames = indexedColumns)) - private def simpleStatistics: Map[String, String] = { + protected def simpleStatistics: Map[String, String] = { Map( "includedColumns" -> includedColumns.mkString(", "), "numBuckets" -> numBuckets.toString, "schema" -> schema.json) } - private def extendedStatistics: Map[String, String] = { + protected def extendedStatistics: Map[String, String] = { Map("hasLineage" -> hasLineageColumn.toString) } } @@ -211,7 +124,7 @@ object CoveringIndex { * case-insensitive and a column name "Foo.BaR" is given, and the schema is * "root |-- foo |-- bar: integer", a resolved column name would be "foo.bar". * This step is necessary to make the index work regardless of the current - * case-sensitivity setting. Normalization is to supporte nested columns. + * case-sensitivity setting. Normalization is to support nested columns. * Nested columns in the source data is stored as a normal column in the * index data. * diff --git a/src/main/scala/com/microsoft/hyperspace/index/covering/CoveringIndexConfig.scala b/src/main/scala/com/microsoft/hyperspace/index/covering/CoveringIndexConfig.scala index fabdfc7ff..ead57f720 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/covering/CoveringIndexConfig.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/covering/CoveringIndexConfig.scala @@ -16,8 +16,6 @@ package com.microsoft.hyperspace.index.covering -import java.util.Locale - import org.apache.spark.sql.DataFrame import com.microsoft.hyperspace.Hyperspace @@ -40,54 +38,7 @@ case class CoveringIndexConfig( override val indexName: String, indexedColumns: Seq[String], includedColumns: Seq[String] = Seq()) - extends IndexConfigTrait { - if (indexName.isEmpty || indexedColumns.isEmpty) { - throw new IllegalArgumentException("Empty index name or indexed columns are not allowed.") - } - - val lowerCaseIndexedColumns = toLowerCase(indexedColumns) - val lowerCaseIncludedColumns = toLowerCase(includedColumns) - val lowerCaseIncludedColumnsSet = lowerCaseIncludedColumns.toSet - - if (lowerCaseIndexedColumns.toSet.size < lowerCaseIndexedColumns.size) { - throw new IllegalArgumentException("Duplicate indexed column names are not allowed.") - } - - if (lowerCaseIncludedColumnsSet.size < lowerCaseIncludedColumns.size) { - throw new IllegalArgumentException("Duplicate included column names are not allowed.") - } - - for (indexedColumn <- lowerCaseIndexedColumns) { - if (lowerCaseIncludedColumns.contains(indexedColumn)) { - throw new IllegalArgumentException( - "Duplicate column names in indexed/included columns are not allowed.") - } - } - - override def equals(that: Any): Boolean = { - that match { - case CoveringIndexConfig(thatIndexName, thatIndexedColumns, thatIncludedColumns) => - indexName.equalsIgnoreCase(thatIndexName) && - lowerCaseIndexedColumns.equals(toLowerCase(thatIndexedColumns)) && - lowerCaseIncludedColumnsSet.equals(toLowerCase(thatIncludedColumns).toSet) - case _ => false - } - } - - override def hashCode(): Int = { - lowerCaseIndexedColumns.hashCode + lowerCaseIncludedColumnsSet.hashCode - } - - override def toString: String = { - val indexedColumnNames = lowerCaseIndexedColumns.mkString(", ") - val includedColumnNames = lowerCaseIncludedColumns.mkString(", ") - s"[indexName: $indexName; indexedColumns: $indexedColumnNames; " + - s"includedColumns: $includedColumnNames]" - } - - private def toLowerCase(seq: Seq[String]): Seq[String] = seq.map(_.toLowerCase(Locale.ROOT)) - - override def referencedColumns: Seq[String] = indexedColumns ++ includedColumns + extends CoveringIndexConfigTrait { override def createIndex( ctx: IndexerContext, diff --git a/src/main/scala/com/microsoft/hyperspace/index/covering/CoveringIndexConfigTrait.scala b/src/main/scala/com/microsoft/hyperspace/index/covering/CoveringIndexConfigTrait.scala new file mode 100644 index 000000000..b95790314 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/covering/CoveringIndexConfigTrait.scala @@ -0,0 +1,75 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.covering + +import java.util.Locale + +import com.microsoft.hyperspace.index._ + +trait CoveringIndexConfigTrait extends IndexConfigTrait { + val indexName: String + val indexedColumns: Seq[String] + val includedColumns: Seq[String] + + if (indexName.isEmpty || indexedColumns.isEmpty) { + throw new IllegalArgumentException("Empty index name or indexed columns are not allowed.") + } + + protected def toLowerCase(seq: Seq[String]): Seq[String] = seq.map(_.toLowerCase(Locale.ROOT)) + + val lowerCaseIndexedColumns = toLowerCase(indexedColumns) + val lowerCaseIncludedColumns = toLowerCase(includedColumns) + val lowerCaseIncludedColumnsSet = lowerCaseIncludedColumns.toSet + + if (lowerCaseIndexedColumns.toSet.size < lowerCaseIndexedColumns.size) { + throw new IllegalArgumentException("Duplicate indexed column names are not allowed.") + } + + if (lowerCaseIncludedColumnsSet.size < lowerCaseIncludedColumns.size) { + throw new IllegalArgumentException("Duplicate included column names are not allowed.") + } + + for (indexedColumn <- lowerCaseIndexedColumns) { + if (lowerCaseIncludedColumns.contains(indexedColumn)) { + throw new IllegalArgumentException( + "Duplicate column names in indexed/included columns are not allowed.") + } + } + + override def equals(that: Any): Boolean = { + that match { + case CoveringIndexConfig(thatIndexName, thatIndexedColumns, thatIncludedColumns) => + indexName.equalsIgnoreCase(thatIndexName) && + lowerCaseIndexedColumns.equals(toLowerCase(thatIndexedColumns)) && + lowerCaseIncludedColumnsSet.equals(toLowerCase(thatIncludedColumns).toSet) + case _ => false + } + } + + override def hashCode(): Int = { + lowerCaseIndexedColumns.hashCode + lowerCaseIncludedColumnsSet.hashCode + } + + override def toString: String = { + val indexedColumnNames = lowerCaseIndexedColumns.mkString(", ") + val includedColumnNames = lowerCaseIncludedColumns.mkString(", ") + s"[indexName: $indexName; indexedColumns: $indexedColumnNames; " + + s"includedColumns: $includedColumnNames]" + } + + override def referencedColumns: Seq[String] = indexedColumns ++ includedColumns +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/covering/CoveringIndexRuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/covering/CoveringIndexRuleUtils.scala index 5e66df825..da64b5d00 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/covering/CoveringIndexRuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/covering/CoveringIndexRuleUtils.scala @@ -100,7 +100,7 @@ object CoveringIndexRuleUtils { index: IndexLogEntry, plan: LogicalPlan, useBucketSpec: Boolean): LogicalPlan = { - val ci = index.derivedDataset.asInstanceOf[CoveringIndex] + val ci = index.derivedDataset.asInstanceOf[CoveringIndexTrait] val provider = Hyperspace.getContext(spark).sourceProviderManager // Note that we transform *only* the base relation and not other portions of the plan // (e.g., filters). For instance, given the following input plan: @@ -118,7 +118,7 @@ object CoveringIndexRuleUtils { location, new StructType(), StructType(ci.schema.filter(relation.schema.contains(_))), - if (useBucketSpec) Some(ci.bucketSpec) else None, + if (useBucketSpec) ci.bucketSpec else None, new ParquetFileFormat, Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark, index) @@ -149,7 +149,7 @@ object CoveringIndexRuleUtils { plan: LogicalPlan, useBucketSpec: Boolean, useBucketUnionForAppended: Boolean): LogicalPlan = { - val ci = index.derivedDataset.asInstanceOf[CoveringIndex] + val ci = index.derivedDataset.asInstanceOf[CoveringIndexTrait] val provider = Hyperspace.getContext(spark).sourceProviderManager var unhandledAppendedFiles: Seq[Path] = Nil // Get transformed plan with index data and appended files if applicable. @@ -233,7 +233,7 @@ object CoveringIndexRuleUtils { newLocation, new StructType(), newSchema, - if (useBucketSpec) Some(ci.bucketSpec) else None, + if (useBucketSpec) ci.bucketSpec else None, new ParquetFileFormat, Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark, index) @@ -271,7 +271,7 @@ object CoveringIndexRuleUtils { // Although only numBuckets of BucketSpec is used in BucketUnion*, bucketColumnNames // and sortColumnNames are shown in plan string. So remove sortColumnNames to avoid // misunderstanding. - val bucketSpec = ci.bucketSpec.copy(sortColumnNames = Nil) + val bucketSpec = ci.bucketSpec.get.copy(sortColumnNames = Nil) // Merge index plan & newly shuffled plan by using bucket-aware union. BucketUnion( @@ -304,7 +304,7 @@ object CoveringIndexRuleUtils { index: IndexLogEntry, originalPlan: LogicalPlan, filesAppended: Seq[Path]): LogicalPlan = { - val ci = index.derivedDataset.asInstanceOf[CoveringIndex] + val ci = index.derivedDataset.asInstanceOf[CoveringIndexTrait] val provider = Hyperspace.getContext(spark).sourceProviderManager // Transform the relation node to include appended files. val planForAppended = originalPlan transformDown { diff --git a/src/main/scala/com/microsoft/hyperspace/index/covering/CoveringIndexTrait.scala b/src/main/scala/com/microsoft/hyperspace/index/covering/CoveringIndexTrait.scala new file mode 100644 index 000000000..944f72c85 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/covering/CoveringIndexTrait.scala @@ -0,0 +1,135 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.covering + +import org.apache.spark.sql.{DataFrame, SaveMode} +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.hyperspace.utils.StructTypeUtils +import org.apache.spark.sql.types.StructType + +import com.microsoft.hyperspace.index._ +import com.microsoft.hyperspace.util.ResolverUtils.ResolvedColumn + +/** + * CoveringIndexTrait is a common interface for covering index type which contains a vertical + * slice of source data including indexedColumns and includedColumns. + */ +trait CoveringIndexTrait extends Index { + val schema: StructType + val includedColumns: Seq[String] + def bucketSpec: Option[BucketSpec] + protected def write(ctx: IndexerContext, indexData: DataFrame, mode: SaveMode) + protected def copyIndex( + indexedCols: Seq[String], + includedCols: Seq[String], + schema: StructType): CoveringIndexTrait + protected def simpleStatistics: Map[String, String] + protected def extendedStatistics: Map[String, String] + + // override functions for Index + override def write(ctx: IndexerContext, indexData: DataFrame): Unit = { + write(ctx, indexData, SaveMode.Overwrite) + } + + override def referencedColumns: Seq[String] = indexedColumns ++ includedColumns + + override def statistics(extended: Boolean = false): Map[String, String] = { + simpleStatistics ++ (if (extended) extendedStatistics else Map.empty) + } + + override def canHandleDeletedFiles: Boolean = hasLineageColumn + + override def refreshIncremental( + ctx: IndexerContext, + appendedSourceData: Option[DataFrame], + deletedSourceDataFiles: Seq[FileInfo], + indexContent: Content): (CoveringIndexTrait, Index.UpdateMode) = { + val updatedIndex = if (appendedSourceData.nonEmpty) { + val (indexData, resolvedIndexedColumns, resolvedIncludedColumns) = + CoveringIndex.createIndexData( + ctx, + appendedSourceData.get, + indexedColumns.map(ResolvedColumn(_).name), + includedColumns.map(ResolvedColumn(_).name), + hasLineageColumn) + write(ctx, indexData) + copyIndex( + indexedCols = resolvedIndexedColumns.map(_.normalizedName), + includedCols = resolvedIncludedColumns.map(_.normalizedName), + schema = schema.merge(indexData.schema)) + } else { + this + } + if (deletedSourceDataFiles.nonEmpty) { + // For an index with lineage, find all the source data files which have been deleted, + // and use index records' lineage to mark and remove index entries which belong to + // deleted source data files as those entries are no longer valid. + val refreshDF = ctx.spark.read + .parquet(indexContent.files.map(_.toString): _*) + .filter(!col(IndexConstants.DATA_FILE_NAME_ID).isin(deletedSourceDataFiles.map(_.id): _*)) + + // Write refreshed data using Append mode if there are index data files from appended files. + val writeMode = if (appendedSourceData.nonEmpty) { + SaveMode.Append + } else { + SaveMode.Overwrite + } + + write(ctx, refreshDF, writeMode) + } + + // If there is no deleted files, there are index data files only for appended data in this + // version and we need to add the index data files of previous index version. + // Otherwise, as previous index data is rewritten in this version while excluding + // indexed rows from deleted files, all necessary index data files exist in this version. + val updatedMode = if (deletedSourceDataFiles.isEmpty) { + Index.UpdateMode.Merge + } else { + Index.UpdateMode.Overwrite + } + (updatedIndex, updatedMode) + } + + override def refreshFull( + ctx: IndexerContext, + sourceData: DataFrame): (CoveringIndexTrait, DataFrame) = { + val (indexData, resolvedIndexedColumns, resolvedIncludedColumns) = + CoveringIndex.createIndexData( + ctx, + sourceData, + // As indexed & included columns in previousLogEntry are resolved & prefixed names, + // need to remove the prefix to resolve with the dataframe for refresh. + indexedColumns.map(ResolvedColumn(_).name), + includedColumns.map(ResolvedColumn(_).name), + hasLineageColumn) + ( + copyIndex( + indexedCols = resolvedIndexedColumns.map(_.normalizedName), + includedCols = resolvedIncludedColumns.map(_.normalizedName), + schema = indexData.schema), + indexData) + } + + def hasLineageColumn: Boolean = IndexUtils.hasLineageColumn(properties) + + override def optimize(ctx: IndexerContext, indexDataFilesToOptimize: Seq[FileInfo]): Unit = { + // Rewrite index using the eligible files to optimize. + val indexData = ctx.spark.read.parquet(indexDataFilesToOptimize.map(_.name): _*) + write(ctx, indexData) + } +}