Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -836,14 +836,6 @@ class Analyzer(override val catalogManager: CatalogManager)
lookupAndResolveTempView(ident)
.map(view => i.copy(table = view))
.getOrElse(i)
case c @ CacheTable(UnresolvedRelation(ident, _, false), _, _, _) =>
lookupAndResolveTempView(ident)
.map(view => c.copy(table = view))
.getOrElse(c)
case c @ UncacheTable(UnresolvedRelation(ident, _, false), _, _) =>
lookupAndResolveTempView(ident)
.map(view => c.copy(table = view, isTempView = true))
.getOrElse(c)
// TODO (SPARK-27484): handle streaming write commands when we have them.
case write: V2WriteCommand =>
write.table match {
Expand Down Expand Up @@ -1022,16 +1014,6 @@ class Analyzer(override val catalogManager: CatalogManager)
.map(v2Relation => i.copy(table = v2Relation))
.getOrElse(i)

case c @ CacheTable(u @ UnresolvedRelation(_, _, false), _, _, _) =>
lookupV2Relation(u.multipartIdentifier, u.options, false)
.map(v2Relation => c.copy(table = v2Relation))
.getOrElse(c)

case c @ UncacheTable(u @ UnresolvedRelation(_, _, false), _, _) =>
lookupV2Relation(u.multipartIdentifier, u.options, false)
.map(v2Relation => c.copy(table = v2Relation))
.getOrElse(c)

// TODO (SPARK-27484): handle streaming write commands when we have them.
case write: V2WriteCommand =>
write.table match {
Expand Down Expand Up @@ -1129,20 +1111,6 @@ class Analyzer(override val catalogManager: CatalogManager)
case other => i.copy(table = other)
}

case c @ CacheTable(u @ UnresolvedRelation(_, _, false), _, _, _) =>
lookupRelation(u.multipartIdentifier, u.options, false)
.map(resolveViews)
.map(EliminateSubqueryAliases(_))
.map(relation => c.copy(table = relation))
.getOrElse(c)

case c @ UncacheTable(u @ UnresolvedRelation(_, _, false), _, _) =>
lookupRelation(u.multipartIdentifier, u.options, false)
.map(resolveViews)
.map(EliminateSubqueryAliases(_))
.map(relation => c.copy(table = relation))
.getOrElse(c)

// TODO (SPARK-27484): handle streaming write commands when we have them.
case write: V2WriteCommand =>
write.table match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _) =>
u.failAnalysis(s"Table not found: ${u.multipartIdentifier.quoted}")

case CacheTable(u: UnresolvedRelation, _, _, _) =>
u.failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}")

case UncacheTable(u: UnresolvedRelation, _, _) =>
u.failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}")
Comment on lines -128 to -132
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we don't need to check them anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CacheTable.table is now a child of the plan, so it will be handled here:

case u: UnresolvedRelation =>
  u.failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}")


// TODO (SPARK-27484): handle streaming write commands when we have them.
case write: V2WriteCommand if write.table.isInstanceOf[UnresolvedRelation] =>
val tblName = write.table.asInstanceOf[UnresolvedRelation].multipartIdentifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1012,7 +1012,18 @@ case class CacheTable(
table: LogicalPlan,
multipartIdentifier: Seq[String],
isLazy: Boolean,
options: Map[String, String]) extends LeafCommand
options: Map[String, String],
isAnalyzed: Boolean = false) extends AnalysisOnlyCommand {
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[LogicalPlan]): CacheTable = {
assert(!isAnalyzed)
copy(table = newChildren.head)
}

override def childrenToAnalyze: Seq[LogicalPlan] = table :: Nil

override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
}

/**
* The logical plan of the CACHE TABLE ... AS SELECT command.
Expand Down Expand Up @@ -1041,7 +1052,17 @@ case class CacheTableAsSelect(
case class UncacheTable(
table: LogicalPlan,
ifExists: Boolean,
isTempView: Boolean = false) extends LeafCommand
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isTempView is not stored anymore but checked in DataSourceV2Strategy.scala.

isAnalyzed: Boolean = false) extends AnalysisOnlyCommand {
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[LogicalPlan]): UncacheTable = {
assert(!isAnalyzed)
copy(table = newChildren.head)
}

override def childrenToAnalyze: Seq[LogicalPlan] = table :: Nil

override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
}

/**
* The logical plan of the ALTER TABLE ... SET LOCATION command.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.ScanOperation
import org.apache.spark.sql.catalyst.plans.logical.{CacheTable, InsertIntoDir, InsertIntoStatement, LogicalPlan, Project, UncacheTable}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.connector.catalog.SupportsRead
Expand Down Expand Up @@ -271,20 +271,6 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, false), _, _, _, _, _) =>
i.copy(table = DDLUtils.readHiveTable(tableMeta))

case c @ CacheTable(UnresolvedCatalogRelation(tableMeta, options, false), _, _, _)
if DDLUtils.isDatasourceTable(tableMeta) =>
c.copy(table = readDataSourceTable(tableMeta, options))

case c @ CacheTable(UnresolvedCatalogRelation(tableMeta, _, false), _, _, _) =>
c.copy(table = DDLUtils.readHiveTable(tableMeta))

case u @ UncacheTable(UnresolvedCatalogRelation(tableMeta, options, false), _, _)
if DDLUtils.isDatasourceTable(tableMeta) =>
u.copy(table = readDataSourceTable(tableMeta, options))

case u @ UncacheTable(UnresolvedCatalogRelation(tableMeta, _, false), _, _) =>
u.copy(table = DDLUtils.readHiveTable(tableMeta))

case UnresolvedCatalogRelation(tableMeta, options, false)
if DDLUtils.isDatasourceTable(tableMeta) =>
readDataSourceTable(tableMeta, options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,11 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
CacheTableAsSelectExec(r.tempViewName, r.plan, r.originalText, r.isLazy, r.options) :: Nil

case r: UncacheTable =>
UncacheTableExec(r.table, cascade = !r.isTempView) :: Nil
def isTempView(table: LogicalPlan): Boolean = table match {
case SubqueryAlias(_, v: View) => v.isTempView
case _ => false
}
UncacheTableExec(r.table, cascade = !isTempView(r.table)) :: Nil

case SetTableLocation(table: ResolvedTable, partitionSpec, location) =>
if (partitionSpec.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans.logical.{CacheTable, InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics, UncacheTable}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogV2Util.assertNoNullTypeInSchema
import org.apache.spark.sql.execution._
Expand Down Expand Up @@ -231,16 +231,6 @@ case class RelationConversions(
assertNoNullTypeInSchema(query.schema)
OptimizedCreateHiveTableAsSelectCommand(
tableDesc, query, query.output.map(_.name), mode)

// Cache table
case c @ CacheTable(relation: HiveTableRelation, _, _, _)
if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
c.copy(table = metastoreCatalog.convert(relation))

// Uncache table
case u @ UncacheTable(relation: HiveTableRelation, _, _)
if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
u.copy(table = metastoreCatalog.convert(relation))
}
}
}
Expand Down