Skip to content

Commit eebf9c6

Browse files
committed
Add custom metrics.
1 parent ac8307d commit eebf9c6

File tree

9 files changed

+150
-6
lines changed

9 files changed

+150
-6
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.spark.internal.Logging
2323
import org.apache.spark.sql.catalyst.InternalRow
2424
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
2525
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
26+
import org.apache.spark.sql.connector.read.streaming.{CustomMetric, CustomSumMetric}
2627
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer
2728

2829
/** A [[InputPartition]] for reading Kafka data in a batch based streaming query. */
@@ -105,4 +106,12 @@ private case class KafkaBatchPartitionReader(
105106
range
106107
}
107108
}
109+
110+
override def getCustomMetrics(): Array[CustomMetric] = {
111+
Array(
112+
CustomSumMetric("offsetOutOfRange", "estimated number of fetched offsets out of range",
113+
consumer.getNumOffsetOutOfRange()),
114+
CustomSumMetric("dataLoss", "number of data loss error",
115+
consumer.getNumDataLoss()))
116+
}
108117
}

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.internal.Logging
2424
import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT
2525
import org.apache.spark.sql.SparkSession
2626
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory}
27-
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset, ReadAllAvailable, ReadLimit, ReadMaxRows, SupportsAdmissionControl}
27+
import org.apache.spark.sql.connector.read.streaming.{CustomMetric, CustomSumMetric, MicroBatchStream, Offset, ReadAllAvailable, ReadLimit, ReadMaxRows, SupportsAdmissionControl}
2828
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
2929
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3030
import org.apache.spark.util.UninterruptibleThread
@@ -217,4 +217,9 @@ private[kafka010] class KafkaMicroBatchStream(
217217
logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
218218
}
219219
}
220+
221+
override def supportedCustomMetrics(): Array[CustomMetric] =
222+
Array(
223+
CustomSumMetric("offsetOutOfRange", "estimated number of fetched offsets out of range", 0L),
224+
CustomSumMetric("dataLoss", "number of data loss error", 0L))
220225
}

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,9 @@ private[kafka010] class KafkaDataConsumer(
239239
fetchedDataPool: FetchedDataPool) extends Logging {
240240
import KafkaDataConsumer._
241241

242+
private var offsetOutOfRange = 0L
243+
private var dataLoss = 0L
244+
242245
private val isTokenProviderEnabled =
243246
HadoopDelegationTokenManager.isServiceEnabled(SparkEnv.get.conf, "kafka")
244247

@@ -329,7 +332,14 @@ private[kafka010] class KafkaDataConsumer(
329332

330333
reportDataLoss(topicPartition, groupId, failOnDataLoss,
331334
s"Cannot fetch offset $toFetchOffset", e)
335+
336+
val oldToFetchOffsetd = toFetchOffset
332337
toFetchOffset = getEarliestAvailableOffsetBetween(consumer, toFetchOffset, untilOffset)
338+
if (toFetchOffset == UNKNOWN_OFFSET) {
339+
offsetOutOfRange += (untilOffset - oldToFetchOffsetd)
340+
} else {
341+
offsetOutOfRange += (toFetchOffset - oldToFetchOffsetd)
342+
}
333343
}
334344
}
335345

@@ -350,6 +360,9 @@ private[kafka010] class KafkaDataConsumer(
350360
consumer.getAvailableOffsetRange()
351361
}
352362

363+
def getNumOffsetOutOfRange(): Long = offsetOutOfRange
364+
def getNumDataLoss(): Long = dataLoss
365+
353366
/**
354367
* Release borrowed objects in data reader to the pool. Once the instance is created, caller
355368
* must call method after using the instance to make sure resources are not leaked.
@@ -596,6 +609,7 @@ private[kafka010] class KafkaDataConsumer(
596609
message: String,
597610
cause: Throwable = null): Unit = {
598611
val finalMessage = s"$message ${additionalMessage(topicPartition, groupId, failOnDataLoss)}"
612+
dataLoss += 1
599613
reportDataLoss0(failOnDataLoss, finalMessage, cause)
600614
}
601615

sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.IOException;
2222

2323
import org.apache.spark.annotation.Evolving;
24+
import org.apache.spark.sql.connector.read.streaming.CustomMetric;
2425

2526
/**
2627
* A partition reader returned by {@link PartitionReaderFactory#createReader(InputPartition)} or
@@ -48,4 +49,12 @@ public interface PartitionReader<T> extends Closeable {
4849
* Return the current record. This method should return same value until `next` is called.
4950
*/
5051
T get();
52+
53+
/**
54+
* Returns an array of custom metrics. By default it returns empty array.
55+
*/
56+
default CustomMetric[] getCustomMetrics() {
57+
CustomMetric[] NO_METRICS = {};
58+
return NO_METRICS;
59+
}
5160
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.read.streaming;
19+
20+
import org.apache.spark.annotation.Evolving;
21+
22+
/**
23+
* A custom metric for {@link SparkDataStream}.
24+
*
25+
* @since 3.2.0
26+
*/
27+
@Evolving
28+
public interface CustomMetric {
29+
/**
30+
* Returns the name of custom metric.
31+
*/
32+
String getName();
33+
34+
/**
35+
* Returns the description of custom metric.
36+
*/
37+
String getDescription();
38+
39+
/**
40+
* Returns the value of custom metric.
41+
*/
42+
Long getValue();
43+
}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/MicroBatchStream.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,13 @@ public interface MicroBatchStream extends SparkDataStream {
5656
* Returns a factory to create a {@link PartitionReader} for each {@link InputPartition}.
5757
*/
5858
PartitionReaderFactory createReaderFactory();
59+
60+
/**
61+
* Returns an array of supported custom metrics with name and description.
62+
* By default it returns empty array.
63+
*/
64+
default CustomMetric[] supportedCustomMetrics() {
65+
CustomMetric[] NO_METRICS = {};
66+
return NO_METRICS;
67+
}
5968
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.read.streaming
19+
20+
case class CustomSumMetric(name: String, desc: String, value: Long) extends CustomMetric {
21+
override def getName(): String = name
22+
override def getDescription: String = desc
23+
override def getValue: java.lang.Long = value
24+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD
2626
import org.apache.spark.sql.catalyst.InternalRow
2727
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
2828
import org.apache.spark.sql.vectorized.ColumnarBatch
29+
import org.apache.spark.util.CompletionIterator
2930

3031
class DataSourceRDDPartition(val index: Int, val inputPartition: InputPartition)
3132
extends Partition with Serializable
@@ -36,7 +37,8 @@ class DataSourceRDD(
3637
sc: SparkContext,
3738
@transient private val inputPartitions: Seq[InputPartition],
3839
partitionReaderFactory: PartitionReaderFactory,
39-
columnarReads: Boolean)
40+
columnarReads: Boolean,
41+
onCompletion: PartitionReader[_] => Unit = _ => {})
4042
extends RDD[InternalRow](sc, Nil) {
4143

4244
override protected def getPartitions: Array[Partition] = {
@@ -55,11 +57,21 @@ class DataSourceRDD(
5557
val (iter, reader) = if (columnarReads) {
5658
val batchReader = partitionReaderFactory.createColumnarReader(inputPartition)
5759
val iter = new MetricsBatchIterator(new PartitionIterator[ColumnarBatch](batchReader))
58-
(iter, batchReader)
60+
def completionFunction = {
61+
onCompletion(batchReader)
62+
}
63+
val completionIterator = CompletionIterator[ColumnarBatch, Iterator[ColumnarBatch]](
64+
iter, completionFunction)
65+
(completionIterator, batchReader)
5966
} else {
6067
val rowReader = partitionReaderFactory.createReader(inputPartition)
6168
val iter = new MetricsRowIterator(new PartitionIterator[InternalRow](rowReader))
62-
(iter, rowReader)
69+
def completionFunction = {
70+
onCompletion(rowReader)
71+
}
72+
val completionIterator = CompletionIterator[InternalRow, Iterator[InternalRow]](
73+
iter, completionFunction)
74+
(completionIterator, rowReader)
6375
}
6476
context.addTaskCompletionListener[Unit](_ => reader.close())
6577
// TODO: SPARK-25083 remove the type erasure hack in data source scan

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ package org.apache.spark.sql.execution.datasources.v2
2020
import org.apache.spark.rdd.RDD
2121
import org.apache.spark.sql.catalyst.InternalRow
2222
import org.apache.spark.sql.catalyst.expressions.Attribute
23-
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory, Scan}
23+
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory, Scan}
2424
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset}
25+
import org.apache.spark.sql.execution.metric.SQLMetrics
2526

2627
/**
2728
* Physical plan node for scanning a micro-batch of data from a data source.
@@ -33,6 +34,14 @@ case class MicroBatchScanExec(
3334
@transient start: Offset,
3435
@transient end: Offset) extends DataSourceV2ScanExecBase {
3536

37+
override lazy val metrics = {
38+
val customMetrics = stream.supportedCustomMetrics().map { customMetric =>
39+
customMetric.getName -> SQLMetrics.createMetric(sparkContext, customMetric.getDescription)
40+
}
41+
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) ++
42+
customMetrics
43+
}
44+
3645
// TODO: unify the equal/hashCode implementation for all data source v2 query plans.
3746
override def equals(other: Any): Boolean = other match {
3847
case other: MicroBatchScanExec => this.stream == other.stream
@@ -45,7 +54,17 @@ case class MicroBatchScanExec(
4554

4655
override lazy val readerFactory: PartitionReaderFactory = stream.createReaderFactory()
4756

57+
/**
58+
* The callback function which is called when the output iterator of input RDD is consumed
59+
* completely.
60+
*/
61+
private def onOutputCompletion(reader: PartitionReader[_]) = {
62+
reader.getCustomMetrics.foreach { metric =>
63+
longMetric(metric.getName) += metric.getValue
64+
}
65+
}
66+
4867
override lazy val inputRDD: RDD[InternalRow] = {
49-
new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar)
68+
new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar, onOutputCompletion)
5069
}
5170
}

0 commit comments

Comments
 (0)