Skip to content

Commit 158e178

Browse files
committed
Fix the flaky tests in HeartbeatReceiverSuite
1 parent c1ad373 commit 158e178

File tree

2 files changed

+56
-14
lines changed

2 files changed

+56
-14
lines changed

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@ package org.apache.spark
2020
import java.util.concurrent.{ScheduledFuture, TimeUnit}
2121

2222
import scala.collection.mutable
23+
import scala.concurrent.Future
2324

2425
import org.apache.spark.executor.TaskMetrics
2526
import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcEnv, RpcCallContext}
2627
import org.apache.spark.storage.BlockManagerId
2728
import org.apache.spark.scheduler._
28-
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
29+
import org.apache.spark.util._
2930

3031
/**
3132
* A heartbeat from executors to the driver. This is a shared message used by several internal
@@ -61,7 +62,12 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
6162
this(sc, new SystemClock)
6263
}
6364

64-
sc.addSparkListener(this)
65+
if (clock.isInstanceOf[SystemClock]) {
66+
sc.addSparkListener(this)
67+
} else {
68+
// We are in HeartbeatReceiverSuite. So don't add HeartbeatReceiver to SparkContext to avoid
69+
// receiving undesired events (SPARK-10058).
70+
}
6571

6672
override val rpcEnv: RpcEnv = sc.env.rpcEnv
6773

@@ -147,11 +153,31 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
147153
}
148154
}
149155

156+
/**
157+
* Send ExecutorRegistered to the event loop to add a new executor. Only for test.
158+
*
159+
* @return if HeartbeatReceiver is stopped, return None. Otherwise, return a Some(Future) that
160+
* indicate if this operation is successful.
161+
*/
162+
def addExecutor(executorId: String): Option[Future[Boolean]] = {
163+
Option(self).map(_.ask[Boolean](ExecutorRegistered(executorId)))
164+
}
165+
150166
/**
151167
* If the heartbeat receiver is not stopped, notify it of executor registrations.
152168
*/
153169
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
154-
Option(self).foreach(_.ask[Boolean](ExecutorRegistered(executorAdded.executorId)))
170+
addExecutor(executorAdded.executorId)
171+
}
172+
173+
/**
174+
* Send ExecutorRemoved to the event loop to remove a executor. Only for test.
175+
*
176+
* @return if HeartbeatReceiver is stopped, return None. Otherwise, return a Some(Future) that
177+
* indicate if this operation is successful.
178+
*/
179+
def removeExecutor(executorId: String): Option[Future[Boolean]] = {
180+
Option(self).map(_.ask[Boolean](ExecutorRemoved(executorId)))
155181
}
156182

157183
/**
@@ -165,7 +191,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
165191
* and expire it with loud error messages.
166192
*/
167193
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
168-
Option(self).foreach(_.ask[Boolean](ExecutorRemoved(executorRemoved.executorId)))
194+
removeExecutor(executorRemoved.executorId)
169195
}
170196

171197
private def expireDeadHosts(): Unit = {

core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package org.apache.spark
2020
import java.util.concurrent.{ExecutorService, TimeUnit}
2121

2222
import scala.collection.mutable
23+
import scala.concurrent.Await
24+
import scala.concurrent.duration._
2325
import scala.language.postfixOps
2426

2527
import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
@@ -96,8 +98,8 @@ class HeartbeatReceiverSuite
9698

9799
test("normal heartbeat") {
98100
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
99-
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null))
100-
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId2, null))
101+
addExecutorAndVerify(executorId1)
102+
addExecutorAndVerify(executorId2)
101103
triggerHeartbeat(executorId1, executorShouldReregister = false)
102104
triggerHeartbeat(executorId2, executorShouldReregister = false)
103105
val trackedExecutors = heartbeatReceiver.invokePrivate(_executorLastSeen())
@@ -107,7 +109,7 @@ class HeartbeatReceiverSuite
107109
}
108110

109111
test("reregister if scheduler is not ready yet") {
110-
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null))
112+
addExecutorAndVerify(executorId1)
111113
// Task scheduler is not set yet in HeartbeatReceiver, so executors should reregister
112114
triggerHeartbeat(executorId1, executorShouldReregister = true)
113115
}
@@ -121,10 +123,10 @@ class HeartbeatReceiverSuite
121123

122124
test("reregister if heartbeat from removed executor") {
123125
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
124-
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null))
125-
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId2, null))
126+
addExecutorAndVerify(executorId1)
127+
addExecutorAndVerify(executorId2)
126128
// Remove the second executor but not the first
127-
heartbeatReceiver.onExecutorRemoved(SparkListenerExecutorRemoved(0, executorId2, "bad boy"))
129+
removeExecutorAndVerify(executorId2)
128130
// Now trigger the heartbeats
129131
// A heartbeat from the second executor should require reregistering
130132
triggerHeartbeat(executorId1, executorShouldReregister = false)
@@ -138,8 +140,8 @@ class HeartbeatReceiverSuite
138140
test("expire dead hosts") {
139141
val executorTimeout = heartbeatReceiver.invokePrivate(_executorTimeoutMs())
140142
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
141-
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null))
142-
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId2, null))
143+
addExecutorAndVerify(executorId1)
144+
addExecutorAndVerify(executorId2)
143145
triggerHeartbeat(executorId1, executorShouldReregister = false)
144146
triggerHeartbeat(executorId2, executorShouldReregister = false)
145147
// Advance the clock and only trigger a heartbeat for the first executor
@@ -175,8 +177,8 @@ class HeartbeatReceiverSuite
175177
fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisteredExecutor.type](
176178
RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "dummy:4040", 0, Map.empty))
177179
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
178-
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null))
179-
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId2, null))
180+
addExecutorAndVerify(executorId1)
181+
addExecutorAndVerify(executorId2)
180182
triggerHeartbeat(executorId1, executorShouldReregister = false)
181183
triggerHeartbeat(executorId2, executorShouldReregister = false)
182184

@@ -222,6 +224,20 @@ class HeartbeatReceiverSuite
222224
}
223225
}
224226

227+
private def addExecutorAndVerify(executorId: String): Unit = {
228+
assert(
229+
heartbeatReceiver.addExecutor(executorId).map { f =>
230+
Await.result(f, 10.seconds)
231+
} === Some(true))
232+
}
233+
234+
private def removeExecutorAndVerify(executorId: String): Unit = {
235+
assert(
236+
heartbeatReceiver.removeExecutor(executorId).map { f =>
237+
Await.result(f, 10.seconds)
238+
} === Some(true))
239+
}
240+
225241
}
226242

227243
// TODO: use these classes to add end-to-end tests for dynamic allocation!

0 commit comments

Comments
 (0)