Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,3 @@ class PartitionsAlreadyExistException(db: String, table: String, specs: Seq[Tabl

class FunctionAlreadyExistsException(db: String, func: String)
extends AnalysisException(s"Function '$func' already exists in database '$db'")

class TempFunctionAlreadyExistsException(func: String)
extends AnalysisException(s"Temporary function '$func' already exists")
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,7 @@ class SessionCatalog(
*
* This performs reflection to decide what type of [[Expression]] to return in the builder.
*/
def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = {
protected def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

registerFunction is the only caller of makeFunctionBuilder after this PR.

// TODO: at least support UDAFs here
throw new UnsupportedOperationException("Use sqlContext.udf.register(...) instead.")
}
Expand All @@ -1064,18 +1064,20 @@ class SessionCatalog(
}

/**
* Create a temporary function.
* This assumes no database is specified in `funcDefinition`.
* Registers a temporary or permanent function into a session-specific [[FunctionRegistry]]
*/
def createTempFunction(
name: String,
info: ExpressionInfo,
funcDefinition: FunctionBuilder,
ignoreIfExists: Boolean): Unit = {
if (functionRegistry.lookupFunctionBuilder(name).isDefined && !ignoreIfExists) {
throw new TempFunctionAlreadyExistsException(name)
def registerFunction(
funcDefinition: CatalogFunction,
ignoreIfExists: Boolean,
functionBuilder: Option[FunctionBuilder] = None): Unit = {
val func = funcDefinition.identifier
if (functionRegistry.functionExists(func.unquotedString) && !ignoreIfExists) {
throw new AnalysisException(s"Function $func already exists")
}
functionRegistry.registerFunction(name, info, funcDefinition)
val info = new ExpressionInfo(funcDefinition.className, func.database.orNull, func.funcName)
val builder =
functionBuilder.getOrElse(makeFunctionBuilder(func.unquotedString, funcDefinition.className))
functionRegistry.registerFunction(func.unquotedString, info, builder)
}

/**
Expand Down Expand Up @@ -1180,12 +1182,7 @@ class SessionCatalog(
// catalog. So, it is possible that qualifiedName is not exactly the same as
// catalogFunction.identifier.unquotedString (difference is on case-sensitivity).
// At here, we preserve the input from the user.
val info = new ExpressionInfo(
catalogFunction.className,
qualifiedName.database.orNull,
qualifiedName.funcName)
val builder = makeFunctionBuilder(qualifiedName.unquotedString, catalogFunction.className)
createTempFunction(qualifiedName.unquotedString, info, builder, ignoreIfExists = false)
registerFunction(catalogFunction.copy(identifier = qualifiedName), ignoreIfExists = false)
// Now, we need to create the Expression.
functionRegistry.lookupFunction(qualifiedName.unquotedString, children)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1162,10 +1162,10 @@ abstract class SessionCatalogSuite extends PlanTest {
withBasicCatalog { catalog =>
val tempFunc1 = (e: Seq[Expression]) => e.head
val tempFunc2 = (e: Seq[Expression]) => e.last
val info1 = new ExpressionInfo("tempFunc1", "temp1")
val info2 = new ExpressionInfo("tempFunc2", "temp2")
catalog.createTempFunction("temp1", info1, tempFunc1, ignoreIfExists = false)
catalog.createTempFunction("temp2", info2, tempFunc2, ignoreIfExists = false)
catalog.registerFunction(
newFunc("temp1", None), ignoreIfExists = false, functionBuilder = Some(tempFunc1))
catalog.registerFunction(
newFunc("temp2", None), ignoreIfExists = false, functionBuilder = Some(tempFunc2))
val arguments = Seq(Literal(1), Literal(2), Literal(3))
assert(catalog.lookupFunction(FunctionIdentifier("temp1"), arguments) === Literal(1))
assert(catalog.lookupFunction(FunctionIdentifier("temp2"), arguments) === Literal(3))
Expand All @@ -1174,13 +1174,15 @@ abstract class SessionCatalogSuite extends PlanTest {
catalog.lookupFunction(FunctionIdentifier("temp3"), arguments)
}
val tempFunc3 = (e: Seq[Expression]) => Literal(e.size)
val info3 = new ExpressionInfo("tempFunc3", "temp1")
// Temporary function already exists
intercept[TempFunctionAlreadyExistsException] {
catalog.createTempFunction("temp1", info3, tempFunc3, ignoreIfExists = false)
}
val e = intercept[AnalysisException] {
catalog.registerFunction(
newFunc("temp1", None), ignoreIfExists = false, functionBuilder = Some(tempFunc3))
}.getMessage
assert(e.contains("Function temp1 already exists"))
// Temporary function is overridden
catalog.createTempFunction("temp1", info3, tempFunc3, ignoreIfExists = true)
catalog.registerFunction(
newFunc("temp1", None), ignoreIfExists = true, functionBuilder = Some(tempFunc3))
assert(
catalog.lookupFunction(
FunctionIdentifier("temp1"), arguments) === Literal(arguments.length))
Expand All @@ -1193,8 +1195,8 @@ abstract class SessionCatalogSuite extends PlanTest {
assert(!catalog.isTemporaryFunction(FunctionIdentifier("temp1")))

val tempFunc1 = (e: Seq[Expression]) => e.head
val info1 = new ExpressionInfo("tempFunc1", "temp1")
catalog.createTempFunction("temp1", info1, tempFunc1, ignoreIfExists = false)
catalog.registerFunction(
newFunc("temp1", None), ignoreIfExists = false, functionBuilder = Some(tempFunc1))

// Returns true when the function is temporary
assert(catalog.isTemporaryFunction(FunctionIdentifier("temp1")))
Expand Down Expand Up @@ -1243,9 +1245,9 @@ abstract class SessionCatalogSuite extends PlanTest {

test("drop temp function") {
withBasicCatalog { catalog =>
val info = new ExpressionInfo("tempFunc", "func1")
val tempFunc = (e: Seq[Expression]) => e.head
catalog.createTempFunction("func1", info, tempFunc, ignoreIfExists = false)
catalog.registerFunction(
newFunc("func1", None), ignoreIfExists = false, functionBuilder = Some(tempFunc))
val arguments = Seq(Literal(1), Literal(2), Literal(3))
assert(catalog.lookupFunction(FunctionIdentifier("func1"), arguments) === Literal(1))
catalog.dropTempFunction("func1", ignoreIfNotExists = false)
Expand Down Expand Up @@ -1284,9 +1286,9 @@ abstract class SessionCatalogSuite extends PlanTest {

test("lookup temp function") {
withBasicCatalog { catalog =>
val info1 = new ExpressionInfo("tempFunc1", "func1")
val tempFunc1 = (e: Seq[Expression]) => e.head
catalog.createTempFunction("func1", info1, tempFunc1, ignoreIfExists = false)
catalog.registerFunction(
newFunc("func1", None), ignoreIfExists = false, functionBuilder = Some(tempFunc1))
assert(catalog.lookupFunction(
FunctionIdentifier("func1"), Seq(Literal(1), Literal(2), Literal(3))) == Literal(1))
catalog.dropTempFunction("func1", ignoreIfNotExists = false)
Expand All @@ -1298,14 +1300,14 @@ abstract class SessionCatalogSuite extends PlanTest {

test("list functions") {
withBasicCatalog { catalog =>
val info1 = new ExpressionInfo("tempFunc1", "func1")
val info2 = new ExpressionInfo("tempFunc2", "yes_me")
val funcMeta1 = newFunc("func1", None)
val funcMeta2 = newFunc("yes_me", None)
val tempFunc1 = (e: Seq[Expression]) => e.head
val tempFunc2 = (e: Seq[Expression]) => e.last
catalog.createFunction(newFunc("func2", Some("db2")), ignoreIfExists = false)
catalog.createFunction(newFunc("not_me", Some("db2")), ignoreIfExists = false)
catalog.createTempFunction("func1", info1, tempFunc1, ignoreIfExists = false)
catalog.createTempFunction("yes_me", info2, tempFunc2, ignoreIfExists = false)
catalog.registerFunction(funcMeta1, ignoreIfExists = false, functionBuilder = Some(tempFunc1))
catalog.registerFunction(funcMeta2, ignoreIfExists = false, functionBuilder = Some(tempFunc2))
assert(catalog.listFunctions("db1", "*").map(_._1).toSet ==
Set(FunctionIdentifier("func1"),
FunctionIdentifier("yes_me")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ case class CreateFunctionCommand(

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val func = CatalogFunction(FunctionIdentifier(functionName, databaseName), className, resources)
if (isTemp) {
if (databaseName.isDefined) {
throw new AnalysisException(s"Specifying a database in CREATE TEMPORARY FUNCTION " +
Expand All @@ -59,17 +60,13 @@ case class CreateFunctionCommand(
// We first load resources and then put the builder in the function registry.
// Please note that it is allowed to overwrite an existing temp function.
catalog.loadFunctionResources(resources)
val info = new ExpressionInfo(className, functionName)
val builder = catalog.makeFunctionBuilder(functionName, className)
catalog.createTempFunction(functionName, info, builder, ignoreIfExists = false)
catalog.registerFunction(func, ignoreIfExists = false)
} else {
// For a permanent, we will store the metadata into underlying external catalog.
// This function will be loaded into the FunctionRegistry when a query uses it.
// We do not load it into FunctionRegistry right now.
// TODO: should we also parse "IF NOT EXISTS"?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we support it? @cloud-fan @yhuai @rxin

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does hive support it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. : )

Oracle has the OR REPLACE clause.

catalog.createFunction(
CatalogFunction(FunctionIdentifier(functionName, databaseName), className, resources),
ignoreIfExists = false)
catalog.createFunction(func, ignoreIfExists = false)
}
Seq.empty[Row]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ class CatalogSuite
}

private def createTempFunction(name: String): Unit = {
val info = new ExpressionInfo("className", name)
val tempFunc = (e: Seq[Expression]) => e.head
sessionCatalog.createTempFunction(name, info, tempFunc, ignoreIfExists = false)
val funcMeta = CatalogFunction(FunctionIdentifier(name, None), "className", Nil)
sessionCatalog.registerFunction(
funcMeta, ignoreIfExists = false, functionBuilder = Some(tempFunc))
}

private def dropFunction(name: String, db: Option[String] = None): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -124,13 +124,6 @@ private[sql] class HiveSessionCatalog(
}

private def lookupFunction0(name: FunctionIdentifier, children: Seq[Expression]): Expression = {
// TODO: Once lookupFunction accepts a FunctionIdentifier, we should refactor this method to
// if (super.functionExists(name)) {
// super.lookupFunction(name, children)
// } else {
// // This function is a Hive builtin function.
// ...
// }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LookupFunction already accepts FunctionIdentifier, but we are unable to do it using the above way because functionExists does not consider the difference among Hive built-in, Spark temporary and permanent functions. More following clean-ups are needed. Will try to do it.

val database = name.database.map(formatDatabaseName)
val funcName = name.copy(database = database)
Try(super.lookupFunction(funcName, children)) match {
Expand Down Expand Up @@ -164,10 +157,11 @@ private[sql] class HiveSessionCatalog(
}
}
val className = functionInfo.getFunctionClass.getName
val builder = makeFunctionBuilder(functionName, className)
val functionIdentifier =
FunctionIdentifier(functionName.toLowerCase(Locale.ROOT), database)
val func = CatalogFunction(functionIdentifier, className, Nil)
// Put this Hive built-in function to our function registry.
val info = new ExpressionInfo(className, functionName)
createTempFunction(functionName, info, builder, ignoreIfExists = false)
registerFunction(func, ignoreIfExists = false)
// Now, we need to create the Expression.
functionRegistry.lookupFunction(functionName, children)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import scala.concurrent.duration._
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFPercentileApprox

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.{ExpressionInfo, Literal}
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogFunction
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile
import org.apache.spark.sql.hive.HiveSessionCatalog
import org.apache.spark.sql.hive.execution.TestingTypedCount
Expand Down Expand Up @@ -217,9 +219,9 @@ class ObjectHashAggregateExecBenchmark extends BenchmarkBase with TestHiveSingle

private def registerHiveFunction(functionName: String, clazz: Class[_]): Unit = {
val sessionCatalog = sparkSession.sessionState.catalog.asInstanceOf[HiveSessionCatalog]
val builder = sessionCatalog.makeFunctionBuilder(functionName, clazz.getName)
val info = new ExpressionInfo(clazz.getName, functionName)
sessionCatalog.createTempFunction(functionName, info, builder, ignoreIfExists = false)
val functionIdentifier = FunctionIdentifier(functionName, database = None)
val func = CatalogFunction(functionIdentifier, clazz.getName, resources = Nil)
sessionCatalog.registerFunction(func, ignoreIfExists = false)
}

private def percentile_approx(
Expand Down