diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/AggregateFunction.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/AggregateFunction.java new file mode 100644 index 0000000000000..3442eba1317cc --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/AggregateFunction.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.DataType; + +import java.io.Serializable; + +/** + * Interface for a function that produces a result value by aggregating over multiple input rows. + *
+ * Intermediate aggregation state must be {@link Serializable} so that state produced by parallel + * tasks can be sent to a single executor and merged to produce a final result. + *
+ * The JVM type of result values produced by this function must be the type used by Spark's
+ * InternalRow API for the {@link DataType SQL data type} returned by {@link #resultType()}.
+ *
+ * @param
+ * This method is called one or more times for every group of values to initialize intermediate
+ * aggregation state. More than one intermediate aggregation state variable may be used when the
+ * aggregation is run in parallel tasks.
+ *
+ * The object returned may passed to {@link #update(Serializable, InternalRow)},
+ * {@link #merge(Serializable, Serializable)}, and {@link #result(Serializable)}. Implementations
+ * that return null must support null state passed into all other methods.
+ *
+ * @return a state instance or null
+ */
+ S initialize();
+
+ /**
+ * Update the aggregation state with a new row.
+ *
+ * This is called for each row in a group to update an intermediate aggregation state.
+ *
+ * @param state intermediate aggregation state
+ * @param input an input row
+ * @return updated aggregation state
+ */
+ S update(S state, InternalRow input);
+
+ /**
+ * Merge two partial aggregation states.
+ *
+ * This is called to merge intermediate aggregation states that were produced by parallel tasks.
+ *
+ * @param leftState intermediate aggregation state
+ * @param rightState intermediate aggregation state
+ * @return combined aggregation state
+ */
+ S merge(S leftState, S rightState);
+
+ /**
+ * Produce the aggregation result based on intermediate state.
+ *
+ * @param state intermediate aggregation state
+ * @return a result value
+ */
+ R result(S state);
+
+}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/BoundFunction.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/BoundFunction.java
new file mode 100644
index 0000000000000..852f238f31383
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/BoundFunction.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.IntegerType;
+
+/**
+ * Represents a function that is bound to an input type.
+ */
+public interface BoundFunction extends Function {
+
+ /**
+ * Returns the {@link DataType data type} of values produced by this function.
+ *
+ * For example, a "plus" function may return {@link IntegerType} when it is bound to arguments
+ * that are also {@link IntegerType}.
+ *
+ * @return a data type for values produced by this function
+ */
+ DataType resultType();
+
+}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Function.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Function.java
new file mode 100644
index 0000000000000..f7219f371cb2c
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Function.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import java.io.Serializable;
+
+/**
+ * Base class for user-defined functions.
+ */
+public interface Function extends Serializable {
+
+ /**
+ * A name to identify this function. Implementations should provide a meaningful name, like the
+ * database and function name from the catalog.
+ */
+ String name();
+
+}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/FunctionCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/FunctionCatalog.java
new file mode 100644
index 0000000000000..0174cd876108e
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/FunctionCatalog.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+
+/**
+ * Catalog methods for working with Functions.
+ */
+public interface FunctionCatalog extends CatalogPlugin {
+
+ /**
+ * List the functions in a namespace from the catalog.
+ *
+ * @param namespace a multi-part namespace
+ * @return an array of Identifiers for functions
+ * @throws NoSuchNamespaceException If the namespace does not exist (optional).
+ */
+ Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException;
+
+ /**
+ * Load a function by {@link Identifier identifier} from the catalog.
+ *
+ * @param ident a function identifier
+ * @return an unbound function instance
+ * @throws NoSuchFunctionException If the function doesn't exist
+ */
+ UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException;
+
+}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/ScalarFunction.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/ScalarFunction.java
new file mode 100644
index 0000000000000..454f4848c5384
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/ScalarFunction.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.DataType;
+
+/**
+ * Interface for a function that produces a result value for each input row.
+ *
+ * The JVM type of result values produced by this function must be the type used by Spark's
+ * InternalRow API for the {@link DataType SQL data type} returned by {@link #resultType()}.
+ *
+ * @param
+ * If the input type is not supported, implementations must throw
+ * {@link UnsupportedOperationException}.
+ *
+ * For example, a "length" function that only supports a single string argument should throw
+ * UnsupportedOperationException if the struct has more than one field or if that field is not a
+ * string, and it may optionally throw if the field is nullable.
+ *
+ * @param inputType a struct type for inputs that will be passed to the bound function
+ * @return a function that can process rows with the given input type
+ * @throws UnsupportedOperationException If the function cannot be applied to the input type
+ */
+ BoundFunction bind(StructType inputType);
+
+}
the JVM type for the aggregation's intermediate state; must be {@link Serializable}
+ * @param extends BoundFunction {
+
+ /**
+ * Initialize state for an aggregation.
+ *