Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
77983f3
Add tracker status and stop to receive messages when stopping tracker.
viirya Feb 9, 2015
ae0d9fd
Merge branch 'master' into tracker_status_timeout
viirya Feb 11, 2015
3d568e8
Let receivers get registered first before going started.
viirya Feb 11, 2015
355f9ce
Separate register and start events for receivers.
viirya Feb 11, 2015
9e1a760
Merge remote-tracking branch 'upstream/master' into tracker_status_ti…
viirya Apr 7, 2015
c419677
Fix style.
viirya Apr 7, 2015
34c18dc
Merge remote-tracking branch 'upstream/master' into tracker_status_ti…
viirya May 5, 2015
19b76d9
Remove timeout.
viirya May 10, 2015
e0ef72a
Merge remote-tracking branch 'upstream/master' into tracker_status_ti…
viirya May 10, 2015
fff63f9
Use a lock to eliminate the race condition when stopping receivers an…
zsxwing May 20, 2015
15ed4a1
Register before starting the receiver
zsxwing May 21, 2015
7b1d9af
"case Throwable" => "case NonFatal"
zsxwing May 22, 2015
a8120c0
Merge branch 'master' into pr4467
zsxwing May 22, 2015
7c73c1f
Use trackerStateLock to protect trackerState
zsxwing May 22, 2015
c41ee94
Make stopReceivers private
zsxwing May 22, 2015
b4c29e7
Stop receiver only if we start it
zsxwing May 25, 2015
3cb19a3
Merge branch 'master' into pr4467
zsxwing Jul 16, 2015
51fb07e
Fix the code style
zsxwing Jul 16, 2015
a178d37
Move 'stopReceivers' to the event looop to resolve the race condition
zsxwing Jul 16, 2015
f637142
Address minor code style comments
zsxwing Jul 17, 2015
e103e8a
Move ReceiverTracker.stop into ReceiverTracker.stop
zsxwing Jul 17, 2015
709ac1f
Fix the comment
zsxwing Jul 17, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.CountDownLatch

import scala.collection.mutable.ArrayBuffer
import scala.concurrent._
import scala.util.control.NonFatal

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.StreamBlockId
Expand All @@ -36,7 +37,7 @@ private[streaming] abstract class ReceiverSupervisor(
conf: SparkConf
) extends Logging {

/** Enumeration to identify current state of the StreamingContext */
/** Enumeration to identify current state of the Receiver */
object ReceiverState extends Enumeration {
type CheckpointState = Value
val Initialized, Started, Stopped = Value
Expand Down Expand Up @@ -97,8 +98,8 @@ private[streaming] abstract class ReceiverSupervisor(
/** Called when supervisor is stopped */
protected def onStop(message: String, error: Option[Throwable]) { }

/** Called when receiver is started */
protected def onReceiverStart() { }
/** Called when receiver is started. Return true if the driver accepts us */
protected def onReceiverStart(): Boolean

/** Called when receiver is stopped */
protected def onReceiverStop(message: String, error: Option[Throwable]) { }
Expand All @@ -121,13 +122,17 @@ private[streaming] abstract class ReceiverSupervisor(
/** Start receiver */
def startReceiver(): Unit = synchronized {
try {
logInfo("Starting receiver")
receiver.onStart()
logInfo("Called receiver onStart")
onReceiverStart()
receiverState = Started
if (onReceiverStart()) {
logInfo("Starting receiver")
receiverState = Started
receiver.onStart()
logInfo("Called receiver onStart")
} else {
// The driver refused us
stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
}
} catch {
case t: Throwable =>
case NonFatal(t) =>
stop("Error starting receiver " + streamId, Some(t))
}
}
Expand All @@ -136,12 +141,19 @@ private[streaming] abstract class ReceiverSupervisor(
def stopReceiver(message: String, error: Option[Throwable]): Unit = synchronized {
try {
logInfo("Stopping receiver with message: " + message + ": " + error.getOrElse(""))
receiverState = Stopped
receiver.onStop()
logInfo("Called receiver onStop")
onReceiverStop(message, error)
receiverState match {
case Initialized =>
logWarning("Skip stopping receiver because it has not yet stared")
case Started =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas now we stop the receiver only if receiverState is Started. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats the right way to do this. But then by that logic we should start only when the state is Init or Stopped. We are not doing that. In fact, that leads to the question of locking the state correctly and all. I dont want to get into fixing those in this PR.

I think its better not change stopReceiver() and rather change stop() to call stopReceiver() only when the state is started

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it here because stopReceiver is called by both stop and restartReceiver.

receiverState = Stopped
receiver.onStop()
logInfo("Called receiver onStop")
onReceiverStop(message, error)
case Stopped =>
logWarning("Receiver has been stopped")
}
} catch {
case t: Throwable =>
case NonFatal(t) =>
logError("Error stopping receiver " + streamId + t.getStackTraceString)
}
}
Expand All @@ -167,7 +179,7 @@ private[streaming] abstract class ReceiverSupervisor(
}(futureExecutionContext)
}

/** Check if receiver has been marked for stopping */
/** Check if receiver has been marked for starting */
def isReceiverStarted(): Boolean = {
logDebug("state = " + receiverState)
receiverState == Started
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private[streaming] class ReceiverSupervisorImpl(
env.rpcEnv.stop(endpoint)
}

override protected def onReceiverStart() {
override protected def onReceiverStart(): Boolean = {
val msg = RegisterReceiver(
streamId, receiver.getClass.getSimpleName, Utils.localHostName(), endpoint)
trackerEndpoint.askWithRetry[Boolean](msg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.streaming.scheduler
import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedMap}
import scala.language.existentials
import scala.math.max
import org.apache.spark.rdd._

import org.apache.spark.streaming.util.WriteAheadLogUtils
import org.apache.spark.{Logging, SparkEnv, SparkException}
Expand All @@ -47,6 +46,8 @@ private[streaming] case class ReportError(streamId: Int, message: String, error:
private[streaming] case class DeregisterReceiver(streamId: Int, msg: String, error: String)
extends ReceiverTrackerMessage

private[streaming] case object StopAllReceivers extends ReceiverTrackerMessage

/**
* This class manages the execution of the receivers of ReceiverInputDStreams. Instance of
* this class must be created after all input streams have been added and StreamingContext.start()
Expand All @@ -71,13 +72,23 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
)
private val listenerBus = ssc.scheduler.listenerBus

/** Enumeration to identify current state of the ReceiverTracker */
object TrackerState extends Enumeration {
type TrackerState = Value
val Initialized, Started, Stopping, Stopped = Value
}
import TrackerState._

/** State of the tracker. Protected by "trackerStateLock" */
@volatile private var trackerState = Initialized

// endpoint is created when generator starts.
// This not being null means the tracker has been started and not stopped
private var endpoint: RpcEndpointRef = null

/** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {
if (endpoint != null) {
if (isTrackerStarted) {
throw new SparkException("ReceiverTracker already started")
}

Expand All @@ -86,20 +97,46 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
"ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
if (!skipReceiverLaunch) receiverExecutor.start()
logInfo("ReceiverTracker started")
trackerState = Started
}
}

/** Stop the receiver execution thread. */
def stop(graceful: Boolean): Unit = synchronized {
if (!receiverInputStreams.isEmpty && endpoint != null) {
if (isTrackerStarted) {
// First, stop the receivers
if (!skipReceiverLaunch) receiverExecutor.stop(graceful)
trackerState = Stopping
if (!skipReceiverLaunch) {
// Send the stop signal to all the receivers
endpoint.askWithRetry[Boolean](StopAllReceivers)

// Wait for the Spark job that runs the receivers to be over
// That is, for the receivers to quit gracefully.
receiverExecutor.awaitTermination(10000)

if (graceful) {
val pollTime = 100
logInfo("Waiting for receiver job to terminate gracefully")
while (receiverInfo.nonEmpty || receiverExecutor.running) {
Thread.sleep(pollTime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to fixed, this could get stuck indefinitely. But different PR.

}
logInfo("Waited for receiver job to terminate gracefully")
}

// Check if all the receivers have been deregistered or not
if (receiverInfo.nonEmpty) {
logWarning("Not all of the receivers have deregistered, " + receiverInfo)
} else {
logInfo("All of the receivers have deregistered successfully")
}
}

// Finally, stop the endpoint
ssc.env.rpcEnv.stop(endpoint)
endpoint = null
receivedBlockTracker.stop()
logInfo("ReceiverTracker stopped")
trackerState = Stopped
}
}

Expand Down Expand Up @@ -145,14 +182,23 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
host: String,
receiverEndpoint: RpcEndpointRef,
senderAddress: RpcAddress
) {
): Boolean = {
if (!receiverInputStreamIds.contains(streamId)) {
throw new SparkException("Register received for unexpected id " + streamId)
}
receiverInfo(streamId) = ReceiverInfo(
streamId, s"${typ}-${streamId}", receiverEndpoint, true, host)
listenerBus.post(StreamingListenerReceiverStarted(receiverInfo(streamId)))
logInfo("Registered receiver for stream " + streamId + " from " + senderAddress)

if (isTrackerStopping || isTrackerStopped) {
false
} else {
// "stopReceivers" won't happen at the same time because both "registerReceiver" and are
// called in the event loop. So here we can assume "stopReceivers" has not yet been called. If
// "stopReceivers" is called later, it should be able to see this receiver.
receiverInfo(streamId) = ReceiverInfo(
streamId, s"${typ}-${streamId}", receiverEndpoint, true, host)
listenerBus.post(StreamingListenerReceiverStarted(receiverInfo(streamId)))
logInfo("Registered receiver for stream " + streamId + " from " + senderAddress)
true
}
}

/** Deregister a receiver */
Expand Down Expand Up @@ -220,20 +266,33 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterReceiver(streamId, typ, host, receiverEndpoint) =>
registerReceiver(streamId, typ, host, receiverEndpoint, context.sender.address)
context.reply(true)
val successful =
registerReceiver(streamId, typ, host, receiverEndpoint, context.sender.address)
context.reply(successful)
case AddBlock(receivedBlockInfo) =>
context.reply(addBlock(receivedBlockInfo))
case DeregisterReceiver(streamId, message, error) =>
deregisterReceiver(streamId, message, error)
context.reply(true)
case StopAllReceivers =>
assert(isTrackerStopping || isTrackerStopped)
stopReceivers()
context.reply(true)
}

/** Send stop signal to the receivers. */
private def stopReceivers() {
// Signal the receivers to stop
receiverInfo.values.flatMap { info => Option(info.endpoint)}
.foreach { _.send(StopReceiver) }
logInfo("Sent stop signal to all " + receiverInfo.size + " receivers")
}
}

/** This thread class runs all the receivers on the cluster. */
class ReceiverLauncher {
@transient val env = ssc.env
@volatile @transient private var running = false
@volatile @transient var running = false
@transient val thread = new Thread() {
override def run() {
try {
Expand All @@ -249,31 +308,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
thread.start()
}

def stop(graceful: Boolean) {
// Send the stop signal to all the receivers
stopReceivers()

// Wait for the Spark job that runs the receivers to be over
// That is, for the receivers to quit gracefully.
thread.join(10000)

if (graceful) {
val pollTime = 100
logInfo("Waiting for receiver job to terminate gracefully")
while (receiverInfo.nonEmpty || running) {
Thread.sleep(pollTime)
}
logInfo("Waited for receiver job to terminate gracefully")
}

// Check if all the receivers have been deregistered or not
if (receiverInfo.nonEmpty) {
logWarning("Not all of the receivers have deregistered, " + receiverInfo)
} else {
logInfo("All of the receivers have deregistered successfully")
}
}

/**
* Get the list of executors excluding driver
*/
Expand Down Expand Up @@ -358,17 +392,30 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
// Distribute the receivers and start them
logInfo("Starting " + receivers.length + " receivers")
running = true
ssc.sparkContext.runJob(tempRDD, ssc.sparkContext.clean(startReceiver))
running = false
logInfo("All of the receivers have been terminated")
try {
ssc.sparkContext.runJob(tempRDD, ssc.sparkContext.clean(startReceiver))
logInfo("All of the receivers have been terminated")
} finally {
running = false
}
}

/** Stops the receivers. */
private def stopReceivers() {
// Signal the receivers to stop
receiverInfo.values.flatMap { info => Option(info.endpoint)}
.foreach { _.send(StopReceiver) }
logInfo("Sent stop signal to all " + receiverInfo.size + " receivers")
/**
* Wait until the Spark job that runs the receivers is terminated, or return when
* `milliseconds` elapses
*/
def awaitTermination(milliseconds: Long): Unit = {
thread.join(milliseconds)
}
}

/** Check if tracker has been marked for starting */
private def isTrackerStarted(): Boolean = trackerState == Started

/** Check if tracker has been marked for stopping */
private def isTrackerStopping(): Boolean = trackerState == Stopping

/** Check if tracker has been marked for stopped */
private def isTrackerStopped(): Boolean = trackerState == Stopped

}
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
def reportError(message: String, throwable: Throwable) {
errors += throwable
}

override protected def onReceiverStart(): Boolean = true
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,21 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
}
}

test("stop gracefully even if a receiver misses StopReceiver") {
// This is not a deterministic unit. But if this unit test is flaky, then there is definitely
// something wrong. See SPARK-5681
val conf = new SparkConf().setMaster(master).setAppName(appName)
sc = new SparkContext(conf)
ssc = new StreamingContext(sc, Milliseconds(100))
val input = ssc.receiverStream(new TestReceiver)
input.foreachRDD(_ => {})
ssc.start()
// Call `ssc.stop` at once so that it's possible that the receiver will miss "StopReceiver"
failAfter(30000 millis) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this test deterministic? I dont see how it is guaranteed to test the situation where the context is stopped before the receiver has started?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test cannot always reproduce the issue. But the probability is high.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please comment this at the top of this unit test that this is not a deterministic unit. If this unit test is flaky, then there is definitely something wrong. Point to the JIRa.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, could you run this unit test locally repeatedly ... like 1000 times .. to see if it fail any time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran StreamingContextSuite 100 times locally and it didn't fail. StreamingContextSuite needs about 40 seconds, so I only ran 100 times.

ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}

test("stop slow receiver gracefully") {
val conf = new SparkConf().setMaster(master).setAppName(appName)
conf.set("spark.streaming.gracefulStopTimeout", "20000s")
Expand Down