From b38a21ef6146784e4b93ef4ce8c899f1eee14572 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 16 Nov 2015 18:30:26 -0800 Subject: [PATCH 01/14] SPARK-11633 --- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 ++- .../spark/sql/hive/execution/SQLQuerySuite.scala | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2f4670b55bdb..5a5b71e52dd7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -425,7 +425,8 @@ class Analyzer( */ j case Some((oldRelation, newRelation)) => - val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) + val attributeRewrites = + AttributeMap(oldRelation.output.zip(newRelation.output).filter(x => x._1 != x._2)) val newRight = right transformUp { case r if r == oldRelation => newRelation } transformUp { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 3427152b2da0..5e00546a74c0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -51,6 +51,8 @@ case class Order( state: String, month: Int) +case class Individual(F1: Integer, F2: Integer) + case class WindowData( month: Int, area: String, @@ -1479,4 +1481,18 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } + + test ("SPARK-11633: HiveContext throws TreeNode Exception : Failed to Copy Node") { + val rdd1 = sparkContext.parallelize(Seq( Individual(1,3), Individual(2,1))) + val df = hiveContext.createDataFrame(rdd1) + df.registerTempTable("foo") + val df2 = sql("select f1, F2 as F2 from foo") + df2.registerTempTable("foo2") + df2.registerTempTable("foo3") + + checkAnswer(sql( + """ + SELECT a.F1 FROM foo2 a INNER JOIN foo3 b ON a.F2=b.F2 + """.stripMargin), Row(2) :: Row(1) :: Nil) + } } From 0546772f151f83d6d3cf4d000cbe341f52545007 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 20 Nov 2015 10:56:45 -0800 Subject: [PATCH 02/14] converge --- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 +-- .../spark/sql/hive/execution/SQLQuerySuite.scala | 15 --------------- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7c9512fbd00a..47962ebe6ef8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -417,8 +417,7 @@ class Analyzer( */ j case Some((oldRelation, newRelation)) => - val attributeRewrites = - AttributeMap(oldRelation.output.zip(newRelation.output).filter(x => x._1 != x._2)) + val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) val newRight = right transformUp { case r if r == oldRelation => newRelation } transformUp { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 5e00546a74c0..61d9dcd37572 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -51,8 +51,6 @@ case class Order( state: String, month: Int) -case class Individual(F1: Integer, F2: Integer) - case class WindowData( month: Int, area: String, @@ -1481,18 +1479,5 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } - - test ("SPARK-11633: HiveContext throws TreeNode Exception : Failed to Copy Node") { - val rdd1 = sparkContext.parallelize(Seq( Individual(1,3), Individual(2,1))) - val df = hiveContext.createDataFrame(rdd1) - df.registerTempTable("foo") - val df2 = sql("select f1, F2 as F2 from foo") - df2.registerTempTable("foo2") - df2.registerTempTable("foo3") - - checkAnswer(sql( - """ - SELECT a.F1 FROM foo2 a INNER JOIN foo3 b ON a.F2=b.F2 - """.stripMargin), Row(2) :: Row(1) :: Nil) } } From b37a64f13956b6ddd0e38ddfd9fe1caee611f1a8 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 20 Nov 2015 10:58:37 -0800 Subject: [PATCH 03/14] converge --- .../org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 61d9dcd37572..3427152b2da0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1479,5 +1479,4 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } - } } From 602cace43c7b2c2ae3c59b6b742a58cf986d03f8 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 25 Mar 2016 10:14:31 -0700 Subject: [PATCH 04/14] native DDL support for create database --- .../sql/catalyst/catalog/SessionCatalog.scala | 2 ++ .../apache/spark/sql/execution/SparkQl.scala | 2 +- .../sql/execution/command/commands.scala | 24 +++++++++++++++++++ .../spark/sql/execution/command/ddl.scala | 8 ------- .../execution/command/DDLCommandSuite.scala | 2 +- .../spark/sql/hive/HiveSessionCatalog.scala | 5 ++++ 6 files changed, 33 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 34265faa7439..e10423109c6b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -104,6 +104,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { currentDb = db } + def getDefaultPath: String = "" + // ---------------------------------------------------------------------------- // Tables // ---------------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala index ef30ba0cdbf5..dd4156c1f672 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala @@ -126,7 +126,7 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly extractProps(propList, "TOK_TABLEPROPERTY") case _ => parseFailed("Invalid CREATE DATABASE command", node) }.toMap - CreateDatabase(databaseName, ifNotExists.isDefined, location, comment, props)(node.source) + CreateDatabase(databaseName, ifNotExists.isDefined, location, comment, props) // CREATE [TEMPORARY] FUNCTION [db_name.]function_name AS class_name // [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ]; diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 964f0a7a7b4e..b28988de2268 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -23,6 +23,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, Row, SQLContext} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.catalog.CatalogDatabase import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical @@ -317,6 +318,29 @@ case class DescribeCommand( } } +case class CreateDatabase( + databaseName: String, + ifNotExists: Boolean, + path: Option[String], + comment: Option[String], + props: Map[String, String]) + extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog + catalog.createDatabase( + CatalogDatabase( + databaseName, + comment.getOrElse(""), + path.getOrElse(catalog.getDefaultPath), + props), + ifNotExists) + Seq.empty[Row] + } + + override val output: Seq[Attribute] = Seq.empty +} + /** * A command for users to get tables in the given database. * If a databaseName is not given, the current database will be used. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 07c89afafb6b..0d1602e32d0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -44,14 +44,6 @@ abstract class NativeDDLCommand(val sql: String) extends RunnableCommand { } -case class CreateDatabase( - databaseName: String, - ifNotExists: Boolean, - path: Option[String], - comment: Option[String], - props: Map[String, String])(sql: String) - extends NativeDDLCommand(sql) with Logging - case class CreateFunction( functionName: String, alias: String, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 6f1eea273faf..36bd0a1c264b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -40,7 +40,7 @@ class DDLCommandSuite extends PlanTest { ifNotExists = true, Some("/home/user/db"), Some("database_comment"), - Map("a" -> "a", "b" -> "b", "c" -> "c"))(sql) + Map("a" -> "a", "b" -> "b", "c" -> "c")) comparePlans(parsed, expected) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index aa44cba4b564..318db1e615ad 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive +import org.apache.hadoop.hive.conf.HiveConf + import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} @@ -57,6 +59,9 @@ class HiveSessionCatalog( // | Methods and fields for interacting with HiveMetastoreCatalog | // ---------------------------------------------------------------- + override def getDefaultPath: String = client.getConf(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, + HiveConf.ConfVars.METASTOREWAREHOUSE.defaultStrVal) + // Catalog for handling data source tables. TODO: This really doesn't belong here since it is // essentially a cache for metastore tables. However, it relies on a lot of session-specific // things so it would be a lot of work to split its functionality between HiveSessionCatalog From 2794bfa2f3675c5703f98540f3663a4cfe3c9126 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 27 Mar 2016 11:28:58 -0700 Subject: [PATCH 05/14] set the directory for database creation --- .../scala/org/apache/spark/sql/execution/command/commands.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index b28988de2268..4da9d8443474 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -332,7 +332,7 @@ case class CreateDatabase( CatalogDatabase( databaseName, comment.getOrElse(""), - path.getOrElse(catalog.getDefaultPath), + path.getOrElse(catalog.getDefaultPath + s"/$databaseName.db"), props), ifNotExists) Seq.empty[Row] From 65ce660dc68576e43c6c96ba8481643787bafde3 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 27 Mar 2016 19:02:34 -0700 Subject: [PATCH 06/14] add a default db extension. --- .../apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 2 ++ .../org/apache/spark/sql/execution/command/commands.scala | 5 ++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index e10423109c6b..7bbfe0a2ba8f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -106,6 +106,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { def getDefaultPath: String = "" + def getDefaultDBExtension: String = ".db" + // ---------------------------------------------------------------------------- // Tables // ---------------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 4da9d8443474..30cb2836777c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.command +import java.io.File import java.util.NoSuchElementException import org.apache.spark.internal.Logging @@ -328,11 +329,13 @@ case class CreateDatabase( override def run(sqlContext: SQLContext): Seq[Row] = { val catalog = sqlContext.sessionState.catalog + val defaultDataBasePath = + catalog.getDefaultPath + File.pathSeparator + databaseName + catalog.getDefaultDBExtension catalog.createDatabase( CatalogDatabase( databaseName, comment.getOrElse(""), - path.getOrElse(catalog.getDefaultPath + s"/$databaseName.db"), + path.getOrElse(defaultDataBasePath), props), ifNotExists) Seq.empty[Row] From aba3a959a5f308958a9924873057ca25222d322e Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 28 Mar 2016 09:52:12 -0700 Subject: [PATCH 07/14] native support for drop/alter/describe database --- .../sql/catalyst/catalog/SessionCatalog.scala | 3 + .../apache/spark/sql/execution/SparkQl.scala | 15 ++-- .../sql/execution/command/commands.scala | 84 ++++++++++++++++++- .../spark/sql/execution/command/ddl.scala | 36 +------- .../execution/command/DDLCommandSuite.scala | 46 ++++------ .../sql/execution/command/DDLSuite.scala | 42 ++++++++++ .../spark/sql/hive/HiveSessionCatalog.scala | 6 ++ 7 files changed, 158 insertions(+), 74 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 7bbfe0a2ba8f..d27d89a048f4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -104,10 +104,13 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { currentDb = db } + // todo: what is the default path in SessionCatalog? def getDefaultPath: String = "" def getDefaultDBExtension: String = ".db" + def getDefaultDBPath(db: String): String = db + getDefaultDBExtension + // ---------------------------------------------------------------------------- // Tables // ---------------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala index 71e361f596f9..6fe04757ba2d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala @@ -97,7 +97,8 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly // CREATE DATABASE [IF NOT EXISTS] database_name [COMMENT database_comment] // [LOCATION path] [WITH DBPROPERTIES (key1=val1, key2=val2, ...)]; - case Token("TOK_CREATEDATABASE", Token(databaseName, Nil) :: args) => + case Token("TOK_CREATEDATABASE", Token(dbName, Nil) :: args) => + val databaseName = cleanIdentifier(dbName) val Seq(ifNotExists, dbLocation, databaseComment, dbprops) = getClauses(Seq( "TOK_IFNOTEXISTS", "TOK_DATABASELOCATION", @@ -136,15 +137,15 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly // :- database_name // :- TOK_IFEXISTS // +- TOK_RESTRICT/TOK_CASCADE - val databaseName = unquoteString(dbName) + val databaseName = cleanIdentifier(dbName) // The default is RESTRICT val Seq(ifExists, _, cascade) = getClauses(Seq( "TOK_IFEXISTS", "TOK_RESTRICT", "TOK_CASCADE"), otherArgs) - DropDatabase(databaseName, ifExists.isDefined, restrict = cascade.isEmpty)(node.source) + DropDatabase(databaseName, ifExists.isDefined, cascade.isDefined) // ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...) case Token("TOK_ALTERDATABASE_PROPERTIES", Token(dbName, Nil) :: args) => - val databaseName = unquoteString(dbName) + val databaseName = cleanIdentifier(dbName) val dbprops = getClause("TOK_DATABASEPROPERTIES", args) val props = dbprops match { case Token("TOK_DATABASEPROPERTIES", Token("TOK_DBPROPLIST", propList) :: Nil) => @@ -161,13 +162,13 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly extractProps(propList, "TOK_TABLEPROPERTY") case _ => parseFailed("Invalid ALTER DATABASE command", node) } - AlterDatabaseProperties(databaseName, props.toMap)(node.source) + AlterDatabaseProperties(databaseName, props.toMap) // DESCRIBE DATABASE [EXTENDED] db_name case Token("TOK_DESCDATABASE", Token(dbName, Nil) :: describeArgs) => - val databaseName = unquoteString(dbName) + val databaseName = cleanIdentifier(dbName) val extended = getClauseOption("EXTENDED", describeArgs) - DescribeDatabase(databaseName, extended.isDefined)(node.source) + DescribeDatabase(databaseName, extended.isDefined) // CREATE [TEMPORARY] FUNCTION [db_name.]function_name AS class_name // [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ]; diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 30cb2836777c..a68a06fdf946 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -329,13 +329,11 @@ case class CreateDatabase( override def run(sqlContext: SQLContext): Seq[Row] = { val catalog = sqlContext.sessionState.catalog - val defaultDataBasePath = - catalog.getDefaultPath + File.pathSeparator + databaseName + catalog.getDefaultDBExtension catalog.createDatabase( CatalogDatabase( databaseName, comment.getOrElse(""), - path.getOrElse(defaultDataBasePath), + path.getOrElse(catalog.getDefaultDBPath(databaseName)), props), ifNotExists) Seq.empty[Row] @@ -344,6 +342,86 @@ case class CreateDatabase( override val output: Seq[Attribute] = Seq.empty } +/** + * DROP DATABASE: Removes a database from the system. + * + * 'ignoreIfNotExists': + * - true, if database_name does't exist, no action + * - false (default), if database_name does't exist, a warning message will be issued + * 'cascade': + * - true, the dependent objects are automatically dropped before dropping database. + * - false (default), it is in the Restrict mode. The database cannot be dropped if + * it is not empty. The inclusive tables must be dropped at first. + */ +case class DropDatabase( + databaseName: String, + ignoreIfNotExists: Boolean, + cascade: Boolean) + extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + sqlContext.sessionState.catalog.dropDatabase(databaseName, ignoreIfNotExists, cascade) + Seq.empty[Row] + } + + override val output: Seq[Attribute] = Seq.empty +} + +/** ALTER DATABASE: add new (key, value) pairs into DBPROPERTIES */ +case class AlterDatabaseProperties( + databaseName: String, + props: Map[String, String]) + extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog + val db: CatalogDatabase = catalog.getDatabase(databaseName) + catalog.alterDatabase(db.copy(properties = db.properties ++ props)) + + Seq.empty[Row] + } + + override val output: Seq[Attribute] = Seq.empty +} + +/** + * DESCRIBE DATABASE: shows the name of the database, its comment (if one has been set), and its + * root location on the filesystem. When extended is true, it also shows the database's properties + */ +case class DescribeDatabase( + databaseName: String, + extended: Boolean) + extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + val dbMetadata: CatalogDatabase = sqlContext.sessionState.catalog.getDatabase(databaseName) + val result = + Row("Database", dbMetadata.name) :: + Row("Comment", dbMetadata.description) :: + Row("Location", dbMetadata.locationUri) :: Nil + + if (extended) { + val properties = + if (dbMetadata.properties.isEmpty) { + "" + } else { + dbMetadata.properties.toSeq.mkString("(", ", ", ")") + } + result :+ Row("Properties", properties) + } else { + result + } + } + + override val output: Seq[Attribute] = { + val schema = StructType( + StructField("database_description_item", StringType, nullable = false) :: + StructField("database_description_value", StringType, nullable = false) :: Nil) + + schema.toAttributes + } +} + /** * A command for users to get tables in the given database. * If a databaseName is not given, the current database will be used. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index c7d24afe8bcd..6b48d3e1ada7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -19,8 +19,7 @@ package org.apache.spark.sql.execution.command import org.apache.spark.internal.Logging import org.apache.spark.sql.{Row, SQLContext} -import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.catalog.CatalogFunction +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.execution.datasources.BucketSpec @@ -45,39 +44,6 @@ abstract class NativeDDLCommand(val sql: String) extends RunnableCommand { } -/** - * Drop Database: Removes a database from the system. - * - * 'ifExists': - * - true, if database_name does't exist, no action - * - false (default), if database_name does't exist, a warning message will be issued - * 'restric': - * - true (default), the database cannot be dropped if it is not empty. The inclusive - * tables must be dropped at first. - * - false, it is in the Cascade mode. The dependent objects are automatically dropped - * before dropping database. - */ -case class DropDatabase( - databaseName: String, - ifExists: Boolean, - restrict: Boolean)(sql: String) - extends NativeDDLCommand(sql) with Logging - -/** ALTER DATABASE: add new (key, value) pairs into DBPROPERTIES */ -case class AlterDatabaseProperties( - databaseName: String, - props: Map[String, String])(sql: String) - extends NativeDDLCommand(sql) with Logging - -/** - * DESCRIBE DATABASE: shows the name of the database, its comment (if one has been set), and its - * root location on the filesystem. When extended is true, it also shows the database's properties - */ -case class DescribeDatabase( - databaseName: String, - extended: Boolean)(sql: String) - extends NativeDDLCommand(sql) with Logging - case class CreateFunction( databaseName: Option[String], functionName: String, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index bfa8a9a983f5..28bb89b257aa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -65,40 +65,28 @@ class DDLCommandSuite extends PlanTest { val expected1 = DropDatabase( "database_name", - ifExists = true, - restrict = true)(sql1) + ignoreIfNotExists = true, + cascade = false) val expected2 = DropDatabase( "database_name", - ifExists = true, - restrict = false)(sql2) + ignoreIfNotExists = true, + cascade = true) val expected3 = DropDatabase( "database_name", - ifExists = true, - restrict = true)(sql3) + ignoreIfNotExists = false, + cascade = false) val expected4 = DropDatabase( "database_name", - ifExists = true, - restrict = false)(sql4) - val expected5 = DropDatabase( - "database_name", - ifExists = true, - restrict = true)(sql5) - val expected6 = DropDatabase( - "database_name", - ifExists = false, - restrict = true)(sql6) - val expected7 = DropDatabase( - "database_name", - ifExists = false, - restrict = false)(sql7) + ignoreIfNotExists = false, + cascade = true) comparePlans(parsed1, expected1) comparePlans(parsed2, expected2) - comparePlans(parsed3, expected3) - comparePlans(parsed4, expected4) - comparePlans(parsed5, expected5) - comparePlans(parsed6, expected6) - comparePlans(parsed7, expected7) + comparePlans(parsed3, expected1) + comparePlans(parsed4, expected2) + comparePlans(parsed5, expected1) + comparePlans(parsed6, expected3) + comparePlans(parsed7, expected4) } test("alter database set dbproperties") { @@ -111,10 +99,10 @@ class DDLCommandSuite extends PlanTest { val expected1 = AlterDatabaseProperties( "database_name", - Map("a" -> "a", "b" -> "b", "c" -> "c"))(sql1) + Map("a" -> "a", "b" -> "b", "c" -> "c")) val expected2 = AlterDatabaseProperties( "database_name", - Map("a" -> "a"))(sql2) + Map("a" -> "a")) comparePlans(parsed1, expected1) comparePlans(parsed2, expected2) @@ -130,10 +118,10 @@ class DDLCommandSuite extends PlanTest { val expected1 = DescribeDatabase( "db_name", - extended = true)(sql1) + extended = true) val expected2 = DescribeDatabase( "db_name", - extended = false)(sql2) + extended = false) comparePlans(parsed1, expected1) comparePlans(parsed2, expected2) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala new file mode 100644 index 000000000000..e4105d6e69ae --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.catalog.CatalogDatabase +import org.apache.spark.sql.test.SharedSQLContext + +class DDLSuite extends QueryTest with SharedSQLContext { + + test("Create/Drop/Alter/Describe Database") { + val catalog = sqlContext.sessionState.catalog + + val databaseName = "db1" + sql(s"CREATE DATABASE $databaseName") + val db1 = catalog.getDatabase(databaseName) + assert(db1 == CatalogDatabase(databaseName, "", "db1.db", Map.empty)) + + sql(s"DESCRIBE DATABASE EXTENDED $databaseName").show(false) + sql(s"ALTER DATABASE $databaseName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')") + sql(s"DESCRIBE DATABASE EXTENDED $databaseName").show(false) + sql(s"ALTER DATABASE $databaseName SET DBPROPERTIES ('d'='d')") + sql(s"DESCRIBE DATABASE EXTENDED $databaseName").show(false) + sql(s"DROP DATABASE $databaseName CASCADE") + assert(!catalog.databaseExists(databaseName)) + } +} \ No newline at end of file diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 318db1e615ad..09426991e6b9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive +import java.io.File + import org.apache.hadoop.hive.conf.HiveConf import org.apache.spark.sql.catalyst.TableIdentifier @@ -62,6 +64,10 @@ class HiveSessionCatalog( override def getDefaultPath: String = client.getConf(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, HiveConf.ConfVars.METASTOREWAREHOUSE.defaultStrVal) + override def getDefaultDBPath(db: String): String = { + getDefaultPath + File.pathSeparator + db + getDefaultDBExtension + } + // Catalog for handling data source tables. TODO: This really doesn't belong here since it is // essentially a cache for metastore tables. However, it relies on a lot of session-specific // things so it would be a lot of work to split its functionality between HiveSessionCatalog From ffe0c7aca597c749c6ae755500a440b5466180da Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 28 Mar 2016 11:09:24 -0700 Subject: [PATCH 08/14] add more test cases. --- .../sql/catalyst/catalog/interface.scala | 2 +- .../sql/execution/command/commands.scala | 4 +- .../sql/execution/command/DDLSuite.scala | 121 ++++++++++++++++-- 3 files changed, 110 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 34803133f6a6..ef03a8b53e20 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -39,7 +39,7 @@ abstract class ExternalCatalog { protected def requireDbExists(db: String): Unit = { if (!databaseExists(db)) { - throw new AnalysisException(s"Database $db does not exist") + throw new AnalysisException(s"Database '$db' does not exist") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index a68a06fdf946..53e7cfbc00ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -396,8 +396,8 @@ case class DescribeDatabase( override def run(sqlContext: SQLContext): Seq[Row] = { val dbMetadata: CatalogDatabase = sqlContext.sessionState.catalog.getDatabase(databaseName) val result = - Row("Database", dbMetadata.name) :: - Row("Comment", dbMetadata.description) :: + Row("Database Name", dbMetadata.name) :: + Row("Description", dbMetadata.description) :: Row("Location", dbMetadata.locationUri) :: Nil if (extended) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index e4105d6e69ae..d4e2d325a671 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -17,26 +17,119 @@ package org.apache.spark.sql.execution.command -import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.catalog.CatalogDatabase +import org.apache.spark.sql.catalyst.parser.ParserUtils._ import org.apache.spark.sql.test.SharedSQLContext class DDLSuite extends QueryTest with SharedSQLContext { - test("Create/Drop/Alter/Describe Database") { + /** + * Drops database `databaseName` after calling `f`. + */ + private def withDatabase(dbNames: String*)(f: => Unit): Unit = { + try f finally { + dbNames.foreach { name => + sqlContext.sql(s"DROP DATABASE IF EXISTS $name CASCADE") + } + } + } + + test("Create/Drop/Alter/Describe Database - basic") { val catalog = sqlContext.sessionState.catalog - val databaseName = "db1" - sql(s"CREATE DATABASE $databaseName") - val db1 = catalog.getDatabase(databaseName) - assert(db1 == CatalogDatabase(databaseName, "", "db1.db", Map.empty)) - - sql(s"DESCRIBE DATABASE EXTENDED $databaseName").show(false) - sql(s"ALTER DATABASE $databaseName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')") - sql(s"DESCRIBE DATABASE EXTENDED $databaseName").show(false) - sql(s"ALTER DATABASE $databaseName SET DBPROPERTIES ('d'='d')") - sql(s"DESCRIBE DATABASE EXTENDED $databaseName").show(false) - sql(s"DROP DATABASE $databaseName CASCADE") - assert(!catalog.databaseExists(databaseName)) + val databaseNames = Seq("db1", "`database`") + + databaseNames.foreach { dbName => + withDatabase(dbName) { + val dbNameWithoutBackTicks = cleanIdentifier(dbName) + sql(s"CREATE DATABASE $dbName") + val db1 = catalog.getDatabase(dbNameWithoutBackTicks) + assert(db1 == CatalogDatabase( + dbNameWithoutBackTicks, "", s"$dbNameWithoutBackTicks.db", Map.empty)) + + checkAnswer( + sql(s"DESCRIBE DATABASE $dbName"), + Row("Database Name", dbNameWithoutBackTicks) :: + Row("Description", "") :: + Row("Location", s"$dbNameWithoutBackTicks.db") :: Nil) + + checkAnswer( + sql(s"DESCRIBE DATABASE EXTENDED $dbName"), + Row("Database Name", dbNameWithoutBackTicks) :: + Row("Description", "") :: + Row("Location", s"$dbNameWithoutBackTicks.db") :: + Row("Properties", "") :: Nil) + + sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')") + + checkAnswer( + sql(s"DESCRIBE DATABASE EXTENDED $dbName"), + Row("Database Name", dbNameWithoutBackTicks) :: + Row("Description", "") :: + Row("Location", s"$dbNameWithoutBackTicks.db") :: + Row("Properties", "((a,a), (b,b), (c,c))") :: Nil) + + sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')") + + checkAnswer( + sql(s"DESCRIBE DATABASE EXTENDED $dbName"), + Row("Database Name", dbNameWithoutBackTicks) :: + Row("Description", "") :: + Row("Location", s"$dbNameWithoutBackTicks.db") :: + Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil) + + sql(s"DROP DATABASE $dbName CASCADE") + assert(!catalog.databaseExists(dbNameWithoutBackTicks)) + } + } } + + test("Create Database - database already exists") { + val catalog = sqlContext.sessionState.catalog + val databaseNames = Seq("db1", "`database`") + + databaseNames.foreach { dbName => + val dbNameWithoutBackTicks = cleanIdentifier(dbName) + withDatabase(dbName) { + sql(s"CREATE DATABASE $dbName") + val db1 = catalog.getDatabase(dbNameWithoutBackTicks) + assert(db1 == CatalogDatabase( + dbNameWithoutBackTicks, "", s"$dbNameWithoutBackTicks.db", Map.empty)) + + val message = intercept[AnalysisException] { + sql(s"CREATE DATABASE $dbName") + }.getMessage + assert(message.contains(s"Database '$dbNameWithoutBackTicks' already exists.")) + } + } + } + + test("Drop/Alter/Describe Database - database does not exists") { + val databaseNames = Seq("db1", "`database`") + + databaseNames.foreach { dbName => + val dbNameWithoutBackTicks = cleanIdentifier(dbName) + assert(!sqlContext.sessionState.catalog.databaseExists(dbNameWithoutBackTicks)) + + var message = intercept[AnalysisException] { + sql(s"DROP DATABASE $dbName") + }.getMessage + assert(message.contains(s"Database '$dbNameWithoutBackTicks' does not exist")) + + message = intercept[AnalysisException] { + sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')") + }.getMessage + assert(message.contains(s"Database '$dbNameWithoutBackTicks' does not exist")) + + message = intercept[AnalysisException] { + sql(s"DESCRIBE DATABASE EXTENDED $dbName") + }.getMessage + assert(message.contains(s"Database '$dbNameWithoutBackTicks' does not exist")) + + sql(s"DROP DATABASE IF EXISTS $dbName") + } + } + + // TODO: ADD a testcase for Drop Database in Restric when we can create tables in SQLContext } \ No newline at end of file From d7c36488746fb9c414aaf2f28aceeefcd078ba95 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 28 Mar 2016 11:15:26 -0700 Subject: [PATCH 09/14] added an extra line. --- .../scala/org/apache/spark/sql/execution/command/DDLSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index d4e2d325a671..f21ef01985db 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -132,4 +132,4 @@ class DDLSuite extends QueryTest with SharedSQLContext { } // TODO: ADD a testcase for Drop Database in Restric when we can create tables in SQLContext -} \ No newline at end of file +} From 9420f08408810ee3bce027d1dcbdefa848ae8d13 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 28 Mar 2016 12:25:16 -0700 Subject: [PATCH 10/14] added more comments. --- .../sql/execution/command/commands.scala | 27 ++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 53e7cfbc00ef..f419501cf1b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -319,6 +319,16 @@ case class DescribeCommand( } } +/** + * CREATE DATABASE: Create a new database. + * + * It will issue an error message when the database with the same name already exists, + * unless 'ifNotExists' is true. + * The syntax of using this command in SQL is: + * {{{ + * CREATE DATABASE|SCHEMA [IF NOT EXISTS] database_name + * }}} + */ case class CreateDatabase( databaseName: String, ifNotExists: Boolean, @@ -352,6 +362,11 @@ case class CreateDatabase( * - true, the dependent objects are automatically dropped before dropping database. * - false (default), it is in the Restrict mode. The database cannot be dropped if * it is not empty. The inclusive tables must be dropped at first. + * + * The syntax of using this command in SQL is: + * {{{ + * ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...) + * }}} */ case class DropDatabase( databaseName: String, @@ -367,7 +382,15 @@ case class DropDatabase( override val output: Seq[Attribute] = Seq.empty } -/** ALTER DATABASE: add new (key, value) pairs into DBPROPERTIES */ +/** + * ALTER DATABASE: add new (key, value) pairs into DBPROPERTIES + * If the database does not exist, an error message will be issued to indicate the database + * does not exist. + * The syntax of using this command in SQL is: + * {{{ + * DROP DATABASE [IF EXISTS] database_name [RESTRICT|CASCADE]; + * }}} + */ case class AlterDatabaseProperties( databaseName: String, props: Map[String, String]) @@ -387,6 +410,8 @@ case class AlterDatabaseProperties( /** * DESCRIBE DATABASE: shows the name of the database, its comment (if one has been set), and its * root location on the filesystem. When extended is true, it also shows the database's properties + * If the database does not exist, an error message will be issued to indicate the database + * does not exist. */ case class DescribeDatabase( databaseName: String, From 4dab82c833e84d0426b550b88945e8821933715f Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 28 Mar 2016 12:30:09 -0700 Subject: [PATCH 11/14] update the comment. --- .../spark/sql/execution/command/commands.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index f419501cf1b3..d6c32922a1c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -320,7 +320,7 @@ case class DescribeCommand( } /** - * CREATE DATABASE: Create a new database. + * A command for users to create a new database. * * It will issue an error message when the database with the same name already exists, * unless 'ifNotExists' is true. @@ -353,7 +353,7 @@ case class CreateDatabase( } /** - * DROP DATABASE: Removes a database from the system. + * A command for users to remove a database from the system. * * 'ignoreIfNotExists': * - true, if database_name does't exist, no action @@ -365,7 +365,7 @@ case class CreateDatabase( * * The syntax of using this command in SQL is: * {{{ - * ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...) + * DROP DATABASE [IF EXISTS] database_name [RESTRICT|CASCADE]; * }}} */ case class DropDatabase( @@ -383,7 +383,7 @@ case class DropDatabase( } /** - * ALTER DATABASE: add new (key, value) pairs into DBPROPERTIES + * A command for users to add new (key, value) pairs into DBPROPERTIES * If the database does not exist, an error message will be issued to indicate the database * does not exist. * The syntax of using this command in SQL is: @@ -408,10 +408,14 @@ case class AlterDatabaseProperties( } /** - * DESCRIBE DATABASE: shows the name of the database, its comment (if one has been set), and its + * A command for users to show the name of the database, its comment (if one has been set), and its * root location on the filesystem. When extended is true, it also shows the database's properties * If the database does not exist, an error message will be issued to indicate the database * does not exist. + * The syntax of using this command in SQL is + * {{{ + * DESCRIBE DATABASE [EXTENDED] db_name + * }}} */ case class DescribeDatabase( databaseName: String, From f4c33e236334057be545d25c3147e4ecbb412a99 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 28 Mar 2016 12:36:26 -0700 Subject: [PATCH 12/14] update the comment. --- .../scala/org/apache/spark/sql/execution/command/commands.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index d6c32922a1c4..411b9b7ec46c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -388,7 +388,7 @@ case class DropDatabase( * does not exist. * The syntax of using this command in SQL is: * {{{ - * DROP DATABASE [IF EXISTS] database_name [RESTRICT|CASCADE]; + * ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...) * }}} */ case class AlterDatabaseProperties( From 536cf36fa496c15a25878f6503caba448813d040 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 29 Mar 2016 07:55:14 -0700 Subject: [PATCH 13/14] address comments. --- .../sql/catalyst/catalog/SessionCatalog.scala | 10 +-- .../sql/execution/command/commands.scala | 88 ------------------ .../spark/sql/execution/command/ddl.scala | 90 +++++++++++++++++++ .../spark/sql/hive/HiveSessionCatalog.scala | 7 +- 4 files changed, 97 insertions(+), 98 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index d27d89a048f4..31e415f53428 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.catalog +import java.io.File import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ @@ -104,12 +105,9 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { currentDb = db } - // todo: what is the default path in SessionCatalog? - def getDefaultPath: String = "" - - def getDefaultDBExtension: String = ".db" - - def getDefaultDBPath(db: String): String = db + getDefaultDBExtension + def getDefaultDBPath(db: String): String = { + System.getProperty("java.io.tmpdir") + File.separator + db + ".db" + } // ---------------------------------------------------------------------------- // Tables diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 411b9b7ec46c..dc00098df77c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -319,94 +319,6 @@ case class DescribeCommand( } } -/** - * A command for users to create a new database. - * - * It will issue an error message when the database with the same name already exists, - * unless 'ifNotExists' is true. - * The syntax of using this command in SQL is: - * {{{ - * CREATE DATABASE|SCHEMA [IF NOT EXISTS] database_name - * }}} - */ -case class CreateDatabase( - databaseName: String, - ifNotExists: Boolean, - path: Option[String], - comment: Option[String], - props: Map[String, String]) - extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - val catalog = sqlContext.sessionState.catalog - catalog.createDatabase( - CatalogDatabase( - databaseName, - comment.getOrElse(""), - path.getOrElse(catalog.getDefaultDBPath(databaseName)), - props), - ifNotExists) - Seq.empty[Row] - } - - override val output: Seq[Attribute] = Seq.empty -} - -/** - * A command for users to remove a database from the system. - * - * 'ignoreIfNotExists': - * - true, if database_name does't exist, no action - * - false (default), if database_name does't exist, a warning message will be issued - * 'cascade': - * - true, the dependent objects are automatically dropped before dropping database. - * - false (default), it is in the Restrict mode. The database cannot be dropped if - * it is not empty. The inclusive tables must be dropped at first. - * - * The syntax of using this command in SQL is: - * {{{ - * DROP DATABASE [IF EXISTS] database_name [RESTRICT|CASCADE]; - * }}} - */ -case class DropDatabase( - databaseName: String, - ignoreIfNotExists: Boolean, - cascade: Boolean) - extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - sqlContext.sessionState.catalog.dropDatabase(databaseName, ignoreIfNotExists, cascade) - Seq.empty[Row] - } - - override val output: Seq[Attribute] = Seq.empty -} - -/** - * A command for users to add new (key, value) pairs into DBPROPERTIES - * If the database does not exist, an error message will be issued to indicate the database - * does not exist. - * The syntax of using this command in SQL is: - * {{{ - * ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...) - * }}} - */ -case class AlterDatabaseProperties( - databaseName: String, - props: Map[String, String]) - extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - val catalog = sqlContext.sessionState.catalog - val db: CatalogDatabase = catalog.getDatabase(databaseName) - catalog.alterDatabase(db.copy(properties = db.properties ++ props)) - - Seq.empty[Row] - } - - override val output: Seq[Attribute] = Seq.empty -} - /** * A command for users to show the name of the database, its comment (if one has been set), and its * root location on the filesystem. When extended is true, it also shows the database's properties diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 6b48d3e1ada7..2c71aa99c974 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.command import org.apache.spark.internal.Logging import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogDatabase import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.execution.datasources.BucketSpec @@ -44,6 +45,95 @@ abstract class NativeDDLCommand(val sql: String) extends RunnableCommand { } +/** + * A command for users to create a new database. + * + * It will issue an error message when the database with the same name already exists, + * unless 'ifNotExists' is true. + * The syntax of using this command in SQL is: + * {{{ + * CREATE DATABASE|SCHEMA [IF NOT EXISTS] database_name + * }}} + */ +case class CreateDatabase( + databaseName: String, + ifNotExists: Boolean, + path: Option[String], + comment: Option[String], + props: Map[String, String]) + extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog + catalog.createDatabase( + CatalogDatabase( + databaseName, + comment.getOrElse(""), + path.getOrElse(catalog.getDefaultDBPath(databaseName)), + props), + ifNotExists) + Seq.empty[Row] + } + + override val output: Seq[Attribute] = Seq.empty +} + + +/** + * A command for users to remove a database from the system. + * + * 'ignoreIfNotExists': + * - true, if database_name does't exist, no action + * - false (default), if database_name does't exist, a warning message will be issued + * 'cascade': + * - true, the dependent objects are automatically dropped before dropping database. + * - false (default), it is in the Restrict mode. The database cannot be dropped if + * it is not empty. The inclusive tables must be dropped at first. + * + * The syntax of using this command in SQL is: + * {{{ + * DROP DATABASE [IF EXISTS] database_name [RESTRICT|CASCADE]; + * }}} + */ +case class DropDatabase( + databaseName: String, + ignoreIfNotExists: Boolean, + cascade: Boolean) + extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + sqlContext.sessionState.catalog.dropDatabase(databaseName, ignoreIfNotExists, cascade) + Seq.empty[Row] + } + + override val output: Seq[Attribute] = Seq.empty +} + +/** + * A command for users to add new (key, value) pairs into DBPROPERTIES + * If the database does not exist, an error message will be issued to indicate the database + * does not exist. + * The syntax of using this command in SQL is: + * {{{ + * ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...) + * }}} + */ +case class AlterDatabaseProperties( + databaseName: String, + props: Map[String, String]) + extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog + val db: CatalogDatabase = catalog.getDatabase(databaseName) + catalog.alterDatabase(db.copy(properties = db.properties ++ props)) + + Seq.empty[Row] + } + + override val output: Seq[Attribute] = Seq.empty +} + case class CreateFunction( databaseName: Option[String], functionName: String, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 09426991e6b9..f4e54691c472 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -61,11 +61,10 @@ class HiveSessionCatalog( // | Methods and fields for interacting with HiveMetastoreCatalog | // ---------------------------------------------------------------- - override def getDefaultPath: String = client.getConf(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, - HiveConf.ConfVars.METASTOREWAREHOUSE.defaultStrVal) - override def getDefaultDBPath(db: String): String = { - getDefaultPath + File.pathSeparator + db + getDefaultDBExtension + val defaultPath = client.getConf(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, + HiveConf.ConfVars.METASTOREWAREHOUSE.defaultStrVal) + defaultPath + File.separator + db + ".db" } // Catalog for handling data source tables. TODO: This really doesn't belong here since it is From 16c829e6e14e177a1950693604a431cc8a2ed4fc Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 29 Mar 2016 14:30:58 -0700 Subject: [PATCH 14/14] address comments. --- .../sql/execution/command/commands.scala | 46 ----------- .../spark/sql/execution/command/ddl.scala | 47 ++++++++++- .../execution/command/DDLCommandSuite.scala | 8 +- .../sql/execution/command/DDLSuite.scala | 82 +++++++++++-------- .../sql/hive/thriftserver/CliSuite.scala | 2 +- .../spark/sql/hive/HiveSessionCatalog.scala | 8 +- 6 files changed, 101 insertions(+), 92 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index dc00098df77c..964f0a7a7b4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -17,14 +17,12 @@ package org.apache.spark.sql.execution.command -import java.io.File import java.util.NoSuchElementException import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, Row, SQLContext} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier} -import org.apache.spark.sql.catalyst.catalog.CatalogDatabase import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical @@ -319,50 +317,6 @@ case class DescribeCommand( } } -/** - * A command for users to show the name of the database, its comment (if one has been set), and its - * root location on the filesystem. When extended is true, it also shows the database's properties - * If the database does not exist, an error message will be issued to indicate the database - * does not exist. - * The syntax of using this command in SQL is - * {{{ - * DESCRIBE DATABASE [EXTENDED] db_name - * }}} - */ -case class DescribeDatabase( - databaseName: String, - extended: Boolean) - extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - val dbMetadata: CatalogDatabase = sqlContext.sessionState.catalog.getDatabase(databaseName) - val result = - Row("Database Name", dbMetadata.name) :: - Row("Description", dbMetadata.description) :: - Row("Location", dbMetadata.locationUri) :: Nil - - if (extended) { - val properties = - if (dbMetadata.properties.isEmpty) { - "" - } else { - dbMetadata.properties.toSeq.mkString("(", ", ", ")") - } - result :+ Row("Properties", properties) - } else { - result - } - } - - override val output: Seq[Attribute] = { - val schema = StructType( - StructField("database_description_item", StringType, nullable = false) :: - StructField("database_description_value", StringType, nullable = false) :: Nil) - - schema.toAttributes - } -} - /** * A command for users to get tables in the given database. * If a databaseName is not given, the current database will be used. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 2c71aa99c974..6c2a67f81c50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -82,7 +82,7 @@ case class CreateDatabase( /** * A command for users to remove a database from the system. * - * 'ignoreIfNotExists': + * 'ifExists': * - true, if database_name does't exist, no action * - false (default), if database_name does't exist, a warning message will be issued * 'cascade': @@ -97,12 +97,12 @@ case class CreateDatabase( */ case class DropDatabase( databaseName: String, - ignoreIfNotExists: Boolean, + ifExists: Boolean, cascade: Boolean) extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { - sqlContext.sessionState.catalog.dropDatabase(databaseName, ignoreIfNotExists, cascade) + sqlContext.sessionState.catalog.dropDatabase(databaseName, ifExists, cascade) Seq.empty[Row] } @@ -134,6 +134,47 @@ case class AlterDatabaseProperties( override val output: Seq[Attribute] = Seq.empty } +/** + * A command for users to show the name of the database, its comment (if one has been set), and its + * root location on the filesystem. When extended is true, it also shows the database's properties + * If the database does not exist, an error message will be issued to indicate the database + * does not exist. + * The syntax of using this command in SQL is + * {{{ + * DESCRIBE DATABASE [EXTENDED] db_name + * }}} + */ +case class DescribeDatabase( + databaseName: String, + extended: Boolean) + extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + val dbMetadata: CatalogDatabase = sqlContext.sessionState.catalog.getDatabase(databaseName) + val result = + Row("Database Name", dbMetadata.name) :: + Row("Description", dbMetadata.description) :: + Row("Location", dbMetadata.locationUri) :: Nil + + if (extended) { + val properties = + if (dbMetadata.properties.isEmpty) { + "" + } else { + dbMetadata.properties.toSeq.mkString("(", ", ", ")") + } + result :+ Row("Properties", properties) + } else { + result + } + } + + override val output: Seq[Attribute] = { + AttributeReference("database_description_item", StringType, nullable = false)() :: + AttributeReference("database_description_value", StringType, nullable = false)() :: Nil + } +} + case class CreateFunction( databaseName: Option[String], functionName: String, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 28bb89b257aa..cc2ec5c7bfd5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -65,19 +65,19 @@ class DDLCommandSuite extends PlanTest { val expected1 = DropDatabase( "database_name", - ignoreIfNotExists = true, + ifExists = true, cascade = false) val expected2 = DropDatabase( "database_name", - ignoreIfNotExists = true, + ifExists = true, cascade = true) val expected3 = DropDatabase( "database_name", - ignoreIfNotExists = false, + ifExists = false, cascade = false) val expected4 = DropDatabase( "database_name", - ignoreIfNotExists = false, + ifExists = false, cascade = true) comparePlans(parsed1, expected1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index f21ef01985db..47c9a22acd44 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.command +import java.io.File + import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.catalog.CatalogDatabase import org.apache.spark.sql.catalyst.parser.ParserUtils._ @@ -35,7 +37,7 @@ class DDLSuite extends QueryTest with SharedSQLContext { } } - test("Create/Drop/Alter/Describe Database - basic") { + test("Create/Drop Database") { val catalog = sqlContext.sessionState.catalog val databaseNames = Seq("db1", "`database`") @@ -43,22 +45,59 @@ class DDLSuite extends QueryTest with SharedSQLContext { databaseNames.foreach { dbName => withDatabase(dbName) { val dbNameWithoutBackTicks = cleanIdentifier(dbName) + sql(s"CREATE DATABASE $dbName") val db1 = catalog.getDatabase(dbNameWithoutBackTicks) assert(db1 == CatalogDatabase( - dbNameWithoutBackTicks, "", s"$dbNameWithoutBackTicks.db", Map.empty)) + dbNameWithoutBackTicks, + "", + System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db", + Map.empty)) + sql(s"DROP DATABASE $dbName CASCADE") + assert(!catalog.databaseExists(dbNameWithoutBackTicks)) + } + } + } - checkAnswer( - sql(s"DESCRIBE DATABASE $dbName"), - Row("Database Name", dbNameWithoutBackTicks) :: - Row("Description", "") :: - Row("Location", s"$dbNameWithoutBackTicks.db") :: Nil) + test("Create Database - database already exists") { + val catalog = sqlContext.sessionState.catalog + val databaseNames = Seq("db1", "`database`") + + databaseNames.foreach { dbName => + val dbNameWithoutBackTicks = cleanIdentifier(dbName) + withDatabase(dbName) { + sql(s"CREATE DATABASE $dbName") + val db1 = catalog.getDatabase(dbNameWithoutBackTicks) + assert(db1 == CatalogDatabase( + dbNameWithoutBackTicks, + "", + System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db", + Map.empty)) + + val message = intercept[AnalysisException] { + sql(s"CREATE DATABASE $dbName") + }.getMessage + assert(message.contains(s"Database '$dbNameWithoutBackTicks' already exists.")) + } + } + } + + test("Alter/Describe Database") { + val catalog = sqlContext.sessionState.catalog + val databaseNames = Seq("db1", "`database`") + + databaseNames.foreach { dbName => + withDatabase(dbName) { + val dbNameWithoutBackTicks = cleanIdentifier(dbName) + val location = + System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db" + sql(s"CREATE DATABASE $dbName") checkAnswer( sql(s"DESCRIBE DATABASE EXTENDED $dbName"), Row("Database Name", dbNameWithoutBackTicks) :: Row("Description", "") :: - Row("Location", s"$dbNameWithoutBackTicks.db") :: + Row("Location", location) :: Row("Properties", "") :: Nil) sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')") @@ -67,7 +106,7 @@ class DDLSuite extends QueryTest with SharedSQLContext { sql(s"DESCRIBE DATABASE EXTENDED $dbName"), Row("Database Name", dbNameWithoutBackTicks) :: Row("Description", "") :: - Row("Location", s"$dbNameWithoutBackTicks.db") :: + Row("Location", location) :: Row("Properties", "((a,a), (b,b), (c,c))") :: Nil) sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')") @@ -76,31 +115,8 @@ class DDLSuite extends QueryTest with SharedSQLContext { sql(s"DESCRIBE DATABASE EXTENDED $dbName"), Row("Database Name", dbNameWithoutBackTicks) :: Row("Description", "") :: - Row("Location", s"$dbNameWithoutBackTicks.db") :: + Row("Location", location) :: Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil) - - sql(s"DROP DATABASE $dbName CASCADE") - assert(!catalog.databaseExists(dbNameWithoutBackTicks)) - } - } - } - - test("Create Database - database already exists") { - val catalog = sqlContext.sessionState.catalog - val databaseNames = Seq("db1", "`database`") - - databaseNames.foreach { dbName => - val dbNameWithoutBackTicks = cleanIdentifier(dbName) - withDatabase(dbName) { - sql(s"CREATE DATABASE $dbName") - val db1 = catalog.getDatabase(dbNameWithoutBackTicks) - assert(db1 == CatalogDatabase( - dbNameWithoutBackTicks, "", s"$dbNameWithoutBackTicks.db", Map.empty)) - - val message = intercept[AnalysisException] { - sql(s"CREATE DATABASE $dbName") - }.getMessage - assert(message.contains(s"Database '$dbNameWithoutBackTicks' already exists.")) } } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 8e1ebe2937d2..7ad7f92bd25f 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -183,7 +183,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { test("Single command with --database") { runCliWithin(2.minute)( "CREATE DATABASE hive_test_db;" - -> "OK", + -> "", "USE hive_test_db;" -> "", "CREATE TABLE hive_test(key INT, val STRING);" diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index f4e54691c472..e91a0b98d97a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -17,8 +17,7 @@ package org.apache.spark.sql.hive -import java.io.File - +import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf import org.apache.spark.sql.catalyst.TableIdentifier @@ -62,9 +61,8 @@ class HiveSessionCatalog( // ---------------------------------------------------------------- override def getDefaultDBPath(db: String): String = { - val defaultPath = client.getConf(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, - HiveConf.ConfVars.METASTOREWAREHOUSE.defaultStrVal) - defaultPath + File.separator + db + ".db" + val defaultPath = context.hiveconf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE) + new Path(new Path(defaultPath), db + ".db").toString } // Catalog for handling data source tables. TODO: This really doesn't belong here since it is