Skip to content

Commit b539e94

Browse files
committed
address some comments.
1 parent 4ee32e9 commit b539e94

File tree

6 files changed

+87
-34
lines changed

6 files changed

+87
-34
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionInfo.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,16 @@
2121
* Expression information, will be used to describe a expression.
2222
*/
2323
public class ExpressionInfo {
24+
25+
public enum FunctionType {
26+
BUILTIN, PERSISTENT, TEMPORARY;
27+
}
2428
private String className;
2529
private String usage;
2630
private String name;
2731
private String extended;
2832
private String db;
29-
private boolean macro;
33+
private FunctionType functionType;
3034

3135
public String getClassName() {
3236
return className;
@@ -48,32 +52,36 @@ public String getDb() {
4852
return db;
4953
}
5054

51-
public boolean isMacro() {
52-
return macro;
55+
public FunctionType getFunctionType() {
56+
return functionType;
5357
}
5458

55-
public ExpressionInfo(String className, String db, String name, String usage, String extended, boolean macro) {
59+
public ExpressionInfo(String className, String db, String name, String usage, String extended, FunctionType functionType) {
5660
this.className = className;
5761
this.db = db;
5862
this.name = name;
5963
this.usage = usage;
6064
this.extended = extended;
61-
this.macro = macro;
65+
this.functionType = functionType;
6266
}
6367

6468
public ExpressionInfo(String className, String db, String name, String usage, String extended) {
65-
this(className, db, name, usage, extended, false);
69+
this(className, db, name, usage, extended, FunctionType.TEMPORARY);
6670
}
6771

6872
public ExpressionInfo(String className, String name) {
69-
this(className, null, name, null, null, false);
73+
this(className, null, name, null, null, FunctionType.TEMPORARY);
7074
}
7175

72-
public ExpressionInfo(String className, String name, boolean macro) {
73-
this(className, null, name, null, null, macro);
76+
public ExpressionInfo(String className, String name, FunctionType functionType) {
77+
this(className, null, name, null, null, functionType);
7478
}
7579

7680
public ExpressionInfo(String className, String db, String name) {
77-
this(className, db, name, null, null, false);
81+
this(className, db, name, null, null, FunctionType.TEMPORARY);
82+
}
83+
84+
public ExpressionInfo(String className, String db, String name, FunctionType functionType) {
85+
this(className, db, name, null, null, functionType);
7886
}
7987
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala

Lines changed: 56 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import scala.util.{Failure, Success, Try}
2626
import org.apache.spark.sql.AnalysisException
2727
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
2828
import org.apache.spark.sql.catalyst.expressions._
29+
import org.apache.spark.sql.catalyst.expressions.ExpressionInfo.FunctionType
2930
import org.apache.spark.sql.catalyst.expressions.aggregate._
3031
import org.apache.spark.sql.catalyst.expressions.xml._
3132
import org.apache.spark.sql.catalyst.util.StringKeyHashMap
@@ -60,9 +61,6 @@ trait FunctionRegistry {
6061
/** Drop a function and return whether the function existed. */
6162
def dropFunction(name: String): Boolean
6263

63-
/** Drop a macro and return whether the macro existed. */
64-
def dropMacro(name: String): Boolean
65-
6664
/** Checks if a function with a given name exists. */
6765
def functionExists(name: String): Boolean = lookupFunction(name).isDefined
6866

@@ -110,20 +108,63 @@ class SimpleFunctionRegistry extends FunctionRegistry {
110108
functionBuilders.remove(name).isDefined
111109
}
112110

113-
override def dropMacro(name: String): Boolean = synchronized {
114-
if (functionBuilders.get(name).map(_._1).filter(_.isMacro).isDefined) {
115-
functionBuilders.remove(name).isDefined
111+
override def clear(): Unit = synchronized {
112+
functionBuilders.clear()
113+
}
114+
115+
override def clone(): SimpleFunctionRegistry = synchronized {
116+
val registry = new SimpleFunctionRegistry
117+
functionBuilders.iterator.foreach { case (name, (info, builder)) =>
118+
registry.registerFunction(name, info, builder)
119+
}
120+
registry
121+
}
122+
}
123+
124+
class SystemFunctionRegistry(builtin: SimpleFunctionRegistry) extends SimpleFunctionRegistry {
125+
126+
override def registerFunction(
127+
name: String,
128+
info: ExpressionInfo,
129+
builder: FunctionBuilder): Unit = synchronized {
130+
if (info.getFunctionType.equals(FunctionType.BUILTIN)) {
131+
builtin.registerFunction(name, info, builder)
116132
} else {
117-
false
133+
functionBuilders.put(name, (info, builder))
134+
}
135+
}
136+
137+
override def lookupFunction(name: String, children: Seq[Expression]): Expression = {
138+
val func = synchronized {
139+
functionBuilders.get(name).map(_._2).orElse(builtin.lookupFunctionBuilder(name)).getOrElse {
140+
throw new AnalysisException(s"undefined function $name")
141+
}
118142
}
143+
func(children)
144+
}
145+
146+
override def listFunction(): Seq[String] = synchronized {
147+
(functionBuilders.iterator.map(_._1).toList ++ builtin.listFunction()).sorted
148+
}
149+
150+
override def lookupFunction(name: String): Option[ExpressionInfo] = synchronized {
151+
functionBuilders.get(name).map(_._1).orElse(builtin.lookupFunction(name))
152+
}
153+
154+
override def lookupFunctionBuilder(name: String): Option[FunctionBuilder] = synchronized {
155+
functionBuilders.get(name).map(_._2).orElse(builtin.lookupFunctionBuilder(name))
156+
}
157+
158+
override def dropFunction(name: String): Boolean = synchronized {
159+
functionBuilders.remove(name).isDefined
119160
}
120161

121162
override def clear(): Unit = synchronized {
122163
functionBuilders.clear()
123164
}
124165

125166
override def clone(): SimpleFunctionRegistry = synchronized {
126-
val registry = new SimpleFunctionRegistry
167+
val registry = new SystemFunctionRegistry(builtin.clone())
127168
functionBuilders.iterator.foreach { case (name, (info, builder)) =>
128169
registry.registerFunction(name, info, builder)
129170
}
@@ -157,10 +198,6 @@ object EmptyFunctionRegistry extends FunctionRegistry {
157198
throw new UnsupportedOperationException
158199
}
159200

160-
override def dropMacro(name: String): Boolean = {
161-
throw new UnsupportedOperationException
162-
}
163-
164201
override def dropFunction(name: String): Boolean = {
165202
throw new UnsupportedOperationException
166203
}
@@ -471,6 +508,8 @@ object FunctionRegistry {
471508
fr
472509
}
473510

511+
val systemRegistry = new SystemFunctionRegistry(builtin)
512+
474513
val functionSet: Set[String] = builtin.listFunction().toSet
475514

476515
/** See usage above. */
@@ -534,7 +573,8 @@ object FunctionRegistry {
534573
}
535574
val clazz = scala.reflect.classTag[Cast].runtimeClass
536575
val usage = "_FUNC_(expr) - Casts the value `expr` to the target data type `_FUNC_`."
537-
(name, (new ExpressionInfo(clazz.getCanonicalName, null, name, usage, null), builder))
576+
(name, (new ExpressionInfo(clazz.getCanonicalName, null, name, usage, null,
577+
FunctionType.BUILTIN), builder))
538578
}
539579

540580
/**
@@ -544,9 +584,10 @@ object FunctionRegistry {
544584
val clazz = scala.reflect.classTag[T].runtimeClass
545585
val df = clazz.getAnnotation(classOf[ExpressionDescription])
546586
if (df != null) {
547-
new ExpressionInfo(clazz.getCanonicalName, null, name, df.usage(), df.extended())
587+
new ExpressionInfo(clazz.getCanonicalName, null, name, df.usage(), df.extended(),
588+
ExpressionInfo.FunctionType.BUILTIN)
548589
} else {
549-
new ExpressionInfo(clazz.getCanonicalName, name)
590+
new ExpressionInfo(clazz.getCanonicalName, name, ExpressionInfo.FunctionType.BUILTIN)
550591
}
551592
}
552593

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst._
3434
import org.apache.spark.sql.catalyst.analysis._
3535
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
3636
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
37+
import org.apache.spark.sql.catalyst.expressions.ExpressionInfo.FunctionType
3738
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface}
3839
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View}
3940
import org.apache.spark.sql.catalyst.util.StringUtils
@@ -1095,15 +1096,12 @@ class SessionCatalog(
10951096
name: String,
10961097
info: ExpressionInfo,
10971098
functionBuilder: FunctionBuilder): Unit = {
1098-
if (functionRegistry.functionExists(name)) {
1099-
throw new AnalysisException(s"Function $name already exists")
1100-
}
11011099
functionRegistry.registerFunction(name, info, functionBuilder)
11021100
}
11031101

11041102
/** Drop a temporary macro. */
11051103
def dropTempMacro(name: String, ignoreIfNotExists: Boolean): Unit = {
1106-
if (!functionRegistry.dropMacro(name) && !ignoreIfNotExists) {
1104+
if (!functionRegistry.dropFunction(name) && !ignoreIfNotExists) {
11071105
throw new NoSuchTempMacroException(name)
11081106
}
11091107
}
@@ -1144,7 +1142,8 @@ class SessionCatalog(
11441142
new ExpressionInfo(
11451143
metadata.className,
11461144
qualifiedName.database.orNull,
1147-
qualifiedName.identifier)
1145+
qualifiedName.identifier,
1146+
FunctionType.PERSISTENT)
11481147
} else {
11491148
failFunctionLookup(name.funcName)
11501149
}
@@ -1266,7 +1265,11 @@ class SessionCatalog(
12661265
if (func.database.isDefined) {
12671266
dropFunction(func, ignoreIfNotExists = false)
12681267
} else {
1269-
dropTempFunction(func.funcName, ignoreIfNotExists = false)
1268+
val functionType = functionRegistry.lookupFunction(func.funcName).map(_.getFunctionType)
1269+
.getOrElse(FunctionType.TEMPORARY)
1270+
if (!functionType.equals(FunctionType.BUILTIN)) {
1271+
dropTempFunction(func.funcName, ignoreIfNotExists = false)
1272+
}
12701273
}
12711274
}
12721275
clearTempTables()

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,8 @@ object CreateStruct extends FunctionBuilder {
260260
null,
261261
"struct",
262262
"_FUNC_(col1, col2, col3, ...) - Creates a struct with the given field values.",
263-
"")
263+
"",
264+
ExpressionInfo.FunctionType.BUILTIN)
264265
("struct", (info, this))
265266
}
266267
}

sql/core/src/main/scala/org/apache/spark/sql/execution/command/macros.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ case class CreateMacroCommand(
6565
}
6666

6767
val macroInfo = columns.mkString(",") + " -> " + funcWrapper.macroFunction.toString
68-
val info = new ExpressionInfo(macroInfo, macroName, true)
68+
val info = new ExpressionInfo(macroInfo, macroName)
6969
val builder = (children: Seq[Expression]) => {
7070
if (children.size != columns.size) {
7171
throw new AnalysisException(s"Actual number of columns: ${children.size} != " +

sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ abstract class BaseSessionStateBuilder(
9595
* This either gets cloned from a pre-existing version or cloned from the built-in registry.
9696
*/
9797
protected lazy val functionRegistry: FunctionRegistry = {
98-
parentState.map(_.functionRegistry).getOrElse(FunctionRegistry.builtin).clone()
98+
parentState.map(_.functionRegistry).getOrElse(FunctionRegistry.systemRegistry).clone()
9999
}
100100

101101
/**

0 commit comments

Comments
 (0)