This repository was archived by the owner on May 9, 2024. It is now read-only.
File tree Expand file tree Collapse file tree 2 files changed +7
-5
lines changed
main/scala/org/apache/spark/streaming/dstream
test/scala/org/apache/spark/streaming/scheduler Expand file tree Collapse file tree 2 files changed +7
-5
lines changed Original file line number Diff line number Diff line change @@ -44,11 +44,13 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
4444 /**
4545 * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
4646 */
47- override protected [streaming] val rateController : Option [RateController ] =
48- if (RateController .isBackPressureEnabled(ssc.conf))
47+ override protected [streaming] val rateController : Option [RateController ] = {
48+ if (RateController .isBackPressureEnabled(ssc.conf)) {
4949 RateEstimator .create(ssc.conf).map { new ReceiverRateController (id, _) }
50- else
50+ } else {
5151 None
52+ }
53+ }
5254
5355 /**
5456 * Gets the receiver object that will be sent to the worker nodes
Original file line number Diff line number Diff line change @@ -46,7 +46,7 @@ class RateControllerSuite extends TestSuiteBase {
4646 }
4747 }
4848
49- test(" receiver rate controller updates reach receivers" ) {
49+ test(" publish rates reach receivers" ) {
5050 val ssc = new StreamingContext (conf, batchDuration)
5151 withStreamingContext(ssc) { ssc =>
5252 val dstream = new RateLimitInputDStream (ssc) {
@@ -65,7 +65,7 @@ class RateControllerSuite extends TestSuiteBase {
6565 }
6666 }
6767
68- test(" multiple rate controller updates reach receivers" ) {
68+ test(" multiple publish rates reach receivers" ) {
6969 val ssc = new StreamingContext (conf, batchDuration)
7070 withStreamingContext(ssc) { ssc =>
7171 val rates = Seq (100L , 200L , 300L )
You can’t perform that action at this time.
0 commit comments