1717
1818package org .apache .spark .streaming .receiver
1919
20- import scala .collection .mutable .ArrayBuffer
21-
2220import org .apache .spark .SparkConf
2321import org .apache .spark .SparkFunSuite
24- import org .apache .spark .streaming .StreamingContext
25- import org .apache .spark .streaming .FakeBlockGeneratorListener
2622
2723/** Testsuite for testing the network receiver behavior */
2824class RateLimiterSuite extends SparkFunSuite {
@@ -47,108 +43,4 @@ class RateLimiterSuite extends SparkFunSuite {
4743 rateLimiter.updateRate(105 )
4844 assert(rateLimiter.getCurrentLimit === 100 )
4945 }
50-
51- def setupGenerator (blockInterval : Int ): (BlockGenerator , FakeBlockGeneratorListener ) = {
52- val blockGeneratorListener = new FakeBlockGeneratorListener
53- val conf = new SparkConf ().set(" spark.streaming.blockInterval" , s " ${blockInterval}ms " )
54- val blockGenerator = new BlockGenerator (blockGeneratorListener, 1 , conf)
55- (blockGenerator, blockGeneratorListener)
56- }
57-
58- test(" throttling block generator" ) {
59- val blockIntervalMs = 100
60- val (blockGenerator, blockGeneratorListener) = setupGenerator(blockIntervalMs)
61- val maxRate = 1000
62- blockGenerator.updateRate(maxRate)
63- blockGenerator.start()
64- throttlingTest(maxRate, blockGenerator, blockGeneratorListener, blockIntervalMs)
65- blockGenerator.stop()
66- }
67-
68- test(" throttling block generator changes rate up" ) {
69- val blockIntervalMs = 100
70- val (blockGenerator, blockGeneratorListener) = setupGenerator(blockIntervalMs)
71- val maxRate1 = 1000
72- blockGenerator.start()
73- blockGenerator.updateRate(maxRate1)
74- throttlingTest(maxRate1, blockGenerator, blockGeneratorListener, blockIntervalMs)
75-
76- blockGeneratorListener.reset()
77- val maxRate2 = 5000
78- blockGenerator.updateRate(maxRate2)
79- throttlingTest(maxRate2, blockGenerator, blockGeneratorListener, blockIntervalMs)
80- blockGenerator.stop()
81- }
82-
83- test(" throttling block generator changes rate up and down" ) {
84- val blockIntervalMs = 100
85- val (blockGenerator, blockGeneratorListener) = setupGenerator(blockIntervalMs)
86- val maxRate1 = 1000
87- blockGenerator.updateRate(maxRate1)
88- blockGenerator.start()
89- throttlingTest(maxRate1, blockGenerator, blockGeneratorListener, blockIntervalMs)
90-
91- blockGeneratorListener.reset()
92- val maxRate2 = 5000
93- blockGenerator.updateRate(maxRate2)
94- throttlingTest(maxRate2, blockGenerator, blockGeneratorListener, blockIntervalMs)
95-
96- blockGeneratorListener.reset()
97- val maxRate3 = 1000
98- blockGenerator.updateRate(maxRate3)
99- throttlingTest(maxRate3, blockGenerator, blockGeneratorListener, blockIntervalMs)
100- blockGenerator.stop()
101- }
102-
103- def throttlingTest (
104- maxRate : Long ,
105- blockGenerator : BlockGenerator ,
106- blockGeneratorListener : FakeBlockGeneratorListener ,
107- blockIntervalMs : Int ) {
108- val expectedBlocks = 20
109- val waitTime = expectedBlocks * blockIntervalMs
110- val expectedMessages = maxRate * waitTime / 1000
111- val expectedMessagesPerBlock = maxRate * blockIntervalMs / 1000
112- val generatedData = new ArrayBuffer [Int ]
113-
114- // Generate blocks
115- val startTime = System .currentTimeMillis()
116- var count = 0
117- while (System .currentTimeMillis - startTime < waitTime) {
118- blockGenerator.addData(count)
119- generatedData += count
120- count += 1
121- }
122-
123- val recordedBlocks = blockGeneratorListener.arrayBuffers
124- val recordedData = recordedBlocks.flatten
125- assert(blockGeneratorListener.arrayBuffers.size > 0 , " No blocks received" )
126-
127- // recordedData size should be close to the expected rate; use an error margin proportional to
128- // the value, so that rate changes don't cause a brittle test
129- val minExpectedMessages = expectedMessages - 0.05 * expectedMessages
130- val maxExpectedMessages = expectedMessages + 0.05 * expectedMessages
131- val numMessages = recordedData.size
132- assert(
133- numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages,
134- s " #records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages"
135- )
136-
137- // XXX Checking every block would require an even distribution of messages across blocks,
138- // which throttling code does not control. Therefore, test against the average.
139- val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.05 * expectedMessagesPerBlock
140- val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.05 * expectedMessagesPerBlock
141- val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(" ," )
142-
143- // the first and last block may be incomplete, so we slice them out
144- val validBlocks = recordedBlocks.drop(1 ).dropRight(1 )
145- val averageBlockSize = validBlocks.map(block => block.size).sum / validBlocks.size
146-
147- assert(
148- averageBlockSize >= minExpectedMessagesPerBlock &&
149- averageBlockSize <= maxExpectedMessagesPerBlock,
150- s " # records in received blocks = [ $receivedBlockSizes], not between " +
151- s " $minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock, on average "
152- )
153- }
15446}
0 commit comments