Skip to content

Commit 6fb1f73

Browse files
koeningerzsxwing
authored andcommitted
[SPARK-17813][SQL][KAFKA] Maximum data per trigger
## What changes were proposed in this pull request? maxOffsetsPerTrigger option for rate limiting, proportionally based on volume of different topicpartitions. ## How was this patch tested? Added unit test Author: cody koeninger <[email protected]> Closes #15527 from koeninger/SPARK-17813. (cherry picked from commit 1042325) Signed-off-by: Shixiong Zhu <[email protected]>
1 parent 1a4be51 commit 6fb1f73

File tree

3 files changed

+157
-27
lines changed

3 files changed

+157
-27
lines changed

docs/structured-streaming-kafka-integration.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,12 @@ The following configurations are optional:
221221
<td>10</td>
222222
<td>milliseconds to wait before retrying to fetch Kafka offsets</td>
223223
</tr>
224+
<tr>
225+
<td>maxOffsetsPerTrigger</td>
226+
<td>long</td>
227+
<td>none</td>
228+
<td>Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.</td>
229+
</tr>
224230
</table>
225231

226232
Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g,

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala

Lines changed: 82 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,9 @@ private[kafka010] case class KafkaSource(
9696
private val offsetFetchAttemptIntervalMs =
9797
sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "10").toLong
9898

99+
private val maxOffsetsPerTrigger =
100+
sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
101+
99102
/**
100103
* A KafkaConsumer used in the driver to query the latest Kafka offsets. This only queries the
101104
* offsets and never commits them.
@@ -121,16 +124,63 @@ private[kafka010] case class KafkaSource(
121124
}.partitionToOffsets
122125
}
123126

127+
private var currentPartitionOffsets: Option[Map[TopicPartition, Long]] = None
128+
124129
override def schema: StructType = KafkaSource.kafkaSchema
125130

126131
/** Returns the maximum available offset for this source. */
127132
override def getOffset: Option[Offset] = {
128133
// Make sure initialPartitionOffsets is initialized
129134
initialPartitionOffsets
130135

131-
val offset = KafkaSourceOffset(fetchLatestOffsets())
132-
logDebug(s"GetOffset: ${offset.partitionToOffsets.toSeq.map(_.toString).sorted}")
133-
Some(offset)
136+
val latest = fetchLatestOffsets()
137+
val offsets = maxOffsetsPerTrigger match {
138+
case None =>
139+
latest
140+
case Some(limit) if currentPartitionOffsets.isEmpty =>
141+
rateLimit(limit, initialPartitionOffsets, latest)
142+
case Some(limit) =>
143+
rateLimit(limit, currentPartitionOffsets.get, latest)
144+
}
145+
146+
currentPartitionOffsets = Some(offsets)
147+
logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}")
148+
Some(KafkaSourceOffset(offsets))
149+
}
150+
151+
/** Proportionally distribute limit number of offsets among topicpartitions */
152+
private def rateLimit(
153+
limit: Long,
154+
from: Map[TopicPartition, Long],
155+
until: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
156+
val fromNew = fetchNewPartitionEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
157+
val sizes = until.flatMap {
158+
case (tp, end) =>
159+
// If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
160+
from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
161+
val size = end - begin
162+
logDebug(s"rateLimit $tp size is $size")
163+
if (size > 0) Some(tp -> size) else None
164+
}
165+
}
166+
val total = sizes.values.sum.toDouble
167+
if (total < 1) {
168+
until
169+
} else {
170+
until.map {
171+
case (tp, end) =>
172+
tp -> sizes.get(tp).map { size =>
173+
val begin = from.get(tp).getOrElse(fromNew(tp))
174+
val prorate = limit * (size / total)
175+
logDebug(s"rateLimit $tp prorated amount is $prorate")
176+
// Don't completely starve small topicpartitions
177+
val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
178+
logDebug(s"rateLimit $tp new offset is $off")
179+
// Paranoia, make sure not to return an offset that's past end
180+
Math.min(end, off)
181+
}.getOrElse(end)
182+
}
183+
}
134184
}
135185

136186
/**
@@ -153,11 +203,7 @@ private[kafka010] case class KafkaSource(
153203

154204
// Find the new partitions, and get their earliest offsets
155205
val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet)
156-
val newPartitionOffsets = if (newPartitions.nonEmpty) {
157-
fetchNewPartitionEarliestOffsets(newPartitions.toSeq)
158-
} else {
159-
Map.empty[TopicPartition, Long]
160-
}
206+
val newPartitionOffsets = fetchNewPartitionEarliestOffsets(newPartitions.toSeq)
161207
if (newPartitionOffsets.keySet != newPartitions) {
162208
// We cannot get from offsets for some partitions. It means they got deleted.
163209
val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
@@ -221,6 +267,12 @@ private[kafka010] case class KafkaSource(
221267

222268
logInfo("GetBatch generating RDD of offset range: " +
223269
offsetRanges.sortBy(_.topicPartition.toString).mkString(", "))
270+
271+
// On recovery, getBatch will get called before getOffset
272+
if (currentPartitionOffsets.isEmpty) {
273+
currentPartitionOffsets = Some(untilPartitionOffsets)
274+
}
275+
224276
sqlContext.createDataFrame(rdd, schema)
225277
}
226278

@@ -305,23 +357,28 @@ private[kafka010] case class KafkaSource(
305357
* some partitions if they are deleted.
306358
*/
307359
private def fetchNewPartitionEarliestOffsets(
308-
newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
309-
// Poll to get the latest assigned partitions
310-
consumer.poll(0)
311-
val partitions = consumer.assignment()
312-
consumer.pause(partitions)
313-
logDebug(s"\tPartitions assigned to consumer: $partitions")
314-
315-
// Get the earliest offset of each partition
316-
consumer.seekToBeginning(partitions)
317-
val partitionOffsets = newPartitions.filter { p =>
318-
// When deleting topics happen at the same time, some partitions may not be in `partitions`.
319-
// So we need to ignore them
320-
partitions.contains(p)
321-
}.map(p => p -> consumer.position(p)).toMap
322-
logDebug(s"Got earliest offsets for new partitions: $partitionOffsets")
323-
partitionOffsets
324-
}
360+
newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] =
361+
if (newPartitions.isEmpty) {
362+
Map.empty[TopicPartition, Long]
363+
} else {
364+
withRetriesWithoutInterrupt {
365+
// Poll to get the latest assigned partitions
366+
consumer.poll(0)
367+
val partitions = consumer.assignment()
368+
consumer.pause(partitions)
369+
logDebug(s"\tPartitions assigned to consumer: $partitions")
370+
371+
// Get the earliest offset of each partition
372+
consumer.seekToBeginning(partitions)
373+
val partitionOffsets = newPartitions.filter { p =>
374+
// When deleting topics happen at the same time, some partitions may not be in
375+
// `partitions`. So we need to ignore them
376+
partitions.contains(p)
377+
}.map(p => p -> consumer.position(p)).toMap
378+
logDebug(s"Got earliest offsets for new partitions: $partitionOffsets")
379+
partitionOffsets
380+
}
381+
}
325382

326383
/**
327384
* Helper function that does multiple retries on the a body of code that returns offsets.

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,14 @@ import scala.util.Random
2323

2424
import org.apache.kafka.clients.producer.RecordMetadata
2525
import org.apache.kafka.common.TopicPartition
26+
import org.scalatest.concurrent.Eventually._
27+
import org.scalatest.concurrent.PatienceConfiguration.Timeout
2628
import org.scalatest.time.SpanSugar._
2729

2830
import org.apache.spark.sql.execution.streaming._
29-
import org.apache.spark.sql.streaming.StreamTest
31+
import org.apache.spark.sql.streaming.{ ProcessingTime, StreamTest }
3032
import org.apache.spark.sql.test.SharedSQLContext
3133

32-
3334
abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
3435

3536
protected var testUtils: KafkaTestUtils = _
@@ -133,6 +134,72 @@ class KafkaSourceSuite extends KafkaSourceTest {
133134

134135
private val topicId = new AtomicInteger(0)
135136

137+
test("maxOffsetsPerTrigger") {
138+
val topic = newTopic()
139+
testUtils.createTopic(topic, partitions = 3)
140+
testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray, Some(0))
141+
testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray, Some(1))
142+
testUtils.sendMessages(topic, Array("1"), Some(2))
143+
144+
val reader = spark
145+
.readStream
146+
.format("kafka")
147+
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
148+
.option("kafka.metadata.max.age.ms", "1")
149+
.option("maxOffsetsPerTrigger", 10)
150+
.option("subscribe", topic)
151+
.option("startingOffsets", "earliest")
152+
val kafka = reader.load()
153+
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
154+
.as[(String, String)]
155+
val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
156+
157+
val clock = new StreamManualClock
158+
159+
val waitUntilBatchProcessed = AssertOnQuery { q =>
160+
eventually(Timeout(streamingTimeout)) {
161+
if (!q.exception.isDefined) {
162+
assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
163+
}
164+
}
165+
if (q.exception.isDefined) {
166+
throw q.exception.get
167+
}
168+
true
169+
}
170+
171+
testStream(mapped)(
172+
StartStream(ProcessingTime(100), clock),
173+
waitUntilBatchProcessed,
174+
// 1 from smallest, 1 from middle, 8 from biggest
175+
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107),
176+
AdvanceManualClock(100),
177+
waitUntilBatchProcessed,
178+
// smallest now empty, 1 more from middle, 9 more from biggest
179+
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
180+
11, 108, 109, 110, 111, 112, 113, 114, 115, 116
181+
),
182+
StopStream,
183+
StartStream(ProcessingTime(100), clock),
184+
waitUntilBatchProcessed,
185+
AdvanceManualClock(100),
186+
waitUntilBatchProcessed,
187+
// smallest now empty, 1 more from middle, 9 more from biggest
188+
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
189+
11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
190+
12, 117, 118, 119, 120, 121, 122, 123, 124, 125
191+
),
192+
AdvanceManualClock(100),
193+
waitUntilBatchProcessed,
194+
// smallest now empty, 1 more from middle, 9 more from biggest
195+
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
196+
11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
197+
12, 117, 118, 119, 120, 121, 122, 123, 124, 125,
198+
13, 126, 127, 128, 129, 130, 131, 132, 133, 134
199+
)
200+
)
201+
}
202+
136203
test("cannot stop Kafka stream") {
137204
val topic = newTopic()
138205
testUtils.createTopic(newTopic(), partitions = 5)

0 commit comments

Comments
 (0)