From 400525e0733e8faf2eb600c95cf38151368ba0c6 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 29 Oct 2019 20:43:44 -0700 Subject: [PATCH 1/3] initial commit --- .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 4 ++-- .../sql/catalyst/analysis/ResolveCatalogs.scala | 6 +++++- .../spark/sql/catalyst/parser/AstBuilder.scala | 1 + .../sql/catalyst/plans/logical/statements.scala | 1 + .../sql/catalyst/parser/DDLParserSuite.scala | 14 ++++++++++---- .../analysis/ResolveSessionCatalog.scala | 8 ++++++-- .../spark/sql/execution/SparkSqlParser.scala | 16 ---------------- .../spark/sql/connector/AlterTableTests.scala | 13 +++++++++++++ .../sql/execution/command/DDLParserSuite.scala | 12 ------------ 9 files changed, 38 insertions(+), 37 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 7ad008ae5d90..33bf5b6fea10 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -168,8 +168,8 @@ statement DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* PURGE? #dropTablePartitions | ALTER VIEW tableIdentifier DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* #dropTablePartitions - | ALTER TABLE multipartIdentifier SET locationSpec #setTableLocation - | ALTER TABLE tableIdentifier partitionSpec SET locationSpec #setPartitionLocation + | ALTER TABLE multipartIdentifier + (partitionSpec)? SET locationSpec #setTableLocation | ALTER TABLE multipartIdentifier RECOVER PARTITIONS #recoverPartitions | DROP TABLE (IF EXISTS)? multipartIdentifier PURGE? #dropTable | DROP VIEW (IF EXISTS)? multipartIdentifier #dropView diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index 7bf0e2515880..ddd60fa5ec11 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -73,7 +73,11 @@ class ResolveCatalogs(val catalogManager: CatalogManager) createAlterTable(nameParts, catalog, tableName, changes) case AlterTableSetLocationStatement( - nameParts @ NonSessionCatalog(catalog, tableName), newLoc) => + nameParts @ NonSessionCatalog(catalog, tableName), partitionSpec, newLoc) => + if (partitionSpec.nonEmpty) { + throw new AnalysisException( + "ALTER TABLE SET LOCATION does not support partition for v2 tables.") + } val changes = Seq(TableChange.setProperty("location", newLoc)) createAlterTable(nameParts, catalog, tableName, changes) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 893003d21828..aa303928085d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2717,6 +2717,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging override def visitSetTableLocation(ctx: SetTableLocationContext): LogicalPlan = withOrigin(ctx) { AlterTableSetLocationStatement( visitMultipartIdentifier(ctx.multipartIdentifier), + Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec), visitLocationSpec(ctx.locationSpec)) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index ac09dec13fff..e864be388c9b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -179,6 +179,7 @@ case class AlterTableUnsetPropertiesStatement( */ case class AlterTableSetLocationStatement( tableName: Seq[String], + partitionSpec: Option[TablePartitionSpec], location: String) extends ParsedStatement /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index a1d74c5099c4..618a7c1f46ce 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -540,10 +540,16 @@ class DDLParserSuite extends AnalysisTest { } test("alter table: set location") { - val sql1 = "ALTER TABLE table_name SET LOCATION 'new location'" - val parsed1 = parsePlan(sql1) - val expected1 = AlterTableSetLocationStatement(Seq("table_name"), "new location") - comparePlans(parsed1, expected1) + comparePlans( + parsePlan("ALTER TABLE a.b.c SET LOCATION 'new location'"), + AlterTableSetLocationStatement(Seq("a", "b", "c"), None, "new location")) + + comparePlans( + parsePlan("ALTER TABLE a.b.c PARTITION(ds='2017-06-10') SET LOCATION 'new location'"), + AlterTableSetLocationStatement( + Seq("a", "b", "c"), + Some(Map("ds" -> "2017-06-10")), + "new location")) } test("alter table: rename column") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index f60d7cdeae6e..39b0c2a3a5b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -118,11 +118,15 @@ class ResolveSessionCatalog( } case AlterTableSetLocationStatement( - nameParts @ SessionCatalog(catalog, tableName), newLoc) => + nameParts @ SessionCatalog(catalog, tableName), partitionSpec, newLoc) => loadTable(catalog, tableName.asIdentifier).collect { case v1Table: V1Table => - AlterTableSetLocationCommand(tableName.asTableIdentifier, None, newLoc) + AlterTableSetLocationCommand(tableName.asTableIdentifier, partitionSpec, newLoc) }.getOrElse { + if (partitionSpec.nonEmpty) { + throw new AnalysisException( + "ALTER TABLE SET LOCATION does not support partition for v2 tables.") + } val changes = Seq(TableChange.setProperty("location", newLoc)) createAlterTable(nameParts, catalog, tableName, changes) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 99375a15f523..8453d324addf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -515,22 +515,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { retainData = false) } - /** - * Create an [[AlterTableSetLocationCommand]] command for a partition. - * - * For example: - * {{{ - * ALTER TABLE table PARTITION spec SET LOCATION "loc"; - * }}} - */ - override def visitSetPartitionLocation( - ctx: SetPartitionLocationContext): LogicalPlan = withOrigin(ctx) { - AlterTableSetLocationCommand( - visitTableIdentifier(ctx.tableIdentifier), - Some(visitNonOptionalPartitionSpec(ctx.partitionSpec)), - visitLocationSpec(ctx.locationSpec)) - } - /** * Create a [[AlterTableChangeColumnCommand]] command. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala index eed07aeff090..7392850f276c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala @@ -816,6 +816,19 @@ trait AlterTableTests extends SharedSparkSession { } } + test("AlterTable: set partition location") { + val t = s"${catalogAndNamespace}table_name" + withTable(t) { + sql(s"CREATE TABLE $t (id int) USING $v2Format") + + val exc = intercept[AnalysisException] { + sql(s"ALTER TABLE $t PARTITION(ds='2017-06-10') SET LOCATION 's3://bucket/path'") + } + assert(exc.getMessage.contains( + "ALTER TABLE SET LOCATION does not support partition for v2 tables")) + } + } + test("AlterTable: set table property") { val t = s"${catalogAndNamespace}table_name" withTable(t) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index beab219b17d8..7532d31eff9f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -635,18 +635,6 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession { "SET FILEFORMAT PARQUET") } - test("alter table: set partition location") { - val sql2 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " + - "SET LOCATION 'new location'" - val parsed2 = parser.parsePlan(sql2) - val tableIdent = TableIdentifier("table_name", None) - val expected2 = AlterTableSetLocationCommand( - tableIdent, - Some(Map("dt" -> "2008-08-08", "country" -> "us")), - "new location") - comparePlans(parsed2, expected2) - } - test("alter table: change column name/type/comment") { val sql1 = "ALTER TABLE table_name CHANGE COLUMN col_old_name col_new_name INT" val sql2 = "ALTER TABLE table_name CHANGE COLUMN col_name col_name INT COMMENT 'new_comment'" From bc76fdf75b6da6f06d52201338c5817e1b8a90dd Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Wed, 30 Oct 2019 08:55:34 -0700 Subject: [PATCH 2/3] Fix test --- .../apache/spark/sql/execution/SQLViewSuite.scala | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala index 8ff293146127..81d261416642 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -136,14 +136,21 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { assertNoSuchTable(s"ALTER TABLE $viewName SET SERDE 'whatever'") assertNoSuchTable(s"ALTER TABLE $viewName PARTITION (a=1, b=2) SET SERDE 'whatever'") assertNoSuchTable(s"ALTER TABLE $viewName SET SERDEPROPERTIES ('p' = 'an')") - assertNoSuchTable(s"ALTER TABLE $viewName PARTITION (a='4') SET LOCATION '/path/to/home'") assertNoSuchTable(s"ALTER TABLE $viewName ADD IF NOT EXISTS PARTITION (a='4', b='8')") assertNoSuchTable(s"ALTER TABLE $viewName DROP PARTITION (a='4', b='8')") assertNoSuchTable(s"ALTER TABLE $viewName PARTITION (a='4') RENAME TO PARTITION (a='5')") assertNoSuchTable(s"ALTER TABLE $viewName RECOVER PARTITIONS") // For v2 ALTER TABLE statements, we have better error message saying view is not supported. - assertViewNotSupported(s"ALTER TABLE $viewName SET LOCATION '/path/to/your/lovely/heart'") + assertAnalysisExceptionThrown( + s"ALTER TABLE $viewName SET LOCATION '/path/to/your/lovely/heart'", + s"'$viewName' is a view not a table") + + // For the following v2 ALERT TABLE statements, unsupported operations are checked first + // before resolving the relations. + assertAnalysisExceptionThrown( + s"ALTER TABLE $viewName PARTITION (a='4') SET LOCATION '/path/to/home'", + "ALTER TABLE SET LOCATION does not support partition for v2 tables") } } @@ -177,9 +184,9 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { } } - private def assertViewNotSupported(query: String): Unit = { + private def assertAnalysisExceptionThrown(query: String, message: String): Unit = { val e = intercept[AnalysisException](sql(query)) - assert(e.message.contains("'testView' is a view not a table")) + assert(e.message.contains(message)) } test("error handling: insert/load/truncate table commands against a view") { From 1f5907288ee76d0b67cc0ed4eb426b5607c898dc Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Wed, 30 Oct 2019 10:09:48 -0700 Subject: [PATCH 3/3] Address PR comments --- .../scala/org/apache/spark/sql/execution/SQLViewSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala index 81d261416642..918e1960dbd5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -142,13 +142,13 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { assertNoSuchTable(s"ALTER TABLE $viewName RECOVER PARTITIONS") // For v2 ALTER TABLE statements, we have better error message saying view is not supported. - assertAnalysisExceptionThrown( + assertAnalysisError( s"ALTER TABLE $viewName SET LOCATION '/path/to/your/lovely/heart'", s"'$viewName' is a view not a table") // For the following v2 ALERT TABLE statements, unsupported operations are checked first // before resolving the relations. - assertAnalysisExceptionThrown( + assertAnalysisError( s"ALTER TABLE $viewName PARTITION (a='4') SET LOCATION '/path/to/home'", "ALTER TABLE SET LOCATION does not support partition for v2 tables") } @@ -184,7 +184,7 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { } } - private def assertAnalysisExceptionThrown(query: String, message: String): Unit = { + private def assertAnalysisError(query: String, message: String): Unit = { val e = intercept[AnalysisException](sql(query)) assert(e.message.contains(message)) }