1717
1818package org .apache .spark .streaming .receiver
1919
20+ import scala .collection .mutable .ArrayBuffer
21+
2022import org .apache .spark .SparkConf
2123import org .apache .spark .SparkFunSuite
24+ import org .apache .spark .streaming .StreamingContext
25+ import org .apache .spark .streaming .FakeBlockGeneratorListener
2226
2327/** Testsuite for testing the network receiver behavior */
2428class RateLimiterSuite extends SparkFunSuite {
@@ -43,4 +47,108 @@ class RateLimiterSuite extends SparkFunSuite {
4347 rateLimiter.updateRate(105 )
4448 assert(rateLimiter.getCurrentLimit === 100 )
4549 }
50+
51+ def setupGenerator (blockInterval : Int ): (BlockGenerator , FakeBlockGeneratorListener ) = {
52+ val blockGeneratorListener = new FakeBlockGeneratorListener
53+ val conf = new SparkConf ().set(" spark.streaming.blockInterval" , s " ${blockInterval}ms " )
54+ val blockGenerator = new BlockGenerator (blockGeneratorListener, 1 , conf)
55+ (blockGenerator, blockGeneratorListener)
56+ }
57+
58+ test(" throttling block generator" ) {
59+ val blockIntervalMs = 100
60+ val (blockGenerator, blockGeneratorListener) = setupGenerator(blockIntervalMs)
61+ val maxRate = 1000
62+ blockGenerator.updateRate(maxRate)
63+ blockGenerator.start()
64+ throttlingTest(maxRate, blockGenerator, blockGeneratorListener, blockIntervalMs)
65+ blockGenerator.stop()
66+ }
67+
68+ test(" throttling block generator changes rate up" ) {
69+ val blockIntervalMs = 100
70+ val (blockGenerator, blockGeneratorListener) = setupGenerator(blockIntervalMs)
71+ val maxRate1 = 1000
72+ blockGenerator.start()
73+ blockGenerator.updateRate(maxRate1)
74+ throttlingTest(maxRate1, blockGenerator, blockGeneratorListener, blockIntervalMs)
75+
76+ blockGeneratorListener.reset()
77+ val maxRate2 = 5000
78+ blockGenerator.updateRate(maxRate2)
79+ throttlingTest(maxRate2, blockGenerator, blockGeneratorListener, blockIntervalMs)
80+ blockGenerator.stop()
81+ }
82+
83+ test(" throttling block generator changes rate up and down" ) {
84+ val blockIntervalMs = 100
85+ val (blockGenerator, blockGeneratorListener) = setupGenerator(blockIntervalMs)
86+ val maxRate1 = 1000
87+ blockGenerator.updateRate(maxRate1)
88+ blockGenerator.start()
89+ throttlingTest(maxRate1, blockGenerator, blockGeneratorListener, blockIntervalMs)
90+
91+ blockGeneratorListener.reset()
92+ val maxRate2 = 5000
93+ blockGenerator.updateRate(maxRate2)
94+ throttlingTest(maxRate2, blockGenerator, blockGeneratorListener, blockIntervalMs)
95+
96+ blockGeneratorListener.reset()
97+ val maxRate3 = 1000
98+ blockGenerator.updateRate(maxRate3)
99+ throttlingTest(maxRate3, blockGenerator, blockGeneratorListener, blockIntervalMs)
100+ blockGenerator.stop()
101+ }
102+
103+ def throttlingTest (
104+ maxRate : Long ,
105+ blockGenerator : BlockGenerator ,
106+ blockGeneratorListener : FakeBlockGeneratorListener ,
107+ blockIntervalMs : Int ) {
108+ val expectedBlocks = 20
109+ val waitTime = expectedBlocks * blockIntervalMs
110+ val expectedMessages = maxRate * waitTime / 1000
111+ val expectedMessagesPerBlock = maxRate * blockIntervalMs / 1000
112+ val generatedData = new ArrayBuffer [Int ]
113+
114+ // Generate blocks
115+ val startTime = System .currentTimeMillis()
116+ var count = 0
117+ while (System .currentTimeMillis - startTime < waitTime) {
118+ blockGenerator.addData(count)
119+ generatedData += count
120+ count += 1
121+ }
122+
123+ val recordedBlocks = blockGeneratorListener.arrayBuffers
124+ val recordedData = recordedBlocks.flatten
125+ assert(blockGeneratorListener.arrayBuffers.size > 0 , " No blocks received" )
126+
127+ // recordedData size should be close to the expected rate; use an error margin proportional to
128+ // the value, so that rate changes don't cause a brittle test
129+ val minExpectedMessages = expectedMessages - 0.05 * expectedMessages
130+ val maxExpectedMessages = expectedMessages + 0.05 * expectedMessages
131+ val numMessages = recordedData.size
132+ assert(
133+ numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages,
134+ s " #records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages"
135+ )
136+
137+ // XXX Checking every block would require an even distribution of messages across blocks,
138+ // which throttling code does not control. Therefore, test against the average.
139+ val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.05 * expectedMessagesPerBlock
140+ val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.05 * expectedMessagesPerBlock
141+ val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(" ," )
142+
143+ // the first and last block may be incomplete, so we slice them out
144+ val validBlocks = recordedBlocks.drop(1 ).dropRight(1 )
145+ val averageBlockSize = validBlocks.map(block => block.size).sum / validBlocks.size
146+
147+ assert(
148+ averageBlockSize >= minExpectedMessagesPerBlock &&
149+ averageBlockSize <= maxExpectedMessagesPerBlock,
150+ s " # records in received blocks = [ $receivedBlockSizes], not between " +
151+ s " $minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock, on average "
152+ )
153+ }
46154}
0 commit comments