Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.{util => ju}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.connector.CustomTaskMetric
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer

Expand Down Expand Up @@ -105,4 +106,16 @@ private case class KafkaBatchPartitionReader(
range
}
}

override def currentMetricsValues(): Array[CustomTaskMetric] = {
val offsetOutOfRange = new CustomTaskMetric {
override def name(): String = "offsetOutOfRange"
override def value(): Long = consumer.getNumOffsetOutOfRange()
}
val dataLoss = new CustomTaskMetric {
override def name(): String = "dataLoss"
override def value(): Long = consumer.getNumDataLoss()
}
Array(offsetOutOfRange, dataLoss)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ import org.apache.spark.internal.Logging
import org.apache.spark.kafka010.KafkaConfigUpdater
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.CustomMetric
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.connector.read.{Batch, Scan, ScanBuilder}
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, SupportsTruncate, WriteBuilder}
import org.apache.spark.sql.connector.write.streaming.StreamingWrite
import org.apache.spark.sql.execution.metric.CustomSumMetric
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.internal.connector.{SimpleTableProvider, SupportsStreamingUpdateAsAppend}
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -503,9 +505,23 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
startingStreamOffsets,
failOnDataLoss(caseInsensitiveOptions))
}

override def supportedCustomMetrics(): Array[CustomMetric] = {
Array(new OffsetOutOfRangeMetric, new DataLossMetric)
}
}
}

private[spark] class OffsetOutOfRangeMetric extends CustomSumMetric {
override def name(): String = "offsetOutOfRange"
override def description(): String = "estimated number of fetched offsets out of range"
}

private[spark] class DataLossMetric extends CustomSumMetric {
override def name(): String = "dataLoss"
override def description(): String = "number of data loss error"
}

private[kafka010] object KafkaSourceProvider extends Logging {
private val ASSIGN = "assign"
private val SUBSCRIBE_PATTERN = "subscribepattern"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ private[kafka010] class KafkaDataConsumer(
fetchedDataPool: FetchedDataPool) extends Logging {
import KafkaDataConsumer._

private var offsetOutOfRange = 0L
private var dataLoss = 0L

private val isTokenProviderEnabled =
HadoopDelegationTokenManager.isServiceEnabled(SparkEnv.get.conf, "kafka")

Expand Down Expand Up @@ -329,7 +332,14 @@ private[kafka010] class KafkaDataConsumer(

reportDataLoss(topicPartition, groupId, failOnDataLoss,
s"Cannot fetch offset $toFetchOffset", e)

val oldToFetchOffsetd = toFetchOffset
toFetchOffset = getEarliestAvailableOffsetBetween(consumer, toFetchOffset, untilOffset)
if (toFetchOffset == UNKNOWN_OFFSET) {
offsetOutOfRange += (untilOffset - oldToFetchOffsetd)
} else {
offsetOutOfRange += (toFetchOffset - oldToFetchOffsetd)
}
}
}

Expand All @@ -350,6 +360,9 @@ private[kafka010] class KafkaDataConsumer(
consumer.getAvailableOffsetRange()
}

def getNumOffsetOutOfRange(): Long = offsetOutOfRange
def getNumDataLoss(): Long = dataLoss

/**
* Release borrowed objects in data reader to the pool. Once the instance is created, caller
* must call method after using the instance to make sure resources are not leaked.
Expand Down Expand Up @@ -596,6 +609,7 @@ private[kafka010] class KafkaDataConsumer(
message: String,
cause: Throwable = null): Unit = {
val finalMessage = s"$message ${additionalMessage(topicPartition, groupId, failOnDataLoss)}"
dataLoss += 1
reportDataLoss0(failOnDataLoss, finalMessage, cause)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,21 @@ object CustomMetrics {
}

/**
* Built-in `CustomMetric` that sums up metric values.
* Built-in `CustomMetric` that sums up metric values. Note that please extend this class
* and override `name` and `description` to create your custom metric for real usage.
*/
class CustomSumMetric extends CustomMetric {
override def name(): String = "CustomSumMetric"

override def description(): String = "Sum up CustomMetric"
abstract class CustomSumMetric extends CustomMetric {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in the internal package. Shall we move it to the v2 package and rewrite it using java?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or we plan to use it for internal sources only?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should move to v2 public package using Java.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will create a followup to do it.


override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
taskMetrics.sum.toString
}
}

/**
* Built-in `CustomMetric` that computes average of metric values.
* Built-in `CustomMetric` that computes average of metric values. Note that please extend this
* class and override `name` and `description` to create your custom metric for real usage.
*/
class CustomAvgMetric extends CustomMetric {
override def name(): String = "CustomAvgMetric"

override def description(): String = "Average CustomMetric"
abstract class CustomAvgMetric extends CustomMetric {

override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
val average = if (taskMetrics.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ object SQLMetrics {
*/
def createV2CustomMetric(sc: SparkContext, customMetric: CustomMetric): SQLMetric = {
val acc = new SQLMetric(CustomMetrics.buildV2CustomMetricTypeName(customMetric))
acc.register(sc, name = Some(customMetric.name()), countFailedValues = false)
acc.register(sc, name = Some(customMetric.description()), countFailedValues = false)
acc
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.SparkFunSuite
class CustomMetricsSuite extends SparkFunSuite {

test("Build/parse custom metric metric type") {
Seq(new CustomSumMetric, new CustomAvgMetric).foreach { customMetric =>
Seq(new TestCustomSumMetric, new TestCustomAvgMetric).foreach { customMetric =>
val metricType = CustomMetrics.buildV2CustomMetricTypeName(customMetric)

assert(metricType == CustomMetrics.V2_CUSTOM + "_" + customMetric.getClass.getCanonicalName)
Expand All @@ -33,7 +33,7 @@ class CustomMetricsSuite extends SparkFunSuite {
}

test("Built-in CustomSumMetric") {
val metric = new CustomSumMetric
val metric = new TestCustomSumMetric

val metricValues1 = Array(0L, 1L, 5L, 5L, 7L, 10L)
assert(metric.aggregateTaskMetrics(metricValues1) == metricValues1.sum.toString)
Expand All @@ -43,7 +43,7 @@ class CustomMetricsSuite extends SparkFunSuite {
}

test("Built-in CustomAvgMetric") {
val metric = new CustomAvgMetric
val metric = new TestCustomAvgMetric

val metricValues1 = Array(0L, 1L, 5L, 5L, 7L, 10L)
assert(metric.aggregateTaskMetrics(metricValues1) == "4.667")
Expand All @@ -52,3 +52,13 @@ class CustomMetricsSuite extends SparkFunSuite {
assert(metric.aggregateTaskMetrics(metricValues2) == "0")
}
}

private[spark] class TestCustomSumMetric extends CustomSumMetric {
override def name(): String = "CustomSumMetric"
override def description(): String = "Sum up CustomMetric"
}

private[spark] class TestCustomAvgMetric extends CustomAvgMetric {
override def name(): String = "CustomAvgMetric"
override def description(): String = "Average CustomMetric"
}