Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ statement
tableIdentifier ('(' colTypeList ')')? tableProvider
(OPTIONS tablePropertyList)? #createTempViewUsing
| ALTER VIEW tableIdentifier AS? query #alterViewQuery
| CREATE TEMPORARY? FUNCTION qualifiedName AS className=STRING
| CREATE (OR REPLACE)? TEMPORARY? FUNCTION (IF NOT EXISTS)?
qualifiedName AS className=STRING
(USING resource (',' resource)*)? #createFunction
| DROP TEMPORARY? FUNCTION (IF EXISTS)? qualifiedName #dropFunction
| EXPLAIN (LOGICAL | FORMATTED | EXTENDED | CODEGEN | COST)?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,15 @@ abstract class ExternalCatalog

protected def doDropFunction(db: String, funcName: String): Unit

final def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = {
val name = funcDefinition.identifier.funcName
postToAll(AlterFunctionPreEvent(db, name))
doAlterFunction(db, funcDefinition)
postToAll(AlterFunctionEvent(db, name))
}

protected def doAlterFunction(db: String, funcDefinition: CatalogFunction): Unit

final def renameFunction(db: String, oldName: String, newName: String): Unit = {
postToAll(RenameFunctionPreEvent(db, oldName, newName))
doRenameFunction(db, oldName, newName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,12 @@ class InMemoryCatalog(
catalog(db).functions.remove(funcName)
}

override protected def doAlterFunction(db: String, func: CatalogFunction): Unit = synchronized {
requireDbExists(db)
requireFunctionExists(db, func.identifier.funcName)
catalog(db).functions.put(func.identifier.funcName, func)
}

override protected def doRenameFunction(
db: String,
oldName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,29 @@ class SessionCatalog(
}
}

/**
* overwirte a metastore function in the database specified in `funcDefinition`..
* If no database is specified, assume the function is in the current database.
*/
def alterFunction(funcDefinition: CatalogFunction): Unit = {
val db = formatDatabaseName(funcDefinition.identifier.database.getOrElse(getCurrentDatabase))
requireDbExists(db)
val identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db))
val newFuncDefinition = funcDefinition.copy(identifier = identifier)
if (functionExists(identifier)) {
if (functionRegistry.functionExists(identifier)) {
// If we have loaded this function into the FunctionRegistry,
// also drop it from there.
// For a permanent function, because we loaded it to the FunctionRegistry
// when it's first used, we also need to drop it from the FunctionRegistry.
functionRegistry.dropFunction(identifier)
}
externalCatalog.alterFunction(db, newFuncDefinition)
} else {
throw new NoSuchFunctionException(db = db, func = identifier.toString)
}
}

/**
* Retrieve the metadata of a metastore function.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,16 @@ case class DropFunctionPreEvent(database: String, name: String) extends Function
*/
case class DropFunctionEvent(database: String, name: String) extends FunctionEvent

/**
* Event fired before a function is altered.
*/
case class AlterFunctionPreEvent(database: String, name: String) extends FunctionEvent

/**
* Event fired after a function has been altered.
*/
case class AlterFunctionEvent(database: String, name: String) extends FunctionEvent

/**
* Event fired before a function is renamed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,15 @@ class ExternalCatalogEventSuite extends SparkFunSuite {
}
checkEvents(RenameFunctionPreEvent("db5", "fn7", "fn4") :: Nil)

// ALTER
val alteredFunctionDefinition = CatalogFunction(
identifier = FunctionIdentifier("fn4", Some("db5")),
className = "org.apache.spark.AlterFunction",
resources = Seq.empty)
catalog.alterFunction("db5", alteredFunctionDefinition)
checkEvents(
AlterFunctionPreEvent("db5", "fn4") :: AlterFunctionEvent("db5", "fn4") :: Nil)

// DROP
intercept[AnalysisException] {
catalog.dropFunction("db5", "fn7")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
}
}

test("alter function") {
val catalog = newBasicCatalog()
assert(catalog.getFunction("db2", "func1").className == funcClass)
val myNewFunc = catalog.getFunction("db2", "func1").copy(className = newFuncClass)
catalog.alterFunction("db2", myNewFunc)
assert(catalog.getFunction("db2", "func1").className == newFuncClass)
}

test("list functions") {
val catalog = newBasicCatalog()
catalog.createFunction("db2", newFunc("func2"))
Expand Down Expand Up @@ -916,6 +924,7 @@ abstract class CatalogTestUtils {
lazy val partWithEmptyValue =
CatalogTablePartition(Map("a" -> "3", "b" -> ""), storageFormat)
lazy val funcClass = "org.apache.spark.myFunc"
lazy val newFuncClass = "org.apache.spark.myNewFunc"

/**
* Creates a basic catalog, with the following structure:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,8 +687,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
*
* For example:
* {{{
* CREATE [TEMPORARY] FUNCTION [db_name.]function_name AS class_name
* [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri']];
* CREATE [OR REPLACE] [TEMPORARY] FUNCTION [IF NOT EXISTS] [db_name.]function_name
* AS class_name [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri']];
* }}}
*/
override def visitCreateFunction(ctx: CreateFunctionContext): LogicalPlan = withOrigin(ctx) {
Expand All @@ -709,7 +709,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
functionIdentifier.funcName,
string(ctx.className),
resources,
ctx.TEMPORARY != null)
ctx.TEMPORARY != null,
ctx.EXISTS != null,
ctx.REPLACE != null)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
* The DDL command that creates a function.
* To create a temporary function, the syntax of using this command in SQL is:
* {{{
* CREATE TEMPORARY FUNCTION functionName
* CREATE [OR REPLACE] TEMPORARY FUNCTION functionName
* AS className [USING JAR\FILE 'uri' [, JAR|FILE 'uri']]
* }}}
*
* To create a permanent function, the syntax in SQL is:
* {{{
* CREATE FUNCTION [databaseName.]functionName
* CREATE [OR REPLACE] FUNCTION [IF NOT EXISTS] [databaseName.]functionName
* AS className [USING JAR\FILE 'uri' [, JAR|FILE 'uri']]
* }}}
*/
Expand All @@ -46,26 +46,46 @@ case class CreateFunctionCommand(
functionName: String,
className: String,
resources: Seq[FunctionResource],
isTemp: Boolean)
isTemp: Boolean,
ifNotExists: Boolean,
replace: Boolean)
extends RunnableCommand {

if (ifNotExists && replace) {
throw new AnalysisException("CREATE FUNCTION with both IF NOT EXISTS and REPLACE" +
" is not allowed.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a test case?

}

// Disallow to define a temporary function with `IF NOT EXISTS`
if (ifNotExists && isTemp) {
throw new AnalysisException(
"It is not allowed to define a TEMPORARY function with IF NOT EXISTS.")
}

// Temporary function names should not contain database prefix like "database.function"
if (databaseName.isDefined && isTemp) {
throw new AnalysisException(s"Specifying a database in CREATE TEMPORARY FUNCTION " +
s"is not allowed: '${databaseName.get}'")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a test case?

}

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val func = CatalogFunction(FunctionIdentifier(functionName, databaseName), className, resources)
if (isTemp) {
if (databaseName.isDefined) {
throw new AnalysisException(s"Specifying a database in CREATE TEMPORARY FUNCTION " +
s"is not allowed: '${databaseName.get}'")
}
// We first load resources and then put the builder in the function registry.
catalog.loadFunctionResources(resources)
catalog.registerFunction(func, overrideIfExists = false)
catalog.registerFunction(func, overrideIfExists = replace)
} else {
// For a permanent, we will store the metadata into underlying external catalog.
// This function will be loaded into the FunctionRegistry when a query uses it.
// We do not load it into FunctionRegistry right now.
// TODO: should we also parse "IF NOT EXISTS"?
catalog.createFunction(func, ignoreIfExists = false)
// Handles `CREATE OR REPLACE FUNCTION AS ... USING ...`
if (replace && catalog.functionExists(func.identifier)) {
// alter the function in the metastore
catalog.alterFunction(CatalogFunction(func.identifier, className, resources))
} else {
// For a permanent, we will store the metadata into underlying external catalog.
// This function will be loaded into the FunctionRegistry when a query uses it.
// We do not load it into FunctionRegistry right now.
catalog.createFunction(CatalogFunction(func.identifier, className, resources), ifNotExists)
}
}
Seq.empty[Row]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,26 +181,74 @@ class DDLCommandSuite extends PlanTest {
|'com.matthewrathbone.example.SimpleUDFExample' USING ARCHIVE '/path/to/archive',
|FILE '/path/to/file'
""".stripMargin
val sql3 =
"""
|CREATE OR REPLACE TEMPORARY FUNCTION helloworld3 as
|'com.matthewrathbone.example.SimpleUDFExample' USING JAR '/path/to/jar1',
|JAR '/path/to/jar2'
""".stripMargin
val sql4 =
"""
|CREATE OR REPLACE FUNCTION hello.world1 as
|'com.matthewrathbone.example.SimpleUDFExample' USING ARCHIVE '/path/to/archive',
|FILE '/path/to/file'
""".stripMargin
val sql5 =
"""
|CREATE FUNCTION IF NOT EXISTS hello.world2 as
|'com.matthewrathbone.example.SimpleUDFExample' USING ARCHIVE '/path/to/archive',
|FILE '/path/to/file'
""".stripMargin
val parsed1 = parser.parsePlan(sql1)
val parsed2 = parser.parsePlan(sql2)
val parsed3 = parser.parsePlan(sql3)
val parsed4 = parser.parsePlan(sql4)
val parsed5 = parser.parsePlan(sql5)
val expected1 = CreateFunctionCommand(
None,
"helloworld",
"com.matthewrathbone.example.SimpleUDFExample",
Seq(
FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar1"),
FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar2")),
isTemp = true)
isTemp = true, ifNotExists = false, replace = false)
val expected2 = CreateFunctionCommand(
Some("hello"),
"world",
"com.matthewrathbone.example.SimpleUDFExample",
Seq(
FunctionResource(FunctionResourceType.fromString("archive"), "/path/to/archive"),
FunctionResource(FunctionResourceType.fromString("file"), "/path/to/file")),
isTemp = false)
isTemp = false, ifNotExists = false, replace = false)
val expected3 = CreateFunctionCommand(
None,
"helloworld3",
"com.matthewrathbone.example.SimpleUDFExample",
Seq(
FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar1"),
FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar2")),
isTemp = true, ifNotExists = false, replace = true)
val expected4 = CreateFunctionCommand(
Some("hello"),
"world1",
"com.matthewrathbone.example.SimpleUDFExample",
Seq(
FunctionResource(FunctionResourceType.fromString("archive"), "/path/to/archive"),
FunctionResource(FunctionResourceType.fromString("file"), "/path/to/file")),
isTemp = false, ifNotExists = false, replace = true)
val expected5 = CreateFunctionCommand(
Some("hello"),
"world2",
"com.matthewrathbone.example.SimpleUDFExample",
Seq(
FunctionResource(FunctionResourceType.fromString("archive"), "/path/to/archive"),
FunctionResource(FunctionResourceType.fromString("file"), "/path/to/file")),
isTemp = false, ifNotExists = true, replace = false)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
comparePlans(parsed3, expected3)
comparePlans(parsed4, expected4)
comparePlans(parsed5, expected5)
}

test("drop function") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2270,6 +2270,57 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}

test("create temporary function with if not exists") {
withUserDefinedFunction("func1" -> true) {
val sql1 =
"""
|CREATE TEMPORARY FUNCTION IF NOT EXISTS func1 as
|'com.matthewrathbone.example.SimpleUDFExample' USING JAR '/path/to/jar1',
|JAR '/path/to/jar2'
""".stripMargin
val e = intercept[AnalysisException] {
sql(sql1)
}.getMessage
assert(e.contains("It is not allowed to define a TEMPORARY function with IF NOT EXISTS"))
}
}

test("create function with both if not exists and replace") {
withUserDefinedFunction("func1" -> false) {
val sql1 =
"""
|CREATE OR REPLACE FUNCTION IF NOT EXISTS func1 as
|'com.matthewrathbone.example.SimpleUDFExample' USING JAR '/path/to/jar1',
|JAR '/path/to/jar2'
""".stripMargin
val e = intercept[AnalysisException] {
sql(sql1)
}.getMessage
assert(e.contains("CREATE FUNCTION with both IF NOT EXISTS and REPLACE is not allowed"))
}
}

test("create temporary function by specifying a database") {
val dbName = "mydb"
withDatabase(dbName) {
sql(s"CREATE DATABASE $dbName")
sql(s"USE $dbName")
withUserDefinedFunction("func1" -> true) {
val sql1 =
s"""
|CREATE TEMPORARY FUNCTION $dbName.func1 as
|'com.matthewrathbone.example.SimpleUDFExample' USING JAR '/path/to/jar1',
|JAR '/path/to/jar2'
""".stripMargin
val e = intercept[AnalysisException] {
sql(sql1)
}.getMessage
assert(e.contains(s"Specifying a database in CREATE TEMPORARY FUNCTION " +
s"is not allowed: '$dbName'"))
}
}
}

Seq(true, false).foreach { caseSensitive =>
test(s"alter table add columns with existing column name - caseSensitive $caseSensitive") {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> s"$caseSensitive") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1132,6 +1132,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
client.dropFunction(db, name)
}

override protected def doAlterFunction(
db: String, funcDefinition: CatalogFunction): Unit = withClient {
requireDbExists(db)
val functionName = funcDefinition.identifier.funcName.toLowerCase(Locale.ROOT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should normalize the function name in SessionCatalog like table name, we can do this in follow-up

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

requireFunctionExists(db, functionName)
val functionIdentifier = funcDefinition.identifier.copy(funcName = functionName)
client.alterFunction(db, funcDefinition.copy(identifier = functionIdentifier))
}

override protected def doRenameFunction(
db: String,
oldName: String,
Expand Down