Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark
import java.util.concurrent.{ScheduledFuture, TimeUnit}

import scala.collection.mutable
import scala.concurrent.Future

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcEnv, RpcCallContext}
Expand Down Expand Up @@ -147,11 +148,31 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
}
}

/**
* Send ExecutorRegistered to the event loop to add a new executor. Only for test.
*
* @return if HeartbeatReceiver is stopped, return None. Otherwise, return a Some(Future) that
* indicate if this operation is successful.
*/
def addExecutor(executorId: String): Option[Future[Boolean]] = {
Option(self).map(_.ask[Boolean](ExecutorRegistered(executorId)))
}

/**
* If the heartbeat receiver is not stopped, notify it of executor registrations.
*/
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
Option(self).foreach(_.ask[Boolean](ExecutorRegistered(executorAdded.executorId)))
addExecutor(executorAdded.executorId)
}

/**
* Send ExecutorRemoved to the event loop to remove a executor. Only for test.
*
* @return if HeartbeatReceiver is stopped, return None. Otherwise, return a Some(Future) that
* indicate if this operation is successful.
*/
def removeExecutor(executorId: String): Option[Future[Boolean]] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem like this and addExecutor are strictly necessary; you can still call onExecutorRemoved and onExecutorAdded from the test and still achieve what you want. Even if this is an internal API, removing the amount of test-only code in the main classes is always good.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to use the Future result in the tests. However, because onExecutorAdded and onExecutorRemoved are defined by the parent class, I cannot make them return the Future. So I added these internal methods.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I missed the future. Nevermind then.

Option(self).map(_.ask[Boolean](ExecutorRemoved(executorId)))
}

/**
Expand All @@ -165,7 +186,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
* and expire it with loud error messages.
*/
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
Option(self).foreach(_.ask[Boolean](ExecutorRemoved(executorRemoved.executorId)))
removeExecutor(executorRemoved.executorId)
}

private def expireDeadHosts(): Unit = {
Expand Down
51 changes: 37 additions & 14 deletions core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ package org.apache.spark

import java.util.concurrent.{ExecutorService, TimeUnit}

import scala.collection.Map
import scala.collection.mutable
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps

import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
Expand Down Expand Up @@ -96,18 +99,18 @@ class HeartbeatReceiverSuite

test("normal heartbeat") {
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null))
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId2, null))
addExecutorAndVerify(executorId1)
addExecutorAndVerify(executorId2)
triggerHeartbeat(executorId1, executorShouldReregister = false)
triggerHeartbeat(executorId2, executorShouldReregister = false)
val trackedExecutors = heartbeatReceiver.invokePrivate(_executorLastSeen())
val trackedExecutors = getTrackedExecutors
assert(trackedExecutors.size === 2)
assert(trackedExecutors.contains(executorId1))
assert(trackedExecutors.contains(executorId2))
}

test("reregister if scheduler is not ready yet") {
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null))
addExecutorAndVerify(executorId1)
// Task scheduler is not set yet in HeartbeatReceiver, so executors should reregister
triggerHeartbeat(executorId1, executorShouldReregister = true)
}
Expand All @@ -116,20 +119,20 @@ class HeartbeatReceiverSuite
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
// Received heartbeat from unknown executor, so we ask it to re-register
triggerHeartbeat(executorId1, executorShouldReregister = true)
assert(heartbeatReceiver.invokePrivate(_executorLastSeen()).isEmpty)
assert(getTrackedExecutors.isEmpty)
}

test("reregister if heartbeat from removed executor") {
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null))
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId2, null))
addExecutorAndVerify(executorId1)
addExecutorAndVerify(executorId2)
// Remove the second executor but not the first
heartbeatReceiver.onExecutorRemoved(SparkListenerExecutorRemoved(0, executorId2, "bad boy"))
removeExecutorAndVerify(executorId2)
// Now trigger the heartbeats
// A heartbeat from the second executor should require reregistering
triggerHeartbeat(executorId1, executorShouldReregister = false)
triggerHeartbeat(executorId2, executorShouldReregister = true)
val trackedExecutors = heartbeatReceiver.invokePrivate(_executorLastSeen())
val trackedExecutors = getTrackedExecutors
assert(trackedExecutors.size === 1)
assert(trackedExecutors.contains(executorId1))
assert(!trackedExecutors.contains(executorId2))
Expand All @@ -138,8 +141,8 @@ class HeartbeatReceiverSuite
test("expire dead hosts") {
val executorTimeout = heartbeatReceiver.invokePrivate(_executorTimeoutMs())
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null))
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId2, null))
addExecutorAndVerify(executorId1)
addExecutorAndVerify(executorId2)
triggerHeartbeat(executorId1, executorShouldReregister = false)
triggerHeartbeat(executorId2, executorShouldReregister = false)
// Advance the clock and only trigger a heartbeat for the first executor
Expand All @@ -149,7 +152,7 @@ class HeartbeatReceiverSuite
heartbeatReceiverRef.askWithRetry[Boolean](ExpireDeadHosts)
// Only the second executor should be expired as a dead host
verify(scheduler).executorLost(Matchers.eq(executorId2), any())
val trackedExecutors = heartbeatReceiver.invokePrivate(_executorLastSeen())
val trackedExecutors = getTrackedExecutors
assert(trackedExecutors.size === 1)
assert(trackedExecutors.contains(executorId1))
assert(!trackedExecutors.contains(executorId2))
Expand All @@ -175,8 +178,8 @@ class HeartbeatReceiverSuite
fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisteredExecutor.type](
RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "dummy:4040", 0, Map.empty))
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null))
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId2, null))
addExecutorAndVerify(executorId1)
addExecutorAndVerify(executorId2)
triggerHeartbeat(executorId1, executorShouldReregister = false)
triggerHeartbeat(executorId2, executorShouldReregister = false)

Expand Down Expand Up @@ -222,6 +225,26 @@ class HeartbeatReceiverSuite
}
}

private def addExecutorAndVerify(executorId: String): Unit = {
assert(
heartbeatReceiver.addExecutor(executorId).map { f =>
Await.result(f, 10.seconds)
} === Some(true))
}

private def removeExecutorAndVerify(executorId: String): Unit = {
assert(
heartbeatReceiver.removeExecutor(executorId).map { f =>
Await.result(f, 10.seconds)
} === Some(true))
}

private def getTrackedExecutors: Map[String, Long] = {
// We may receive undesired SparkListenerExecutorAdded from LocalBackend, so exclude it from
// the map. See SPARK-10800.
heartbeatReceiver.invokePrivate(_executorLastSeen()).
filterKeys(_ != SparkContext.DRIVER_IDENTIFIER)
}
}

// TODO: use these classes to add end-to-end tests for dynamic allocation!
Expand Down