Skip to content

Commit f637142

Browse files
committed
Address minor code style comments
1 parent a178d37 commit f637142

File tree

4 files changed

+17
-12
lines changed

4 files changed

+17
-12
lines changed

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ private[streaming] abstract class ReceiverSupervisor(
9999
protected def onStop(message: String, error: Option[Throwable]) { }
100100

101101
/** Called when receiver is started. Return true if the driver accepts us */
102-
protected def onReceiverStart(): Boolean = true
102+
protected def onReceiverStart(): Boolean
103103

104104
/** Called when receiver is stopped */
105105
protected def onReceiverStop(message: String, error: Option[Throwable]) { }
@@ -129,7 +129,7 @@ private[streaming] abstract class ReceiverSupervisor(
129129
logInfo("Called receiver onStart")
130130
} else {
131131
// The driver refused us
132-
stop("Registered unsuccessfully because the driver refused" + streamId, None)
132+
stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
133133
}
134134
} catch {
135135
case NonFatal(t) =>

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
7474

7575
/** Enumeration to identify current state of the ReceiverTracker */
7676
object TrackerState extends Enumeration {
77-
type CheckpointState = Value
77+
type TrackerState = Value
7878
val Initialized, Started, Stopping, Stopped = Value
7979
}
8080
import TrackerState._
@@ -86,15 +86,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
8686
// This not being null means the tracker has been started and not stopped
8787
private var endpoint: RpcEndpointRef = null
8888

89-
/** Check if tracker has been marked for starting */
90-
private def isTrackerStarted(): Boolean = trackerState == Started
91-
92-
/** Check if tracker has been marked for stopping */
93-
private def isTrackerStopping(): Boolean = trackerState == Stopping
94-
95-
/** Check if tracker has been marked for stopped */
96-
private def isTrackerStopped(): Boolean = trackerState == Stopped
97-
9889
/** Start the endpoint and receiver execution thread. */
9990
def start(): Unit = synchronized {
10091
if (isTrackerStarted) {
@@ -412,4 +403,14 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
412403
}
413404

414405
}
406+
407+
/** Check if tracker has been marked for starting */
408+
private def isTrackerStarted(): Boolean = trackerState == Started
409+
410+
/** Check if tracker has been marked for stopping */
411+
private def isTrackerStopping(): Boolean = trackerState == Stopping
412+
413+
/** Check if tracker has been marked for stopped */
414+
private def isTrackerStopped(): Boolean = trackerState == Stopped
415+
415416
}

streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,8 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
346346
def reportError(message: String, throwable: Throwable) {
347347
errors += throwable
348348
}
349+
350+
override protected def onReceiverStart(): Boolean = true
349351
}
350352

351353
/**

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,8 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
286286
}
287287

288288
test("stop gracefully even if a receiver misses StopReceiver") {
289+
// This is not a deterministic unit. But if this unit test is flaky, then there is definitely
290+
// something wrong. See SPARK-5681
289291
val conf = new SparkConf().setMaster(master).setAppName(appName)
290292
sc = new SparkContext(conf)
291293
ssc = new StreamingContext(sc, Milliseconds(100))

0 commit comments

Comments
 (0)