Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.catalog

import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException}


/**
Expand All @@ -38,6 +38,18 @@ abstract class ExternalCatalog {
}
}

protected def requireFunctionExists(db: String, funcName: String): Unit = {
if (!functionExists(db, funcName)) {
throw new NoSuchFunctionException(db = db, func = funcName)
}
}

protected def requireFunctionNotExists(db: String, funcName: String): Unit = {
if (functionExists(db, funcName)) {
throw new FunctionAlreadyExistsException(db = db, func = funcName)
}
}

// --------------------------------------------------------------------------
// Databases
// --------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,6 @@ class InMemoryCatalog(
catalog(db).tables(table).partitions.contains(spec)
}

private def requireFunctionExists(db: String, funcName: String): Unit = {
if (!functionExists(db, funcName)) {
throw new NoSuchFunctionException(db = db, func = funcName)
}
}

private def requireFunctionNotExists(db: String, funcName: String): Unit = {
if (functionExists(db, funcName)) {
throw new FunctionAlreadyExistsException(db = db, func = funcName)
}
}

private def requireTableExists(db: String, table: String): Unit = {
if (!tableExists(db, table)) {
throw new NoSuchTableException(db = db, table = table)
Expand Down Expand Up @@ -474,11 +462,8 @@ class InMemoryCatalog(

override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
requireDbExists(db)
if (functionExists(db, func.identifier.funcName)) {
throw new FunctionAlreadyExistsException(db = db, func = func.identifier.funcName)
} else {
catalog(db).functions.put(func.identifier.funcName, func)
}
requireFunctionNotExists(db, func.identifier.funcName)
catalog(db).functions.put(func.identifier.funcName, func)
}

override def dropFunction(db: String, funcName: String): Unit = synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -450,14 +451,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac

test("create function when database does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
intercept[NoSuchDatabaseException] {
catalog.createFunction("does_not_exist", newFunc())
}
}

test("create function that already exists") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
intercept[FunctionAlreadyExistsException] {
catalog.createFunction("db2", newFunc("func1"))
}
}
Expand All @@ -471,14 +472,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac

test("drop function when database does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
intercept[NoSuchDatabaseException] {
catalog.dropFunction("does_not_exist", "something")
}
}

test("drop function that does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
intercept[NoSuchFunctionException] {
catalog.dropFunction("db2", "does_not_exist")
}
}
Expand All @@ -488,14 +489,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
assert(catalog.getFunction("db2", "func1") ==
CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass,
Seq.empty[FunctionResource]))
intercept[AnalysisException] {
intercept[NoSuchFunctionException] {
catalog.getFunction("db2", "does_not_exist")
}
}

test("get function when database does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
intercept[NoSuchDatabaseException] {
catalog.getFunction("does_not_exist", "func1")
}
}
Expand All @@ -505,23 +506,23 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val newName = "funcky"
assert(catalog.getFunction("db2", "func1").className == funcClass)
catalog.renameFunction("db2", "func1", newName)
intercept[AnalysisException] { catalog.getFunction("db2", "func1") }
intercept[NoSuchFunctionException] { catalog.getFunction("db2", "func1") }
assert(catalog.getFunction("db2", newName).identifier.funcName == newName)
assert(catalog.getFunction("db2", newName).className == funcClass)
intercept[AnalysisException] { catalog.renameFunction("db2", "does_not_exist", "me") }
intercept[NoSuchFunctionException] { catalog.renameFunction("db2", "does_not_exist", "me") }
}

test("rename function when database does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
intercept[NoSuchDatabaseException] {
catalog.renameFunction("does_not_exist", "func1", "func5")
}
}

test("rename function when new function already exists") {
val catalog = newBasicCatalog()
catalog.createFunction("db2", newFunc("func2", Some("db2")))
intercept[AnalysisException] {
intercept[FunctionAlreadyExistsException] {
catalog.renameFunction("db2", "func1", "func2")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,31 +570,39 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
override def createFunction(
db: String,
funcDefinition: CatalogFunction): Unit = withClient {
requireDbExists(db)
// Hive's metastore is case insensitive. However, Hive's createFunction does
// not normalize the function name (unlike the getFunction part). So,
// we are normalizing the function name.
val functionName = funcDefinition.identifier.funcName.toLowerCase
requireFunctionNotExists(db, functionName)
val functionIdentifier = funcDefinition.identifier.copy(funcName = functionName)
client.createFunction(db, funcDefinition.copy(identifier = functionIdentifier))
}

override def dropFunction(db: String, name: String): Unit = withClient {
requireFunctionExists(db, name)
client.dropFunction(db, name)
}

override def renameFunction(db: String, oldName: String, newName: String): Unit = withClient {
requireFunctionExists(db, oldName)
requireFunctionNotExists(db, newName)
client.renameFunction(db, oldName, newName)
}

override def getFunction(db: String, funcName: String): CatalogFunction = withClient {
requireFunctionExists(db, funcName)
client.getFunction(db, funcName)
}

override def functionExists(db: String, funcName: String): Boolean = withClient {
requireDbExists(db)
client.functionExists(db, funcName)
}

override def listFunctions(db: String, pattern: String): Seq[String] = withClient {
requireDbExists(db)
client.listFunctions(db, pattern)
}

Expand Down