From 158e178237703c8f16a69039656e8d767e0c8dbd Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 30 Sep 2015 15:33:56 +0800 Subject: [PATCH 1/2] Fix the flaky tests in HeartbeatReceiverSuite --- .../org/apache/spark/HeartbeatReceiver.scala | 34 +++++++++++++++--- .../apache/spark/HeartbeatReceiverSuite.scala | 36 +++++++++++++------ 2 files changed, 56 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index ee60d697d8799..d510ca61f51f4 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -20,12 +20,13 @@ package org.apache.spark import java.util.concurrent.{ScheduledFuture, TimeUnit} import scala.collection.mutable +import scala.concurrent.Future import org.apache.spark.executor.TaskMetrics import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcEnv, RpcCallContext} import org.apache.spark.storage.BlockManagerId import org.apache.spark.scheduler._ -import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} +import org.apache.spark.util._ /** * A heartbeat from executors to the driver. This is a shared message used by several internal @@ -61,7 +62,12 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) this(sc, new SystemClock) } - sc.addSparkListener(this) + if (clock.isInstanceOf[SystemClock]) { + sc.addSparkListener(this) + } else { + // We are in HeartbeatReceiverSuite. So don't add HeartbeatReceiver to SparkContext to avoid + // receiving undesired events (SPARK-10058). + } override val rpcEnv: RpcEnv = sc.env.rpcEnv @@ -147,11 +153,31 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) } } + /** + * Send ExecutorRegistered to the event loop to add a new executor. Only for test. + * + * @return if HeartbeatReceiver is stopped, return None. Otherwise, return a Some(Future) that + * indicate if this operation is successful. + */ + def addExecutor(executorId: String): Option[Future[Boolean]] = { + Option(self).map(_.ask[Boolean](ExecutorRegistered(executorId))) + } + /** * If the heartbeat receiver is not stopped, notify it of executor registrations. */ override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { - Option(self).foreach(_.ask[Boolean](ExecutorRegistered(executorAdded.executorId))) + addExecutor(executorAdded.executorId) + } + + /** + * Send ExecutorRemoved to the event loop to remove a executor. Only for test. + * + * @return if HeartbeatReceiver is stopped, return None. Otherwise, return a Some(Future) that + * indicate if this operation is successful. + */ + def removeExecutor(executorId: String): Option[Future[Boolean]] = { + Option(self).map(_.ask[Boolean](ExecutorRemoved(executorId))) } /** @@ -165,7 +191,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) * and expire it with loud error messages. */ override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { - Option(self).foreach(_.ask[Boolean](ExecutorRemoved(executorRemoved.executorId))) + removeExecutor(executorRemoved.executorId) } private def expireDeadHosts(): Unit = { diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 139b8dc25f4b4..fdbffe7e68150 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -20,6 +20,8 @@ package org.apache.spark import java.util.concurrent.{ExecutorService, TimeUnit} import scala.collection.mutable +import scala.concurrent.Await +import scala.concurrent.duration._ import scala.language.postfixOps import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester} @@ -96,8 +98,8 @@ class HeartbeatReceiverSuite test("normal heartbeat") { heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet) - heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null)) - heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId2, null)) + addExecutorAndVerify(executorId1) + addExecutorAndVerify(executorId2) triggerHeartbeat(executorId1, executorShouldReregister = false) triggerHeartbeat(executorId2, executorShouldReregister = false) val trackedExecutors = heartbeatReceiver.invokePrivate(_executorLastSeen()) @@ -107,7 +109,7 @@ class HeartbeatReceiverSuite } test("reregister if scheduler is not ready yet") { - heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null)) + addExecutorAndVerify(executorId1) // Task scheduler is not set yet in HeartbeatReceiver, so executors should reregister triggerHeartbeat(executorId1, executorShouldReregister = true) } @@ -121,10 +123,10 @@ class HeartbeatReceiverSuite test("reregister if heartbeat from removed executor") { heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet) - heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null)) - heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId2, null)) + addExecutorAndVerify(executorId1) + addExecutorAndVerify(executorId2) // Remove the second executor but not the first - heartbeatReceiver.onExecutorRemoved(SparkListenerExecutorRemoved(0, executorId2, "bad boy")) + removeExecutorAndVerify(executorId2) // Now trigger the heartbeats // A heartbeat from the second executor should require reregistering triggerHeartbeat(executorId1, executorShouldReregister = false) @@ -138,8 +140,8 @@ class HeartbeatReceiverSuite test("expire dead hosts") { val executorTimeout = heartbeatReceiver.invokePrivate(_executorTimeoutMs()) heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet) - heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null)) - heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId2, null)) + addExecutorAndVerify(executorId1) + addExecutorAndVerify(executorId2) triggerHeartbeat(executorId1, executorShouldReregister = false) triggerHeartbeat(executorId2, executorShouldReregister = false) // Advance the clock and only trigger a heartbeat for the first executor @@ -175,8 +177,8 @@ class HeartbeatReceiverSuite fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisteredExecutor.type]( RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "dummy:4040", 0, Map.empty)) heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet) - heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null)) - heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId2, null)) + addExecutorAndVerify(executorId1) + addExecutorAndVerify(executorId2) triggerHeartbeat(executorId1, executorShouldReregister = false) triggerHeartbeat(executorId2, executorShouldReregister = false) @@ -222,6 +224,20 @@ class HeartbeatReceiverSuite } } + private def addExecutorAndVerify(executorId: String): Unit = { + assert( + heartbeatReceiver.addExecutor(executorId).map { f => + Await.result(f, 10.seconds) + } === Some(true)) + } + + private def removeExecutorAndVerify(executorId: String): Unit = { + assert( + heartbeatReceiver.removeExecutor(executorId).map { f => + Await.result(f, 10.seconds) + } === Some(true)) + } + } // TODO: use these classes to add end-to-end tests for dynamic allocation! From 0debc465ee76335508fa065f2b23b36ab4462584 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 1 Oct 2015 00:08:50 +0800 Subject: [PATCH 2/2] Filter out 'driver' in the tests --- .../org/apache/spark/HeartbeatReceiver.scala | 9 ++------- .../org/apache/spark/HeartbeatReceiverSuite.scala | 15 +++++++++++---- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index d510ca61f51f4..1f1f0b75de5f1 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -26,7 +26,7 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcEnv, RpcCallContext} import org.apache.spark.storage.BlockManagerId import org.apache.spark.scheduler._ -import org.apache.spark.util._ +import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} /** * A heartbeat from executors to the driver. This is a shared message used by several internal @@ -62,12 +62,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) this(sc, new SystemClock) } - if (clock.isInstanceOf[SystemClock]) { - sc.addSparkListener(this) - } else { - // We are in HeartbeatReceiverSuite. So don't add HeartbeatReceiver to SparkContext to avoid - // receiving undesired events (SPARK-10058). - } + sc.addSparkListener(this) override val rpcEnv: RpcEnv = sc.env.rpcEnv diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index fdbffe7e68150..18f2229fea39b 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark import java.util.concurrent.{ExecutorService, TimeUnit} +import scala.collection.Map import scala.collection.mutable import scala.concurrent.Await import scala.concurrent.duration._ @@ -102,7 +103,7 @@ class HeartbeatReceiverSuite addExecutorAndVerify(executorId2) triggerHeartbeat(executorId1, executorShouldReregister = false) triggerHeartbeat(executorId2, executorShouldReregister = false) - val trackedExecutors = heartbeatReceiver.invokePrivate(_executorLastSeen()) + val trackedExecutors = getTrackedExecutors assert(trackedExecutors.size === 2) assert(trackedExecutors.contains(executorId1)) assert(trackedExecutors.contains(executorId2)) @@ -118,7 +119,7 @@ class HeartbeatReceiverSuite heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet) // Received heartbeat from unknown executor, so we ask it to re-register triggerHeartbeat(executorId1, executorShouldReregister = true) - assert(heartbeatReceiver.invokePrivate(_executorLastSeen()).isEmpty) + assert(getTrackedExecutors.isEmpty) } test("reregister if heartbeat from removed executor") { @@ -131,7 +132,7 @@ class HeartbeatReceiverSuite // A heartbeat from the second executor should require reregistering triggerHeartbeat(executorId1, executorShouldReregister = false) triggerHeartbeat(executorId2, executorShouldReregister = true) - val trackedExecutors = heartbeatReceiver.invokePrivate(_executorLastSeen()) + val trackedExecutors = getTrackedExecutors assert(trackedExecutors.size === 1) assert(trackedExecutors.contains(executorId1)) assert(!trackedExecutors.contains(executorId2)) @@ -151,7 +152,7 @@ class HeartbeatReceiverSuite heartbeatReceiverRef.askWithRetry[Boolean](ExpireDeadHosts) // Only the second executor should be expired as a dead host verify(scheduler).executorLost(Matchers.eq(executorId2), any()) - val trackedExecutors = heartbeatReceiver.invokePrivate(_executorLastSeen()) + val trackedExecutors = getTrackedExecutors assert(trackedExecutors.size === 1) assert(trackedExecutors.contains(executorId1)) assert(!trackedExecutors.contains(executorId2)) @@ -238,6 +239,12 @@ class HeartbeatReceiverSuite } === Some(true)) } + private def getTrackedExecutors: Map[String, Long] = { + // We may receive undesired SparkListenerExecutorAdded from LocalBackend, so exclude it from + // the map. See SPARK-10800. + heartbeatReceiver.invokePrivate(_executorLastSeen()). + filterKeys(_ != SparkContext.DRIVER_IDENTIFIER) + } } // TODO: use these classes to add end-to-end tests for dynamic allocation!