Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f2433c2
init commit
lianhuiwang Jun 16, 2016
0b93636
fix unit test
lianhuiwang Jun 16, 2016
301e950
update
lianhuiwang Jun 18, 2016
808a5fa
update createTempMacro
lianhuiwang Jun 18, 2016
f4ed3bc
address comments
lianhuiwang Jun 20, 2016
af0136d
update
lianhuiwang Jun 20, 2016
5550496
based master
lianhuiwang Nov 10, 2016
9fe1881
update code
lianhuiwang Nov 10, 2016
b8ffdc9
fix function
lianhuiwang Nov 10, 2016
fb8b57a
update comment
lianhuiwang Nov 11, 2016
e895a9c
update comments
lianhuiwang Nov 11, 2016
8d520eb
Merge branch 'master' of https://github.com/apache/spark into macro
lianhuiwang May 27, 2017
651b485
Merge branch 'master' of https://github.com/apache/spark into macro
lianhuiwang May 27, 2017
277ba9f
Merge branch 'macro' of https://github.com/lianhuiwang/spark into macro
lianhuiwang May 27, 2017
314913d
Merge branch 'macro' of https://github.com/lianhuiwang/spark into macro
lianhuiwang May 27, 2017
3d05e4f
reformat code.
lianhuiwang May 27, 2017
22d8b1a
reformat code.
lianhuiwang May 27, 2017
d91f633
reformat code.
lianhuiwang May 27, 2017
1eb23c7
reformat code.
lianhuiwang May 27, 2017
ad85109
remove type check for macro as same with hive.
lianhuiwang May 27, 2017
b52698f
add import
lianhuiwang May 27, 2017
3eacebc
treat macro as temp function like hive
lianhuiwang May 27, 2017
fce1121
add Modifier for FunctionRegistry.
lianhuiwang May 27, 2017
eaff4e9
update comments.
lianhuiwang May 27, 2017
97632a9
add dropMacro().
lianhuiwang May 27, 2017
4ee32e9
reformat code style
lianhuiwang May 27, 2017
b539e94
address some comments.
lianhuiwang May 30, 2017
1563f12
address some comments.
lianhuiwang May 30, 2017
4d8e843
update
lianhuiwang May 30, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ statement
| CREATE TEMPORARY? FUNCTION qualifiedName AS className=STRING
(USING resource (',' resource)*)? #createFunction
| DROP TEMPORARY? FUNCTION (IF EXISTS)? qualifiedName #dropFunction
| CREATE TEMPORARY MACRO macroName=identifier
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Hive also support non-temporary macro's.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, Now Hive only support temporary macro's.

'(' colTypeList? ')' expression #createMacro
| DROP TEMPORARY MACRO (IF EXISTS)? macroName=identifier #dropMacro
| EXPLAIN (LOGICAL | FORMATTED | EXTENDED | CODEGEN | COST)?
statement #explain
| SHOW TABLES ((FROM | IN) db=identifier)?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,17 @@
* Expression information, will be used to describe a expression.
*/
public class ExpressionInfo {

public enum FunctionType {
BUILTIN, PERSISTENT, TEMPORARY;
}

private String className;
private String usage;
private String name;
private String extended;
private String db;
private FunctionType functionType;

public String getClassName() {
return className;
Expand All @@ -47,19 +53,32 @@ public String getDb() {
return db;
}

public ExpressionInfo(String className, String db, String name, String usage, String extended) {
public FunctionType getFunctionType() {
return functionType;
}

public ExpressionInfo(String className, String db, String name, String usage, String extended, FunctionType functionType) {
this.className = className;
this.db = db;
this.name = name;
this.usage = usage;
this.extended = extended;
this.functionType = functionType;
}

public ExpressionInfo(String className, String name) {
this(className, null, name, null, null);
this(className, null, name, null, null, FunctionType.TEMPORARY);
}

public ExpressionInfo(String className, String name, FunctionType functionType) {
this(className, null, name, null, null, functionType);
}

public ExpressionInfo(String className, String db, String name) {
this(className, db, name, null, null);
this(className, db, name, null, null, FunctionType.TEMPORARY);
}

public ExpressionInfo(String className, String db, String name, FunctionType functionType) {
this(className, db, name, null, null, functionType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,12 @@ class AnalysisException protected[sql] (
s"$message;$lineAnnotation$positionAnnotation"
}
}

object AnalysisException {
/**
* Create a no such temporary macro exception.
*/
def noSuchTempMacroException(func: String): AnalysisException = {
new AnalysisException(s"Temporary macro '$func' not found")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.util.{Failure, Success, Try}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.ExpressionInfo.FunctionType
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.xml._
import org.apache.spark.sql.catalyst.util.StringKeyHashMap
Expand Down Expand Up @@ -120,6 +121,58 @@ class SimpleFunctionRegistry extends FunctionRegistry {
}
}

class SystemFunctionRegistry(builtin: SimpleFunctionRegistry) extends SimpleFunctionRegistry {

override def registerFunction(
name: String,
info: ExpressionInfo,
builder: FunctionBuilder): Unit = synchronized {
if (info.getFunctionType.equals(FunctionType.BUILTIN)) {
builtin.registerFunction(name, info, builder)
} else {
functionBuilders.put(name, (info, builder))
}
}

override def lookupFunction(name: String, children: Seq[Expression]): Expression = {
val func = synchronized {
functionBuilders.get(name).map(_._2).orElse(builtin.lookupFunctionBuilder(name)).getOrElse {
throw new AnalysisException(s"undefined function $name")
}
}
func(children)
}

override def listFunction(): Seq[String] = synchronized {
(functionBuilders.iterator.map(_._1).toList ++ builtin.listFunction()).distinct.sorted
}

override def lookupFunction(name: String): Option[ExpressionInfo] = synchronized {
functionBuilders.get(name).map(_._1).orElse(builtin.lookupFunction(name))
}

override def lookupFunctionBuilder(name: String): Option[FunctionBuilder] = synchronized {
functionBuilders.get(name).map(_._2).orElse(builtin.lookupFunctionBuilder(name))
}

override def dropFunction(name: String): Boolean = synchronized {
functionBuilders.remove(name).isDefined
}

override def clear(): Unit = synchronized {
builtin.clear()
functionBuilders.clear()
}

override def clone(): SimpleFunctionRegistry = synchronized {
val registry = new SystemFunctionRegistry(builtin.clone())
functionBuilders.iterator.foreach { case (name, (info, builder)) =>
registry.registerFunction(name, info, builder)
}
registry
}
}

/**
* A trivial catalog that returns an error when a function is requested. Used for testing when all
* functions are already filled in and the analyzer needs only to resolve attribute references.
Expand Down Expand Up @@ -456,6 +509,8 @@ object FunctionRegistry {
fr
}

val systemRegistry = new SystemFunctionRegistry(builtin)

val functionSet: Set[String] = builtin.listFunction().toSet

/** See usage above. */
Expand Down Expand Up @@ -519,7 +574,8 @@ object FunctionRegistry {
}
val clazz = scala.reflect.classTag[Cast].runtimeClass
val usage = "_FUNC_(expr) - Casts the value `expr` to the target data type `_FUNC_`."
(name, (new ExpressionInfo(clazz.getCanonicalName, null, name, usage, null), builder))
(name, (new ExpressionInfo(clazz.getCanonicalName, null, name, usage, null,
FunctionType.BUILTIN), builder))
}

/**
Expand All @@ -529,9 +585,10 @@ object FunctionRegistry {
val clazz = scala.reflect.classTag[T].runtimeClass
val df = clazz.getAnnotation(classOf[ExpressionDescription])
if (df != null) {
new ExpressionInfo(clazz.getCanonicalName, null, name, df.usage(), df.extended())
new ExpressionInfo(clazz.getCanonicalName, null, name, df.usage(), df.extended(),
ExpressionInfo.FunctionType.BUILTIN)
} else {
new ExpressionInfo(clazz.getCanonicalName, name)
new ExpressionInfo(clazz.getCanonicalName, name, ExpressionInfo.FunctionType.BUILTIN)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.expressions.ExpressionInfo.FunctionType
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View}
import org.apache.spark.sql.catalyst.util.StringUtils
Expand Down Expand Up @@ -1090,6 +1091,21 @@ class SessionCatalog(
}
}

/** Create a temporary macro. */
def createTempMacro(
name: String,
info: ExpressionInfo,
functionBuilder: FunctionBuilder): Unit = {
functionRegistry.registerFunction(name, info, functionBuilder)
}

/** Drop a temporary macro. */
def dropTempMacro(name: String, ignoreIfNotExists: Boolean): Unit = {
if (!functionRegistry.dropFunction(name) && !ignoreIfNotExists) {
throw AnalysisException.noSuchTempMacroException(name)
}
}

/**
* Returns whether it is a temporary function. If not existed, returns false.
*/
Expand Down Expand Up @@ -1126,7 +1142,8 @@ class SessionCatalog(
new ExpressionInfo(
metadata.className,
qualifiedName.database.orNull,
qualifiedName.identifier)
qualifiedName.identifier,
FunctionType.PERSISTENT)
} else {
failFunctionLookup(name.funcName)
}
Expand Down Expand Up @@ -1248,7 +1265,11 @@ class SessionCatalog(
if (func.database.isDefined) {
dropFunction(func, ignoreIfNotExists = false)
} else {
dropTempFunction(func.funcName, ignoreIfNotExists = false)
val functionType = functionRegistry.lookupFunction(func.funcName)
.map(_.getFunctionType).getOrElse(FunctionType.TEMPORARY)
if (!functionType.equals(FunctionType.BUILTIN)) {
dropTempFunction(func.funcName, ignoreIfNotExists = false)
}
}
}
clearTempTables()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ object CreateStruct extends FunctionBuilder {
null,
"struct",
"_FUNC_(col1, col2, col3, ...) - Creates a struct with the given field values.",
"")
"",
ExpressionInfo.FunctionType.BUILTIN)
("struct", (info, this))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTable, _}
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{StructField, StructType}

/**
* Concrete parser for Spark SQL statements.
Expand Down Expand Up @@ -715,6 +715,36 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
ctx.TEMPORARY != null)
}

/**
* Create a [[CreateMacroCommand]] command.
*
* For example:
* {{{
* CREATE TEMPORARY MACRO macro_name([col_name col_type, ...]) expression;
* }}}
*/
override def visitCreateMacro(ctx: CreateMacroContext): LogicalPlan = withOrigin(ctx) {
val columns = createSchema(ctx.colTypeList)
val e = expression(ctx.expression)
CreateMacroCommand(
ctx.macroName.getText,
MacroFunctionWrapper(columns, e))
}

/**
* Create a [[DropMacroCommand]] command.
*
* For example:
* {{{
* DROP TEMPORARY MACRO [IF EXISTS] macro_name;
* }}}
*/
override def visitDropMacro(ctx: DropMacroContext): LogicalPlan = withOrigin(ctx) {
DropMacroCommand(
ctx.macroName.getText,
ctx.EXISTS != null)
}

/**
* Create a [[DropTableCommand]] command.
*/
Expand Down
Loading