Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ class Analyzer(override val catalogManager: CatalogManager)
ResolveRelations ::
ResolveTables ::
ResolvePartitionSpec ::
ResolveAlterTableColumnCommands ::
ResolveAlterTableCommands ::
AddMetadataColumns ::
DeduplicateRelations ::
ResolveReferences ::
Expand Down Expand Up @@ -3607,15 +3607,15 @@ class Analyzer(override val catalogManager: CatalogManager)
* Rule to mostly resolve, normalize and rewrite column names based on case sensitivity
* for alter table column commands.
*/
object ResolveAlterTableColumnCommands extends Rule[LogicalPlan] {
object ResolveAlterTableCommands extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case a: AlterTableColumnCommand if a.table.resolved && hasUnresolvedFieldName(a) =>
case a: AlterTableCommand if a.table.resolved && hasUnresolvedFieldName(a) =>
val table = a.table.asInstanceOf[ResolvedTable]
a.transformExpressions {
case u: UnresolvedFieldName => resolveFieldNames(table, u.name, u)
}

case a @ AlterTableAddColumns(r: ResolvedTable, cols) if !a.resolved =>
case a @ AddColumns(r: ResolvedTable, cols) if !a.resolved =>
// 'colsToAdd' keeps track of new columns being added. It stores a mapping from a
// normalized parent name of fields to field names that belong to the parent.
// For example, if we add columns "a.b.c", "a.b.d", and "a.c", 'colsToAdd' will become
Expand Down Expand Up @@ -3668,7 +3668,7 @@ class Analyzer(override val catalogManager: CatalogManager)
resolved.copyTagsFrom(a)
resolved

case a @ AlterTableAlterColumn(
case a @ AlterColumn(
table: ResolvedTable, ResolvedFieldName(path, field), dataType, _, _, position) =>
val newDataType = dataType.flatMap { dt =>
// Hive style syntax provides the column type, even if it may not have changed.
Expand Down Expand Up @@ -3705,7 +3705,7 @@ class Analyzer(override val catalogManager: CatalogManager)
}.getOrElse(throw QueryCompilationErrors.missingFieldError(fieldName, table, context.origin))
}

private def hasUnresolvedFieldName(a: AlterTableColumnCommand): Boolean = {
private def hasUnresolvedFieldName(a: AlterTableCommand): Boolean = {
a.expressions.exists(_.find(_.isInstanceOf[UnresolvedFieldName]).isDefined)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,8 +442,8 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
case write: V2WriteCommand if write.resolved =>
write.query.schema.foreach(f => TypeUtils.failWithIntervalType(f.dataType))

case alter: AlterTableColumnCommand if alter.table.resolved =>
checkAlterTableColumnCommand(alter)
case alter: AlterTableCommand =>
checkAlterTableCommand(alter)

case _ => // Falls back to the following checks
}
Expand Down Expand Up @@ -939,7 +939,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
/**
* Validates the options used for alter table commands after table and columns are resolved.
*/
private def checkAlterTableColumnCommand(alter: AlterTableColumnCommand): Unit = {
private def checkAlterTableCommand(alter: AlterTableCommand): Unit = {
def checkColumnNotExists(op: String, fieldNames: Seq[String], struct: StructType): Unit = {
if (struct.findNestedField(fieldNames, includeCollections = true).isDefined) {
alter.failAnalysis(s"Cannot $op column, because ${fieldNames.quoted} " +
Expand All @@ -948,7 +948,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
}

alter match {
case AlterTableAddColumns(table: ResolvedTable, colsToAdd) =>
case AddColumns(table: ResolvedTable, colsToAdd) =>
colsToAdd.foreach { colToAdd =>
checkColumnNotExists("add", colToAdd.name, table.schema)
}
Expand All @@ -957,10 +957,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
"in the user specified columns",
alter.conf.resolver)

case AlterTableRenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) =>
case RenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) =>
checkColumnNotExists("rename", col.path :+ newName, table.schema)

case a @ AlterTableAlterColumn(table: ResolvedTable, col: ResolvedFieldName, _, _, _, _) =>
case a @ AlterColumn(table: ResolvedTable, col: ResolvedFieldName, _, _, _, _) =>
val fieldName = col.name.quoted
if (a.dataType.isDefined) {
val field = CharVarcharUtils.getRawType(col.field.metadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3611,7 +3611,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
*/
override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = withOrigin(ctx) {
val colToken = if (ctx.COLUMN() != null) "COLUMN" else "COLUMNS"
AlterTableAddColumns(
AddColumns(
createUnresolvedTable(ctx.multipartIdentifier, s"ALTER TABLE ... ADD $colToken"),
ctx.columns.qualifiedColTypeWithPosition.asScala.map(typedVisit[QualifiedColType]).toSeq
)
Expand All @@ -3627,7 +3627,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
*/
override def visitRenameTableColumn(
ctx: RenameTableColumnContext): LogicalPlan = withOrigin(ctx) {
AlterTableRenameColumn(
RenameColumn(
createUnresolvedTable(ctx.table, "ALTER TABLE ... RENAME COLUMN"),
UnresolvedFieldName(typedVisit[Seq[String]](ctx.from)),
ctx.to.getText)
Expand Down Expand Up @@ -3681,7 +3681,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg

assert(Seq(dataType, nullable, comment, position).count(_.nonEmpty) == 1)

AlterTableAlterColumn(
AlterColumn(
createUnresolvedTable(ctx.table, s"ALTER TABLE ... $verb COLUMN"),
UnresolvedFieldName(typedVisit[Seq[String]](ctx.column)),
dataType = dataType,
Expand Down Expand Up @@ -3715,7 +3715,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
Some("please run ALTER COLUMN ... SET/DROP NOT NULL instead"))
}

AlterTableAlterColumn(
AlterColumn(
createUnresolvedTable(ctx.table, s"ALTER TABLE ... CHANGE COLUMN"),
UnresolvedFieldName(columnNameParts),
dataType = Option(ctx.colType().dataType()).map(typedVisit[DataType]),
Expand All @@ -3730,7 +3730,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
if (ctx.partitionSpec != null) {
operationNotAllowed("ALTER TABLE table PARTITION partition_spec REPLACE COLUMNS", ctx)
}
AlterTableReplaceColumns(
ReplaceColumns(
createUnresolvedTable(ctx.multipartIdentifier, "ALTER TABLE ... REPLACE COLUMNS"),
ctx.columns.qualifiedColTypeWithPosition.asScala.map { colType =>
if (colType.NULL != null) {
Expand Down Expand Up @@ -3763,7 +3763,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
override def visitDropTableColumns(
ctx: DropTableColumnsContext): LogicalPlan = withOrigin(ctx) {
val columnsToDrop = ctx.columns.multipartIdentifier.asScala.map(typedVisit[Seq[String]])
AlterTableDropColumns(
DropColumns(
createUnresolvedTable(
ctx.multipartIdentifier,
"ALTER TABLE ... DROP COLUMNS"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.DataType

/**
* The base trait for commands that need to alter a v2 table with [[TableChange]]s.
*/
trait AlterTableCommand extends UnaryCommand {
def changes: Seq[TableChange]
def table: LogicalPlan
final override def child: LogicalPlan = table
}

/**
* The logical plan that defines or changes the comment of an TABLE for v2 catalogs.
*
* {{{
* COMMENT ON TABLE tableIdentifier IS ('text' | NULL)
* }}}
*
* where the `text` is the new comment written as a string literal; or `NULL` to drop the comment.
*/
case class CommentOnTable(table: LogicalPlan, comment: String) extends AlterTableCommand {
override def changes: Seq[TableChange] = {
Seq(TableChange.setProperty(TableCatalog.PROP_COMMENT, comment))
}
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(table = newChild)
}

/**
* The logical plan of the ALTER TABLE ... SET LOCATION command.
*/
case class SetTableLocation(
table: LogicalPlan,
partitionSpec: Option[TablePartitionSpec],
location: String) extends AlterTableCommand {
override def changes: Seq[TableChange] = {
if (partitionSpec.nonEmpty) {
throw QueryCompilationErrors.alterV2TableSetLocationWithPartitionNotSupportedError()
}
Seq(TableChange.setProperty(TableCatalog.PROP_LOCATION, location))
}
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(table = newChild)
}

/**
* The logical plan of the ALTER TABLE ... SET TBLPROPERTIES command.
*/
case class SetTableProperties(
table: LogicalPlan,
properties: Map[String, String]) extends AlterTableCommand {
override def changes: Seq[TableChange] = {
properties.map { case (key, value) =>
TableChange.setProperty(key, value)
}.toSeq
}
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(table = newChild)
}

/**
* The logical plan of the ALTER TABLE ... UNSET TBLPROPERTIES command.
*/
case class UnsetTableProperties(
table: LogicalPlan,
propertyKeys: Seq[String],
ifExists: Boolean) extends AlterTableCommand {
override def changes: Seq[TableChange] = {
propertyKeys.map(key => TableChange.removeProperty(key))
}
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(table = newChild)
}

/**
* The logical plan of the ALTER TABLE ... ADD COLUMNS command.
*/
case class AddColumns(
table: LogicalPlan,
columnsToAdd: Seq[QualifiedColType]) extends AlterTableCommand {
columnsToAdd.foreach { c =>
TypeUtils.failWithIntervalType(c.dataType)
}

override lazy val resolved: Boolean = table.resolved && columnsToAdd.forall(_.resolved)

override def changes: Seq[TableChange] = {
columnsToAdd.map { col =>
require(col.path.forall(_.resolved),
"FieldName should be resolved before it's converted to TableChange.")
require(col.position.forall(_.resolved),
"FieldPosition should be resolved before it's converted to TableChange.")
TableChange.addColumn(
col.name.toArray,
col.dataType,
col.nullable,
col.comment.orNull,
col.position.map(_.position).orNull)
}
}

override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(table = newChild)
}

/**
* The logical plan of the ALTER TABLE ... REPLACE COLUMNS command.
*/
case class ReplaceColumns(
table: LogicalPlan,
columnsToAdd: Seq[QualifiedColType]) extends AlterTableCommand {
columnsToAdd.foreach { c =>
TypeUtils.failWithIntervalType(c.dataType)
}

override lazy val resolved: Boolean = table.resolved && columnsToAdd.forall(_.resolved)

override def changes: Seq[TableChange] = {
// REPLACE COLUMNS deletes all the existing columns and adds new columns specified.
require(table.resolved)
val deleteChanges = table.schema.fieldNames.map { name =>
TableChange.deleteColumn(Array(name))
}
val addChanges = columnsToAdd.map { col =>
assert(col.path.isEmpty)
assert(col.position.isEmpty)
TableChange.addColumn(
col.name.toArray,
col.dataType,
col.nullable,
col.comment.orNull,
null)
}
deleteChanges ++ addChanges
}

override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(table = newChild)
}

/**
* The logical plan of the ALTER TABLE ... DROP COLUMNS command.
*/
case class DropColumns(
table: LogicalPlan,
columnsToDrop: Seq[FieldName]) extends AlterTableCommand {
override def changes: Seq[TableChange] = {
columnsToDrop.map { col =>
require(col.resolved, "FieldName should be resolved before it's converted to TableChange.")
TableChange.deleteColumn(col.name.toArray)
}
}

override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(table = newChild)
}

/**
* The logical plan of the ALTER TABLE ... RENAME COLUMN command.
*/
case class RenameColumn(
table: LogicalPlan,
column: FieldName,
newName: String) extends AlterTableCommand {
override def changes: Seq[TableChange] = {
require(column.resolved, "FieldName should be resolved before it's converted to TableChange.")
Seq(TableChange.renameColumn(column.name.toArray, newName))
}

override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(table = newChild)
}

/**
* The logical plan of the ALTER TABLE ... ALTER COLUMN command.
*/
case class AlterColumn(
table: LogicalPlan,
column: FieldName,
dataType: Option[DataType],
nullable: Option[Boolean],
comment: Option[String],
position: Option[FieldPosition]) extends AlterTableCommand {
override def changes: Seq[TableChange] = {
require(column.resolved, "FieldName should be resolved before it's converted to TableChange.")
val colName = column.name.toArray
val typeChange = dataType.map { newDataType =>
TableChange.updateColumnType(colName, newDataType)
}
val nullabilityChange = nullable.map { nullable =>
TableChange.updateColumnNullability(colName, nullable)
}
val commentChange = comment.map { newComment =>
TableChange.updateColumnComment(colName, newComment)
}
val positionChange = position.map { newPosition =>
require(newPosition.resolved,
"FieldPosition should be resolved before it's converted to TableChange.")
TableChange.updateColumnPosition(colName, newPosition.position)
}
typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange
}

override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(table = newChild)
}
Loading