Skip to content

Commit b08b500

Browse files
maropusrowen
authored andcommitted
[SPARK-18620][STREAMING][KINESIS] Flatten input rates in timeline for streaming + kinesis
## What changes were proposed in this pull request? This pr is to make input rates in timeline more flat for spark streaming + kinesis. Since kinesis workers fetch records and push them into block generators in bulk, timeline in web UI has many spikes when `maxRates` applied (See a Figure.1 below). This fix splits fetched input records into multiple `adRecords` calls. Figure.1 Apply `maxRates=500` in vanilla Spark <img width="1084" alt="apply_limit in_vanilla_spark" src="https://cloud.githubusercontent.com/assets/692303/20823861/4602f300-b89b-11e6-95f3-164a37061305.png"> Figure.2 Apply `maxRates=500` in Spark with my patch <img width="1056" alt="apply_limit in_spark_with_my_patch" src="https://cloud.githubusercontent.com/assets/692303/20823882/6c46352c-b89b-11e6-81ab-afd8abfe0cfe.png"> ## How was this patch tested? Add tests to check to split input records into multiple `addRecords` calls. Author: Takeshi YAMAMURO <[email protected]> Closes #16114 from maropu/SPARK-18620.
1 parent be5fc6e commit b08b500

File tree

3 files changed

+35
-2
lines changed

3 files changed

+35
-2
lines changed

external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,12 @@ private[kinesis] class KinesisReceiver[T](
221221
}
222222
}
223223

224+
/** Return the current rate limit defined in [[BlockGenerator]]. */
225+
private[kinesis] def getCurrentLimit: Int = {
226+
assert(blockGenerator != null)
227+
math.min(blockGenerator.getCurrentLimit, Int.MaxValue).toInt
228+
}
229+
224230
/** Get the latest sequence number for the given shard that can be checkpointed through KCL */
225231
private[kinesis] def getLatestSeqNumToCheckpoint(shardId: String): Option[String] = {
226232
Option(shardIdToLatestStoredSeqNum.get(shardId))

external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,18 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
6868
override def processRecords(batch: List[Record], checkpointer: IRecordProcessorCheckpointer) {
6969
if (!receiver.isStopped()) {
7070
try {
71-
receiver.addRecords(shardId, batch)
72-
logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId")
71+
// Limit the number of processed records from Kinesis stream. This is because the KCL cannot
72+
// control the number of aggregated records to be fetched even if we set `MaxRecords`
73+
// in `KinesisClientLibConfiguration`. For example, if we set 10 to the number of max
74+
// records in a worker and a producer aggregates two records into one message, the worker
75+
// possibly 20 records every callback function called.
76+
val maxRecords = receiver.getCurrentLimit
77+
for (start <- 0 until batch.size by maxRecords) {
78+
val miniBatch = batch.subList(start, math.min(start + maxRecords, batch.size))
79+
receiver.addRecords(shardId, miniBatch)
80+
logDebug(s"Stored: Worker $workerId stored ${miniBatch.size} records " +
81+
s"for shardId $shardId")
82+
}
7383
receiver.setCheckpointer(shardId, checkpointer)
7484
} catch {
7585
case NonFatal(e) =>

external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
6969

7070
test("process records including store and set checkpointer") {
7171
when(receiverMock.isStopped()).thenReturn(false)
72+
when(receiverMock.getCurrentLimit).thenReturn(Int.MaxValue)
7273

7374
val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
7475
recordProcessor.initialize(shardId)
@@ -79,8 +80,23 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
7980
verify(receiverMock, times(1)).setCheckpointer(shardId, checkpointerMock)
8081
}
8182

83+
test("split into multiple processes if a limitation is set") {
84+
when(receiverMock.isStopped()).thenReturn(false)
85+
when(receiverMock.getCurrentLimit).thenReturn(1)
86+
87+
val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
88+
recordProcessor.initialize(shardId)
89+
recordProcessor.processRecords(batch, checkpointerMock)
90+
91+
verify(receiverMock, times(1)).isStopped()
92+
verify(receiverMock, times(1)).addRecords(shardId, batch.subList(0, 1))
93+
verify(receiverMock, times(1)).addRecords(shardId, batch.subList(1, 2))
94+
verify(receiverMock, times(1)).setCheckpointer(shardId, checkpointerMock)
95+
}
96+
8297
test("shouldn't store and update checkpointer when receiver is stopped") {
8398
when(receiverMock.isStopped()).thenReturn(true)
99+
when(receiverMock.getCurrentLimit).thenReturn(Int.MaxValue)
84100

85101
val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
86102
recordProcessor.processRecords(batch, checkpointerMock)
@@ -92,6 +108,7 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
92108

93109
test("shouldn't update checkpointer when exception occurs during store") {
94110
when(receiverMock.isStopped()).thenReturn(false)
111+
when(receiverMock.getCurrentLimit).thenReturn(Int.MaxValue)
95112
when(
96113
receiverMock.addRecords(anyString, anyListOf(classOf[Record]))
97114
).thenThrow(new RuntimeException())

0 commit comments

Comments
 (0)