Skip to content

Commit 4e540b8

Browse files
jinxingcmonkey
authored andcommitted
[SPARK-19437] Rectify spark executor id in HeartbeatReceiverSuite.
## What changes were proposed in this pull request? The current code in `HeartbeatReceiverSuite`, executorId is set as below: ``` private val executorId1 = "executor-1" private val executorId2 = "executor-2" ``` The executorId is sent to driver when register as below: ``` test("expire dead hosts should kill executors with replacement (SPARK-8119)") { ... fakeSchedulerBackend.driverEndpoint.askSync[Boolean]( RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty)) ... } ``` Receiving `RegisterExecutor` in `CoarseGrainedSchedulerBackend`, the executorId will be compared with `currentExecutorIdCounter` as below: ``` case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) => if (executorDataMap.contains(executorId)) { executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) context.reply(true) } else { ... executorDataMap.put(executorId, data) if (currentExecutorIdCounter < executorId.toInt) { currentExecutorIdCounter = executorId.toInt } ... ``` `executorId.toInt` will cause NumberformatException. This unit test can pass currently because of `askWithRetry`, when catching exception, RPC will call again, thus it will go `if` branch and return true. **To fix** Rectify executorId and replace `askWithRetry` with `askSync`, refer to apache#16690 ## How was this patch tested? This fix is for unit test and no need to add another one.(If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Author: jinxing <[email protected]> Closes apache#16779 from jinxing64/SPARK-19437.
1 parent 7e43739 commit 4e540b8

File tree

1 file changed

+13
-13
lines changed

1 file changed

+13
-13
lines changed

core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ class HeartbeatReceiverSuite
4646
with PrivateMethodTester
4747
with LocalSparkContext {
4848

49-
private val executorId1 = "executor-1"
50-
private val executorId2 = "executor-2"
49+
private val executorId1 = "1"
50+
private val executorId2 = "2"
5151

5252
// Shared state that must be reset before and after each test
5353
private var scheduler: TaskSchedulerImpl = null
@@ -93,12 +93,12 @@ class HeartbeatReceiverSuite
9393

9494
test("task scheduler is set correctly") {
9595
assert(heartbeatReceiver.scheduler === null)
96-
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
96+
heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
9797
assert(heartbeatReceiver.scheduler !== null)
9898
}
9999

100100
test("normal heartbeat") {
101-
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
101+
heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
102102
addExecutorAndVerify(executorId1)
103103
addExecutorAndVerify(executorId2)
104104
triggerHeartbeat(executorId1, executorShouldReregister = false)
@@ -116,14 +116,14 @@ class HeartbeatReceiverSuite
116116
}
117117

118118
test("reregister if heartbeat from unregistered executor") {
119-
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
119+
heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
120120
// Received heartbeat from unknown executor, so we ask it to re-register
121121
triggerHeartbeat(executorId1, executorShouldReregister = true)
122122
assert(getTrackedExecutors.isEmpty)
123123
}
124124

125125
test("reregister if heartbeat from removed executor") {
126-
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
126+
heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
127127
addExecutorAndVerify(executorId1)
128128
addExecutorAndVerify(executorId2)
129129
// Remove the second executor but not the first
@@ -140,7 +140,7 @@ class HeartbeatReceiverSuite
140140

141141
test("expire dead hosts") {
142142
val executorTimeout = heartbeatReceiver.invokePrivate(_executorTimeoutMs())
143-
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
143+
heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
144144
addExecutorAndVerify(executorId1)
145145
addExecutorAndVerify(executorId2)
146146
triggerHeartbeat(executorId1, executorShouldReregister = false)
@@ -149,7 +149,7 @@ class HeartbeatReceiverSuite
149149
heartbeatReceiverClock.advance(executorTimeout / 2)
150150
triggerHeartbeat(executorId1, executorShouldReregister = false)
151151
heartbeatReceiverClock.advance(executorTimeout)
152-
heartbeatReceiverRef.askWithRetry[Boolean](ExpireDeadHosts)
152+
heartbeatReceiverRef.askSync[Boolean](ExpireDeadHosts)
153153
// Only the second executor should be expired as a dead host
154154
verify(scheduler).executorLost(Matchers.eq(executorId2), any())
155155
val trackedExecutors = getTrackedExecutors
@@ -173,11 +173,11 @@ class HeartbeatReceiverSuite
173173
val dummyExecutorEndpoint2 = new FakeExecutorEndpoint(rpcEnv)
174174
val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1)
175175
val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2)
176-
fakeSchedulerBackend.driverEndpoint.askWithRetry[Boolean](
176+
fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
177177
RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty))
178-
fakeSchedulerBackend.driverEndpoint.askWithRetry[Boolean](
178+
fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
179179
RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "1.2.3.5", 0, Map.empty))
180-
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
180+
heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
181181
addExecutorAndVerify(executorId1)
182182
addExecutorAndVerify(executorId2)
183183
triggerHeartbeat(executorId1, executorShouldReregister = false)
@@ -195,7 +195,7 @@ class HeartbeatReceiverSuite
195195
// Here we use a timeout of O(seconds), but in practice this whole test takes O(10ms).
196196
val executorTimeout = heartbeatReceiver.invokePrivate(_executorTimeoutMs())
197197
heartbeatReceiverClock.advance(executorTimeout * 2)
198-
heartbeatReceiverRef.askWithRetry[Boolean](ExpireDeadHosts)
198+
heartbeatReceiverRef.askSync[Boolean](ExpireDeadHosts)
199199
val killThread = heartbeatReceiver.invokePrivate(_killExecutorThread())
200200
killThread.shutdown() // needed for awaitTermination
201201
killThread.awaitTermination(10L, TimeUnit.SECONDS)
@@ -213,7 +213,7 @@ class HeartbeatReceiverSuite
213213
executorShouldReregister: Boolean): Unit = {
214214
val metrics = TaskMetrics.empty
215215
val blockManagerId = BlockManagerId(executorId, "localhost", 12345)
216-
val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
216+
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
217217
Heartbeat(executorId, Array(1L -> metrics.accumulators()), blockManagerId))
218218
if (executorShouldReregister) {
219219
assert(response.reregisterBlockManager)

0 commit comments

Comments
 (0)