From 46737b5c9fecbc68b1e4e830b2a1b189a2e72158 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 26 Jan 2016 13:33:13 +0800 Subject: [PATCH 1/7] Implement command to set current database. --- .../apache/spark/sql/catalyst/analysis/Catalog.scala | 4 ++++ .../scala/org/apache/spark/sql/execution/SparkQl.scala | 3 +++ .../org/apache/spark/sql/execution/commands.scala | 10 ++++++++++ .../apache/spark/sql/hive/HiveMetastoreCatalog.scala | 4 ++++ .../main/scala/org/apache/spark/sql/hive/HiveQl.scala | 2 -- .../apache/spark/sql/hive/client/ClientInterface.scala | 3 +++ .../apache/spark/sql/hive/client/ClientWrapper.scala | 4 ++++ .../spark/sql/hive/execution/HiveQuerySuite.scala | 8 ++++++++ 8 files changed, 36 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala index a8f89ce6de45..f2f9ec59417e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala @@ -46,6 +46,10 @@ trait Catalog { def lookupRelation(tableIdent: TableIdentifier, alias: Option[String] = None): LogicalPlan + def setCurrentDatabase(databaseName: String): Unit = { + throw new UnsupportedOperationException + } + /** * Returns tuples of (tableName, isTemporary) for all tables in the given database. * isTemporary is a Boolean value indicates if a table is a temporary or not. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala index f3e89ef4a71f..24f3332f2ddb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala @@ -42,6 +42,9 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly getClauses(Seq("TOK_QUERY", "FORMATTED", "EXTENDED"), explainArgs) ExplainCommand(nodeToPlan(query), extended = extended.isDefined) + case Token("TOK_SWITCHDATABASE", Token(database, Nil) :: Nil) => + SetDatabaseCommand(cleanIdentifier(database)) + case Token("TOK_DESCTABLE", describeArgs) => // Reference: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL val Some(tableType) :: formatted :: extended :: pretty :: Nil = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 3cfa3dfd9c7e..e29e81164ce8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -408,3 +408,13 @@ case class DescribeFunction( } } } + +case class SetDatabaseCommand(databaseName: String) extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + sqlContext.catalog.setCurrentDatabase(databaseName) + Seq.empty[Row] + } + + override def output: Seq[Attribute] = Seq.empty +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 0cfe03ba91ec..9ed8cc62a5c4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -705,6 +705,10 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive } override def unregisterAllTables(): Unit = {} + + override def setCurrentDatabase(databaseName: String): Unit = { + client.setCurrentDatabase(databaseName) + } } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 46246f8191db..a145475088c8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -156,8 +156,6 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging "TOK_SHOWLOCKS", "TOK_SHOWPARTITIONS", - "TOK_SWITCHDATABASE", - "TOK_UNLOCKTABLE" ) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala index 9d9a55edd731..4eec3fef7408 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala @@ -109,6 +109,9 @@ private[hive] trait ClientInterface { /** Returns the name of the active database. */ def currentDatabase: String + /** Sets the name of current database. */ + def setCurrentDatabase(databaseName: String): Unit + /** Returns the metadata for specified database, throwing an exception if it doesn't exist */ def getDatabase(name: String): HiveDatabase = { getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index ce7a305d437a..b2595e0454a5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -229,6 +229,10 @@ private[hive] class ClientWrapper( state.getCurrentDatabase } + override def setCurrentDatabase(databaseName: String): Unit = withHiveState { + state.setCurrentDatabase(databaseName) + } + override def createDatabase(database: HiveDatabase): Unit = withHiveState { client.createDatabase( new Database( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 4659d745fe78..41f2216c75ec 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -1262,6 +1262,14 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { } + test("use database") { + val currentDatabase = sql("select current_database()").first().getString(0) + sql("USE test") + assert("test" == sql("select current_database()").first().getString(0)) + sql(s"USE $currentDatabase") + assert(currentDatabase == sql("select current_database()").first().getString(0)) + } + test("lookup hive UDF in another thread") { val e = intercept[AnalysisException] { range(1).selectExpr("not_a_udf()") From 43beb4ba499814c698df7537018ab6fafefa738e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 26 Jan 2016 15:05:05 +0800 Subject: [PATCH 2/7] Add test. --- .../org/apache/spark/sql/hive/execution/HiveQuerySuite.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 41f2216c75ec..22a3f7e23911 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -1266,6 +1266,11 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { val currentDatabase = sql("select current_database()").first().getString(0) sql("USE test") assert("test" == sql("select current_database()").first().getString(0)) + + sql("CREATE DATABASE hive_test_db") + sql("USE hive_test_db") + assert("hive_test_db" == sql("select current_database()").first().getString(0)) + sql(s"USE $currentDatabase") assert(currentDatabase == sql("select current_database()").first().getString(0)) } From ee237e4f09a8711652ba96837293dd9b19d949a7 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 26 Jan 2016 16:54:57 +0800 Subject: [PATCH 3/7] Throw exception when use non existing database. --- .../scala/org/apache/spark/sql/execution/commands.scala | 9 +++++++-- .../org/apache/spark/sql/hive/client/ClientWrapper.scala | 7 ++++++- .../apache/spark/sql/hive/execution/HiveQuerySuite.scala | 7 +++++-- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index e29e81164ce8..793e6a7d1e84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -413,8 +413,13 @@ case class SetDatabaseCommand(databaseName: String) extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { sqlContext.catalog.setCurrentDatabase(databaseName) - Seq.empty[Row] + Seq(Row("OK")) } - override def output: Seq[Attribute] = Seq.empty + override val output: Seq[Attribute] = { + val schema = StructType( + StructField("output", StringType, nullable = false) :: Nil) + + schema.toAttributes + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index b2595e0454a5..5307e924e7e5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader} import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{Logging, SparkConf, SparkException} +import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.util.{CircularBuffer, Utils} @@ -230,7 +231,11 @@ private[hive] class ClientWrapper( } override def setCurrentDatabase(databaseName: String): Unit = withHiveState { - state.setCurrentDatabase(databaseName) + if (getDatabaseOption(databaseName).isDefined) { + state.setCurrentDatabase(databaseName) + } else { + throw new NoSuchDatabaseException + } } override def createDatabase(database: HiveDatabase): Unit = withHiveState { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 22a3f7e23911..f02f12638cd8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -29,6 +29,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkException, SparkFiles} import org.apache.spark.sql.{AnalysisException, DataFrame, Row} import org.apache.spark.sql.catalyst.expressions.Cast +import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoin import org.apache.spark.sql.hive._ @@ -1264,13 +1265,15 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { test("use database") { val currentDatabase = sql("select current_database()").first().getString(0) - sql("USE test") - assert("test" == sql("select current_database()").first().getString(0)) sql("CREATE DATABASE hive_test_db") sql("USE hive_test_db") assert("hive_test_db" == sql("select current_database()").first().getString(0)) + intercept[NoSuchDatabaseException] { + sql("USE not_existing_db") + } + sql(s"USE $currentDatabase") assert(currentDatabase == sql("select current_database()").first().getString(0)) } From bffae435ce05fe1c2790abf557e2db8bb96c37b0 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 26 Jan 2016 17:08:45 +0800 Subject: [PATCH 4/7] Fix import order. --- .../org/apache/spark/sql/hive/execution/HiveQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index f02f12638cd8..9632d27a2ffc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -28,8 +28,8 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkException, SparkFiles} import org.apache.spark.sql.{AnalysisException, DataFrame, Row} -import org.apache.spark.sql.catalyst.expressions.Cast import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException +import org.apache.spark.sql.catalyst.expressions.Cast import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoin import org.apache.spark.sql.hive._ From 9df380fa134079b93ab9b97e01a34ed4156137aa Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 26 Jan 2016 21:17:22 +0800 Subject: [PATCH 5/7] set database command should not output row. --- .../scala/org/apache/spark/sql/execution/commands.scala | 9 ++------- .../apache/spark/sql/hive/thriftserver/CliSuite.scala | 2 +- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 793e6a7d1e84..703e4643cbd2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -413,13 +413,8 @@ case class SetDatabaseCommand(databaseName: String) extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { sqlContext.catalog.setCurrentDatabase(databaseName) - Seq(Row("OK")) + Seq.empty[Row] } - override val output: Seq[Attribute] = { - val schema = StructType( - StructField("output", StringType, nullable = false) :: Nil) - - schema.toAttributes - } + override val output: Seq[Attribute] = Seq.empty } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index ab31d45a79a2..72da266da4d0 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -183,7 +183,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { "CREATE DATABASE hive_test_db;" -> "OK", "USE hive_test_db;" - -> "OK", + -> "", "CREATE TABLE hive_test(key INT, val STRING);" -> "OK", "SHOW TABLES;" From 87c27ef34802cb9ad6c18af5ad66531eeee02298 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 29 Jan 2016 04:24:45 +0000 Subject: [PATCH 6/7] Add method to read current database to catalog. --- .../org/apache/spark/sql/catalyst/analysis/Catalog.scala | 4 ++++ .../org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 2 ++ 2 files changed, 6 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala index f2f9ec59417e..c9682933c49a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala @@ -46,6 +46,10 @@ trait Catalog { def lookupRelation(tableIdent: TableIdentifier, alias: Option[String] = None): LogicalPlan + def currentDatabase: String = { + throw new UnsupportedOperationException + } + def setCurrentDatabase(databaseName: String): Unit = { throw new UnsupportedOperationException } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 9ed8cc62a5c4..802518d92917 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -706,6 +706,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive override def unregisterAllTables(): Unit = {} + override def currentDatabase: String = client.currentDatabase + override def setCurrentDatabase(databaseName: String): Unit = { client.setCurrentDatabase(databaseName) } From ff5508b5c6776af5f38dc17f4c5af37eebd3097e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 29 Jan 2016 06:18:42 +0000 Subject: [PATCH 7/7] Revert last commit. --- .../org/apache/spark/sql/catalyst/analysis/Catalog.scala | 4 ---- .../org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 2 -- 2 files changed, 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala index c9682933c49a..f2f9ec59417e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala @@ -46,10 +46,6 @@ trait Catalog { def lookupRelation(tableIdent: TableIdentifier, alias: Option[String] = None): LogicalPlan - def currentDatabase: String = { - throw new UnsupportedOperationException - } - def setCurrentDatabase(databaseName: String): Unit = { throw new UnsupportedOperationException } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 802518d92917..9ed8cc62a5c4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -706,8 +706,6 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive override def unregisterAllTables(): Unit = {} - override def currentDatabase: String = client.currentDatabase - override def setCurrentDatabase(databaseName: String): Unit = { client.setCurrentDatabase(databaseName) }