diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala index 8b37fd6e7e2b..0aa51a940160 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala @@ -22,6 +22,7 @@ import java.{util => ju} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.connector.CustomTaskMetric import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer @@ -105,4 +106,16 @@ private case class KafkaBatchPartitionReader( range } } + + override def currentMetricsValues(): Array[CustomTaskMetric] = { + val offsetOutOfRange = new CustomTaskMetric { + override def name(): String = "offsetOutOfRange" + override def value(): Long = consumer.getNumOffsetOutOfRange() + } + val dataLoss = new CustomTaskMetric { + override def name(): String = "dataLoss" + override def value(): Long = consumer.getNumDataLoss() + } + Array(offsetOutOfRange, dataLoss) + } } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 7299b182ae1c..c34c43563014 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -30,11 +30,13 @@ import org.apache.spark.internal.Logging import org.apache.spark.kafka010.KafkaConfigUpdater import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.connector.CustomMetric import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability} import org.apache.spark.sql.connector.read.{Batch, Scan, ScanBuilder} import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, SupportsTruncate, WriteBuilder} import org.apache.spark.sql.connector.write.streaming.StreamingWrite +import org.apache.spark.sql.execution.metric.CustomSumMetric import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.internal.connector.{SimpleTableProvider, SupportsStreamingUpdateAsAppend} import org.apache.spark.sql.sources._ @@ -503,9 +505,23 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister startingStreamOffsets, failOnDataLoss(caseInsensitiveOptions)) } + + override def supportedCustomMetrics(): Array[CustomMetric] = { + Array(new OffsetOutOfRangeMetric, new DataLossMetric) + } } } +private[spark] class OffsetOutOfRangeMetric extends CustomSumMetric { + override def name(): String = "offsetOutOfRange" + override def description(): String = "estimated number of fetched offsets out of range" +} + +private[spark] class DataLossMetric extends CustomSumMetric { + override def name(): String = "dataLoss" + override def description(): String = "number of data loss error" +} + private[kafka010] object KafkaSourceProvider extends Logging { private val ASSIGN = "assign" private val SUBSCRIBE_PATTERN = "subscribepattern" diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala index 5c92d110a630..37fe38ea94ec 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala @@ -239,6 +239,9 @@ private[kafka010] class KafkaDataConsumer( fetchedDataPool: FetchedDataPool) extends Logging { import KafkaDataConsumer._ + private var offsetOutOfRange = 0L + private var dataLoss = 0L + private val isTokenProviderEnabled = HadoopDelegationTokenManager.isServiceEnabled(SparkEnv.get.conf, "kafka") @@ -329,7 +332,14 @@ private[kafka010] class KafkaDataConsumer( reportDataLoss(topicPartition, groupId, failOnDataLoss, s"Cannot fetch offset $toFetchOffset", e) + + val oldToFetchOffsetd = toFetchOffset toFetchOffset = getEarliestAvailableOffsetBetween(consumer, toFetchOffset, untilOffset) + if (toFetchOffset == UNKNOWN_OFFSET) { + offsetOutOfRange += (untilOffset - oldToFetchOffsetd) + } else { + offsetOutOfRange += (toFetchOffset - oldToFetchOffsetd) + } } } @@ -350,6 +360,9 @@ private[kafka010] class KafkaDataConsumer( consumer.getAvailableOffsetRange() } + def getNumOffsetOutOfRange(): Long = offsetOutOfRange + def getNumDataLoss(): Long = dataLoss + /** * Release borrowed objects in data reader to the pool. Once the instance is created, caller * must call method after using the instance to make sure resources are not leaked. @@ -596,6 +609,7 @@ private[kafka010] class KafkaDataConsumer( message: String, cause: Throwable = null): Unit = { val finalMessage = s"$message ${additionalMessage(topicPartition, groupId, failOnDataLoss)}" + dataLoss += 1 reportDataLoss0(failOnDataLoss, finalMessage, cause) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala index 3cb20f87ae63..cc28be3ca8ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala @@ -47,12 +47,10 @@ object CustomMetrics { } /** - * Built-in `CustomMetric` that sums up metric values. + * Built-in `CustomMetric` that sums up metric values. Note that please extend this class + * and override `name` and `description` to create your custom metric for real usage. */ -class CustomSumMetric extends CustomMetric { - override def name(): String = "CustomSumMetric" - - override def description(): String = "Sum up CustomMetric" +abstract class CustomSumMetric extends CustomMetric { override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = { taskMetrics.sum.toString @@ -60,12 +58,10 @@ class CustomSumMetric extends CustomMetric { } /** - * Built-in `CustomMetric` that computes average of metric values. + * Built-in `CustomMetric` that computes average of metric values. Note that please extend this + * class and override `name` and `description` to create your custom metric for real usage. */ -class CustomAvgMetric extends CustomMetric { - override def name(): String = "CustomAvgMetric" - - override def description(): String = "Average CustomMetric" +abstract class CustomAvgMetric extends CustomMetric { override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = { val average = if (taskMetrics.isEmpty) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index da39e8c455e3..0e48e6efeee3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -113,7 +113,7 @@ object SQLMetrics { */ def createV2CustomMetric(sc: SparkContext, customMetric: CustomMetric): SQLMetric = { val acc = new SQLMetric(CustomMetrics.buildV2CustomMetricTypeName(customMetric)) - acc.register(sc, name = Some(customMetric.name()), countFailedValues = false) + acc.register(sc, name = Some(customMetric.description()), countFailedValues = false) acc } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/CustomMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/CustomMetricsSuite.scala index e2fa03ff23c9..020f3f494a2e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/CustomMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/CustomMetricsSuite.scala @@ -22,7 +22,7 @@ import org.apache.spark.SparkFunSuite class CustomMetricsSuite extends SparkFunSuite { test("Build/parse custom metric metric type") { - Seq(new CustomSumMetric, new CustomAvgMetric).foreach { customMetric => + Seq(new TestCustomSumMetric, new TestCustomAvgMetric).foreach { customMetric => val metricType = CustomMetrics.buildV2CustomMetricTypeName(customMetric) assert(metricType == CustomMetrics.V2_CUSTOM + "_" + customMetric.getClass.getCanonicalName) @@ -33,7 +33,7 @@ class CustomMetricsSuite extends SparkFunSuite { } test("Built-in CustomSumMetric") { - val metric = new CustomSumMetric + val metric = new TestCustomSumMetric val metricValues1 = Array(0L, 1L, 5L, 5L, 7L, 10L) assert(metric.aggregateTaskMetrics(metricValues1) == metricValues1.sum.toString) @@ -43,7 +43,7 @@ class CustomMetricsSuite extends SparkFunSuite { } test("Built-in CustomAvgMetric") { - val metric = new CustomAvgMetric + val metric = new TestCustomAvgMetric val metricValues1 = Array(0L, 1L, 5L, 5L, 7L, 10L) assert(metric.aggregateTaskMetrics(metricValues1) == "4.667") @@ -52,3 +52,13 @@ class CustomMetricsSuite extends SparkFunSuite { assert(metric.aggregateTaskMetrics(metricValues2) == "0") } } + +private[spark] class TestCustomSumMetric extends CustomSumMetric { + override def name(): String = "CustomSumMetric" + override def description(): String = "Sum up CustomMetric" +} + +private[spark] class TestCustomAvgMetric extends CustomAvgMetric { + override def name(): String = "CustomAvgMetric" + override def description(): String = "Average CustomMetric" +}