Skip to content

Commit b66b97c

Browse files
gatorsmileAndrew Or
authored andcommitted
[SPARK-14124][SQL] Implement Database-related DDL Commands
#### What changes were proposed in this pull request? This PR is to implement the following four Database-related DDL commands: - `CREATE DATABASE|SCHEMA [IF NOT EXISTS] database_name` - `DROP DATABASE [IF EXISTS] database_name [RESTRICT|CASCADE]` - `DESCRIBE DATABASE [EXTENDED] db_name` - `ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...)` Another PR will be submitted to handle the unsupported commands. In the Database-related DDL commands, we will issue an error exception for `ALTER (DATABASE|SCHEMA) database_name SET OWNER [USER|ROLE] user_or_role`. cc yhuai andrewor14 rxin Could you review the changes? Is it in the right direction? Thanks! #### How was this patch tested? Added a few test cases in `command/DDLSuite.scala` for testing DDL command execution in `SQLContext`. Since `HiveContext` also shares the same implementation, the existing test cases in `\hive` also verifies the correctness of these commands. Author: gatorsmile <[email protected]> Author: xiaoli <[email protected]> Author: Xiao Li <[email protected]> Closes #12009 from gatorsmile/dbDDL.
1 parent e1f6845 commit b66b97c

File tree

9 files changed

+302
-61
lines changed

9 files changed

+302
-61
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.catalyst.catalog
1919

20+
import java.io.File
21+
2022
import scala.collection.mutable
2123

2224
import org.apache.spark.sql.AnalysisException
@@ -114,6 +116,10 @@ class SessionCatalog(
114116
currentDb = db
115117
}
116118

119+
def getDefaultDBPath(db: String): String = {
120+
System.getProperty("java.io.tmpdir") + File.separator + db + ".db"
121+
}
122+
117123
// ----------------------------------------------------------------------------
118124
// Tables
119125
// ----------------------------------------------------------------------------

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ abstract class ExternalCatalog {
3939

4040
protected def requireDbExists(db: String): Unit = {
4141
if (!databaseExists(db)) {
42-
throw new AnalysisException(s"Database $db does not exist")
42+
throw new AnalysisException(s"Database '$db' does not exist")
4343
}
4444
}
4545

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
9797

9898
// CREATE DATABASE [IF NOT EXISTS] database_name [COMMENT database_comment]
9999
// [LOCATION path] [WITH DBPROPERTIES (key1=val1, key2=val2, ...)];
100-
case Token("TOK_CREATEDATABASE", Token(databaseName, Nil) :: args) =>
100+
case Token("TOK_CREATEDATABASE", Token(dbName, Nil) :: args) =>
101+
val databaseName = cleanIdentifier(dbName)
101102
val Seq(ifNotExists, dbLocation, databaseComment, dbprops) = getClauses(Seq(
102103
"TOK_IFNOTEXISTS",
103104
"TOK_DATABASELOCATION",
@@ -126,7 +127,7 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
126127
extractProps(propList, "TOK_TABLEPROPERTY")
127128
case _ => parseFailed("Invalid CREATE DATABASE command", node)
128129
}.toMap
129-
CreateDatabase(databaseName, ifNotExists.isDefined, location, comment, props)(node.source)
130+
CreateDatabase(databaseName, ifNotExists.isDefined, location, comment, props)
130131

131132
// DROP DATABASE [IF EXISTS] database_name [RESTRICT|CASCADE];
132133
case Token("TOK_DROPDATABASE", Token(dbName, Nil) :: otherArgs) =>
@@ -136,15 +137,15 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
136137
// :- database_name
137138
// :- TOK_IFEXISTS
138139
// +- TOK_RESTRICT/TOK_CASCADE
139-
val databaseName = unquoteString(dbName)
140+
val databaseName = cleanIdentifier(dbName)
140141
// The default is RESTRICT
141142
val Seq(ifExists, _, cascade) = getClauses(Seq(
142143
"TOK_IFEXISTS", "TOK_RESTRICT", "TOK_CASCADE"), otherArgs)
143-
DropDatabase(databaseName, ifExists.isDefined, restrict = cascade.isEmpty)(node.source)
144+
DropDatabase(databaseName, ifExists.isDefined, cascade.isDefined)
144145

145146
// ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...)
146147
case Token("TOK_ALTERDATABASE_PROPERTIES", Token(dbName, Nil) :: args) =>
147-
val databaseName = unquoteString(dbName)
148+
val databaseName = cleanIdentifier(dbName)
148149
val dbprops = getClause("TOK_DATABASEPROPERTIES", args)
149150
val props = dbprops match {
150151
case Token("TOK_DATABASEPROPERTIES", Token("TOK_DBPROPLIST", propList) :: Nil) =>
@@ -161,13 +162,13 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
161162
extractProps(propList, "TOK_TABLEPROPERTY")
162163
case _ => parseFailed("Invalid ALTER DATABASE command", node)
163164
}
164-
AlterDatabaseProperties(databaseName, props.toMap)(node.source)
165+
AlterDatabaseProperties(databaseName, props.toMap)
165166

166167
// DESCRIBE DATABASE [EXTENDED] db_name
167168
case Token("TOK_DESCDATABASE", Token(dbName, Nil) :: describeArgs) =>
168-
val databaseName = unquoteString(dbName)
169+
val databaseName = cleanIdentifier(dbName)
169170
val extended = getClauseOption("EXTENDED", describeArgs)
170-
DescribeDatabase(databaseName, extended.isDefined)(node.source)
171+
DescribeDatabase(databaseName, extended.isDefined)
171172

172173
// CREATE [TEMPORARY] FUNCTION [db_name.]function_name AS class_name
173174
// [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ];

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -232,8 +232,7 @@ class SparkSqlAstBuilder extends AstBuilder {
232232
ctx.EXISTS != null,
233233
Option(ctx.locationSpec).map(visitLocationSpec),
234234
Option(ctx.comment).map(string),
235-
Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty))(
236-
command(ctx))
235+
Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty))
237236
}
238237

239238
/**
@@ -248,8 +247,7 @@ class SparkSqlAstBuilder extends AstBuilder {
248247
ctx: SetDatabasePropertiesContext): LogicalPlan = withOrigin(ctx) {
249248
AlterDatabaseProperties(
250249
ctx.identifier.getText,
251-
visitTablePropertyList(ctx.tablePropertyList))(
252-
command(ctx))
250+
visitTablePropertyList(ctx.tablePropertyList))
253251
}
254252

255253
/**
@@ -261,7 +259,7 @@ class SparkSqlAstBuilder extends AstBuilder {
261259
* }}}
262260
*/
263261
override def visitDropDatabase(ctx: DropDatabaseContext): LogicalPlan = withOrigin(ctx) {
264-
DropDatabase(ctx.identifier.getText, ctx.EXISTS != null, ctx.CASCADE == null)(command(ctx))
262+
DropDatabase(ctx.identifier.getText, ctx.EXISTS != null, ctx.CASCADE != null)
265263
}
266264

267265
/**
@@ -273,7 +271,7 @@ class SparkSqlAstBuilder extends AstBuilder {
273271
* }}}
274272
*/
275273
override def visitDescribeDatabase(ctx: DescribeDatabaseContext): LogicalPlan = withOrigin(ctx) {
276-
DescribeDatabase(ctx.identifier.getText, ctx.EXTENDED != null)(command(ctx))
274+
DescribeDatabase(ctx.identifier.getText, ctx.EXTENDED != null)
277275
}
278276

279277
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 107 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ package org.apache.spark.sql.execution.command
1919

2020
import org.apache.spark.internal.Logging
2121
import org.apache.spark.sql.{Row, SQLContext}
22-
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
23-
import org.apache.spark.sql.catalyst.catalog.CatalogFunction
22+
import org.apache.spark.sql.catalyst.TableIdentifier
23+
import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
2424
import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
2525
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
2626
import org.apache.spark.sql.execution.datasources.BucketSpec
@@ -45,46 +45,135 @@ abstract class NativeDDLCommand(val sql: String) extends RunnableCommand {
4545

4646
}
4747

48+
/**
49+
* A command for users to create a new database.
50+
*
51+
* It will issue an error message when the database with the same name already exists,
52+
* unless 'ifNotExists' is true.
53+
* The syntax of using this command in SQL is:
54+
* {{{
55+
* CREATE DATABASE|SCHEMA [IF NOT EXISTS] database_name
56+
* }}}
57+
*/
4858
case class CreateDatabase(
4959
databaseName: String,
5060
ifNotExists: Boolean,
5161
path: Option[String],
5262
comment: Option[String],
53-
props: Map[String, String])(sql: String)
54-
extends NativeDDLCommand(sql) with Logging
63+
props: Map[String, String])
64+
extends RunnableCommand {
65+
66+
override def run(sqlContext: SQLContext): Seq[Row] = {
67+
val catalog = sqlContext.sessionState.catalog
68+
catalog.createDatabase(
69+
CatalogDatabase(
70+
databaseName,
71+
comment.getOrElse(""),
72+
path.getOrElse(catalog.getDefaultDBPath(databaseName)),
73+
props),
74+
ifNotExists)
75+
Seq.empty[Row]
76+
}
77+
78+
override val output: Seq[Attribute] = Seq.empty
79+
}
80+
5581

5682
/**
57-
* Drop Database: Removes a database from the system.
83+
* A command for users to remove a database from the system.
5884
*
5985
* 'ifExists':
6086
* - true, if database_name does't exist, no action
6187
* - false (default), if database_name does't exist, a warning message will be issued
62-
* 'restric':
63-
* - true (default), the database cannot be dropped if it is not empty. The inclusive
64-
* tables must be dropped at first.
65-
* - false, it is in the Cascade mode. The dependent objects are automatically dropped
66-
* before dropping database.
88+
* 'cascade':
89+
* - true, the dependent objects are automatically dropped before dropping database.
90+
* - false (default), it is in the Restrict mode. The database cannot be dropped if
91+
* it is not empty. The inclusive tables must be dropped at first.
92+
*
93+
* The syntax of using this command in SQL is:
94+
* {{{
95+
* DROP DATABASE [IF EXISTS] database_name [RESTRICT|CASCADE];
96+
* }}}
6797
*/
6898
case class DropDatabase(
6999
databaseName: String,
70100
ifExists: Boolean,
71-
restrict: Boolean)(sql: String)
72-
extends NativeDDLCommand(sql) with Logging
101+
cascade: Boolean)
102+
extends RunnableCommand {
103+
104+
override def run(sqlContext: SQLContext): Seq[Row] = {
105+
sqlContext.sessionState.catalog.dropDatabase(databaseName, ifExists, cascade)
106+
Seq.empty[Row]
107+
}
108+
109+
override val output: Seq[Attribute] = Seq.empty
110+
}
73111

74-
/** ALTER DATABASE: add new (key, value) pairs into DBPROPERTIES */
112+
/**
113+
* A command for users to add new (key, value) pairs into DBPROPERTIES
114+
* If the database does not exist, an error message will be issued to indicate the database
115+
* does not exist.
116+
* The syntax of using this command in SQL is:
117+
* {{{
118+
* ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...)
119+
* }}}
120+
*/
75121
case class AlterDatabaseProperties(
76122
databaseName: String,
77-
props: Map[String, String])(sql: String)
78-
extends NativeDDLCommand(sql) with Logging
123+
props: Map[String, String])
124+
extends RunnableCommand {
125+
126+
override def run(sqlContext: SQLContext): Seq[Row] = {
127+
val catalog = sqlContext.sessionState.catalog
128+
val db: CatalogDatabase = catalog.getDatabase(databaseName)
129+
catalog.alterDatabase(db.copy(properties = db.properties ++ props))
130+
131+
Seq.empty[Row]
132+
}
133+
134+
override val output: Seq[Attribute] = Seq.empty
135+
}
79136

80137
/**
81-
* DESCRIBE DATABASE: shows the name of the database, its comment (if one has been set), and its
138+
* A command for users to show the name of the database, its comment (if one has been set), and its
82139
* root location on the filesystem. When extended is true, it also shows the database's properties
140+
* If the database does not exist, an error message will be issued to indicate the database
141+
* does not exist.
142+
* The syntax of using this command in SQL is
143+
* {{{
144+
* DESCRIBE DATABASE [EXTENDED] db_name
145+
* }}}
83146
*/
84147
case class DescribeDatabase(
85148
databaseName: String,
86-
extended: Boolean)(sql: String)
87-
extends NativeDDLCommand(sql) with Logging
149+
extended: Boolean)
150+
extends RunnableCommand {
151+
152+
override def run(sqlContext: SQLContext): Seq[Row] = {
153+
val dbMetadata: CatalogDatabase = sqlContext.sessionState.catalog.getDatabase(databaseName)
154+
val result =
155+
Row("Database Name", dbMetadata.name) ::
156+
Row("Description", dbMetadata.description) ::
157+
Row("Location", dbMetadata.locationUri) :: Nil
158+
159+
if (extended) {
160+
val properties =
161+
if (dbMetadata.properties.isEmpty) {
162+
""
163+
} else {
164+
dbMetadata.properties.toSeq.mkString("(", ", ", ")")
165+
}
166+
result :+ Row("Properties", properties)
167+
} else {
168+
result
169+
}
170+
}
171+
172+
override val output: Seq[Attribute] = {
173+
AttributeReference("database_description_item", StringType, nullable = false)() ::
174+
AttributeReference("database_description_value", StringType, nullable = false)() :: Nil
175+
}
176+
}
88177

89178
case class CreateFunction(
90179
databaseName: Option[String],

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala

Lines changed: 15 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class DDLCommandSuite extends PlanTest {
3939
ifNotExists = true,
4040
Some("/home/user/db"),
4141
Some("database_comment"),
42-
Map("a" -> "a", "b" -> "b", "c" -> "c"))(sql)
42+
Map("a" -> "a", "b" -> "b", "c" -> "c"))
4343
comparePlans(parsed, expected)
4444
}
4545

@@ -65,39 +65,27 @@ class DDLCommandSuite extends PlanTest {
6565
val expected1 = DropDatabase(
6666
"database_name",
6767
ifExists = true,
68-
restrict = true)(sql1)
68+
cascade = false)
6969
val expected2 = DropDatabase(
7070
"database_name",
7171
ifExists = true,
72-
restrict = false)(sql2)
72+
cascade = true)
7373
val expected3 = DropDatabase(
74-
"database_name",
75-
ifExists = true,
76-
restrict = true)(sql3)
77-
val expected4 = DropDatabase(
78-
"database_name",
79-
ifExists = true,
80-
restrict = false)(sql4)
81-
val expected5 = DropDatabase(
82-
"database_name",
83-
ifExists = true,
84-
restrict = true)(sql5)
85-
val expected6 = DropDatabase(
8674
"database_name",
8775
ifExists = false,
88-
restrict = true)(sql6)
89-
val expected7 = DropDatabase(
76+
cascade = false)
77+
val expected4 = DropDatabase(
9078
"database_name",
9179
ifExists = false,
92-
restrict = false)(sql7)
80+
cascade = true)
9381

9482
comparePlans(parsed1, expected1)
9583
comparePlans(parsed2, expected2)
96-
comparePlans(parsed3, expected3)
97-
comparePlans(parsed4, expected4)
98-
comparePlans(parsed5, expected5)
99-
comparePlans(parsed6, expected6)
100-
comparePlans(parsed7, expected7)
84+
comparePlans(parsed3, expected1)
85+
comparePlans(parsed4, expected2)
86+
comparePlans(parsed5, expected1)
87+
comparePlans(parsed6, expected3)
88+
comparePlans(parsed7, expected4)
10189
}
10290

10391
test("alter database set dbproperties") {
@@ -110,10 +98,10 @@ class DDLCommandSuite extends PlanTest {
11098

11199
val expected1 = AlterDatabaseProperties(
112100
"database_name",
113-
Map("a" -> "a", "b" -> "b", "c" -> "c"))(sql1)
101+
Map("a" -> "a", "b" -> "b", "c" -> "c"))
114102
val expected2 = AlterDatabaseProperties(
115103
"database_name",
116-
Map("a" -> "a"))(sql2)
104+
Map("a" -> "a"))
117105

118106
comparePlans(parsed1, expected1)
119107
comparePlans(parsed2, expected2)
@@ -129,10 +117,10 @@ class DDLCommandSuite extends PlanTest {
129117

130118
val expected1 = DescribeDatabase(
131119
"db_name",
132-
extended = true)(sql1)
120+
extended = true)
133121
val expected2 = DescribeDatabase(
134122
"db_name",
135-
extended = false)(sql2)
123+
extended = false)
136124

137125
comparePlans(parsed1, expected1)
138126
comparePlans(parsed2, expected2)

0 commit comments

Comments
 (0)