@@ -19,35 +19,64 @@ package org.apache.spark.streaming.scheduler.rate
1919
2020import scala .util .Random
2121
22- import org .scalatest ._
22+ import org .scalatest .Inspectors . forAll
2323import org .scalatest .Matchers
24- import org .scalatest .Inspectors ._
2524
26- import org .apache .spark .SparkFunSuite
25+ import org .apache .spark .{SparkConf , SparkFunSuite }
26+ import org .apache .spark .streaming .Seconds
2727
2828class PIDRateEstimatorSuite extends SparkFunSuite with Matchers {
2929
30+ test(" the right estimator is created" ) {
31+ val conf = new SparkConf
32+ conf.set(" spark.streaming.backpressure.rateEstimator" , " pid" )
33+ val pid = RateEstimator .create(conf, Seconds (1 ))
34+ pid.getClass should equal(classOf [PIDRateEstimator ])
35+ }
36+
37+ test(" estimator checks ranges" ) {
38+ intercept[IllegalArgumentException ] {
39+ new PIDRateEstimator (0 , 1 , 2 , 3 )
40+ }
41+ intercept[IllegalArgumentException ] {
42+ new PIDRateEstimator (100 , - 1 , 2 , 3 )
43+ }
44+ intercept[IllegalArgumentException ] {
45+ new PIDRateEstimator (100 , 0 , - 1 , 3 )
46+ }
47+ intercept[IllegalArgumentException ] {
48+ new PIDRateEstimator (100 , 0 , 0 , - 1 )
49+ }
50+ }
51+
52+ private def createDefaultEstimator : PIDRateEstimator = {
53+ new PIDRateEstimator (20 , 1D , 0D , 0D )
54+ }
55+
3056 test(" first bound is None" ) {
31- val p = new PIDRateEstimator ( 20 , - 1D , 0D , 0D )
57+ val p = createDefaultEstimator
3258 p.compute(0 , 10 , 10 , 0 ) should equal(None )
3359 }
3460
3561 test(" second bound is rate" ) {
36- val p = new PIDRateEstimator ( 20 , - 1D , 0D , 0D )
62+ val p = createDefaultEstimator
3763 p.compute(0 , 10 , 10 , 0 )
3864 // 1000 elements / s
3965 p.compute(10 , 10 , 10 , 0 ) should equal(Some (1000 ))
4066 }
4167
4268 test(" works even with no time between updates" ) {
43- val p = new PIDRateEstimator ( 20 , - 1D , 0D , 0D )
69+ val p = createDefaultEstimator
4470 p.compute(0 , 10 , 10 , 0 )
4571 p.compute(10 , 10 , 10 , 0 )
4672 p.compute(10 , 10 , 10 , 0 ) should equal(None )
4773 }
4874
4975 test(" bound is never negative" ) {
50- val p = new PIDRateEstimator (20 , - 1D , - 1D , 0D )
76+ val p = new PIDRateEstimator (20 , 1D , 1D , 0D )
77+ // prepare a series of batch updates, one every 20ms, 0 processed elements, 2ms of processing
78+ // this might point the estimator to try and decrease the bound, but we test it never
79+ // goes below zero, which would be nonsensical.
5180 val times = List .tabulate(50 )(x => x * 20 ) // every 20ms
5281 val elements = List .fill(50 )(0 ) // no processing
5382 val proc = List .fill(50 )(20 ) // 20ms of processing
@@ -58,7 +87,10 @@ class PIDRateEstimatorSuite extends SparkFunSuite with Matchers {
5887 }
5988
6089 test(" with no accumulated or positive error, |I| > 0, follow the processing speed" ) {
61- val p = new PIDRateEstimator (20 , - 1D , - 1D , 0D )
90+ val p = new PIDRateEstimator (20 , 1D , 1D , 0D )
91+ // prepare a series of batch updates, one every 20ms with an increasing number of processed
92+ // elements in each batch, but constant processing time, and no accumulated error. Even though
93+ // the integral part is non-zero, the estimated rate should follow only the proportional term
6294 val times = List .tabulate(50 )(x => x * 20 ) // every 20ms
6395 val elements = List .tabulate(50 )(x => x * 20 ) // increasing
6496 val proc = List .fill(50 )(20 ) // 20ms of processing
@@ -69,7 +101,11 @@ class PIDRateEstimatorSuite extends SparkFunSuite with Matchers {
69101 }
70102
71103 test(" with no accumulated but some positive error, |I| > 0, follow the processing speed" ) {
72- val p = new PIDRateEstimator (20 , - 1D , - 1D , 0D )
104+ val p = new PIDRateEstimator (20 , 1D , 1D , 0D )
105+ // prepare a series of batch updates, one every 20ms with an decreasing number of processed
106+ // elements in each batch, but constant processing time, and no accumulated error. Even though
107+ // the integral part is non-zero, the estimated rate should follow only the proportional term,
108+ // asking for less and less elements
73109 val times = List .tabulate(50 )(x => x * 20 ) // every 20ms
74110 val elements = List .tabulate(50 )(x => (50 - x) * 20 ) // decreasing
75111 val proc = List .fill(50 )(20 ) // 20ms of processing
@@ -80,7 +116,7 @@ class PIDRateEstimatorSuite extends SparkFunSuite with Matchers {
80116 }
81117
82118 test(" with some accumulated and some positive error, |I| > 0, stay below the processing speed" ) {
83- val p = new PIDRateEstimator (20 , - 1D , - .01D , 0D )
119+ val p = new PIDRateEstimator (20 , 1D , .01D , 0D )
84120 val times = List .tabulate(50 )(x => x * 20 ) // every 20ms
85121 val rng = new Random ()
86122 val elements = List .tabulate(50 )(x => rng.nextInt(1000 ))
0 commit comments