Skip to content

Commit 2a6e68e

Browse files
imback82cloud-fan
authored andcommitted
[SPARK-34546][SQL] AlterViewAs.query should be analyzed during the analysis phase, and AlterViewAs should invalidate the cache
### What changes were proposed in this pull request? This PR proposes the following: * `AlterViewAs.query` is currently analyzed in the physical operator `AlterViewAsCommand`, but it should be analyzed during the analysis phase. * When `spark.sql.legacy.storeAnalyzedPlanForView` is set to true, store `TermporaryViewRelation` which wraps the analyzed plan, similar to #31273. * Try to uncache the view you are altering. ### Why are the changes needed? Analyzing a plan should be done in the analysis phase if possible. Not uncaching the view (existing behavior) seems like a bug since the cache may not be used again. ### Does this PR introduce _any_ user-facing change? Yes, now the view can be uncached if it's already cached. ### How was this patch tested? Added new tests around uncaching. The existing tests such as `SQLViewSuite` should cover the analysis changes. Closes #31652 from imback82/alter_view_child. Authored-by: Terry Kim <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 5e120e4 commit 2a6e68e

File tree

10 files changed

+183
-88
lines changed

10 files changed

+183
-88
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -879,7 +879,7 @@ class Analyzer(override val catalogManager: CatalogManager)
879879

880880
private def unwrapRelationPlan(plan: LogicalPlan): LogicalPlan = {
881881
EliminateSubqueryAliases(plan) match {
882-
case v: View if v.isDataFrameTempView => v.child
882+
case v: View if v.isTempViewStoringAnalyzedPlan => v.child
883883
case other => other
884884
}
885885
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ object UnsupportedOperationChecker extends Logging {
393393
case (_: Project | _: Filter | _: MapElements | _: MapPartitions |
394394
_: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias |
395395
_: TypedFilter) =>
396-
case v: View if v.isDataFrameTempView =>
396+
case v: View if v.isTempViewStoringAnalyzedPlan =>
397397
case node if node.nodeName == "StreamingRelationV2" =>
398398
case node =>
399399
throwError(s"Continuous processing does not support ${node.nodeName} operations.")

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.json4s.jackson.JsonMethods._
3131
import org.apache.spark.internal.Logging
3232
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, SQLConfHelper, TableIdentifier}
3333
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
34-
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_CREATED_FROM_DATAFRAME
34+
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN
3535
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
3636
import org.apache.spark.sql.catalyst.plans.logical._
3737
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
@@ -468,7 +468,7 @@ object CatalogTable {
468468
val VIEW_REFERRED_TEMP_VIEW_NAMES = VIEW_PREFIX + "referredTempViewNames"
469469
val VIEW_REFERRED_TEMP_FUNCTION_NAMES = VIEW_PREFIX + "referredTempFunctionsNames"
470470

471-
val VIEW_CREATED_FROM_DATAFRAME = VIEW_PREFIX + "createdFromDataFrame"
471+
val VIEW_STORING_ANALYZED_PLAN = VIEW_PREFIX + "storingAnalyzedPlan"
472472

473473
def splitLargeTableProp(
474474
key: String,
@@ -782,14 +782,14 @@ case class UnresolvedCatalogRelation(
782782

783783
/**
784784
* A wrapper to store the temporary view info, will be kept in `SessionCatalog`
785-
* and will be transformed to `View` during analysis. If the temporary view was
786-
* created from a dataframe, `plan` is set to the analyzed plan for the view.
785+
* and will be transformed to `View` during analysis. If the temporary view is
786+
* storing an analyzed plan, `plan` is set to the analyzed plan for the view.
787787
*/
788788
case class TemporaryViewRelation(
789789
tableMeta: CatalogTable,
790790
plan: Option[LogicalPlan] = None) extends LeafNode {
791791
require(plan.isEmpty ||
792-
(plan.get.resolved && tableMeta.properties.contains(VIEW_CREATED_FROM_DATAFRAME)))
792+
(plan.get.resolved && tableMeta.properties.contains(VIEW_STORING_ANALYZED_PLAN)))
793793

794794
override lazy val resolved: Boolean = false
795795
override def output: Seq[Attribute] = Nil

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical
2020
import org.apache.spark.sql.catalyst.AliasIdentifier
2121
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
2222
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
23-
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_CREATED_FROM_DATAFRAME
23+
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN
2424
import org.apache.spark.sql.catalyst.expressions._
2525
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
2626
import org.apache.spark.sql.catalyst.plans._
@@ -462,7 +462,7 @@ case class View(
462462
desc: CatalogTable,
463463
isTempView: Boolean,
464464
child: LogicalPlan) extends UnaryNode {
465-
require(!isDataFrameTempView || child.resolved)
465+
require(!isTempViewStoringAnalyzedPlan || child.resolved)
466466

467467
override def output: Seq[Attribute] = child.output
468468

@@ -475,8 +475,8 @@ case class View(
475475
case _ => child.canonicalized
476476
}
477477

478-
def isDataFrameTempView: Boolean =
479-
isTempView && desc.properties.contains(VIEW_CREATED_FROM_DATAFRAME)
478+
def isTempViewStoringAnalyzedPlan: Boolean =
479+
isTempView && desc.properties.contains(VIEW_STORING_ANALYZED_PLAN)
480480

481481
// When resolving a SQL view, we use an extra Project to add cast and alias to make sure the view
482482
// output schema doesn't change even if the table referenced by the view is changed after view

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -855,7 +855,7 @@ case class AlterViewAs(
855855
child: LogicalPlan,
856856
originalText: String,
857857
query: LogicalPlan) extends Command {
858-
override def children: Seq[LogicalPlan] = child :: Nil
858+
override def children: Seq[LogicalPlan] = child :: query :: Nil
859859
}
860860

861861
/**

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ trait AnalysisTest extends PlanTest {
6666
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
6767
val actualPlan = getAnalyzer.executeAndCheck(inputPlan, new QueryPlanningTracker)
6868
val transformed = actualPlan transformUp {
69-
case v: View if v.isDataFrameTempView => v.child
69+
case v: View if v.isTempViewStoringAnalyzedPlan => v.child
7070
}
7171
comparePlans(transformed, expectedPlan)
7272
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
8686
}
8787

8888
private def getTempViewRawPlan(plan: Option[LogicalPlan]): Option[LogicalPlan] = plan match {
89-
case Some(v: View) if v.isDataFrameTempView => Some(v.child)
89+
case Some(v: View) if v.isTempViewStoringAnalyzedPlan => Some(v.child)
9090
case other => other
9191
}
9292

sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
474474
case SetTableLocation(ResolvedV1TableIdentifier(ident), partitionSpec, location) =>
475475
AlterTableSetLocationCommand(ident.asTableIdentifier, partitionSpec, location)
476476

477-
case AlterViewAs(ResolvedView(ident, _), originalText, query) =>
477+
case AlterViewAs(ResolvedView(ident, _), originalText, query) if query.resolved =>
478478
AlterViewAsCommand(
479479
ident.asTableIdentifier,
480480
originalText,

sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala

Lines changed: 104 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ import scala.collection.mutable
2222
import org.json4s.JsonAST.{JArray, JString}
2323
import org.json4s.jackson.JsonMethods._
2424

25+
import org.apache.spark.internal.Logging
2526
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
26-
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
27+
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier}
2728
import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, PersistedView, ViewType}
2829
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog, TemporaryViewRelation}
2930
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression, UserDefinedExpression}
@@ -115,48 +116,27 @@ case class CreateViewCommand(
115116

116117
if (viewType == LocalTempView) {
117118
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
118-
if (replace && needsToUncache(catalog.getRawTempView(name.table), aliasedPlan)) {
119-
logInfo(s"Try to uncache ${name.quotedString} before replacing.")
120-
checkCyclicViewReference(analyzedPlan, Seq(name), name)
121-
CommandUtils.uncacheTableOrView(sparkSession, name.quotedString)
122-
}
123-
// If there is no sql text (e.g. from Dataset API), we will always store the analyzed plan
124-
val tableDefinition = if (!conf.storeAnalyzedPlanForView && originalText.nonEmpty) {
125-
TemporaryViewRelation(
126-
prepareTemporaryView(
127-
name,
128-
sparkSession,
129-
analyzedPlan,
130-
aliasedPlan.schema,
131-
originalText))
132-
} else {
133-
TemporaryViewRelation(
134-
prepareTemporaryViewFromDataFrame(name, aliasedPlan),
135-
Some(aliasedPlan))
136-
}
119+
val tableDefinition = createTemporaryViewRelation(
120+
name,
121+
sparkSession,
122+
replace,
123+
catalog.getRawTempView,
124+
originalText,
125+
analyzedPlan,
126+
aliasedPlan)
137127
catalog.createTempView(name.table, tableDefinition, overrideIfExists = replace)
138128
} else if (viewType == GlobalTempView) {
139129
val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
140130
val viewIdent = TableIdentifier(name.table, Option(db))
141131
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
142-
if (replace && needsToUncache(catalog.getRawGlobalTempView(name.table), aliasedPlan)) {
143-
logInfo(s"Try to uncache ${viewIdent.quotedString} before replacing.")
144-
checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent)
145-
CommandUtils.uncacheTableOrView(sparkSession, viewIdent.quotedString)
146-
}
147-
val tableDefinition = if (!conf.storeAnalyzedPlanForView && originalText.nonEmpty) {
148-
TemporaryViewRelation(
149-
prepareTemporaryView(
150-
viewIdent,
151-
sparkSession,
152-
analyzedPlan,
153-
aliasedPlan.schema,
154-
originalText))
155-
} else {
156-
TemporaryViewRelation(
157-
prepareTemporaryViewFromDataFrame(viewIdent, aliasedPlan),
158-
Some(aliasedPlan))
159-
}
132+
val tableDefinition = createTemporaryViewRelation(
133+
viewIdent,
134+
sparkSession,
135+
replace,
136+
catalog.getRawGlobalTempView,
137+
originalText,
138+
analyzedPlan,
139+
aliasedPlan)
160140
catalog.createGlobalTempView(name.table, tableDefinition, overrideIfExists = replace)
161141
} else if (catalog.tableExists(name)) {
162142
val tableMetadata = catalog.getTableMetadata(name)
@@ -192,20 +172,6 @@ case class CreateViewCommand(
192172
Seq.empty[Row]
193173
}
194174

195-
/**
196-
* Checks if need to uncache the temp view being replaced.
197-
*/
198-
private def needsToUncache(
199-
rawTempView: Option[LogicalPlan],
200-
aliasedPlan: LogicalPlan): Boolean = rawTempView match {
201-
// The temp view doesn't exist, no need to uncache.
202-
case None => false
203-
// Do not need to uncache if the to-be-replaced temp view plan and the new plan are the
204-
// same-result plans.
205-
case Some(TemporaryViewRelation(_, Some(p))) => !p.sameResult(aliasedPlan)
206-
case Some(p) => !p.sameResult(aliasedPlan)
207-
}
208-
209175
/**
210176
* If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns,
211177
* else return the analyzed plan directly.
@@ -274,28 +240,29 @@ case class AlterViewAsCommand(
274240
override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
275241

276242
override def run(session: SparkSession): Seq[Row] = {
277-
// If the plan cannot be analyzed, throw an exception and don't proceed.
278-
val qe = session.sessionState.executePlan(query)
279-
qe.assertAnalyzed()
280-
val analyzedPlan = qe.analyzed
281-
282243
if (session.sessionState.catalog.isTempView(name)) {
283-
alterTemporaryView(session, analyzedPlan)
244+
alterTemporaryView(session, query)
284245
} else {
285-
alterPermanentView(session, analyzedPlan)
246+
alterPermanentView(session, query)
286247
}
287248
Seq.empty[Row]
288249
}
289250

290251
private def alterTemporaryView(session: SparkSession, analyzedPlan: LogicalPlan): Unit = {
291-
val tableDefinition = if (conf.storeAnalyzedPlanForView) {
292-
analyzedPlan
252+
val catalog = session.sessionState.catalog
253+
val getRawTempView: String => Option[LogicalPlan] = if (name.database.isEmpty) {
254+
catalog.getRawTempView
293255
} else {
294-
checkCyclicViewReference(analyzedPlan, Seq(name), name)
295-
TemporaryViewRelation(
296-
prepareTemporaryView(
297-
name, session, analyzedPlan, analyzedPlan.schema, Some(originalText)))
256+
catalog.getRawGlobalTempView
298257
}
258+
val tableDefinition = createTemporaryViewRelation(
259+
name,
260+
session,
261+
replace = true,
262+
getRawTempView,
263+
Some(originalText),
264+
analyzedPlan,
265+
aliasedPlan = analyzedPlan)
299266
session.sessionState.catalog.alterTempViewDefinition(name, tableDefinition)
300267
}
301268

@@ -306,6 +273,9 @@ case class AlterViewAsCommand(
306273
val viewIdent = viewMeta.identifier
307274
checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent)
308275

276+
logDebug(s"Try to uncache ${viewIdent.quotedString} before replacing.")
277+
CommandUtils.uncacheTableOrView(session, viewIdent.quotedString)
278+
309279
val newProperties = generateViewProperties(
310280
viewMeta.properties, session, analyzedPlan, analyzedPlan.schema.fieldNames)
311281

@@ -349,7 +319,7 @@ case class ShowViewsCommand(
349319
}
350320
}
351321

352-
object ViewHelper {
322+
object ViewHelper extends SQLConfHelper with Logging {
353323

354324
private val configPrefixDenyList = Seq(
355325
SQLConf.MAX_NESTED_VIEW_DEPTH.key,
@@ -596,19 +566,80 @@ object ViewHelper {
596566
(collectTempViews(child), collectTempFunctions(child))
597567
}
598568

569+
/**
570+
* Returns a [[TemporaryViewRelation]] that contains information about a temporary view
571+
* to create, given an analyzed plan of the view. If a temp view is to be replaced and it is
572+
* cached, it will be uncached before being replaced.
573+
*
574+
* @param name the name of the temporary view to create/replace.
575+
* @param session the spark session.
576+
* @param replace if true and the existing view is cached, it will be uncached.
577+
* @param getRawTempView the function that returns an optional raw plan of the local or
578+
* global temporary view.
579+
* @param originalText the original SQL text of this view, can be None if this view is created via
580+
* Dataset API or spark.sql.legacy.storeAnalyzedPlanForView is set to true.
581+
* @param analyzedPlan the logical plan that represents the view; this is used to generate the
582+
* logical plan for temporary view and the view schema.
583+
* @param aliasedPlan the aliased logical plan based on the user specified columns. If there are
584+
* no user specified plans, this should be same as `analyzedPlan`.
585+
*/
586+
def createTemporaryViewRelation(
587+
name: TableIdentifier,
588+
session: SparkSession,
589+
replace: Boolean,
590+
getRawTempView: String => Option[LogicalPlan],
591+
originalText: Option[String],
592+
analyzedPlan: LogicalPlan,
593+
aliasedPlan: LogicalPlan): TemporaryViewRelation = {
594+
val uncache = getRawTempView(name.table).map { r =>
595+
needsToUncache(r, aliasedPlan)
596+
}.getOrElse(false)
597+
if (replace && uncache) {
598+
logDebug(s"Try to uncache ${name.quotedString} before replacing.")
599+
checkCyclicViewReference(analyzedPlan, Seq(name), name)
600+
CommandUtils.uncacheTableOrView(session, name.quotedString)
601+
}
602+
if (!conf.storeAnalyzedPlanForView && originalText.nonEmpty) {
603+
TemporaryViewRelation(
604+
prepareTemporaryView(
605+
name,
606+
session,
607+
analyzedPlan,
608+
aliasedPlan.schema,
609+
originalText.get))
610+
} else {
611+
TemporaryViewRelation(
612+
prepareTemporaryViewStoringAnalyzedPlan(name, aliasedPlan),
613+
Some(aliasedPlan))
614+
}
615+
}
616+
617+
/**
618+
* Checks if need to uncache the temp view being replaced.
619+
*/
620+
private def needsToUncache(
621+
rawTempView: LogicalPlan,
622+
aliasedPlan: LogicalPlan): Boolean = rawTempView match {
623+
// If TemporaryViewRelation doesn't store the analyzed view, always uncache.
624+
case TemporaryViewRelation(_, None) => true
625+
// Do not need to uncache if the to-be-replaced temp view plan and the new plan are the
626+
// same-result plans.
627+
case TemporaryViewRelation(_, Some(p)) => !p.sameResult(aliasedPlan)
628+
case p => !p.sameResult(aliasedPlan)
629+
}
599630

600631
/**
601632
* Returns a [[CatalogTable]] that contains information for temporary view.
602633
* Generate the view-specific properties(e.g. view default database, view query output
603634
* column names) and store them as properties in the CatalogTable, and also creates
604635
* the proper schema for the view.
605636
*/
606-
def prepareTemporaryView(
637+
private def prepareTemporaryView(
607638
viewName: TableIdentifier,
608639
session: SparkSession,
609640
analyzedPlan: LogicalPlan,
610641
viewSchema: StructType,
611-
originalText: Option[String]): CatalogTable = {
642+
originalText: String): CatalogTable = {
612643

613644
val catalog = session.sessionState.catalog
614645
val (tempViews, tempFunctions) = collectTemporaryObjects(catalog, analyzedPlan)
@@ -622,22 +653,22 @@ object ViewHelper {
622653
tableType = CatalogTableType.VIEW,
623654
storage = CatalogStorageFormat.empty,
624655
schema = viewSchema,
625-
viewText = originalText,
656+
viewText = Some(originalText),
626657
properties = newProperties)
627658
}
628659

629660
/**
630-
* Returns a [[CatalogTable]] that contains information for the temporary view created
631-
* from a dataframe.
661+
* Returns a [[CatalogTable]] that contains information for the temporary view storing
662+
* an analyzed plan.
632663
*/
633-
def prepareTemporaryViewFromDataFrame(
664+
private def prepareTemporaryViewStoringAnalyzedPlan(
634665
viewName: TableIdentifier,
635666
analyzedPlan: LogicalPlan): CatalogTable = {
636667
CatalogTable(
637668
identifier = viewName,
638669
tableType = CatalogTableType.VIEW,
639670
storage = CatalogStorageFormat.empty,
640671
schema = analyzedPlan.schema,
641-
properties = Map((VIEW_CREATED_FROM_DATAFRAME, "true")))
672+
properties = Map((VIEW_STORING_ANALYZED_PLAN, "true")))
642673
}
643674
}

0 commit comments

Comments
 (0)