Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,41 @@ package com.microsoft.hyperspace.index.covering
import org.apache.spark.sql.{DataFrame, SaveMode}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.functions.{col, input_file_name}
import org.apache.spark.sql.hyperspace.utils.StructTypeUtils
import org.apache.spark.sql.types.StructType

import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer
import com.microsoft.hyperspace.util.ResolverUtils
import com.microsoft.hyperspace.util.ResolverUtils.ResolvedColumn

/**
* CoveringIndex data is stored as bucketed & sorted by indexedColumns so that it can substitute
* for a shuffle node in a join query plan or a data scan node in a filter query plan.
*/
case class CoveringIndex(
override val indexedColumns: Seq[String],
includedColumns: Seq[String],
schema: StructType,
numBuckets: Int,
override val properties: Map[String, String])
extends Index {
extends CoveringIndexTrait {

override def kind: String = CoveringIndex.kind

override def kindAbbr: String = CoveringIndex.kindAbbr

override def referencedColumns: Seq[String] = indexedColumns ++ includedColumns

override def withNewProperties(newProperties: Map[String, String]): CoveringIndex = {
copy(properties = newProperties)
}

override def statistics(extended: Boolean = false): Map[String, String] = {
simpleStatistics ++ (if (extended) extendedStatistics else Map.empty)
protected def copyIndex(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: override

indexedCols: Seq[String],
includedCols: Seq[String],
schema: StructType): CoveringIndex = {
copy(indexedColumns = indexedCols, includedColumns = includedCols, schema = schema)
}

override def canHandleDeletedFiles: Boolean = hasLineageColumn

override def write(ctx: IndexerContext, indexData: DataFrame): Unit = {
protected def write(ctx: IndexerContext, indexData: DataFrame, mode: SaveMode): Unit = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: override

// Run job
val repartitionedIndexData = {
// We are repartitioning with normalized columns (e.g., flattened nested column).
Expand All @@ -65,95 +67,7 @@ case class CoveringIndex(
ctx.indexDataPath.toString,
numBuckets,
indexedColumns,
SaveMode.Overwrite)
}

override def optimize(ctx: IndexerContext, indexDataFilesToOptimize: Seq[FileInfo]): Unit = {
// Rewrite index using the eligible files to optimize.
val indexData = ctx.spark.read.parquet(indexDataFilesToOptimize.map(_.name): _*)
val repartitionedIndexData =
indexData.repartition(numBuckets, indexedColumns.map(indexData(_)): _*)
repartitionedIndexData.write.saveWithBuckets(
repartitionedIndexData,
ctx.indexDataPath.toString,
numBuckets,
indexedColumns,
SaveMode.Overwrite)
}

override def refreshIncremental(
ctx: IndexerContext,
appendedSourceData: Option[DataFrame],
deletedSourceDataFiles: Seq[FileInfo],
indexContent: Content): (CoveringIndex, Index.UpdateMode) = {
val updatedIndex = if (appendedSourceData.nonEmpty) {
val (indexData, resolvedIndexedColumns, resolvedIncludedColumns) =
CoveringIndex.createIndexData(
ctx,
appendedSourceData.get,
indexedColumns.map(ResolvedColumn(_).name),
includedColumns.map(ResolvedColumn(_).name),
hasLineageColumn)
write(ctx, indexData)
copy(
indexedColumns = resolvedIndexedColumns.map(_.normalizedName),
includedColumns = resolvedIncludedColumns.map(_.normalizedName),
schema = schema.merge(indexData.schema))
} else {
this
}
if (deletedSourceDataFiles.nonEmpty) {
// For an index with lineage, find all the source data files which have been deleted,
// and use index records' lineage to mark and remove index entries which belong to
// deleted source data files as those entries are no longer valid.
val refreshDF = ctx.spark.read
.parquet(indexContent.files.map(_.toString): _*)
.filter(!col(IndexConstants.DATA_FILE_NAME_ID).isin(deletedSourceDataFiles.map(_.id): _*))

// Write refreshed data using Append mode if there are index data files from appended files.
val writeMode = if (appendedSourceData.nonEmpty) {
SaveMode.Append
} else {
SaveMode.Overwrite
}
refreshDF.write.saveWithBuckets(
refreshDF,
ctx.indexDataPath.toString,
numBuckets,
indexedColumns,
writeMode)
}

// If there is no deleted files, there are index data files only for appended data in this
// version and we need to add the index data files of previous index version.
// Otherwise, as previous index data is rewritten in this version while excluding
// indexed rows from deleted files, all necessary index data files exist in this version.
val updatedMode = if (deletedSourceDataFiles.isEmpty) {
Index.UpdateMode.Merge
} else {
Index.UpdateMode.Overwrite
}
(updatedIndex, updatedMode)
}

override def refreshFull(
ctx: IndexerContext,
sourceData: DataFrame): (CoveringIndex, DataFrame) = {
val (indexData, resolvedIndexedColumns, resolvedIncludedColumns) =
CoveringIndex.createIndexData(
ctx,
sourceData,
// As indexed & included columns in previousLogEntry are resolved & prefixed names,
// need to remove the prefix to resolve with the dataframe for refresh.
indexedColumns.map(ResolvedColumn(_).name),
includedColumns.map(ResolvedColumn(_).name),
hasLineageColumn)
(
copy(
indexedColumns = resolvedIndexedColumns.map(_.normalizedName),
includedColumns = resolvedIncludedColumns.map(_.normalizedName),
schema = indexData.schema),
indexData)
mode)
}

override def equals(o: Any): Boolean =
Expand All @@ -170,22 +84,21 @@ case class CoveringIndex(
(indexedColumns, includedColumns, numBuckets)
}

def bucketSpec: BucketSpec =
BucketSpec(
numBuckets = numBuckets,
bucketColumnNames = indexedColumns,
sortColumnNames = indexedColumns)

def hasLineageColumn: Boolean = IndexUtils.hasLineageColumn(properties)
def bucketSpec: Option[BucketSpec] =
Some(
BucketSpec(
numBuckets = numBuckets,
bucketColumnNames = indexedColumns,
sortColumnNames = indexedColumns))

private def simpleStatistics: Map[String, String] = {
protected def simpleStatistics: Map[String, String] = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: override

Map(
"includedColumns" -> includedColumns.mkString(", "),
"numBuckets" -> numBuckets.toString,
"schema" -> schema.json)
}

private def extendedStatistics: Map[String, String] = {
protected def extendedStatistics: Map[String, String] = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: override

Map("hasLineage" -> hasLineageColumn.toString)
}
}
Expand All @@ -211,7 +124,7 @@ object CoveringIndex {
* case-insensitive and a column name "Foo.BaR" is given, and the schema is
* "root |-- foo |-- bar: integer", a resolved column name would be "foo.bar".
* This step is necessary to make the index work regardless of the current
* case-sensitivity setting. Normalization is to supporte nested columns.
* case-sensitivity setting. Normalization is to support nested columns.
* Nested columns in the source data is stored as a normal column in the
* index data.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package com.microsoft.hyperspace.index.covering

import java.util.Locale

import org.apache.spark.sql.DataFrame

import com.microsoft.hyperspace.Hyperspace
Expand All @@ -40,54 +38,7 @@ case class CoveringIndexConfig(
override val indexName: String,
indexedColumns: Seq[String],
includedColumns: Seq[String] = Seq())
extends IndexConfigTrait {
if (indexName.isEmpty || indexedColumns.isEmpty) {
throw new IllegalArgumentException("Empty index name or indexed columns are not allowed.")
}

val lowerCaseIndexedColumns = toLowerCase(indexedColumns)
val lowerCaseIncludedColumns = toLowerCase(includedColumns)
val lowerCaseIncludedColumnsSet = lowerCaseIncludedColumns.toSet

if (lowerCaseIndexedColumns.toSet.size < lowerCaseIndexedColumns.size) {
throw new IllegalArgumentException("Duplicate indexed column names are not allowed.")
}

if (lowerCaseIncludedColumnsSet.size < lowerCaseIncludedColumns.size) {
throw new IllegalArgumentException("Duplicate included column names are not allowed.")
}

for (indexedColumn <- lowerCaseIndexedColumns) {
if (lowerCaseIncludedColumns.contains(indexedColumn)) {
throw new IllegalArgumentException(
"Duplicate column names in indexed/included columns are not allowed.")
}
}

override def equals(that: Any): Boolean = {
that match {
case CoveringIndexConfig(thatIndexName, thatIndexedColumns, thatIncludedColumns) =>
indexName.equalsIgnoreCase(thatIndexName) &&
lowerCaseIndexedColumns.equals(toLowerCase(thatIndexedColumns)) &&
lowerCaseIncludedColumnsSet.equals(toLowerCase(thatIncludedColumns).toSet)
case _ => false
}
}

override def hashCode(): Int = {
lowerCaseIndexedColumns.hashCode + lowerCaseIncludedColumnsSet.hashCode
}

override def toString: String = {
val indexedColumnNames = lowerCaseIndexedColumns.mkString(", ")
val includedColumnNames = lowerCaseIncludedColumns.mkString(", ")
s"[indexName: $indexName; indexedColumns: $indexedColumnNames; " +
s"includedColumns: $includedColumnNames]"
}

private def toLowerCase(seq: Seq[String]): Seq[String] = seq.map(_.toLowerCase(Locale.ROOT))

override def referencedColumns: Seq[String] = indexedColumns ++ includedColumns
extends CoveringIndexConfigTrait {

override def createIndex(
ctx: IndexerContext,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.covering

import java.util.Locale

import com.microsoft.hyperspace.index._

trait CoveringIndexConfigTrait extends IndexConfigTrait {
val indexName: String
val indexedColumns: Seq[String]
val includedColumns: Seq[String]

if (indexName.isEmpty || indexedColumns.isEmpty) {
throw new IllegalArgumentException("Empty index name or indexed columns are not allowed.")
}

protected def toLowerCase(seq: Seq[String]): Seq[String] = seq.map(_.toLowerCase(Locale.ROOT))

val lowerCaseIndexedColumns = toLowerCase(indexedColumns)
val lowerCaseIncludedColumns = toLowerCase(includedColumns)
val lowerCaseIncludedColumnsSet = lowerCaseIncludedColumns.toSet

if (lowerCaseIndexedColumns.toSet.size < lowerCaseIndexedColumns.size) {
throw new IllegalArgumentException("Duplicate indexed column names are not allowed.")
}

if (lowerCaseIncludedColumnsSet.size < lowerCaseIncludedColumns.size) {
throw new IllegalArgumentException("Duplicate included column names are not allowed.")
}

for (indexedColumn <- lowerCaseIndexedColumns) {
if (lowerCaseIncludedColumns.contains(indexedColumn)) {
throw new IllegalArgumentException(
"Duplicate column names in indexed/included columns are not allowed.")
}
}

override def equals(that: Any): Boolean = {
that match {
case CoveringIndexConfig(thatIndexName, thatIndexedColumns, thatIncludedColumns) =>
indexName.equalsIgnoreCase(thatIndexName) &&
lowerCaseIndexedColumns.equals(toLowerCase(thatIndexedColumns)) &&
lowerCaseIncludedColumnsSet.equals(toLowerCase(thatIncludedColumns).toSet)
case _ => false
}
}

override def hashCode(): Int = {
lowerCaseIndexedColumns.hashCode + lowerCaseIncludedColumnsSet.hashCode
}

override def toString: String = {
val indexedColumnNames = lowerCaseIndexedColumns.mkString(", ")
val includedColumnNames = lowerCaseIncludedColumns.mkString(", ")
s"[indexName: $indexName; indexedColumns: $indexedColumnNames; " +
s"includedColumns: $includedColumnNames]"
}

override def referencedColumns: Seq[String] = indexedColumns ++ includedColumns
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ object CoveringIndexRuleUtils {
index: IndexLogEntry,
plan: LogicalPlan,
useBucketSpec: Boolean): LogicalPlan = {
val ci = index.derivedDataset.asInstanceOf[CoveringIndex]
val ci = index.derivedDataset.asInstanceOf[CoveringIndexTrait]
val provider = Hyperspace.getContext(spark).sourceProviderManager
// Note that we transform *only* the base relation and not other portions of the plan
// (e.g., filters). For instance, given the following input plan:
Expand All @@ -118,7 +118,7 @@ object CoveringIndexRuleUtils {
location,
new StructType(),
StructType(ci.schema.filter(relation.schema.contains(_))),
if (useBucketSpec) Some(ci.bucketSpec) else None,
if (useBucketSpec) ci.bucketSpec else None,
new ParquetFileFormat,
Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark, index)

Expand Down Expand Up @@ -149,7 +149,7 @@ object CoveringIndexRuleUtils {
plan: LogicalPlan,
useBucketSpec: Boolean,
useBucketUnionForAppended: Boolean): LogicalPlan = {
val ci = index.derivedDataset.asInstanceOf[CoveringIndex]
val ci = index.derivedDataset.asInstanceOf[CoveringIndexTrait]
val provider = Hyperspace.getContext(spark).sourceProviderManager
var unhandledAppendedFiles: Seq[Path] = Nil
// Get transformed plan with index data and appended files if applicable.
Expand Down Expand Up @@ -233,7 +233,7 @@ object CoveringIndexRuleUtils {
newLocation,
new StructType(),
newSchema,
if (useBucketSpec) Some(ci.bucketSpec) else None,
if (useBucketSpec) ci.bucketSpec else None,
new ParquetFileFormat,
Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark, index)

Expand Down Expand Up @@ -271,7 +271,7 @@ object CoveringIndexRuleUtils {
// Although only numBuckets of BucketSpec is used in BucketUnion*, bucketColumnNames
// and sortColumnNames are shown in plan string. So remove sortColumnNames to avoid
// misunderstanding.
val bucketSpec = ci.bucketSpec.copy(sortColumnNames = Nil)
val bucketSpec = ci.bucketSpec.get.copy(sortColumnNames = Nil)

// Merge index plan & newly shuffled plan by using bucket-aware union.
BucketUnion(
Expand Down Expand Up @@ -304,7 +304,7 @@ object CoveringIndexRuleUtils {
index: IndexLogEntry,
originalPlan: LogicalPlan,
filesAppended: Seq[Path]): LogicalPlan = {
val ci = index.derivedDataset.asInstanceOf[CoveringIndex]
val ci = index.derivedDataset.asInstanceOf[CoveringIndexTrait]
val provider = Hyperspace.getContext(spark).sourceProviderManager
// Transform the relation node to include appended files.
val planForAppended = originalPlan transformDown {
Expand Down
Loading