Skip to content

Commit dfce966

Browse files
gatorsmileyhuai
authored andcommitted
[SPARK-14362][SPARK-14406][SQL] DDL Native Support: Drop View and Drop Table
#### What changes were proposed in this pull request? This PR is to provide a native support for DDL `DROP VIEW` and `DROP TABLE`. The PR includes native parsing and native analysis. Based on the HIVE DDL document for [DROP_VIEW_WEB_LINK](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL- DropView ), `DROP VIEW` is defined as, **Syntax:** ```SQL DROP VIEW [IF EXISTS] [db_name.]view_name; ``` - to remove metadata for the specified view. - illegal to use DROP TABLE on a view. - illegal to use DROP VIEW on a table. - this command only works in `HiveContext`. In `SQLContext`, we will get an exception. This PR also handles `DROP TABLE`. **Syntax:** ```SQL DROP TABLE [IF EXISTS] table_name [PURGE]; ``` - Previously, the `DROP TABLE` command only can drop Hive tables in `HiveContext`. Now, after this PR, this command also can drop temporary table, external table, external data source table in `SQLContext`. - In `HiveContext`, we will not issue an exception if the to-be-dropped table does not exist and users did not specify `IF EXISTS`. Instead, we just log an error message. If `IF EXISTS` is specified, we will not issue any error message/exception. - In `SQLContext`, we will issue an exception if the to-be-dropped table does not exist, unless `IF EXISTS` is specified. - Data will not be deleted if the tables are `external`, unless table type is `managed_table`. #### How was this patch tested? For verifying command parsing, added test cases in `spark/sql/hive/HiveDDLCommandSuite.scala` For verifying command analysis, added test cases in `spark/sql/hive/execution/HiveDDLSuite.scala` Author: gatorsmile <[email protected]> Author: xiaoli <[email protected]> Author: Xiao Li <[email protected]> Closes #12146 from gatorsmile/dropView.
1 parent 9be5558 commit dfce966

File tree

16 files changed

+376
-63
lines changed

16 files changed

+376
-63
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ statement
104104
REPLACE COLUMNS '(' colTypeList ')' (CASCADE | RESTRICT)? #replaceColumns
105105
| DROP TABLE (IF EXISTS)? tableIdentifier PURGE?
106106
(FOR METADATA? REPLICATION '(' STRING ')')? #dropTable
107+
| DROP VIEW (IF EXISTS)? tableIdentifier #dropTable
107108
| CREATE (OR REPLACE)? VIEW (IF NOT EXISTS)? tableIdentifier
108109
identifierCommentList? (COMMENT STRING)?
109110
(PARTITIONED ON identifierList)?
@@ -141,7 +142,6 @@ hiveNativeCommands
141142
| DELETE FROM tableIdentifier (WHERE booleanExpression)?
142143
| TRUNCATE TABLE tableIdentifier partitionSpec?
143144
(COLUMNS identifierList)?
144-
| DROP VIEW (IF EXISTS)? qualifiedName
145145
| SHOW COLUMNS (FROM | IN) tableIdentifier ((FROM|IN) identifier)?
146146
| START TRANSACTION (transactionMode (',' transactionMode)*)?
147147
| COMMIT WORK?

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,10 @@ class InMemoryCatalog extends ExternalCatalog {
187187
catalog(db).tables(table).table
188188
}
189189

190+
override def getTableOption(db: String, table: String): Option[CatalogTable] = synchronized {
191+
if (!tableExists(db, table)) None else Option(catalog(db).tables(table).table)
192+
}
193+
190194
override def tableExists(db: String, table: String): Boolean = synchronized {
191195
requireDbExists(db)
192196
catalog(db).tables.contains(table)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.io.File
2121

2222
import scala.collection.mutable
2323

24+
import org.apache.spark.internal.Logging
2425
import org.apache.spark.sql.AnalysisException
2526
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
2627
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
@@ -41,7 +42,7 @@ class SessionCatalog(
4142
externalCatalog: ExternalCatalog,
4243
functionResourceLoader: FunctionResourceLoader,
4344
functionRegistry: FunctionRegistry,
44-
conf: CatalystConf) {
45+
conf: CatalystConf) extends Logging {
4546
import ExternalCatalog._
4647

4748
def this(
@@ -175,6 +176,17 @@ class SessionCatalog(
175176
externalCatalog.getTable(db, table)
176177
}
177178

179+
/**
180+
* Retrieve the metadata of an existing metastore table.
181+
* If no database is specified, assume the table is in the current database.
182+
* If the specified table is not found in the database then return None if it doesn't exist.
183+
*/
184+
def getTableMetadataOption(name: TableIdentifier): Option[CatalogTable] = {
185+
val db = name.database.getOrElse(currentDb)
186+
val table = formatTableName(name.table)
187+
externalCatalog.getTableOption(db, table)
188+
}
189+
178190
// -------------------------------------------------------------
179191
// | Methods that interact with temporary and metastore tables |
180192
// -------------------------------------------------------------
@@ -229,7 +241,13 @@ class SessionCatalog(
229241
val db = name.database.getOrElse(currentDb)
230242
val table = formatTableName(name.table)
231243
if (name.database.isDefined || !tempTables.contains(table)) {
232-
externalCatalog.dropTable(db, table, ignoreIfNotExists)
244+
// When ignoreIfNotExists is false, no exception is issued when the table does not exist.
245+
// Instead, log it as an error message. This is consistent with Hive.
246+
if (externalCatalog.tableExists(db, table)) {
247+
externalCatalog.dropTable(db, table, ignoreIfNotExists = true)
248+
} else if (!ignoreIfNotExists) {
249+
logError(s"Table '${name.quotedString}' does not exist")
250+
}
233251
} else {
234252
tempTables.remove(table)
235253
}
@@ -283,9 +301,14 @@ class SessionCatalog(
283301
* explicitly specified.
284302
*/
285303
def isTemporaryTable(name: TableIdentifier): Boolean = {
286-
!name.database.isDefined && tempTables.contains(formatTableName(name.table))
304+
name.database.isEmpty && tempTables.contains(formatTableName(name.table))
287305
}
288306

307+
/**
308+
* Return whether View is supported
309+
*/
310+
def isViewSupported: Boolean = false
311+
289312
/**
290313
* List all tables in the specified database, including temporary tables.
291314
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ abstract class ExternalCatalog {
9191

9292
def getTable(db: String, table: String): CatalogTable
9393

94+
def getTableOption(db: String, table: String): Option[CatalogTable]
95+
9496
def tableExists(db: String, table: String): Boolean
9597

9698
def listTables(db: String): Seq[String]

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -233,10 +233,9 @@ class SessionCatalogSuite extends SparkFunSuite {
233233
intercept[AnalysisException] {
234234
catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = true)
235235
}
236-
// Table does not exist
237-
intercept[AnalysisException] {
238-
catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = false)
239-
}
236+
// If the table does not exist, we do not issue an exception. Instead, we output an error log
237+
// message to console when ignoreIfNotExists is set to false.
238+
catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = false)
240239
catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = true)
241240
}
242241

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,22 @@ class SparkSqlAstBuilder extends AstBuilder {
363363
}
364364
}
365365

366+
/**
367+
* Create a [[DropTable]] command.
368+
*/
369+
override def visitDropTable(ctx: DropTableContext): LogicalPlan = withOrigin(ctx) {
370+
if (ctx.PURGE != null) {
371+
throw new ParseException("Unsupported operation: PURGE option", ctx)
372+
}
373+
if (ctx.REPLICATION != null) {
374+
throw new ParseException("Unsupported operation: REPLICATION clause", ctx)
375+
}
376+
DropTable(
377+
visitTableIdentifier(ctx.tableIdentifier),
378+
ctx.EXISTS != null,
379+
ctx.VIEW != null)
380+
}
381+
366382
/**
367383
* Create a [[AlterTableRename]] command.
368384
*

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
2020
import org.apache.spark.internal.Logging
2121
import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
2222
import org.apache.spark.sql.catalyst.TableIdentifier
23-
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
23+
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType}
2424
import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
2525
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
2626
import org.apache.spark.sql.types._
@@ -175,13 +175,61 @@ case class DescribeDatabase(
175175
}
176176
}
177177

178+
/**
179+
* Drops a table/view from the metastore and removes it if it is cached.
180+
*
181+
* The syntax of this command is:
182+
* {{{
183+
* DROP TABLE [IF EXISTS] table_name;
184+
* DROP VIEW [IF EXISTS] [db_name.]view_name;
185+
* }}}
186+
*/
187+
case class DropTable(
188+
tableName: TableIdentifier,
189+
ifExists: Boolean,
190+
isView: Boolean) extends RunnableCommand {
191+
192+
override def run(sqlContext: SQLContext): Seq[Row] = {
193+
val catalog = sqlContext.sessionState.catalog
194+
if (isView && !catalog.isViewSupported) {
195+
throw new AnalysisException(s"Not supported object: views")
196+
}
197+
// If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view
198+
// issue an exception.
199+
catalog.getTableMetadataOption(tableName).map(_.tableType match {
200+
case CatalogTableType.VIRTUAL_VIEW if !isView =>
201+
throw new AnalysisException(
202+
"Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
203+
case o if o != CatalogTableType.VIRTUAL_VIEW && isView =>
204+
throw new AnalysisException(
205+
s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")
206+
case _ =>
207+
})
208+
209+
try {
210+
sqlContext.cacheManager.tryUncacheQuery(sqlContext.table(tableName.quotedString))
211+
} catch {
212+
// This table's metadata is not in Hive metastore (e.g. the table does not exist).
213+
case e if e.getClass.getName == "org.apache.hadoop.hive.ql.metadata.InvalidTableException" =>
214+
case _: org.apache.spark.sql.catalyst.analysis.NoSuchTableException =>
215+
// Other Throwables can be caused by users providing wrong parameters in OPTIONS
216+
// (e.g. invalid paths). We catch it and log a warning message.
217+
// Users should be able to drop such kinds of tables regardless if there is an error.
218+
case e: Throwable => log.warn(s"${e.getMessage}", e)
219+
}
220+
catalog.invalidateTable(tableName)
221+
catalog.dropTable(tableName, ifExists)
222+
Seq.empty[Row]
223+
}
224+
}
225+
178226
/**
179227
* A command that renames a table/view.
180228
*
181229
* The syntax of this command is:
182230
* {{{
183-
* ALTER TABLE table1 RENAME TO table2;
184-
* ALTER VIEW view1 RENAME TO view2;
231+
* ALTER TABLE table1 RENAME TO table2;
232+
* ALTER VIEW view1 RENAME TO view2;
185233
* }}}
186234
*/
187235
case class AlterTableRename(

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,8 @@ package org.apache.spark.sql.execution.command
2020
import org.apache.spark.sql.catalyst.TableIdentifier
2121
import org.apache.spark.sql.catalyst.parser.ParseException
2222
import org.apache.spark.sql.catalyst.plans.PlanTest
23-
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
23+
import org.apache.spark.sql.catalyst.plans.logical.Project
2424
import org.apache.spark.sql.execution.SparkSqlParser
25-
import org.apache.spark.sql.execution.datasources.BucketSpec
2625
import org.apache.spark.sql.types._
2726

2827
class DDLCommandSuite extends PlanTest {
@@ -667,7 +666,10 @@ class DDLCommandSuite extends PlanTest {
667666

668667
test("unsupported operations") {
669668
intercept[ParseException] {
670-
parser.parsePlan("DROP TABLE D1.T1")
669+
parser.parsePlan("DROP TABLE tab PURGE")
670+
}
671+
intercept[ParseException] {
672+
parser.parsePlan("DROP TABLE tab FOR REPLICATION('eventid')")
671673
}
672674
intercept[ParseException] {
673675
parser.parsePlan("CREATE VIEW testView AS SELECT id FROM tab")
@@ -700,4 +702,52 @@ class DDLCommandSuite extends PlanTest {
700702
val parsed = parser.parsePlan(sql)
701703
assert(parsed.isInstanceOf[Project])
702704
}
705+
706+
test("drop table") {
707+
val tableName1 = "db.tab"
708+
val tableName2 = "tab"
709+
710+
val parsed1 = parser.parsePlan(s"DROP TABLE $tableName1")
711+
val parsed2 = parser.parsePlan(s"DROP TABLE IF EXISTS $tableName1")
712+
val parsed3 = parser.parsePlan(s"DROP TABLE $tableName2")
713+
val parsed4 = parser.parsePlan(s"DROP TABLE IF EXISTS $tableName2")
714+
715+
val expected1 =
716+
DropTable(TableIdentifier("tab", Option("db")), ifExists = false, isView = false)
717+
val expected2 =
718+
DropTable(TableIdentifier("tab", Option("db")), ifExists = true, isView = false)
719+
val expected3 =
720+
DropTable(TableIdentifier("tab", None), ifExists = false, isView = false)
721+
val expected4 =
722+
DropTable(TableIdentifier("tab", None), ifExists = true, isView = false)
723+
724+
comparePlans(parsed1, expected1)
725+
comparePlans(parsed2, expected2)
726+
comparePlans(parsed3, expected3)
727+
comparePlans(parsed4, expected4)
728+
}
729+
730+
test("drop view") {
731+
val viewName1 = "db.view"
732+
val viewName2 = "view"
733+
734+
val parsed1 = parser.parsePlan(s"DROP VIEW $viewName1")
735+
val parsed2 = parser.parsePlan(s"DROP VIEW IF EXISTS $viewName1")
736+
val parsed3 = parser.parsePlan(s"DROP VIEW $viewName2")
737+
val parsed4 = parser.parsePlan(s"DROP VIEW IF EXISTS $viewName2")
738+
739+
val expected1 =
740+
DropTable(TableIdentifier("view", Option("db")), ifExists = false, isView = true)
741+
val expected2 =
742+
DropTable(TableIdentifier("view", Option("db")), ifExists = true, isView = true)
743+
val expected3 =
744+
DropTable(TableIdentifier("view", None), ifExists = false, isView = true)
745+
val expected4 =
746+
DropTable(TableIdentifier("view", None), ifExists = true, isView = true)
747+
748+
comparePlans(parsed1, expected1)
749+
comparePlans(parsed2, expected2)
750+
comparePlans(parsed3, expected3)
751+
comparePlans(parsed4, expected4)
752+
}
703753
}

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,54 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
391391
Nil)
392392
}
393393

394+
test("drop table - temporary table") {
395+
val catalog = sqlContext.sessionState.catalog
396+
sql(
397+
"""
398+
|CREATE TEMPORARY TABLE tab1
399+
|USING org.apache.spark.sql.sources.DDLScanSource
400+
|OPTIONS (
401+
| From '1',
402+
| To '10',
403+
| Table 'test1'
404+
|)
405+
""".stripMargin)
406+
assert(catalog.listTables("default") == Seq(TableIdentifier("tab1")))
407+
sql("DROP TABLE tab1")
408+
assert(catalog.listTables("default") == Nil)
409+
}
410+
411+
test("drop table") {
412+
testDropTable(isDatasourceTable = false)
413+
}
414+
415+
test("drop table - data source table") {
416+
testDropTable(isDatasourceTable = true)
417+
}
418+
419+
private def testDropTable(isDatasourceTable: Boolean): Unit = {
420+
val catalog = sqlContext.sessionState.catalog
421+
val tableIdent = TableIdentifier("tab1", Some("dbx"))
422+
createDatabase(catalog, "dbx")
423+
createTable(catalog, tableIdent)
424+
if (isDatasourceTable) {
425+
convertToDatasourceTable(catalog, tableIdent)
426+
}
427+
assert(catalog.listTables("dbx") == Seq(tableIdent))
428+
sql("DROP TABLE dbx.tab1")
429+
assert(catalog.listTables("dbx") == Nil)
430+
sql("DROP TABLE IF EXISTS dbx.tab1")
431+
// no exception will be thrown
432+
sql("DROP TABLE dbx.tab1")
433+
}
434+
435+
test("drop view") {
436+
val e = intercept[AnalysisException] {
437+
sql("DROP VIEW dbx.tab1")
438+
}
439+
assert(e.getMessage.contains("Not supported object: views"))
440+
}
441+
394442
private def convertToDatasourceTable(
395443
catalog: SessionCatalog,
396444
tableIdent: TableIdentifier): Unit = {

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
172172
"SELECT COUNT(*) FROM hive_test;"
173173
-> "5",
174174
"DROP TABLE hive_test;"
175-
-> "OK"
175+
-> ""
176176
)
177177
}
178178

@@ -220,9 +220,9 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
220220
"SELECT count(key) FROM t1;"
221221
-> "5",
222222
"DROP TABLE t1;"
223-
-> "OK",
223+
-> "",
224224
"DROP TABLE sourceTable;"
225-
-> "OK"
225+
-> ""
226226
)
227227
}
228228

0 commit comments

Comments
 (0)