From 623f1932133cb7e4d77ab5bd3f1356e7e7b8da78 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 2 Feb 2021 20:06:12 -0800 Subject: [PATCH 1/8] Add interface for DS v2 metrics. --- .../spark/sql/connector/CustomMetric.java | 38 +++++++++++++++++++ .../spark/sql/connector/LongMetric.java | 33 ++++++++++++++++ .../sql/connector/read/PartitionReader.java | 9 +++++ .../apache/spark/sql/connector/read/Scan.java | 10 +++++ 4 files changed, 90 insertions(+) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/LongMetric.java diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java new file mode 100644 index 000000000000..d8e800f0f85f --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector; + +import org.apache.spark.annotation.Evolving; + +/** + * A general custom metric. + * + * @since 3.2.0 + */ +@Evolving +public interface CustomMetric { + /** + * Returns the name of custom metric. + */ + String name(); + + /** + * Returns the description of custom metric. + */ + String description(); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/LongMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/LongMetric.java new file mode 100644 index 000000000000..51565caead0c --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/LongMetric.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector; + +import org.apache.spark.annotation.Evolving; + +/** + * A custom metric that reports a long value. + * + * @since 3.2.0 + */ +@Evolving +public interface LongMetric extends CustomMetric { + /** + * Returns the value of custom metric. + */ + long value(); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java index 23fbd95800e2..b7fd3f48e726 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.CustomMetric; /** * A partition reader returned by {@link PartitionReaderFactory#createReader(InputPartition)} or @@ -48,4 +49,12 @@ public interface PartitionReader extends Closeable { * Return the current record. This method should return same value until `next` is called. */ T get(); + + /** + * Returns an array of custom metrics. By default it returns empty array. + */ + default CustomMetric[] getCustomMetrics() { + CustomMetric[] NO_METRICS = {}; + return NO_METRICS; + } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java index 4146f217985b..b70a656c492a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java @@ -18,6 +18,7 @@ package org.apache.spark.sql.connector.read; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.CustomMetric; import org.apache.spark.sql.connector.read.streaming.ContinuousStream; import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; import org.apache.spark.sql.types.StructType; @@ -102,4 +103,13 @@ default MicroBatchStream toMicroBatchStream(String checkpointLocation) { default ContinuousStream toContinuousStream(String checkpointLocation) { throw new UnsupportedOperationException(description() + ": Continuous scan are not supported"); } + + /** + * Returns an array of supported custom metrics with name and description. + * By default it returns empty array. + */ + default CustomMetric[] supportedCustomMetrics() { + CustomMetric[] NO_METRICS = {}; + return NO_METRICS; + } } From 89e2f3d7e66d4b9aae702600656af5b54d9cdeb5 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 4 Feb 2021 13:59:29 -0800 Subject: [PATCH 2/8] Modify comment. --- .../main/java/org/apache/spark/sql/connector/CustomMetric.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java index d8e800f0f85f..67c763190d43 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java @@ -20,7 +20,7 @@ import org.apache.spark.annotation.Evolving; /** - * A general custom metric. + * A custom metric. * * @since 3.2.0 */ From 38fb9667cfcdfc7f103b574e80684380a95d19e4 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 4 Feb 2021 15:15:55 -0800 Subject: [PATCH 3/8] Make new interfaces. --- .../sql/connector/SupportsReportMetrics.java | 39 +++++++++++++++++++ .../sql/connector/read/PartitionReader.java | 9 ----- .../read/PartitionReaderWithMetrics.java | 39 +++++++++++++++++++ .../apache/spark/sql/connector/read/Scan.java | 10 ----- 4 files changed, 78 insertions(+), 19 deletions(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/SupportsReportMetrics.java create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReaderWithMetrics.java diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/SupportsReportMetrics.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/SupportsReportMetrics.java new file mode 100644 index 000000000000..80186cb78126 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/SupportsReportMetrics.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector; + +import org.apache.spark.annotation.Evolving; + +/** + * Data sources can implement this interface to + * report supported custom metrics to Spark in read/write path. + * + * @since 3.2.0 + */ +@Evolving +public interface SupportsReportMetrics { + + /** + * Returns an array of supported custom metrics with name and description. + * By default it returns empty array. + */ + default CustomMetric[] supportedCustomMetrics() { + CustomMetric[] NO_METRICS = {}; + return NO_METRICS; + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java index b7fd3f48e726..23fbd95800e2 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java @@ -21,7 +21,6 @@ import java.io.IOException; import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.connector.CustomMetric; /** * A partition reader returned by {@link PartitionReaderFactory#createReader(InputPartition)} or @@ -49,12 +48,4 @@ public interface PartitionReader extends Closeable { * Return the current record. This method should return same value until `next` is called. */ T get(); - - /** - * Returns an array of custom metrics. By default it returns empty array. - */ - default CustomMetric[] getCustomMetrics() { - CustomMetric[] NO_METRICS = {}; - return NO_METRICS; - } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReaderWithMetrics.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReaderWithMetrics.java new file mode 100644 index 000000000000..b8dc7b239fbf --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReaderWithMetrics.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.CustomMetric; + +/** + * A mix in interface for {@link PartitionReader}. Partition reader can this interface to + * report custom metrics to Spark. + * + * @since 3.2.0 + */ +@Evolving +public interface PartitionReaderWithMetrics extends PartitionReader { + + /** + * Returns an array of custom metrics. By default it returns empty array. + */ + default CustomMetric[] getCustomMetrics() { + CustomMetric[] NO_METRICS = {}; + return NO_METRICS; + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java index b70a656c492a..4146f217985b 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java @@ -18,7 +18,6 @@ package org.apache.spark.sql.connector.read; import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.connector.CustomMetric; import org.apache.spark.sql.connector.read.streaming.ContinuousStream; import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; import org.apache.spark.sql.types.StructType; @@ -103,13 +102,4 @@ default MicroBatchStream toMicroBatchStream(String checkpointLocation) { default ContinuousStream toContinuousStream(String checkpointLocation) { throw new UnsupportedOperationException(description() + ": Continuous scan are not supported"); } - - /** - * Returns an array of supported custom metrics with name and description. - * By default it returns empty array. - */ - default CustomMetric[] supportedCustomMetrics() { - CustomMetric[] NO_METRICS = {}; - return NO_METRICS; - } } From 00842c45f723ca60a72b921991ebc900654e2d10 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 13 Feb 2021 00:24:15 -0800 Subject: [PATCH 4/8] Go back to optional methods. --- .../sql/connector/SupportsReportMetrics.java | 39 ------------------- .../sql/connector/read/PartitionReader.java | 9 +++++ .../read/PartitionReaderWithMetrics.java | 39 ------------------- .../apache/spark/sql/connector/read/Scan.java | 10 +++++ 4 files changed, 19 insertions(+), 78 deletions(-) delete mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/SupportsReportMetrics.java delete mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReaderWithMetrics.java diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/SupportsReportMetrics.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/SupportsReportMetrics.java deleted file mode 100644 index 80186cb78126..000000000000 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/SupportsReportMetrics.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.connector; - -import org.apache.spark.annotation.Evolving; - -/** - * Data sources can implement this interface to - * report supported custom metrics to Spark in read/write path. - * - * @since 3.2.0 - */ -@Evolving -public interface SupportsReportMetrics { - - /** - * Returns an array of supported custom metrics with name and description. - * By default it returns empty array. - */ - default CustomMetric[] supportedCustomMetrics() { - CustomMetric[] NO_METRICS = {}; - return NO_METRICS; - } -} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java index 23fbd95800e2..b7fd3f48e726 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.CustomMetric; /** * A partition reader returned by {@link PartitionReaderFactory#createReader(InputPartition)} or @@ -48,4 +49,12 @@ public interface PartitionReader extends Closeable { * Return the current record. This method should return same value until `next` is called. */ T get(); + + /** + * Returns an array of custom metrics. By default it returns empty array. + */ + default CustomMetric[] getCustomMetrics() { + CustomMetric[] NO_METRICS = {}; + return NO_METRICS; + } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReaderWithMetrics.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReaderWithMetrics.java deleted file mode 100644 index b8dc7b239fbf..000000000000 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReaderWithMetrics.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.connector.read; - -import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.connector.CustomMetric; - -/** - * A mix in interface for {@link PartitionReader}. Partition reader can this interface to - * report custom metrics to Spark. - * - * @since 3.2.0 - */ -@Evolving -public interface PartitionReaderWithMetrics extends PartitionReader { - - /** - * Returns an array of custom metrics. By default it returns empty array. - */ - default CustomMetric[] getCustomMetrics() { - CustomMetric[] NO_METRICS = {}; - return NO_METRICS; - } -} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java index 4146f217985b..b70a656c492a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java @@ -18,6 +18,7 @@ package org.apache.spark.sql.connector.read; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.CustomMetric; import org.apache.spark.sql.connector.read.streaming.ContinuousStream; import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; import org.apache.spark.sql.types.StructType; @@ -102,4 +103,13 @@ default MicroBatchStream toMicroBatchStream(String checkpointLocation) { default ContinuousStream toContinuousStream(String checkpointLocation) { throw new UnsupportedOperationException(description() + ": Continuous scan are not supported"); } + + /** + * Returns an array of supported custom metrics with name and description. + * By default it returns empty array. + */ + default CustomMetric[] supportedCustomMetrics() { + CustomMetric[] NO_METRICS = {}; + return NO_METRICS; + } } From b8c762a776d535a1083c424a61922dc8d1401ccd Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 13 Feb 2021 10:58:10 -0800 Subject: [PATCH 5/8] Enrich metric comment. --- .../spark/sql/connector/CustomMetric.java | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java index 67c763190d43..0d79c392bb6b 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java @@ -18,9 +18,19 @@ package org.apache.spark.sql.connector; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.read.PartitionReader; +import org.apache.spark.sql.connector.read.Scan; /** - * A custom metric. + * A custom metric. This is a logical representation of a metric reported by data sources during + * read path. Data sources can report supported metric list by {@link Scan} to Spark in query + * planning. During query execution, Spark will collect the metrics per partition by + * {@link PartitionReader} and combine metrics from partitions to the final result. How Spark + * combines metrics depends on the metric type. For streaming query, Spark will collect and combine + * metrics for a final result per micro batch. + * + * The metrics will be gathered during query execution back to the driver and then combined. The + * final result will be shown up in the physical operator in Spark UI. * * @since 3.2.0 */ @@ -35,4 +45,17 @@ public interface CustomMetric { * Returns the description of custom metric. */ String description(); + + /** + * Supported metric type. The metric types must be supported by Spark SQL internal metrics. + * SUM: Spark sums up metrics from partitions as the final result. + */ + enum MetricType { + SUM + } + + /** + * Returns the type of custom metric. + */ + MetricType type(); } From cce58a56d5c48cb351688411d65159779550b5de Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 18 Feb 2021 19:18:27 -0800 Subject: [PATCH 6/8] Move value method to CustomMetric. --- .../spark/sql/connector/CustomMetric.java | 5 +++ .../spark/sql/connector/LongMetric.java | 33 ------------------- 2 files changed, 5 insertions(+), 33 deletions(-) delete mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/LongMetric.java diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java index 0d79c392bb6b..2f5843263071 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java @@ -46,6 +46,11 @@ public interface CustomMetric { */ String description(); + /** + * Returns the long value of custom metric. + */ + long value(); + /** * Supported metric type. The metric types must be supported by Spark SQL internal metrics. * SUM: Spark sums up metrics from partitions as the final result. diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/LongMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/LongMetric.java deleted file mode 100644 index 51565caead0c..000000000000 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/LongMetric.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.connector; - -import org.apache.spark.annotation.Evolving; - -/** - * A custom metric that reports a long value. - * - * @since 3.2.0 - */ -@Evolving -public interface LongMetric extends CustomMetric { - /** - * Returns the value of custom metric. - */ - long value(); -} From f46b733c2ec276dad31aa7732ff2349fd4363e52 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 22 Mar 2021 17:04:29 -0700 Subject: [PATCH 7/8] Update. --- .../spark/sql/connector/CustomMetric.java | 31 ++++--------- .../spark/sql/connector/CustomTaskMetric.java | 46 +++++++++++++++++++ .../sql/connector/read/PartitionReader.java | 8 ++-- 3 files changed, 58 insertions(+), 27 deletions(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomTaskMetric.java diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java index 2f5843263071..bbd35ac94677 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java @@ -18,19 +18,12 @@ package org.apache.spark.sql.connector; import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.connector.read.PartitionReader; -import org.apache.spark.sql.connector.read.Scan; /** - * A custom metric. This is a logical representation of a metric reported by data sources during - * read path. Data sources can report supported metric list by {@link Scan} to Spark in query - * planning. During query execution, Spark will collect the metrics per partition by - * {@link PartitionReader} and combine metrics from partitions to the final result. How Spark - * combines metrics depends on the metric type. For streaming query, Spark will collect and combine - * metrics for a final result per micro batch. - * - * The metrics will be gathered during query execution back to the driver and then combined. The - * final result will be shown up in the physical operator in Spark UI. + * A custom metric. Data source can define supported custom metrics using this interface. + * During query execution, Spark will collect the task metrics using {@link CustomTaskMetric} + * and combine the metrics at the driver side. How to combine task metrics is defined by the + * metric class with the same metric name. * * @since 3.2.0 */ @@ -47,20 +40,12 @@ public interface CustomMetric { String description(); /** - * Returns the long value of custom metric. - */ - long value(); - - /** - * Supported metric type. The metric types must be supported by Spark SQL internal metrics. - * SUM: Spark sums up metrics from partitions as the final result. + * The initial value of this metric. */ - enum MetricType { - SUM - } + long initialValue = 0L; /** - * Returns the type of custom metric. + * Given an array of task metric values, returns aggregated final metric value. */ - MetricType type(); + String aggregateTaskMetrics(long[] taskMetrics); } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomTaskMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomTaskMetric.java new file mode 100644 index 000000000000..caa508698ff2 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomTaskMetric.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.read.PartitionReader; + +/** + * A custom task metric. This is a logical representation of a metric reported by data sources + * at the executor side. During query execution, Spark will collect the task metrics per partition + * by {@link PartitionReader} and update internal metrics based on collected metric values. + * For streaming query, Spark will collect and combine metrics for a final result per micro batch. + * + * The metrics will be gathered during query execution back to the driver and then combined. How + * the task metrics are combined is defined by corresponding {@link CustomMetric} with same metric + * name. The final result will be shown up in the physical operator in Spark UI. + * + * @since 3.2.0 + */ +@Evolving +public interface CustomTaskMetric { + /** + * Returns the name of custom task metric. + */ + String name(); + + /** + * Returns the long value of custom task metric. + */ + long value(); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java index b7fd3f48e726..dfecb77c669b 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java @@ -21,7 +21,7 @@ import java.io.IOException; import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.connector.CustomMetric; +import org.apache.spark.sql.connector.CustomTaskMetric; /** * A partition reader returned by {@link PartitionReaderFactory#createReader(InputPartition)} or @@ -51,10 +51,10 @@ public interface PartitionReader extends Closeable { T get(); /** - * Returns an array of custom metrics. By default it returns empty array. + * Returns an array of custom task metrics. By default it returns empty array. */ - default CustomMetric[] getCustomMetrics() { - CustomMetric[] NO_METRICS = {}; + default CustomTaskMetric[] currentMetricsValues() { + CustomTaskMetric[] NO_METRICS = {}; return NO_METRICS; } } From eb9d94a07a38d6efcd71f12cc1c2db22bf8a7019 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 23 Mar 2021 02:00:42 -0700 Subject: [PATCH 8/8] Update comment. --- .../java/org/apache/spark/sql/connector/CustomTaskMetric.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomTaskMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomTaskMetric.java index caa508698ff2..47644a3267ed 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomTaskMetric.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomTaskMetric.java @@ -28,7 +28,7 @@ * * The metrics will be gathered during query execution back to the driver and then combined. How * the task metrics are combined is defined by corresponding {@link CustomMetric} with same metric - * name. The final result will be shown up in the physical operator in Spark UI. + * name. The final result will be shown up in the data source scan operator in Spark UI. * * @since 3.2.0 */