diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java new file mode 100644 index 000000000000..bbd35ac94677 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector; + +import org.apache.spark.annotation.Evolving; + +/** + * A custom metric. Data source can define supported custom metrics using this interface. + * During query execution, Spark will collect the task metrics using {@link CustomTaskMetric} + * and combine the metrics at the driver side. How to combine task metrics is defined by the + * metric class with the same metric name. + * + * @since 3.2.0 + */ +@Evolving +public interface CustomMetric { + /** + * Returns the name of custom metric. + */ + String name(); + + /** + * Returns the description of custom metric. + */ + String description(); + + /** + * The initial value of this metric. + */ + long initialValue = 0L; + + /** + * Given an array of task metric values, returns aggregated final metric value. + */ + String aggregateTaskMetrics(long[] taskMetrics); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomTaskMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomTaskMetric.java new file mode 100644 index 000000000000..47644a3267ed --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomTaskMetric.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.read.PartitionReader; + +/** + * A custom task metric. This is a logical representation of a metric reported by data sources + * at the executor side. During query execution, Spark will collect the task metrics per partition + * by {@link PartitionReader} and update internal metrics based on collected metric values. + * For streaming query, Spark will collect and combine metrics for a final result per micro batch. + * + * The metrics will be gathered during query execution back to the driver and then combined. How + * the task metrics are combined is defined by corresponding {@link CustomMetric} with same metric + * name. The final result will be shown up in the data source scan operator in Spark UI. + * + * @since 3.2.0 + */ +@Evolving +public interface CustomTaskMetric { + /** + * Returns the name of custom task metric. + */ + String name(); + + /** + * Returns the long value of custom task metric. + */ + long value(); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java index 23fbd95800e2..dfecb77c669b 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.CustomTaskMetric; /** * A partition reader returned by {@link PartitionReaderFactory#createReader(InputPartition)} or @@ -48,4 +49,12 @@ public interface PartitionReader extends Closeable { * Return the current record. This method should return same value until `next` is called. */ T get(); + + /** + * Returns an array of custom task metrics. By default it returns empty array. + */ + default CustomTaskMetric[] currentMetricsValues() { + CustomTaskMetric[] NO_METRICS = {}; + return NO_METRICS; + } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java index 4146f217985b..b70a656c492a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java @@ -18,6 +18,7 @@ package org.apache.spark.sql.connector.read; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.CustomMetric; import org.apache.spark.sql.connector.read.streaming.ContinuousStream; import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; import org.apache.spark.sql.types.StructType; @@ -102,4 +103,13 @@ default MicroBatchStream toMicroBatchStream(String checkpointLocation) { default ContinuousStream toContinuousStream(String checkpointLocation) { throw new UnsupportedOperationException(description() + ": Continuous scan are not supported"); } + + /** + * Returns an array of supported custom metrics with name and description. + * By default it returns empty array. + */ + default CustomMetric[] supportedCustomMetrics() { + CustomMetric[] NO_METRICS = {}; + return NO_METRICS; + } }