-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-6339][SQL] Supports CREATE TEMPORARY VIEW tableIdentifier AS query #12872
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ce139a6
8af667e
238db12
1e20bb0
85d121c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command | |
| import scala.util.control.NonFatal | ||
|
|
||
| import org.apache.spark.sql.{AnalysisException, Row, SparkSession} | ||
| import org.apache.spark.sql.catalyst.SQLBuilder | ||
| import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier} | ||
| import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType} | ||
| import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute} | ||
| import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} | ||
|
|
@@ -37,13 +37,18 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} | |
| * already exists, throws analysis exception. | ||
| * @param replace if true, and if the view already exists, updates it; if false, and if the view | ||
| * already exists, throws analysis exception. | ||
| * @param isTemporary if true, the view is created as a temporary view. Temporary views are dropped | ||
| * at the end of current Spark session. Existing permanent relations with the same | ||
| * name are not visible to the current session while the temporary view exists, | ||
| * unless they are specified with full qualified table name with database prefix. | ||
| * @param sql the original sql | ||
| */ | ||
| case class CreateViewCommand( | ||
| tableDesc: CatalogTable, | ||
| child: LogicalPlan, | ||
| allowExisting: Boolean, | ||
| replace: Boolean, | ||
| isTemporary: Boolean, | ||
| sql: String) | ||
| extends RunnableCommand { | ||
|
|
||
|
|
@@ -55,13 +60,24 @@ case class CreateViewCommand( | |
| require(tableDesc.tableType == CatalogTableType.VIEW) | ||
| require(tableDesc.viewText.isDefined) | ||
|
|
||
| private val tableIdentifier = tableDesc.identifier | ||
|
|
||
| if (allowExisting && replace) { | ||
| throw new AnalysisException( | ||
| "It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.") | ||
| } | ||
|
|
||
| // Disallows 'CREATE TEMPORARY VIEW IF NOT EXISTS' to be consistent with 'CREATE TEMPORARY TABLE' | ||
| if (allowExisting && isTemporary) { | ||
| throw new AnalysisException( | ||
| "It is not allowed to define a TEMPORARY view with IF NOT EXISTS.") | ||
| } | ||
|
|
||
| // Temporary view names should NOT contain database prefix like "database.table" | ||
| if (isTemporary && tableDesc.identifier.database.isDefined) { | ||
| val database = tableDesc.identifier.database.get | ||
| throw new AnalysisException( | ||
| s"It is not allowed to add database prefix ${database} for the TEMPORARY view name.") | ||
| } | ||
|
|
||
| override def run(sparkSession: SparkSession): Seq[Row] = { | ||
| // If the plan cannot be analyzed, throw an exception and don't proceed. | ||
| val qe = sparkSession.executePlan(child) | ||
|
|
@@ -71,29 +87,59 @@ case class CreateViewCommand( | |
| require(tableDesc.schema == Nil || tableDesc.schema.length == analyzedPlan.output.length) | ||
| val sessionState = sparkSession.sessionState | ||
|
|
||
| if (sessionState.catalog.tableExists(tableIdentifier)) { | ||
| if (allowExisting) { | ||
| // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view | ||
| // already exists. | ||
| } else if (replace) { | ||
| // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...` | ||
| sessionState.catalog.alterTable(prepareTable(sparkSession, analyzedPlan)) | ||
| if (isTemporary) { | ||
| createTemporaryView(tableDesc.identifier, sparkSession, analyzedPlan) | ||
| } else { | ||
| // Adds default database for permanent table if it doesn't exist, so that tableExists() | ||
| // only check permanent tables. | ||
| val database = tableDesc.identifier.database.getOrElse( | ||
| sessionState.catalog.getCurrentDatabase) | ||
| val tableIdentifier = tableDesc.identifier.copy(database = Option(database)) | ||
|
|
||
| if (sessionState.catalog.tableExists(tableIdentifier)) { | ||
| if (allowExisting) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can simplify the code here by ommiting this if clause and adding the explicit check
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed by disallowing "IF NOT EXISTS" syntax to be consistent with "CREATE TEMPORARY TABLE" behavior. |
||
| // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view | ||
| // already exists. | ||
| } else if (replace) { | ||
| // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...` | ||
| sessionState.catalog.alterTable(prepareTable(sparkSession, analyzedPlan)) | ||
| } else { | ||
| // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already | ||
| // exists. | ||
| throw new AnalysisException( | ||
| s"View $tableIdentifier already exists. If you want to update the view definition, " + | ||
| "please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS") | ||
| } | ||
| } else { | ||
| // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already | ||
| // exists. | ||
| throw new AnalysisException(s"View $tableIdentifier already exists. " + | ||
| "If you want to update the view definition, please use ALTER VIEW AS or " + | ||
| "CREATE OR REPLACE VIEW AS") | ||
| // Create the view if it doesn't exist. | ||
| sessionState.catalog.createTable( | ||
| prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false) | ||
| } | ||
| } else { | ||
| // Create the view if it doesn't exist. | ||
| sessionState.catalog.createTable( | ||
| prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false) | ||
| } | ||
|
|
||
| Seq.empty[Row] | ||
| } | ||
|
|
||
| private def createTemporaryView( | ||
| table: TableIdentifier, sparkSession: SparkSession, analyzedPlan: LogicalPlan): Unit = { | ||
|
|
||
| val sessionState = sparkSession.sessionState | ||
| val catalog = sessionState.catalog | ||
|
|
||
| // Projects column names to alias names | ||
| val logicalPlan = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this possible? To have different columns and query output?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we can project the columns like this:
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BTW, seems that this feature is not tested yet?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is covered by UT
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nvm, actually it is. |
||
| if (tableDesc.schema.isEmpty) { | ||
| analyzedPlan | ||
| } else { | ||
| val projectList = analyzedPlan.output.zip(tableDesc.schema).map { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems you want to check if
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nvm
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we already check the lenght |
||
| case (attr, col) => Alias(attr, col.name)() | ||
| } | ||
| sparkSession.executePlan(Project(projectList, analyzedPlan)).analyzed | ||
| } | ||
| } | ||
|
|
||
| catalog.createTempTable(table.table, logicalPlan, replace) | ||
| } | ||
|
|
||
| /** | ||
| * Returns a [[CatalogTable]] that can be used to save in the catalog. This comment canonicalize | ||
| * SQL based on the analyzed plan, and also creates the proper schema for the view. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,11 +37,21 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { | |
| sqlContext.sql(s"DROP TABLE IF EXISTS jt") | ||
| } | ||
|
|
||
| test("nested views") { | ||
| withView("jtv1", "jtv2") { | ||
| sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3").collect() | ||
| sql("CREATE VIEW jtv2 AS SELECT * FROM jtv1 WHERE id < 6").collect() | ||
| test("nested views (interleaved with temporary views)") { | ||
| withView("jtv1", "jtv2", "jtv3", "temp_jtv1", "temp_jtv2", "temp_jtv3") { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does withView("jtv1", "jtv2", "jtv3") {
withTempTable("temp_jtv1", "temp_jtv2", "temp_jtv3") {
// ...
}
}I'd suggest to make sure that
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (This applies to all test cases modified/introduced in this PR.)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @clockfly had helped verify that |
||
| sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3") | ||
| sql("CREATE VIEW jtv2 AS SELECT * FROM jtv1 WHERE id < 6") | ||
| checkAnswer(sql("select count(*) FROM jtv2"), Row(2)) | ||
|
|
||
| // Checks temporary views | ||
| sql("CREATE TEMPORARY VIEW temp_jtv1 AS SELECT * FROM jt WHERE id > 3") | ||
| sql("CREATE TEMPORARY VIEW temp_jtv2 AS SELECT * FROM temp_jtv1 WHERE id < 6") | ||
| checkAnswer(sql("select count(*) FROM temp_jtv2"), Row(2)) | ||
|
|
||
| // Checks interleaved temporary view and normal view | ||
| sql("CREATE TEMPORARY VIEW temp_jtv3 AS SELECT * FROM jt WHERE id > 3") | ||
| sql("CREATE VIEW jtv3 AS SELECT * FROM temp_jtv3 WHERE id < 6") | ||
| checkAnswer(sql("select count(*) FROM jtv3"), Row(2)) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -57,6 +67,33 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { | |
| } | ||
| } | ||
|
|
||
| test("error handling: fail if the temp view name contains the database prefix") { | ||
| // Fully qualified table name like "database.table" is not allowed for temporary view | ||
| val e = intercept[AnalysisException] { | ||
| sql("CREATE OR REPLACE TEMPORARY VIEW default.myabcdview AS SELECT * FROM jt") | ||
| } | ||
| assert(e.message.contains("It is not allowed to add database prefix")) | ||
| } | ||
|
|
||
| test("error handling: disallow IF NOT EXISTS for CREATE TEMPORARY VIEW") { | ||
| val e = intercept[AnalysisException] { | ||
| sql("CREATE TEMPORARY VIEW IF NOT EXISTS myabcdview AS SELECT * FROM jt") | ||
| } | ||
| assert(e.message.contains("It is not allowed to define a TEMPORARY view with IF NOT EXISTS")) | ||
| } | ||
|
|
||
| test("error handling: fail if the temp view sql itself is invalid") { | ||
| // A table that does not exist for temporary view | ||
| intercept[AnalysisException] { | ||
| sql("CREATE OR REPLACE TEMPORARY VIEW myabcdview AS SELECT * FROM table_not_exist1345") | ||
| } | ||
|
|
||
| // A column that does not exist, for temporary view | ||
| intercept[AnalysisException] { | ||
| sql("CREATE OR REPLACE TEMPORARY VIEW myabcdview AS SELECT random1234 FROM jt") | ||
| } | ||
| } | ||
|
|
||
| test("correctly parse CREATE VIEW statement") { | ||
| withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") { | ||
| sql( | ||
|
|
@@ -69,18 +106,70 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { | |
| } | ||
| } | ||
|
|
||
| test("correctly parse CREATE TEMPORARY VIEW statement") { | ||
| withView("testView") { | ||
| sql( | ||
| """CREATE TEMPORARY VIEW | ||
| |testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla') | ||
| |TBLPROPERTIES ('a' = 'b') | ||
| |AS SELECT * FROM jt | ||
| |""".stripMargin) | ||
| checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i))) | ||
| } | ||
| } | ||
|
|
||
| test("should NOT allow CREATE TEMPORARY VIEW when TEMPORARY VIEW with same name exists") { | ||
| withView("testView") { | ||
| sql("CREATE TEMPORARY VIEW testView AS SELECT id FROM jt") | ||
|
|
||
| val e = intercept[AnalysisException] { | ||
| sql("CREATE TEMPORARY VIEW testView AS SELECT id FROM jt") | ||
| } | ||
|
|
||
| assert(e.message.contains("Temporary table") && e.message.contains("already exists")) | ||
| } | ||
| } | ||
|
|
||
| test("should allow CREATE TEMPORARY VIEW when a permanent VIEW with same name exists") { | ||
| withView("testView", "default.testView") { | ||
| sql("CREATE VIEW testView AS SELECT id FROM jt") | ||
| sql("CREATE TEMPORARY VIEW testView AS SELECT id FROM jt") | ||
| } | ||
| } | ||
|
|
||
| test("should allow CREATE permanent VIEW when a TEMPORARY VIEW with same name exists") { | ||
| withView("testView", "default.testView") { | ||
| sql("CREATE TEMPORARY VIEW testView AS SELECT id FROM jt") | ||
| sql("CREATE VIEW testView AS SELECT id FROM jt") | ||
| } | ||
| } | ||
|
|
||
| test("correctly handle CREATE VIEW IF NOT EXISTS") { | ||
| withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") { | ||
| withTable("jt2") { | ||
| sql("CREATE VIEW testView AS SELECT id FROM jt") | ||
| withView("testView") { | ||
| sql("CREATE VIEW testView AS SELECT id FROM jt") | ||
|
|
||
| val df = (1 until 10).map(i => i -> i).toDF("i", "j") | ||
| df.write.format("json").saveAsTable("jt2") | ||
| sql("CREATE VIEW IF NOT EXISTS testView AS SELECT * FROM jt2") | ||
| val df = (1 until 10).map(i => i -> i).toDF("i", "j") | ||
| df.write.format("json").saveAsTable("jt2") | ||
| sql("CREATE VIEW IF NOT EXISTS testView AS SELECT * FROM jt2") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not related to this PR, but it's a waste to write a new persisted table here to do the test. We can simply create a view with different column number here: sql("CREATE VIEW IF NOT EXISTS testView AS SELECT id AS a, id AS b FROM jt") |
||
|
|
||
| // make sure our view doesn't change. | ||
| // make sure our view doesn't change. | ||
| checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i))) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test(s"correctly handle CREATE OR REPLACE TEMPORARY VIEW") { | ||
| withTable("jt2") { | ||
| withView("testView") { | ||
| sql("CREATE OR REPLACE TEMPORARY VIEW testView AS SELECT id FROM jt") | ||
| checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i))) | ||
| sql("DROP VIEW testView") | ||
|
|
||
| sql("CREATE OR REPLACE TEMPORARY VIEW testView AS SELECT id AS i, id AS j FROM jt") | ||
| // make sure the view has been changed. | ||
| checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i))) | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -214,5 +303,4 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { | |
| } | ||
| } | ||
| } | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where does this semantic rule come from? Hive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to be consistent with DataSet API. When registering a temp table, it will remove the database prefix, here is the code:
spark/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Line 533 in 6ba17cd