diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 2954d4ce7d2d..fea0272951ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution +import org.apache.spark.sql.sources.{CreateTempTableUsing, CreateTableUsing} import org.apache.spark.sql.{SQLContext, Strategy, execution} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ @@ -307,6 +308,14 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case object CommandStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case r: RunnableCommand => ExecutedCommand(r) :: Nil + + case CreateTableUsing(tableName, provider, true, options) => + ExecutedCommand( + CreateTempTableUsing(tableName, provider, options)) :: Nil + + case CreateTableUsing(tableName, provider, false, options) => + sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") + case logical.SetCommand(kv) => Seq(ExecutedCommand(execution.SetCommand(kv, plan.output))) case logical.ExplainCommand(logicalPlan, extended) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 8a66ac31f2df..40bef5c28533 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -72,9 +72,9 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro") */ protected lazy val createTable: Parser[LogicalPlan] = - CREATE ~ TEMPORARY ~ TABLE ~> ident ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ { - case tableName ~ provider ~ opts => - CreateTableUsing(tableName, provider, opts) + (CREATE ~> TEMPORARY.? <~ TABLE) ~ ident ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ { + case temp ~ tableName ~ provider ~ opts => + CreateTableUsing(tableName, provider, temp.isDefined, opts) } protected lazy val options: Parser[Map[String, String]] = @@ -85,12 +85,11 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi protected lazy val pair: Parser[(String, String)] = ident ~ stringLit ^^ { case k ~ v => (k,v) } } -private[sql] case class CreateTableUsing( - tableName: String, - provider: String, - options: Map[String, String]) extends RunnableCommand { - - def run(sqlContext: SQLContext) = { +object ResolvedDataSource { + def apply( + sqlContext: SQLContext, + provider: String, + options: Map[String, String]): ResolvedDataSource = { val loader = Utils.getContextOrSparkClassLoader val clazz: Class[_] = try loader.loadClass(provider) catch { case cnf: java.lang.ClassNotFoundException => @@ -102,7 +101,27 @@ private[sql] case class CreateTableUsing( val dataSource = clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.RelationProvider] val relation = dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options)) - sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName) + new ResolvedDataSource(clazz, relation) + } +} + +private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation) + +private[sql] case class CreateTableUsing( + tableName: String, + provider: String, + temporary: Boolean, + options: Map[String, String]) extends Command + +private [sql] case class CreateTempTableUsing( + tableName: String, + provider: String, + options: Map[String, String]) extends RunnableCommand { + + def run(sqlContext: SQLContext) = { + val resolved = ResolvedDataSource(sqlContext, provider, options) + + sqlContext.baseRelationToSchemaRDD(resolved.relation).registerTempTable(tableName) Seq.empty } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 56fe27a77b83..ab7c0a3b1256 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -116,6 +116,16 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting) } + def refreshTable(tableName: String): Unit = { + // TODO: Database support... + catalog.refreshTable("default", tableName) + } + + protected[hive] def invalidateTable(tableName: String): Unit = { + // TODO: Database support... + catalog.invalidateTable("default", tableName) + } + /** * Analyzes the given table in the current database to generate statistics, which will be * used in query optimizations. @@ -340,8 +350,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { override def strategies: Seq[Strategy] = extraStrategies ++ Seq( DataSourceStrategy, - CommandStrategy, HiveCommandStrategy(self), + CommandStrategy, TakeOrdered, ParquetOperations, InMemoryScans, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index b31a3ec25096..697170e00a8e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -20,15 +20,16 @@ package org.apache.spark.sql.hive import java.io.IOException import java.util.{List => JList} +import com.google.common.cache.{CacheLoader, CacheBuilder} import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.sources.{LogicalRelation, ResolvedDataSource, BaseRelation} import scala.util.parsing.combinator.RegexParsers import org.apache.hadoop.util.ReflectionUtils import org.apache.hadoop.hive.metastore.TableType -import org.apache.hadoop.hive.metastore.api.FieldSchema -import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition} +import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition, SerDeInfo, FieldSchema} import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, HiveException} import org.apache.hadoop.hive.ql.plan.CreateTableDesc import org.apache.hadoop.hive.serde.serdeConstants @@ -55,8 +56,61 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with /** Connection to hive metastore. Usages should lock on `this`. */ protected[hive] val client = Hive.get(hive.hiveconf) + // TODO: Use this everywhere instead of tuples or databaseName, tableName,. + /** A fully qualified identifier for a table (i.e., database.tableName) */ + case class TableIdent(database: String, name: String) { + def toLowerCase = TableIdent(database.toLowerCase, name.toLowerCase) + } + + /** A cache of Spark SQL data source tables that have been accessed. */ + protected[hive] val cachedDataSourceTables = CacheBuilder.newBuilder() + .maximumSize(1000) + .build( + new CacheLoader[TableIdent, LogicalPlan]() { + override def load(in: TableIdent): LogicalPlan = { + logDebug(s"Creating new cached data source for $in") + val table = client.getTable(in.database, in.name) + + // It does not appear that the ql client for the metastore has a way to enumerate all the + // SerDe properties directly... + val options = table.getTTable.getSd.getSerdeInfo.getParameters.toMap + + val resolvedRelation = + ResolvedDataSource( + hive, + table.getProperty("spark.sql.sources.provider"), + options) + + LogicalRelation(resolvedRelation.relation) + } + }) + + def refreshTable(databaseName: String, tableName: String): Unit = { + cachedDataSourceTables.refresh(TableIdent(databaseName, tableName).toLowerCase) + } + + def invalidateTable(databaseName: String, tableName: String): Unit = { + cachedDataSourceTables.invalidate(TableIdent(databaseName, tableName).toLowerCase) + } + val caseSensitive: Boolean = false + def createDataSourceTable(tableName: String, provider: String, options: Map[String, String]) = { + val (dbName, tblName) = processDatabaseAndTableName("default", tableName) + val tbl = new Table(dbName, tblName) + + tbl.setProperty("spark.sql.sources.provider", provider) + options.foreach { case (key, value) => tbl.setSerdeParam(key, value) } + + tbl.setProperty("EXTERNAL", "TRUE") + tbl.setTableType(TableType.EXTERNAL_TABLE) + + // create the table + synchronized { + client.createTable(tbl, false) + } + } + def tableExists(db: Option[String], tableName: String): Boolean = { val (databaseName, tblName) = processDatabaseAndTableName( db.getOrElse(hive.sessionState.getCurrentDatabase), tableName) @@ -70,7 +124,10 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val (databaseName, tblName) = processDatabaseAndTableName(db.getOrElse(hive.sessionState.getCurrentDatabase), tableName) val table = client.getTable(databaseName, tblName) - if (table.isView) { + + if (table.getProperty("spark.sql.sources.provider") != null) { + cachedDataSourceTables(TableIdent(databaseName, tblName).toLowerCase) + } else if (table.isView) { // if the unresolved relation is from hive view // parse the text into logic node. HiveQl.createPlanForView(table, alias) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index d3f6381b69a4..6c906ad66dac 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.parquet.ParquetRelation +import org.apache.spark.sql.sources.CreateTableUsing import org.apache.spark.sql.{SQLContext, SchemaRDD, Strategy} import scala.collection.JavaConversions._ @@ -219,6 +220,10 @@ private[hive] trait HiveStrategies { ExecutedCommand(DescribeCommand(planLater(o), describe.output)) :: Nil } + case CreateTableUsing(tableName, provider, false, options) => + ExecutedCommand( + CreateMetastoreDataSource(tableName, provider, options)) :: Nil + case _ => Nil } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index b2149bd95a33..1f6a5441b643 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -394,6 +394,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { clearCache() loadedTables.clear() + catalog.cachedDataSourceTables.invalidateAll() catalog.client.getAllTables("default").foreach { t => logDebug(s"Deleting table $t") val table = catalog.client.getTable("default", t) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 6fc4153f6a5d..f38b44688256 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -52,6 +52,10 @@ case class DropTable( override def run(sqlContext: SQLContext) = { val hiveContext = sqlContext.asInstanceOf[HiveContext] val ifExistsClause = if (ifExists) "IF EXISTS " else "" + try hiveContext.tryUncacheQuery(hiveContext.table(tableName)) catch { + case _: org.apache.hadoop.hive.ql.metadata.InvalidTableException => + } + hiveContext.invalidateTable(tableName) hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName") hiveContext.catalog.unregisterTable(None, tableName) Seq.empty[Row] @@ -85,3 +89,16 @@ case class AddFile(path: String) extends RunnableCommand { Seq.empty[Row] } } + +case class CreateMetastoreDataSource( + tableName: String, + provider: String, + options: Map[String, String]) extends RunnableCommand { + + override def run(sqlContext: SQLContext) = { + val hiveContext = sqlContext.asInstanceOf[HiveContext] + hiveContext.catalog.createDataSourceTable(tableName, provider, options) + + Seq.empty[Row] + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala new file mode 100644 index 000000000000..ee9933593d73 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.io.File + +import org.apache.commons.io.FileUtils +import org.apache.spark.sql._ +import org.apache.spark.sql.hive.test.TestHive +import org.scalatest.BeforeAndAfterEach + +/* Implicits */ +import org.apache.spark.sql.hive.test.TestHive._ + +/** + * Tests for persisting tables created though the data sources API into the metastore. + */ +class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { + override def afterEach(): Unit = { + reset() + } + + test ("persistent JSON table") { + sql( + """ + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path 'src/test/resources/data/files/sample.json' + |) + """.stripMargin) + + checkAnswer( + sql("SELECT * FROM jsonTable"), + jsonFile("src/test/resources/data/files/sample.json").collect().toSeq) + + } + + test("resolve shortened provider names") { + sql( + """ + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json + |OPTIONS ( + | path 'src/test/resources/data/files/sample.json' + |) + """.stripMargin) + + checkAnswer( + sql("SELECT * FROM jsonTable"), + jsonFile("src/test/resources/data/files/sample.json").collect().toSeq) + } + + test("drop table") { + sql( + """ + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json + |OPTIONS ( + | path 'src/test/resources/data/files/sample.json' + |) + """.stripMargin) + + checkAnswer( + sql("SELECT * FROM jsonTable"), + jsonFile("src/test/resources/data/files/sample.json").collect().toSeq) + + sql("DROP TABLE jsonTable") + + intercept[Exception] { + sql("SELECT * FROM jsonTable").collect() + } + } + + test("check change without refresh") { + val tempDir = File.createTempFile("sparksql", "json") + tempDir.delete() + sparkContext.parallelize(("a", "b") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath) + + sql( + s""" + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json + |OPTIONS ( + | path '${tempDir.getCanonicalPath}' + |) + """.stripMargin) + + checkAnswer( + sql("SELECT * FROM jsonTable"), + ("a", "b") :: Nil) + + FileUtils.deleteDirectory(tempDir) + sparkContext.parallelize(("a", "b", "c") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath) + + // Schema is cached so answer does not change. + checkAnswer( + sql("SELECT * FROM jsonTable"), + ("a", "b") :: Nil) + + refreshTable("jsonTable") + + // Check that the refresh worked + checkAnswer( + sql("SELECT * FROM jsonTable"), + ("a", "b", "c") :: Nil) + FileUtils.deleteDirectory(tempDir) + } + + test("drop, change, recreate") { + val tempDir = File.createTempFile("sparksql", "json") + tempDir.delete() + sparkContext.parallelize(("a", "b") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath) + + sql( + s""" + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json + |OPTIONS ( + | path '${tempDir.getCanonicalPath}' + |) + """.stripMargin) + + checkAnswer( + sql("SELECT * FROM jsonTable"), + ("a", "b") :: Nil) + + FileUtils.deleteDirectory(tempDir) + sparkContext.parallelize(("a", "b", "c") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath) + + sql("DROP TABLE jsonTable") + + sql( + s""" + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json + |OPTIONS ( + | path '${tempDir.getCanonicalPath}' + |) + """.stripMargin) + + // New table should reflect new schema. + checkAnswer( + sql("SELECT * FROM jsonTable"), + ("a", "b", "c") :: Nil) + FileUtils.deleteDirectory(tempDir) + } +} \ No newline at end of file