Skip to content

Commit bbdbe0f

Browse files
yijiacui-dbHeartSaVioR
authored andcommitted
[SPARK-34854][SQL][SS] Expose source metrics via progress report and add Kafka use-case to report delay
### What changes were proposed in this pull request? This pull request proposes a new API for streaming sources to signal that they can report metrics, and adds a use case to support Kafka micro batch stream to report the stats of # of offsets for the current offset falling behind the latest. A public interface is added. `metrics`: returns the metrics reported by the streaming source with given offset. ### Why are the changes needed? The new API can expose any custom metrics for the "current" offset for streaming sources. Different from #31398, this PR makes metrics available to user through progress report, not through spark UI. A use case is that people want to know how the current offset falls behind the latest offset. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test for Kafka micro batch source v2 are added to test the Kafka use case. Closes #31944 from yijiacui-db/SPARK-34297. Authored-by: Yijia Cui <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
1 parent f550e03 commit bbdbe0f

File tree

6 files changed

+238
-28
lines changed

6 files changed

+238
-28
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@
1818
package org.apache.spark.sql.kafka010
1919

2020
import java.{util => ju}
21+
import java.util.Optional
22+
23+
import scala.collection.JavaConverters._
2124

2225
import org.apache.spark.SparkEnv
2326
import org.apache.spark.internal.Logging
2427
import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT
2528
import org.apache.spark.sql.SparkSession
2629
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory}
27-
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset, ReadAllAvailable, ReadLimit, ReadMaxRows, SupportsAdmissionControl}
30+
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset, ReadAllAvailable, ReadLimit, ReadMaxRows, ReportsSourceMetrics, SupportsAdmissionControl}
2831
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
2932
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3033
import org.apache.spark.util.UninterruptibleThread
@@ -51,7 +54,8 @@ private[kafka010] class KafkaMicroBatchStream(
5154
options: CaseInsensitiveStringMap,
5255
metadataPath: String,
5356
startingOffsets: KafkaOffsetRangeLimit,
54-
failOnDataLoss: Boolean) extends SupportsAdmissionControl with MicroBatchStream with Logging {
57+
failOnDataLoss: Boolean)
58+
extends SupportsAdmissionControl with ReportsSourceMetrics with MicroBatchStream with Logging {
5559

5660
private[kafka010] val pollTimeoutMs = options.getLong(
5761
KafkaSourceProvider.CONSUMER_POLL_TIMEOUT,
@@ -133,6 +137,10 @@ private[kafka010] class KafkaMicroBatchStream(
133137

134138
override def toString(): String = s"KafkaV2[$kafkaOffsetReader]"
135139

140+
override def metrics(latestConsumedOffset: Optional[Offset]): ju.Map[String, String] = {
141+
KafkaMicroBatchStream.metrics(latestConsumedOffset, latestPartitionOffsets)
142+
}
143+
136144
/**
137145
* Read initial partition offsets from the checkpoint, or decide the offsets and write them to
138146
* the checkpoint.
@@ -218,3 +226,37 @@ private[kafka010] class KafkaMicroBatchStream(
218226
}
219227
}
220228
}
229+
230+
object KafkaMicroBatchStream extends Logging {
231+
232+
/**
233+
* Compute the difference of offset per partition between latestAvailablePartitionOffsets
234+
* and partition offsets in the latestConsumedOffset.
235+
* Report min/max/avg offsets behind the latest for all the partitions in the Kafka stream.
236+
*
237+
* Because of rate limit, latest consumed offset per partition can be smaller than
238+
* the latest available offset per partition.
239+
* @param latestConsumedOffset latest consumed offset
240+
* @param latestAvailablePartitionOffsets latest available offset per partition
241+
* @return the generated metrics map
242+
*/
243+
def metrics(
244+
latestConsumedOffset: Optional[Offset],
245+
latestAvailablePartitionOffsets: PartitionOffsetMap): ju.Map[String, String] = {
246+
val offset = Option(latestConsumedOffset.orElse(null))
247+
248+
if (offset.nonEmpty && latestAvailablePartitionOffsets != null) {
249+
val consumedPartitionOffsets = offset.map(KafkaSourceOffset(_)).get.partitionToOffsets
250+
val offsetsBehindLatest = latestAvailablePartitionOffsets
251+
.map(partitionOffset => partitionOffset._2 - consumedPartitionOffsets(partitionOffset._1))
252+
if (offsetsBehindLatest.nonEmpty) {
253+
val avgOffsetBehindLatest = offsetsBehindLatest.sum.toDouble / offsetsBehindLatest.size
254+
return Map[String, String](
255+
"minOffsetsBehindLatest" -> offsetsBehindLatest.min.toString,
256+
"maxOffsetsBehindLatest" -> offsetsBehindLatest.max.toString,
257+
"avgOffsetsBehindLatest" -> avgOffsetBehindLatest.toString).asJava
258+
}
259+
}
260+
ju.Collections.emptyMap()
261+
}
262+
}

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.kafka010
1919

2020
import org.apache.kafka.common.TopicPartition
2121

22+
import org.apache.spark.sql.connector.read.streaming
2223
import org.apache.spark.sql.connector.read.streaming.PartitionOffset
2324
import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
2425

@@ -62,4 +63,17 @@ private[kafka010] object KafkaSourceOffset {
6263
*/
6364
def apply(offset: SerializedOffset): KafkaSourceOffset =
6465
KafkaSourceOffset(JsonUtils.partitionOffsets(offset.json))
66+
67+
/**
68+
* Returns [[KafkaSourceOffset]] from a streaming.Offset
69+
*/
70+
def apply(offset: streaming.Offset): KafkaSourceOffset = {
71+
offset match {
72+
case k: KafkaSourceOffset => k
73+
case so: SerializedOffset => apply(so)
74+
case _ =>
75+
throw new IllegalArgumentException(
76+
s"Invalid conversion from offset of ${offset.getClass} to KafkaSourceOffset")
77+
}
78+
}
6579
}

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010
2020
import java.io._
2121
import java.nio.charset.StandardCharsets.UTF_8
2222
import java.nio.file.{Files, Paths}
23-
import java.util.Locale
23+
import java.util.{Locale, Optional}
2424
import java.util.concurrent.ConcurrentLinkedQueue
2525
import java.util.concurrent.atomic.AtomicInteger
2626

@@ -1297,6 +1297,112 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
12971297
CheckNewAnswer(32, 33, 34, 35, 36)
12981298
)
12991299
}
1300+
1301+
test("test custom metrics - with rate limit") {
1302+
import testImplicits._
1303+
1304+
val topic = newTopic()
1305+
val data = 1 to 10
1306+
testUtils.createTopic(topic, partitions = 2)
1307+
testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
1308+
testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
1309+
1310+
val kafka = spark
1311+
.readStream
1312+
.format("kafka")
1313+
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
1314+
.option("subscribe", topic)
1315+
.option("maxOffsetsPerTrigger", 1)
1316+
.option(STARTING_OFFSETS_OPTION_KEY, "earliest")
1317+
.load()
1318+
.selectExpr("CAST(value AS STRING)")
1319+
.as[String]
1320+
.map(_.toInt)
1321+
1322+
testStream(kafka)(
1323+
StartStream(),
1324+
makeSureGetOffsetCalled,
1325+
CheckAnswer(data: _*),
1326+
Execute { query =>
1327+
// The rate limit is 1, so there must be some delay in offsets per partition.
1328+
val progressWithDelay = query.recentProgress.map(_.sources.head).reverse.find { progress =>
1329+
// find the metrics that has non-zero average offsetsBehindLatest greater than 0.
1330+
!progress.metrics.isEmpty && progress.metrics.get("avgOffsetsBehindLatest").toDouble > 0
1331+
}
1332+
assert(progressWithDelay.nonEmpty)
1333+
val metrics = progressWithDelay.get.metrics
1334+
assert(metrics.keySet() ===
1335+
Set("minOffsetsBehindLatest",
1336+
"maxOffsetsBehindLatest",
1337+
"avgOffsetsBehindLatest").asJava)
1338+
assert(metrics.get("minOffsetsBehindLatest").toLong > 0)
1339+
assert(metrics.get("maxOffsetsBehindLatest").toLong > 0)
1340+
assert(metrics.get("avgOffsetsBehindLatest").toDouble > 0)
1341+
}
1342+
)
1343+
}
1344+
1345+
test("test custom metrics - no rate limit") {
1346+
import testImplicits._
1347+
1348+
val topic = newTopic()
1349+
val data = 1 to 10
1350+
testUtils.createTopic(topic, partitions = 2)
1351+
testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
1352+
testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
1353+
1354+
val kafka = spark
1355+
.readStream
1356+
.format("kafka")
1357+
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
1358+
.option("subscribe", topic)
1359+
.option(STARTING_OFFSETS_OPTION_KEY, "earliest")
1360+
.load()
1361+
.selectExpr("CAST(value AS STRING)")
1362+
.as[String]
1363+
.map(_.toInt)
1364+
1365+
testStream(kafka)(
1366+
StartStream(),
1367+
makeSureGetOffsetCalled,
1368+
CheckAnswer(data: _*),
1369+
Execute { query =>
1370+
val progress = query.recentProgress.map(_.sources.head).lastOption
1371+
assert(progress.nonEmpty)
1372+
val metrics = progress.get.metrics
1373+
// When there is no rate limit, there shouldn't be any delay in the current stream.
1374+
assert(metrics.keySet() ===
1375+
Set("minOffsetsBehindLatest",
1376+
"maxOffsetsBehindLatest",
1377+
"avgOffsetsBehindLatest").asJava)
1378+
assert(metrics.get("minOffsetsBehindLatest").toLong === 0)
1379+
assert(metrics.get("maxOffsetsBehindLatest").toLong === 0)
1380+
assert(metrics.get("avgOffsetsBehindLatest").toDouble === 0)
1381+
}
1382+
)
1383+
}
1384+
1385+
test("test custom metrics - corner cases") {
1386+
val topicPartition1 = new TopicPartition(newTopic(), 0)
1387+
val topicPartition2 = new TopicPartition(newTopic(), 0)
1388+
val latestOffset = Map[TopicPartition, Long]((topicPartition1, 3L), (topicPartition2, 6L))
1389+
1390+
// test empty offset.
1391+
assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(null), latestOffset).isEmpty)
1392+
1393+
// test valid offsetsBehindLatest
1394+
val offset = KafkaSourceOffset(
1395+
Map[TopicPartition, Long]((topicPartition1, 1L), (topicPartition2, 2L)))
1396+
assert(
1397+
KafkaMicroBatchStream.metrics(Optional.ofNullable(offset), latestOffset) ===
1398+
Map[String, String](
1399+
"minOffsetsBehindLatest" -> "2",
1400+
"maxOffsetsBehindLatest" -> "4",
1401+
"avgOffsetsBehindLatest" -> "3.0").asJava)
1402+
1403+
// test null latestAvailablePartitionOffsets
1404+
assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(offset), null).isEmpty)
1405+
}
13001406
}
13011407

13021408
abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.read.streaming;
19+
20+
import java.util.Map;
21+
import java.util.Optional;
22+
23+
import org.apache.spark.annotation.Evolving;
24+
25+
/**
26+
* A mix-in interface for {@link SparkDataStream} streaming sources to signal that they can report
27+
* metrics.
28+
*/
29+
@Evolving
30+
public interface ReportsSourceMetrics extends SparkDataStream {
31+
/**
32+
* Returns the metrics reported by the streaming source with respect to
33+
* the latest consumed offset.
34+
*
35+
* @param latestConsumedOffset the end offset (exclusive) of the latest triggered batch.
36+
*/
37+
Map<String, String> metrics(Optional<Offset> latestConsumedOffset);
38+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.execution.streaming
1919

2020
import java.text.SimpleDateFormat
21-
import java.util.{Date, UUID}
21+
import java.util.{Date, Optional, UUID}
2222

2323
import scala.collection.JavaConverters._
2424
import scala.collection.mutable
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalP
2929
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_SECOND
3030
import org.apache.spark.sql.catalyst.util.DateTimeUtils
3131
import org.apache.spark.sql.connector.catalog.Table
32-
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, SparkDataStream}
32+
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, ReportsSourceMetrics, SparkDataStream}
3333
import org.apache.spark.sql.execution.QueryExecution
3434
import org.apache.spark.sql.execution.datasources.v2.{MicroBatchScanExec, StreamingDataSourceV2Relation, StreamWriterCommitProgress}
3535
import org.apache.spark.sql.streaming._
@@ -101,6 +101,8 @@ trait ProgressReporter extends Logging {
101101
isTriggerActive = false)
102102
}
103103

104+
private var latestStreamProgress: StreamProgress = _
105+
104106
/** Returns the current status of the query. */
105107
def status: StreamingQueryStatus = currentStatus
106108

@@ -136,6 +138,7 @@ trait ProgressReporter extends Logging {
136138
currentTriggerStartOffsets = from.mapValues(_.json).toMap
137139
currentTriggerEndOffsets = to.mapValues(_.json).toMap
138140
currentTriggerLatestOffsets = latest.mapValues(_.json).toMap
141+
latestStreamProgress = to
139142
}
140143

141144
private def updateProgress(newProgress: StreamingQueryProgress): Unit = {
@@ -175,14 +178,20 @@ trait ProgressReporter extends Logging {
175178

176179
val sourceProgress = sources.distinct.map { source =>
177180
val numRecords = executionStats.inputRows.getOrElse(source, 0L)
181+
val sourceMetrics = source match {
182+
case withMetrics: ReportsSourceMetrics =>
183+
withMetrics.metrics(Optional.ofNullable(latestStreamProgress.get(source).orNull))
184+
case _ => Map[String, String]().asJava
185+
}
178186
new SourceProgress(
179187
description = source.toString,
180188
startOffset = currentTriggerStartOffsets.get(source).orNull,
181189
endOffset = currentTriggerEndOffsets.get(source).orNull,
182190
latestOffset = currentTriggerLatestOffsets.get(source).orNull,
183191
numInputRows = numRecords,
184192
inputRowsPerSecond = numRecords / inputTimeSec,
185-
processedRowsPerSecond = numRecords / processingTimeSec
193+
processedRowsPerSecond = numRecords / processingTimeSec,
194+
metrics = sourceMetrics
186195
)
187196
}
188197

sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.json4s.jackson.JsonMethods._
3333
import org.apache.spark.annotation.Evolving
3434
import org.apache.spark.sql.Row
3535
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
36+
import org.apache.spark.sql.streaming.SafeJsonSerializer.{safeDoubleToJValue, safeMapToJValue}
3637
import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
3738

3839
/**
@@ -138,17 +139,6 @@ class StreamingQueryProgress private[sql](
138139
override def toString: String = prettyJson
139140

140141
private[sql] def jsonValue: JValue = {
141-
def safeDoubleToJValue(value: Double): JValue = {
142-
if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
143-
}
144-
145-
/** Convert map to JValue while handling empty maps. Also, this sorts the keys. */
146-
def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = {
147-
if (map.isEmpty) return JNothing
148-
val keys = map.asScala.keySet.toSeq.sorted
149-
keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ ~ _)
150-
}
151-
152142
("id" -> JString(id.toString)) ~
153143
("runId" -> JString(runId.toString)) ~
154144
("name" -> JString(name)) ~
@@ -188,7 +178,8 @@ class SourceProgress protected[sql](
188178
val latestOffset: String,
189179
val numInputRows: Long,
190180
val inputRowsPerSecond: Double,
191-
val processedRowsPerSecond: Double) extends Serializable {
181+
val processedRowsPerSecond: Double,
182+
val metrics: ju.Map[String, String] = Map[String, String]().asJava) extends Serializable {
192183

193184
/** The compact JSON representation of this progress. */
194185
def json: String = compact(render(jsonValue))
@@ -199,17 +190,14 @@ class SourceProgress protected[sql](
199190
override def toString: String = prettyJson
200191

201192
private[sql] def jsonValue: JValue = {
202-
def safeDoubleToJValue(value: Double): JValue = {
203-
if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
204-
}
205-
206193
("description" -> JString(description)) ~
207-
("startOffset" -> tryParse(startOffset)) ~
208-
("endOffset" -> tryParse(endOffset)) ~
209-
("latestOffset" -> tryParse(latestOffset)) ~
210-
("numInputRows" -> JInt(numInputRows)) ~
211-
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
212-
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond))
194+
("startOffset" -> tryParse(startOffset)) ~
195+
("endOffset" -> tryParse(endOffset)) ~
196+
("latestOffset" -> tryParse(latestOffset)) ~
197+
("numInputRows" -> JInt(numInputRows)) ~
198+
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
199+
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
200+
("metrics" -> safeMapToJValue[String](metrics, s => JString(s)))
213201
}
214202

215203
private def tryParse(json: String) = try {
@@ -258,3 +246,16 @@ private[sql] object SinkProgress {
258246
def apply(description: String, numOutputRows: Option[Long]): SinkProgress =
259247
new SinkProgress(description, numOutputRows.getOrElse(DEFAULT_NUM_OUTPUT_ROWS))
260248
}
249+
250+
private object SafeJsonSerializer {
251+
def safeDoubleToJValue(value: Double): JValue = {
252+
if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
253+
}
254+
255+
/** Convert map to JValue while handling empty maps. Also, this sorts the keys. */
256+
def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = {
257+
if (map.isEmpty) return JNothing
258+
val keys = map.asScala.keySet.toSeq.sorted
259+
keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ ~ _)
260+
}
261+
}

0 commit comments

Comments
 (0)