From 040cb73522c9a69b1ad4e265f1f56b470292a128 Mon Sep 17 00:00:00 2001 From: ouyangxiaochen Date: Wed, 14 Jun 2017 10:21:01 +0800 Subject: [PATCH 1/3] support to create [temporary] function with the keyword 'OR REPLACE' and 'IF NOT EXISTS' --- .../spark/sql/catalyst/parser/SqlBase.g4 | 3 +- .../catalyst/catalog/ExternalCatalog.scala | 9 ++++ .../catalyst/catalog/InMemoryCatalog.scala | 8 +++ .../sql/catalyst/catalog/SessionCatalog.scala | 25 ++++++++- .../spark/sql/catalyst/catalog/events.scala | 5 ++ .../catalog/ExternalCatalogSuite.scala | 1 + .../catalog/SessionCatalogSuite.scala | 29 +++++++--- .../spark/sql/execution/SparkSqlParser.scala | 8 +-- .../sql/execution/command/functions.scala | 45 +++++++++++----- .../execution/command/DDLCommandSuite.scala | 53 ++++++++++++++++++- .../spark/sql/internal/CatalogSuite.scala | 2 +- .../spark/sql/hive/HiveExternalCatalog.scala | 9 ++++ .../spark/sql/hive/HiveSparkSubmitSuite.scala | 2 +- 13 files changed, 169 insertions(+), 30 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 29f554451ed4..ef9f88a9026c 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -126,7 +126,8 @@ statement tableIdentifier ('(' colTypeList ')')? tableProvider (OPTIONS tablePropertyList)? #createTempViewUsing | ALTER VIEW tableIdentifier AS? query #alterViewQuery - | CREATE TEMPORARY? FUNCTION qualifiedName AS className=STRING + | CREATE (OR REPLACE)? TEMPORARY? FUNCTION (IF NOT EXISTS)? + qualifiedName AS className=STRING (USING resource (',' resource)*)? #createFunction | DROP TEMPORARY? FUNCTION (IF EXISTS)? qualifiedName #dropFunction | EXPLAIN (LOGICAL | FORMATTED | EXTENDED | CODEGEN | COST)? diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 0254b6bb6d13..2d2c9ff9fe26 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -332,6 +332,15 @@ abstract class ExternalCatalog protected def doDropFunction(db: String, funcName: String): Unit + final def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = { + val name = funcDefinition.identifier.funcName + postToAll(AlterFunctionPreEvent(db, name)) + doAlterFunction(db, funcDefinition) + postToAll(DropFunctionEvent(db, name)) + } + + protected def doAlterFunction(db: String, funcDefinition: CatalogFunction): Unit + final def renameFunction(db: String, oldName: String, newName: String): Unit = { postToAll(RenameFunctionPreEvent(db, oldName, newName)) doRenameFunction(db, oldName, newName) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 747190faa3c8..6d2e39a730e1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -590,6 +590,14 @@ class InMemoryCatalog( catalog(db).functions.remove(funcName) } + override protected def doAlterFunction(db: String, func: CatalogFunction): Unit = synchronized { + requireDbExists(db) + requireFunctionExists(db, func.identifier.funcName) + catalog(db).functions.remove(func.identifier.funcName) + requireFunctionNotExists(db, func.identifier.funcName) + catalog(db).functions.put(func.identifier.funcName, func) + } + override protected def doRenameFunction( db: String, oldName: String, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index a86604e4353a..e0cac913df76 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -1021,14 +1021,14 @@ class SessionCatalog( * Create a metastore function in the database specified in `funcDefinition`. * If no such database is specified, create it in the current database. */ - def createFunction(funcDefinition: CatalogFunction, ignoreIfExists: Boolean): Unit = { + def createFunction(funcDefinition: CatalogFunction, ifNotExists: Boolean): Unit = { val db = formatDatabaseName(funcDefinition.identifier.database.getOrElse(getCurrentDatabase)) requireDbExists(db) val identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db)) val newFuncDefinition = funcDefinition.copy(identifier = identifier) if (!functionExists(identifier)) { externalCatalog.createFunction(db, newFuncDefinition) - } else if (!ignoreIfExists) { + } else if (!ifNotExists) { throw new FunctionAlreadyExistsException(db = db, func = identifier.toString) } } @@ -1055,6 +1055,27 @@ class SessionCatalog( } } + /** + * overwirte a metastore function in the database specified in `funcDefinition`.. + * If no database is specified, assume the function is in the current database. + */ + def alterFunction(funcDefinition: CatalogFunction): Unit = { + val db = formatDatabaseName(funcDefinition.identifier.database.getOrElse(getCurrentDatabase)) + requireDbExists(db) + val identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db)) + val newFuncDefinition = funcDefinition.copy(identifier = identifier) + if (functionExists(identifier)) { + if (functionRegistry.functionExists(identifier)) { + // If we have loaded this function into the FunctionRegistry, + // also drop it from there. + // For a permanent function, because we loaded it to the FunctionRegistry + // when it's first used, we also need to drop it from the FunctionRegistry. + functionRegistry.dropFunction(identifier) + } + externalCatalog.alterFunction(db, newFuncDefinition) + } + } + /** * Retrieve the metadata of a metastore function. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala index 459973a13bb1..0c9a23e1c50e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala @@ -134,6 +134,11 @@ case class CreateFunctionEvent(database: String, name: String) extends FunctionE */ case class DropFunctionPreEvent(database: String, name: String) extends FunctionEvent +/** + * Event fired before a function is dropped. + */ +case class AlterFunctionPreEvent(database: String, name: String) extends FunctionEvent + /** * Event fired after a function has been dropped. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index c22d55fc96a6..9d0598bb8ea4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -916,6 +916,7 @@ abstract class CatalogTestUtils { lazy val partWithEmptyValue = CatalogTablePartition(Map("a" -> "3", "b" -> ""), storageFormat) lazy val funcClass = "org.apache.spark.myFunc" + lazy val newFuncClass = "org.apache.spark.myNewFunc" /** * Creates a basic catalog, with the following structure: diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 8f856a0daad1..e8503ce1121c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -1143,11 +1143,11 @@ abstract class SessionCatalogSuite extends AnalysisTest { test("basic create and list functions") { withEmptyCatalog { catalog => catalog.createDatabase(newDb("mydb"), ignoreIfExists = false) - catalog.createFunction(newFunc("myfunc", Some("mydb")), ignoreIfExists = false) + catalog.createFunction(newFunc("myfunc", Some("mydb")), ifNotExists = false) assert(catalog.externalCatalog.listFunctions("mydb", "*").toSet == Set("myfunc")) // Create function without explicitly specifying database catalog.setCurrentDatabase("mydb") - catalog.createFunction(newFunc("myfunc2"), ignoreIfExists = false) + catalog.createFunction(newFunc("myfunc2"), ifNotExists = false) assert(catalog.externalCatalog.listFunctions("mydb", "*").toSet == Set("myfunc", "myfunc2")) } } @@ -1156,7 +1156,7 @@ abstract class SessionCatalogSuite extends AnalysisTest { withBasicCatalog { catalog => intercept[NoSuchDatabaseException] { catalog.createFunction( - newFunc("func5", Some("does_not_exist")), ignoreIfExists = false) + newFunc("func5", Some("does_not_exist")), ifNotExists = false) } } } @@ -1164,9 +1164,9 @@ abstract class SessionCatalogSuite extends AnalysisTest { test("create function that already exists") { withBasicCatalog { catalog => intercept[FunctionAlreadyExistsException] { - catalog.createFunction(newFunc("func1", Some("db2")), ignoreIfExists = false) + catalog.createFunction(newFunc("func1", Some("db2")), ifNotExists = false) } - catalog.createFunction(newFunc("func1", Some("db2")), ignoreIfExists = true) + catalog.createFunction(newFunc("func1", Some("db2")), ifNotExists = true) } } @@ -1227,6 +1227,17 @@ abstract class SessionCatalogSuite extends AnalysisTest { } } + test("alter function") { + val externalCatalog = newBasicCatalog() + val sessionCatalog = new SessionCatalog(externalCatalog) + assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1")) + val myNewFunc = CatalogFunction(FunctionIdentifier("func1", Some("db2")), + newFuncClass, Seq.empty[FunctionResource]) + sessionCatalog.alterFunction(myNewFunc) + val newFunc = sessionCatalog.getFunctionMetadata(FunctionIdentifier("func1", Some("db2"))) + assert(newFunc.className == newFuncClass) + } + test("drop function") { withBasicCatalog { catalog => assert(catalog.externalCatalog.listFunctions("db2", "*").toSet == Set("func1")) @@ -1235,7 +1246,7 @@ abstract class SessionCatalogSuite extends AnalysisTest { assert(catalog.externalCatalog.listFunctions("db2", "*").isEmpty) // Drop function without explicitly specifying database catalog.setCurrentDatabase("db2") - catalog.createFunction(newFunc("func2", Some("db2")), ignoreIfExists = false) + catalog.createFunction(newFunc("func2", Some("db2")), ifNotExists = false) assert(catalog.externalCatalog.listFunctions("db2", "*").toSet == Set("func2")) catalog.dropFunction(FunctionIdentifier("func2"), ignoreIfNotExists = false) assert(catalog.externalCatalog.listFunctions("db2", "*").isEmpty) @@ -1316,8 +1327,10 @@ abstract class SessionCatalogSuite extends AnalysisTest { val funcMeta2 = newFunc("yes_me", None) val tempFunc1 = (e: Seq[Expression]) => e.head val tempFunc2 = (e: Seq[Expression]) => e.last - catalog.createFunction(newFunc("func2", Some("db2")), ignoreIfExists = false) - catalog.createFunction(newFunc("not_me", Some("db2")), ignoreIfExists = false) + catalog.registerFunction( + funcMeta1, overrideIfExists = false, functionBuilder = Some(tempFunc1)) + catalog.registerFunction( + funcMeta2, overrideIfExists = false, functionBuilder = Some(tempFunc2)) catalog.registerFunction( funcMeta1, overrideIfExists = false, functionBuilder = Some(tempFunc1)) catalog.registerFunction( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 2b79eb5eac0f..2f8e416e7df1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -687,8 +687,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { * * For example: * {{{ - * CREATE [TEMPORARY] FUNCTION [db_name.]function_name AS class_name - * [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri']]; + * CREATE [OR REPLACE] [TEMPORARY] FUNCTION [IF NOT EXISTS] [db_name.]function_name + * AS class_name [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri']]; * }}} */ override def visitCreateFunction(ctx: CreateFunctionContext): LogicalPlan = withOrigin(ctx) { @@ -709,7 +709,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { functionIdentifier.funcName, string(ctx.className), resources, - ctx.TEMPORARY != null) + ctx.TEMPORARY != null, + ctx.EXISTS != null, + ctx.REPLACE != null) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index a91ad413f4d1..60c2a545cbdb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -31,13 +31,13 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType} * The DDL command that creates a function. * To create a temporary function, the syntax of using this command in SQL is: * {{{ - * CREATE TEMPORARY FUNCTION functionName + * CREATE [OR REPLACE] TEMPORARY FUNCTION functionName * AS className [USING JAR\FILE 'uri' [, JAR|FILE 'uri']] * }}} * * To create a permanent function, the syntax in SQL is: * {{{ - * CREATE FUNCTION [databaseName.]functionName + * CREATE [OR REPLACE] FUNCTION [IF NOT EXISTS] [databaseName.]functionName * AS className [USING JAR\FILE 'uri' [, JAR|FILE 'uri']] * }}} */ @@ -46,26 +46,47 @@ case class CreateFunctionCommand( functionName: String, className: String, resources: Seq[FunctionResource], - isTemp: Boolean) + isTemp: Boolean, + ifNotExists: Boolean, + replace: Boolean) extends RunnableCommand { + if (ifNotExists && replace) { + throw new AnalysisException("CREATE FUNCTION with both IF NOT EXISTS and REPLACE" + + " is not allowed.") + } + + // Disallows 'CREATE TEMPORARY FUNCTION IF NOT EXISTS' to be consistent + // with 'CREATE TEMPORARY FUNCTION' + if (ifNotExists && isTemp) { + throw new AnalysisException( + "It is not allowed to define a TEMPORARY function with IF NOT EXISTS.") + } + + // Temporary function names should not contain database prefix like "database.function" + if (databaseName.isDefined && isTemp) { + throw new AnalysisException(s"Specifying a database in CREATE TEMPORARY FUNCTION " + + s"is not allowed: '${databaseName.get}'") + } + override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val func = CatalogFunction(FunctionIdentifier(functionName, databaseName), className, resources) if (isTemp) { - if (databaseName.isDefined) { - throw new AnalysisException(s"Specifying a database in CREATE TEMPORARY FUNCTION " + - s"is not allowed: '${databaseName.get}'") - } // We first load resources and then put the builder in the function registry. catalog.loadFunctionResources(resources) catalog.registerFunction(func, overrideIfExists = false) } else { - // For a permanent, we will store the metadata into underlying external catalog. - // This function will be loaded into the FunctionRegistry when a query uses it. - // We do not load it into FunctionRegistry right now. - // TODO: should we also parse "IF NOT EXISTS"? - catalog.createFunction(func, ignoreIfExists = false) + // Handles `CREATE OR REPLACE FUNCTION AS ... USING ...` + if (replace && catalog.functionExists(func.identifier)) { + // alter the function in the metastore + catalog.alterFunction(CatalogFunction(func.identifier, className, resources)) + } else { + // For a permanent, we will store the metadata into underlying external catalog. + // This function will be loaded into the FunctionRegistry when a query uses it. + // We do not load it into FunctionRegistry right now. + catalog.createFunction(CatalogFunction(func.identifier, className, resources), ifNotExists) + } } Seq.empty[Row] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 8a6bc62fec96..33c00549f3ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -181,8 +181,29 @@ class DDLCommandSuite extends PlanTest { |'com.matthewrathbone.example.SimpleUDFExample' USING ARCHIVE '/path/to/archive', |FILE '/path/to/file' """.stripMargin + val sql3 = + """ + |CREATE OR REPLACE TEMPORARY FUNCTION helloworld3 as + |'com.matthewrathbone.example.SimpleUDFExample' USING JAR '/path/to/jar1', + |JAR '/path/to/jar2' + """.stripMargin + val sql4 = + """ + |CREATE OR REPLACE FUNCTION hello.world1 as + |'com.matthewrathbone.example.SimpleUDFExample' USING ARCHIVE '/path/to/archive', + |FILE '/path/to/file' + """.stripMargin + val sql5 = + """ + |CREATE FUNCTION IF NOT EXISTS hello.world2 as + |'com.matthewrathbone.example.SimpleUDFExample' USING ARCHIVE '/path/to/archive', + |FILE '/path/to/file' + """.stripMargin val parsed1 = parser.parsePlan(sql1) val parsed2 = parser.parsePlan(sql2) + val parsed3 = parser.parsePlan(sql3) + val parsed4 = parser.parsePlan(sql4) + val parsed5 = parser.parsePlan(sql5) val expected1 = CreateFunctionCommand( None, "helloworld", @@ -190,7 +211,7 @@ class DDLCommandSuite extends PlanTest { Seq( FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar1"), FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar2")), - isTemp = true) + isTemp = true, ifNotExists = false, replace = false) val expected2 = CreateFunctionCommand( Some("hello"), "world", @@ -198,9 +219,37 @@ class DDLCommandSuite extends PlanTest { Seq( FunctionResource(FunctionResourceType.fromString("archive"), "/path/to/archive"), FunctionResource(FunctionResourceType.fromString("file"), "/path/to/file")), - isTemp = false) + isTemp = false, ifNotExists = false, replace = false) + val expected3 = CreateFunctionCommand( + None, + "helloworld3", + "com.matthewrathbone.example.SimpleUDFExample", + Seq( + FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar1"), + FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar2")), + isTemp = true, ifNotExists = false, replace = true) + val expected4 = CreateFunctionCommand( + Some("hello"), + "world1", + "com.matthewrathbone.example.SimpleUDFExample", + Seq( + FunctionResource(FunctionResourceType.fromString("archive"), "/path/to/archive"), + FunctionResource(FunctionResourceType.fromString("file"), "/path/to/file")), + isTemp = false, ifNotExists = false, replace = true) + val expected5 = CreateFunctionCommand( + Some("hello"), + "world2", + "com.matthewrathbone.example.SimpleUDFExample", + Seq( + FunctionResource(FunctionResourceType.fromString("archive"), "/path/to/archive"), + FunctionResource(FunctionResourceType.fromString("file"), "/path/to/file")), + isTemp = false, ifNotExists = true, replace = false) comparePlans(parsed1, expected1) comparePlans(parsed2, expected2) + comparePlans(parsed3, expected3) + comparePlans(parsed4, expected4) + comparePlans(parsed5, expected5) + } test("drop function") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index 6acac1a9aa31..e0a88ed067e1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -72,7 +72,7 @@ class CatalogSuite } private def createFunction(name: String, db: Option[String] = None): Unit = { - sessionCatalog.createFunction(utils.newFunc(name, db), ignoreIfExists = false) + sessionCatalog.createFunction(utils.newFunc(name, db), ifNotExists = false) } private def createTempFunction(name: String): Unit = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 2a17849fa8a3..306b38048e3a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -1132,6 +1132,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.dropFunction(db, name) } + override protected def doAlterFunction( + db: String, funcDefinition: CatalogFunction): Unit = withClient { + requireDbExists(db) + val functionName = funcDefinition.identifier.funcName.toLowerCase(Locale.ROOT) + requireFunctionExists(db, functionName) + val functionIdentifier = funcDefinition.identifier.copy(funcName = functionName) + client.alterFunction(db, funcDefinition.copy(identifier = functionIdentifier)) + } + override protected def doRenameFunction( db: String, oldName: String, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index 5f15a705a2e9..1e8cabe4cca0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -607,7 +607,7 @@ object PermanentHiveUDFTest2 extends Logging { FunctionIdentifier("example_max"), "org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax", FunctionResource(JarResource, jar) :: Nil) - hiveContext.sessionState.catalog.createFunction(function, ignoreIfExists = false) + hiveContext.sessionState.catalog.createFunction(function, ifNotExists = false) val source = hiveContext.createDataFrame((1 to 10).map(i => (i, s"str$i"))).toDF("key", "val") source.createOrReplaceTempView("sourceTable") From 5eb9379108a52824fd30ad529defa03ec984d285 Mon Sep 17 00:00:00 2001 From: ouyangxiaochen Date: Wed, 14 Jun 2017 10:36:23 +0800 Subject: [PATCH 2/3] support to create [temporary] function with the keyword 'OR REPLACE' and 'IF NOT EXISTS' --- .../catalyst/catalog/ExternalCatalog.scala | 2 +- .../catalyst/catalog/InMemoryCatalog.scala | 1 - .../sql/catalyst/catalog/SessionCatalog.scala | 6 ++- .../spark/sql/catalyst/catalog/events.scala | 11 ++-- .../catalog/ExternalCatalogEventSuite.scala | 9 ++++ .../catalog/ExternalCatalogSuite.scala | 8 +++ .../catalog/SessionCatalogSuite.scala | 23 +++------ .../sql/execution/command/functions.scala | 2 +- .../execution/command/DDLCommandSuite.scala | 1 - .../sql/execution/command/DDLSuite.scala | 51 +++++++++++++++++++ .../spark/sql/internal/CatalogSuite.scala | 2 +- .../spark/sql/hive/HiveSparkSubmitSuite.scala | 2 +- 12 files changed, 90 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 2d2c9ff9fe26..6000d483db20 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -336,7 +336,7 @@ abstract class ExternalCatalog val name = funcDefinition.identifier.funcName postToAll(AlterFunctionPreEvent(db, name)) doAlterFunction(db, funcDefinition) - postToAll(DropFunctionEvent(db, name)) + postToAll(AlterFunctionEvent(db, name)) } protected def doAlterFunction(db: String, funcDefinition: CatalogFunction): Unit diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 6d2e39a730e1..8e6508067218 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -594,7 +594,6 @@ class InMemoryCatalog( requireDbExists(db) requireFunctionExists(db, func.identifier.funcName) catalog(db).functions.remove(func.identifier.funcName) - requireFunctionNotExists(db, func.identifier.funcName) catalog(db).functions.put(func.identifier.funcName, func) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index e0cac913df76..c40d5f6031a2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -1021,14 +1021,14 @@ class SessionCatalog( * Create a metastore function in the database specified in `funcDefinition`. * If no such database is specified, create it in the current database. */ - def createFunction(funcDefinition: CatalogFunction, ifNotExists: Boolean): Unit = { + def createFunction(funcDefinition: CatalogFunction, ignoreIfExists: Boolean): Unit = { val db = formatDatabaseName(funcDefinition.identifier.database.getOrElse(getCurrentDatabase)) requireDbExists(db) val identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db)) val newFuncDefinition = funcDefinition.copy(identifier = identifier) if (!functionExists(identifier)) { externalCatalog.createFunction(db, newFuncDefinition) - } else if (!ifNotExists) { + } else if (!ignoreIfExists) { throw new FunctionAlreadyExistsException(db = db, func = identifier.toString) } } @@ -1073,6 +1073,8 @@ class SessionCatalog( functionRegistry.dropFunction(identifier) } externalCatalog.alterFunction(db, newFuncDefinition) + } else { + throw new NoSuchFunctionException(db = db, func = identifier.toString) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala index 0c9a23e1c50e..742a51e64038 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala @@ -135,14 +135,19 @@ case class CreateFunctionEvent(database: String, name: String) extends FunctionE case class DropFunctionPreEvent(database: String, name: String) extends FunctionEvent /** - * Event fired before a function is dropped. + * Event fired after a function has been dropped. + */ +case class DropFunctionEvent(database: String, name: String) extends FunctionEvent + +/** + * Event fired before a function is altered. */ case class AlterFunctionPreEvent(database: String, name: String) extends FunctionEvent /** - * Event fired after a function has been dropped. + * Event fired after a function has been altered. */ -case class DropFunctionEvent(database: String, name: String) extends FunctionEvent +case class AlterFunctionEvent(database: String, name: String) extends FunctionEvent /** * Event fired before a function is renamed. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala index 2539ea615ff9..087c26f23f38 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala @@ -176,6 +176,15 @@ class ExternalCatalogEventSuite extends SparkFunSuite { } checkEvents(RenameFunctionPreEvent("db5", "fn7", "fn4") :: Nil) + // ALTER + val alteredFunctionDefinition = CatalogFunction( + identifier = FunctionIdentifier("fn4", Some("db5")), + className = "org.apache.spark.AlterFunction", + resources = Seq.empty) + catalog.alterFunction("db5", alteredFunctionDefinition) + checkEvents( + AlterFunctionPreEvent("db5", "fn4") :: AlterFunctionEvent("db5", "fn4") :: Nil) + // DROP intercept[AnalysisException] { catalog.dropFunction("db5", "fn7") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 9d0598bb8ea4..66e895a4690c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -752,6 +752,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac } } + test("alter function") { + val catalog = newBasicCatalog() + assert(catalog.getFunction("db2", "func1").className == funcClass) + val myNewFunc = catalog.getFunction("db2", "func1").copy(className = newFuncClass) + catalog.alterFunction("db2", myNewFunc) + assert(catalog.getFunction("db2", "func1").className == newFuncClass) + } + test("list functions") { val catalog = newBasicCatalog() catalog.createFunction("db2", newFunc("func2")) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index e8503ce1121c..96e7888f39e9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -1143,11 +1143,11 @@ abstract class SessionCatalogSuite extends AnalysisTest { test("basic create and list functions") { withEmptyCatalog { catalog => catalog.createDatabase(newDb("mydb"), ignoreIfExists = false) - catalog.createFunction(newFunc("myfunc", Some("mydb")), ifNotExists = false) + catalog.createFunction(newFunc("myfunc", Some("mydb")), ignoreIfExists = false) assert(catalog.externalCatalog.listFunctions("mydb", "*").toSet == Set("myfunc")) // Create function without explicitly specifying database catalog.setCurrentDatabase("mydb") - catalog.createFunction(newFunc("myfunc2"), ifNotExists = false) + catalog.createFunction(newFunc("myfunc2"), ignoreIfExists = false) assert(catalog.externalCatalog.listFunctions("mydb", "*").toSet == Set("myfunc", "myfunc2")) } } @@ -1156,7 +1156,7 @@ abstract class SessionCatalogSuite extends AnalysisTest { withBasicCatalog { catalog => intercept[NoSuchDatabaseException] { catalog.createFunction( - newFunc("func5", Some("does_not_exist")), ifNotExists = false) + newFunc("func5", Some("does_not_exist")), ignoreIfExists = false) } } } @@ -1164,9 +1164,9 @@ abstract class SessionCatalogSuite extends AnalysisTest { test("create function that already exists") { withBasicCatalog { catalog => intercept[FunctionAlreadyExistsException] { - catalog.createFunction(newFunc("func1", Some("db2")), ifNotExists = false) + catalog.createFunction(newFunc("func1", Some("db2")), ignoreIfExists = false) } - catalog.createFunction(newFunc("func1", Some("db2")), ifNotExists = true) + catalog.createFunction(newFunc("func1", Some("db2")), ignoreIfExists = true) } } @@ -1227,17 +1227,6 @@ abstract class SessionCatalogSuite extends AnalysisTest { } } - test("alter function") { - val externalCatalog = newBasicCatalog() - val sessionCatalog = new SessionCatalog(externalCatalog) - assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1")) - val myNewFunc = CatalogFunction(FunctionIdentifier("func1", Some("db2")), - newFuncClass, Seq.empty[FunctionResource]) - sessionCatalog.alterFunction(myNewFunc) - val newFunc = sessionCatalog.getFunctionMetadata(FunctionIdentifier("func1", Some("db2"))) - assert(newFunc.className == newFuncClass) - } - test("drop function") { withBasicCatalog { catalog => assert(catalog.externalCatalog.listFunctions("db2", "*").toSet == Set("func1")) @@ -1246,7 +1235,7 @@ abstract class SessionCatalogSuite extends AnalysisTest { assert(catalog.externalCatalog.listFunctions("db2", "*").isEmpty) // Drop function without explicitly specifying database catalog.setCurrentDatabase("db2") - catalog.createFunction(newFunc("func2", Some("db2")), ifNotExists = false) + catalog.createFunction(newFunc("func2", Some("db2")), ignoreIfExists = false) assert(catalog.externalCatalog.listFunctions("db2", "*").toSet == Set("func2")) catalog.dropFunction(FunctionIdentifier("func2"), ignoreIfNotExists = false) assert(catalog.externalCatalog.listFunctions("db2", "*").isEmpty) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index 60c2a545cbdb..558789e3eacf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -75,7 +75,7 @@ case class CreateFunctionCommand( if (isTemp) { // We first load resources and then put the builder in the function registry. catalog.loadFunctionResources(resources) - catalog.registerFunction(func, overrideIfExists = false) + catalog.registerFunction(func, overrideIfExists = replace) } else { // Handles `CREATE OR REPLACE FUNCTION AS ... USING ...` if (replace && catalog.functionExists(func.identifier)) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 33c00549f3ea..5643c58d9f84 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -249,7 +249,6 @@ class DDLCommandSuite extends PlanTest { comparePlans(parsed3, expected3) comparePlans(parsed4, expected4) comparePlans(parsed5, expected5) - } test("drop function") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index e4dd077715d0..5c40d8bb4b1e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2270,6 +2270,57 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } + test("create temporary function with if not exists") { + withUserDefinedFunction("func1" -> true) { + val sql1 = + """ + |CREATE TEMPORARY FUNCTION IF NOT EXISTS func1 as + |'com.matthewrathbone.example.SimpleUDFExample' USING JAR '/path/to/jar1', + |JAR '/path/to/jar2' + """.stripMargin + val e = intercept[AnalysisException] { + sql(sql1) + }.getMessage + assert(e.contains("It is not allowed to define a TEMPORARY function with IF NOT EXISTS")) + } + } + + test("create function with both if not exists and replace") { + withUserDefinedFunction("func1" -> false) { + val sql1 = + """ + |CREATE OR REPLACE FUNCTION IF NOT EXISTS func1 as + |'com.matthewrathbone.example.SimpleUDFExample' USING JAR '/path/to/jar1', + |JAR '/path/to/jar2' + """.stripMargin + val e = intercept[AnalysisException] { + sql(sql1) + }.getMessage + assert(e.contains("CREATE FUNCTION with both IF NOT EXISTS and REPLACE is not allowed")) + } + } + + test("create temporary function by specifying a database") { + val dbName = "mydb" + withDatabase(dbName) { + sql(s"CREATE DATABASE $dbName") + sql(s"USE $dbName") + withUserDefinedFunction("func1" -> true) { + val sql1 = + s""" + |CREATE TEMPORARY FUNCTION $dbName.func1 as + |'com.matthewrathbone.example.SimpleUDFExample' USING JAR '/path/to/jar1', + |JAR '/path/to/jar2' + """.stripMargin + val e = intercept[AnalysisException] { + sql(sql1) + }.getMessage + assert(e.contains(s"Specifying a database in CREATE TEMPORARY FUNCTION " + + s"is not allowed: '$dbName'")) + } + } + } + Seq(true, false).foreach { caseSensitive => test(s"alter table add columns with existing column name - caseSensitive $caseSensitive") { withSQLConf(SQLConf.CASE_SENSITIVE.key -> s"$caseSensitive") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index e0a88ed067e1..6acac1a9aa31 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -72,7 +72,7 @@ class CatalogSuite } private def createFunction(name: String, db: Option[String] = None): Unit = { - sessionCatalog.createFunction(utils.newFunc(name, db), ifNotExists = false) + sessionCatalog.createFunction(utils.newFunc(name, db), ignoreIfExists = false) } private def createTempFunction(name: String): Unit = { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index 1e8cabe4cca0..5f15a705a2e9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -607,7 +607,7 @@ object PermanentHiveUDFTest2 extends Logging { FunctionIdentifier("example_max"), "org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax", FunctionResource(JarResource, jar) :: Nil) - hiveContext.sessionState.catalog.createFunction(function, ifNotExists = false) + hiveContext.sessionState.catalog.createFunction(function, ignoreIfExists = false) val source = hiveContext.createDataFrame((1 to 10).map(i => (i, s"str$i"))).toDF("key", "val") source.createOrReplaceTempView("sourceTable") From 257bce01d849c5e48ba88f8185913a028c978829 Mon Sep 17 00:00:00 2001 From: ouyangxiaochen Date: Wed, 5 Jul 2017 15:30:51 +0800 Subject: [PATCH 3/3] fix the problem --- .../apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala | 1 - .../spark/sql/catalyst/catalog/SessionCatalogSuite.scala | 6 ++---- .../org/apache/spark/sql/execution/command/functions.scala | 3 +-- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 8e6508067218..d253c72a6273 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -593,7 +593,6 @@ class InMemoryCatalog( override protected def doAlterFunction(db: String, func: CatalogFunction): Unit = synchronized { requireDbExists(db) requireFunctionExists(db, func.identifier.funcName) - catalog(db).functions.remove(func.identifier.funcName) catalog(db).functions.put(func.identifier.funcName, func) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 96e7888f39e9..8f856a0daad1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -1316,10 +1316,8 @@ abstract class SessionCatalogSuite extends AnalysisTest { val funcMeta2 = newFunc("yes_me", None) val tempFunc1 = (e: Seq[Expression]) => e.head val tempFunc2 = (e: Seq[Expression]) => e.last - catalog.registerFunction( - funcMeta1, overrideIfExists = false, functionBuilder = Some(tempFunc1)) - catalog.registerFunction( - funcMeta2, overrideIfExists = false, functionBuilder = Some(tempFunc2)) + catalog.createFunction(newFunc("func2", Some("db2")), ignoreIfExists = false) + catalog.createFunction(newFunc("not_me", Some("db2")), ignoreIfExists = false) catalog.registerFunction( funcMeta1, overrideIfExists = false, functionBuilder = Some(tempFunc1)) catalog.registerFunction( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index 558789e3eacf..4f92ffee687a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -56,8 +56,7 @@ case class CreateFunctionCommand( " is not allowed.") } - // Disallows 'CREATE TEMPORARY FUNCTION IF NOT EXISTS' to be consistent - // with 'CREATE TEMPORARY FUNCTION' + // Disallow to define a temporary function with `IF NOT EXISTS` if (ifNotExists && isTemp) { throw new AnalysisException( "It is not allowed to define a TEMPORARY function with IF NOT EXISTS.")