-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-14124] [SQL] Implement Database-related DDL Commands #12009
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
01e4cdf
6835704
9180687
b38a21e
d2b84af
fda8025
ac0dccd
6e0018b
0546772
b37a64f
c2a872c
ab6dbd7
4276356
2dab708
0458770
1debdfa
763706d
4de6ec1
9422a4f
52bdf48
1e95df3
fab24cf
8b2e33b
2ee1876
b9f0090
ade6f7e
9fd63d2
5199d49
404214c
c001dd9
59daa48
41d5f64
472a6e3
0fba10a
cbf73b3
c08f561
474df88
3d9828d
602cace
72d2361
e26542e
2794bfa
65ce660
aba3a95
ffe0c7a
d7c3648
9420f08
4dab82c
f4c33e2
536cf36
16c829e
07afea5
f22ef90
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -97,7 +97,8 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly | |
|
|
||
| // CREATE DATABASE [IF NOT EXISTS] database_name [COMMENT database_comment] | ||
| // [LOCATION path] [WITH DBPROPERTIES (key1=val1, key2=val2, ...)]; | ||
| case Token("TOK_CREATEDATABASE", Token(databaseName, Nil) :: args) => | ||
| case Token("TOK_CREATEDATABASE", Token(dbName, Nil) :: args) => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. changes in this file aren't really necessary since the file will be deleted anyway, but for now it's OK to keep |
||
| val databaseName = cleanIdentifier(dbName) | ||
| val Seq(ifNotExists, dbLocation, databaseComment, dbprops) = getClauses(Seq( | ||
| "TOK_IFNOTEXISTS", | ||
| "TOK_DATABASELOCATION", | ||
|
|
@@ -126,7 +127,7 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly | |
| extractProps(propList, "TOK_TABLEPROPERTY") | ||
| case _ => parseFailed("Invalid CREATE DATABASE command", node) | ||
| }.toMap | ||
| CreateDatabase(databaseName, ifNotExists.isDefined, location, comment, props)(node.source) | ||
| CreateDatabase(databaseName, ifNotExists.isDefined, location, comment, props) | ||
|
|
||
| // DROP DATABASE [IF EXISTS] database_name [RESTRICT|CASCADE]; | ||
| case Token("TOK_DROPDATABASE", Token(dbName, Nil) :: otherArgs) => | ||
|
|
@@ -136,15 +137,15 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly | |
| // :- database_name | ||
| // :- TOK_IFEXISTS | ||
| // +- TOK_RESTRICT/TOK_CASCADE | ||
| val databaseName = unquoteString(dbName) | ||
| val databaseName = cleanIdentifier(dbName) | ||
| // The default is RESTRICT | ||
| val Seq(ifExists, _, cascade) = getClauses(Seq( | ||
| "TOK_IFEXISTS", "TOK_RESTRICT", "TOK_CASCADE"), otherArgs) | ||
| DropDatabase(databaseName, ifExists.isDefined, restrict = cascade.isEmpty)(node.source) | ||
| DropDatabase(databaseName, ifExists.isDefined, cascade.isDefined) | ||
|
|
||
| // ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...) | ||
| case Token("TOK_ALTERDATABASE_PROPERTIES", Token(dbName, Nil) :: args) => | ||
| val databaseName = unquoteString(dbName) | ||
| val databaseName = cleanIdentifier(dbName) | ||
| val dbprops = getClause("TOK_DATABASEPROPERTIES", args) | ||
| val props = dbprops match { | ||
| case Token("TOK_DATABASEPROPERTIES", Token("TOK_DBPROPLIST", propList) :: Nil) => | ||
|
|
@@ -161,13 +162,13 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly | |
| extractProps(propList, "TOK_TABLEPROPERTY") | ||
| case _ => parseFailed("Invalid ALTER DATABASE command", node) | ||
| } | ||
| AlterDatabaseProperties(databaseName, props.toMap)(node.source) | ||
| AlterDatabaseProperties(databaseName, props.toMap) | ||
|
|
||
| // DESCRIBE DATABASE [EXTENDED] db_name | ||
| case Token("TOK_DESCDATABASE", Token(dbName, Nil) :: describeArgs) => | ||
| val databaseName = unquoteString(dbName) | ||
| val databaseName = cleanIdentifier(dbName) | ||
| val extended = getClauseOption("EXTENDED", describeArgs) | ||
| DescribeDatabase(databaseName, extended.isDefined)(node.source) | ||
| DescribeDatabase(databaseName, extended.isDefined) | ||
|
|
||
| // CREATE [TEMPORARY] FUNCTION [db_name.]function_name AS class_name | ||
| // [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ]; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,8 +19,8 @@ package org.apache.spark.sql.execution.command | |
|
|
||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.sql.{Row, SQLContext} | ||
| import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} | ||
| import org.apache.spark.sql.catalyst.catalog.CatalogFunction | ||
| import org.apache.spark.sql.catalyst.TableIdentifier | ||
| import org.apache.spark.sql.catalyst.catalog.CatalogDatabase | ||
| import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec | ||
| import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} | ||
| import org.apache.spark.sql.execution.datasources.BucketSpec | ||
|
|
@@ -45,46 +45,135 @@ abstract class NativeDDLCommand(val sql: String) extends RunnableCommand { | |
|
|
||
| } | ||
|
|
||
| /** | ||
| * A command for users to create a new database. | ||
| * | ||
| * It will issue an error message when the database with the same name already exists, | ||
| * unless 'ifNotExists' is true. | ||
| * The syntax of using this command in SQL is: | ||
| * {{{ | ||
| * CREATE DATABASE|SCHEMA [IF NOT EXISTS] database_name | ||
| * }}} | ||
| */ | ||
| case class CreateDatabase( | ||
| databaseName: String, | ||
| ifNotExists: Boolean, | ||
| path: Option[String], | ||
| comment: Option[String], | ||
| props: Map[String, String])(sql: String) | ||
| extends NativeDDLCommand(sql) with Logging | ||
| props: Map[String, String]) | ||
| extends RunnableCommand { | ||
|
|
||
| override def run(sqlContext: SQLContext): Seq[Row] = { | ||
| val catalog = sqlContext.sessionState.catalog | ||
| catalog.createDatabase( | ||
| CatalogDatabase( | ||
| databaseName, | ||
| comment.getOrElse(""), | ||
| path.getOrElse(catalog.getDefaultDBPath(databaseName)), | ||
| props), | ||
| ifNotExists) | ||
| Seq.empty[Row] | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to create the underlying dir in this command?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. True. Only did it in the default DB path. I forgot to do it in the regular case : ( Let me submit a follow-up PR for it. Thanks for catching it!
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @yhuai I tried it in However, this PR has an issue when users specify the location in the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea. Let's create the directory if it is not created.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. Will do it in #12081. Thanks!
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @yhuai I did try it. Actually, the code is done... However, if we create a directory before issuing Hive client API Just feel free to let me know what I should do next. Thanks! |
||
|
|
||
| override val output: Seq[Attribute] = Seq.empty | ||
| } | ||
|
|
||
|
|
||
| /** | ||
| * Drop Database: Removes a database from the system. | ||
| * A command for users to remove a database from the system. | ||
| * | ||
| * 'ifExists': | ||
| * - true, if database_name does't exist, no action | ||
| * - false (default), if database_name does't exist, a warning message will be issued | ||
| * 'restric': | ||
| * - true (default), the database cannot be dropped if it is not empty. The inclusive | ||
| * tables must be dropped at first. | ||
| * - false, it is in the Cascade mode. The dependent objects are automatically dropped | ||
| * before dropping database. | ||
| * 'cascade': | ||
| * - true, the dependent objects are automatically dropped before dropping database. | ||
| * - false (default), it is in the Restrict mode. The database cannot be dropped if | ||
| * it is not empty. The inclusive tables must be dropped at first. | ||
| * | ||
| * The syntax of using this command in SQL is: | ||
| * {{{ | ||
| * DROP DATABASE [IF EXISTS] database_name [RESTRICT|CASCADE]; | ||
| * }}} | ||
| */ | ||
| case class DropDatabase( | ||
| databaseName: String, | ||
| ifExists: Boolean, | ||
| restrict: Boolean)(sql: String) | ||
| extends NativeDDLCommand(sql) with Logging | ||
| cascade: Boolean) | ||
| extends RunnableCommand { | ||
|
|
||
| override def run(sqlContext: SQLContext): Seq[Row] = { | ||
| sqlContext.sessionState.catalog.dropDatabase(databaseName, ifExists, cascade) | ||
| Seq.empty[Row] | ||
| } | ||
|
|
||
| override val output: Seq[Attribute] = Seq.empty | ||
| } | ||
|
|
||
| /** ALTER DATABASE: add new (key, value) pairs into DBPROPERTIES */ | ||
| /** | ||
| * A command for users to add new (key, value) pairs into DBPROPERTIES | ||
| * If the database does not exist, an error message will be issued to indicate the database | ||
| * does not exist. | ||
| * The syntax of using this command in SQL is: | ||
| * {{{ | ||
| * ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...) | ||
| * }}} | ||
| */ | ||
| case class AlterDatabaseProperties( | ||
| databaseName: String, | ||
| props: Map[String, String])(sql: String) | ||
| extends NativeDDLCommand(sql) with Logging | ||
| props: Map[String, String]) | ||
| extends RunnableCommand { | ||
|
|
||
| override def run(sqlContext: SQLContext): Seq[Row] = { | ||
| val catalog = sqlContext.sessionState.catalog | ||
| val db: CatalogDatabase = catalog.getDatabase(databaseName) | ||
| catalog.alterDatabase(db.copy(properties = db.properties ++ props)) | ||
|
|
||
| Seq.empty[Row] | ||
| } | ||
|
|
||
| override val output: Seq[Attribute] = Seq.empty | ||
| } | ||
|
|
||
| /** | ||
| * DESCRIBE DATABASE: shows the name of the database, its comment (if one has been set), and its | ||
| * A command for users to show the name of the database, its comment (if one has been set), and its | ||
| * root location on the filesystem. When extended is true, it also shows the database's properties | ||
| * If the database does not exist, an error message will be issued to indicate the database | ||
| * does not exist. | ||
| * The syntax of using this command in SQL is | ||
| * {{{ | ||
| * DESCRIBE DATABASE [EXTENDED] db_name | ||
| * }}} | ||
| */ | ||
| case class DescribeDatabase( | ||
| databaseName: String, | ||
| extended: Boolean)(sql: String) | ||
| extends NativeDDLCommand(sql) with Logging | ||
| extended: Boolean) | ||
| extends RunnableCommand { | ||
|
|
||
| override def run(sqlContext: SQLContext): Seq[Row] = { | ||
| val dbMetadata: CatalogDatabase = sqlContext.sessionState.catalog.getDatabase(databaseName) | ||
| val result = | ||
| Row("Database Name", dbMetadata.name) :: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this can probably just be |
||
| Row("Description", dbMetadata.description) :: | ||
| Row("Location", dbMetadata.locationUri) :: Nil | ||
|
|
||
| if (extended) { | ||
| val properties = | ||
| if (dbMetadata.properties.isEmpty) { | ||
| "" | ||
| } else { | ||
| dbMetadata.properties.toSeq.mkString("(", ", ", ")") | ||
| } | ||
| result :+ Row("Properties", properties) | ||
| } else { | ||
| result | ||
| } | ||
| } | ||
|
|
||
| override val output: Seq[Attribute] = { | ||
| AttributeReference("database_description_item", StringType, nullable = false)() :: | ||
| AttributeReference("database_description_value", StringType, nullable = false)() :: Nil | ||
| } | ||
| } | ||
|
|
||
| case class CreateFunction( | ||
| databaseName: Option[String], | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe use backtick?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Sure. We need to change all the places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's change it in a separate patch then. There are a few other places like this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will do it in a separate PR.