Skip to content

Commit a398905

Browse files
BryanCutlerrxin
authored andcommitted
[SPARK-10827][CORE] AppClient should not use askWithReply in receiveAndReply
Changed AppClient to be non-blocking in `receiveAndReply` by using a separate thread to wait for response and reply to the context. The threads are managed by a thread pool. Also added unit tests for the AppClient interface. Author: Bryan Cutler <[email protected]> Closes #9317 from BryanCutler/appClient-receiveAndReply-SPARK-10827.
1 parent 21c562f commit a398905

File tree

2 files changed

+238
-4
lines changed

2 files changed

+238
-4
lines changed

core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ private[spark] class AppClient(
4949
private val REGISTRATION_TIMEOUT_SECONDS = 20
5050
private val REGISTRATION_RETRIES = 3
5151

52-
private var endpoint: RpcEndpointRef = null
53-
private var appId: String = null
52+
@volatile private var endpoint: RpcEndpointRef = null
53+
@volatile private var appId: String = null
5454
@volatile private var registered = false
5555

5656
private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint
@@ -77,6 +77,11 @@ private[spark] class AppClient(
7777
private val registrationRetryThread =
7878
ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread")
7979

80+
// A thread pool to perform receive then reply actions in a thread so as not to block the
81+
// event loop.
82+
private val askAndReplyThreadPool =
83+
ThreadUtils.newDaemonCachedThreadPool("appclient-receive-and-reply-threadpool")
84+
8085
override def onStart(): Unit = {
8186
try {
8287
registerWithMaster(1)
@@ -200,21 +205,40 @@ private[spark] class AppClient(
200205

201206
case r: RequestExecutors =>
202207
master match {
203-
case Some(m) => context.reply(m.askWithRetry[Boolean](r))
208+
case Some(m) => askAndReplyAsync(m, context, r)
204209
case None =>
205210
logWarning("Attempted to request executors before registering with Master.")
206211
context.reply(false)
207212
}
208213

209214
case k: KillExecutors =>
210215
master match {
211-
case Some(m) => context.reply(m.askWithRetry[Boolean](k))
216+
case Some(m) => askAndReplyAsync(m, context, k)
212217
case None =>
213218
logWarning("Attempted to kill executors before registering with Master.")
214219
context.reply(false)
215220
}
216221
}
217222

223+
private def askAndReplyAsync[T](
224+
endpointRef: RpcEndpointRef,
225+
context: RpcCallContext,
226+
msg: T): Unit = {
227+
// Create a thread to ask a message and reply with the result. Allow thread to be
228+
// interrupted during shutdown, otherwise context must be notified of NonFatal errors.
229+
askAndReplyThreadPool.execute(new Runnable {
230+
override def run(): Unit = {
231+
try {
232+
context.reply(endpointRef.askWithRetry[Boolean](msg))
233+
} catch {
234+
case ie: InterruptedException => // Cancelled
235+
case NonFatal(t) =>
236+
context.sendFailure(t)
237+
}
238+
}
239+
})
240+
}
241+
218242
override def onDisconnected(address: RpcAddress): Unit = {
219243
if (master.exists(_.address == address)) {
220244
logWarning(s"Connection to $address failed; waiting for master to reconnect...")
@@ -252,6 +276,7 @@ private[spark] class AppClient(
252276
registrationRetryThread.shutdownNow()
253277
registerMasterFutures.foreach(_.cancel(true))
254278
registerMasterThreadPool.shutdownNow()
279+
askAndReplyThreadPool.shutdownNow()
255280
}
256281

257282
}
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.client
19+
20+
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
21+
import scala.concurrent.duration._
22+
23+
import org.scalatest.BeforeAndAfterAll
24+
import org.scalatest.concurrent.Eventually._
25+
26+
import org.apache.spark._
27+
import org.apache.spark.deploy.{ApplicationDescription, Command}
28+
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
29+
import org.apache.spark.deploy.master.{ApplicationInfo, Master}
30+
import org.apache.spark.deploy.worker.Worker
31+
import org.apache.spark.rpc.RpcEnv
32+
import org.apache.spark.util.Utils
33+
34+
/**
35+
* End-to-end tests for application client in standalone mode.
36+
*/
37+
class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterAll {
38+
private val numWorkers = 2
39+
private val conf = new SparkConf()
40+
private val securityManager = new SecurityManager(conf)
41+
42+
private var masterRpcEnv: RpcEnv = null
43+
private var workerRpcEnvs: Seq[RpcEnv] = null
44+
private var master: Master = null
45+
private var workers: Seq[Worker] = null
46+
47+
/**
48+
* Start the local cluster.
49+
* Note: local-cluster mode is insufficient because we want a reference to the Master.
50+
*/
51+
override def beforeAll(): Unit = {
52+
super.beforeAll()
53+
masterRpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 0, conf, securityManager)
54+
workerRpcEnvs = (0 until numWorkers).map { i =>
55+
RpcEnv.create(Worker.SYSTEM_NAME + i, "localhost", 0, conf, securityManager)
56+
}
57+
master = makeMaster()
58+
workers = makeWorkers(10, 2048)
59+
// Wait until all workers register with master successfully
60+
eventually(timeout(60.seconds), interval(10.millis)) {
61+
assert(getMasterState.workers.size === numWorkers)
62+
}
63+
}
64+
65+
override def afterAll(): Unit = {
66+
workerRpcEnvs.foreach(_.shutdown())
67+
masterRpcEnv.shutdown()
68+
workers.foreach(_.stop())
69+
master.stop()
70+
workerRpcEnvs = null
71+
masterRpcEnv = null
72+
workers = null
73+
master = null
74+
super.afterAll()
75+
}
76+
77+
test("interface methods of AppClient using local Master") {
78+
val ci = new AppClientInst(masterRpcEnv.address.toSparkURL)
79+
80+
ci.client.start()
81+
82+
// Client should connect with one Master which registers the application
83+
eventually(timeout(10.seconds), interval(10.millis)) {
84+
val apps = getApplications()
85+
assert(ci.listener.connectedIdList.size === 1, "client listener should have one connection")
86+
assert(apps.size === 1, "master should have 1 registered app")
87+
}
88+
89+
// Send message to Master to request Executors, verify request by change in executor limit
90+
val numExecutorsRequested = 1
91+
assert(ci.client.requestTotalExecutors(numExecutorsRequested))
92+
93+
eventually(timeout(10.seconds), interval(10.millis)) {
94+
val apps = getApplications()
95+
assert(apps.head.getExecutorLimit === numExecutorsRequested, s"executor request failed")
96+
}
97+
98+
// Send request to kill executor, verify request was made
99+
assert {
100+
val apps = getApplications()
101+
val executorId: String = apps.head.executors.head._2.fullId
102+
ci.client.killExecutors(Seq(executorId))
103+
}
104+
105+
// Issue stop command for Client to disconnect from Master
106+
ci.client.stop()
107+
108+
// Verify Client is marked dead and unregistered from Master
109+
eventually(timeout(10.seconds), interval(10.millis)) {
110+
val apps = getApplications()
111+
assert(ci.listener.deadReasonList.size === 1, "client should have been marked dead")
112+
assert(apps.isEmpty, "master should have 0 registered apps")
113+
}
114+
}
115+
116+
test("request from AppClient before initialized with master") {
117+
val ci = new AppClientInst(masterRpcEnv.address.toSparkURL)
118+
119+
// requests to master should fail immediately
120+
assert(ci.client.requestTotalExecutors(3) === false)
121+
}
122+
123+
// ===============================
124+
// | Utility methods for testing |
125+
// ===============================
126+
127+
/** Return a SparkConf for applications that want to talk to our Master. */
128+
private def appConf: SparkConf = {
129+
new SparkConf()
130+
.setMaster(masterRpcEnv.address.toSparkURL)
131+
.setAppName("test")
132+
.set("spark.executor.memory", "256m")
133+
}
134+
135+
/** Make a master to which our application will send executor requests. */
136+
private def makeMaster(): Master = {
137+
val master = new Master(masterRpcEnv, masterRpcEnv.address, 0, securityManager, conf)
138+
masterRpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
139+
master
140+
}
141+
142+
/** Make a few workers that talk to our master. */
143+
private def makeWorkers(cores: Int, memory: Int): Seq[Worker] = {
144+
(0 until numWorkers).map { i =>
145+
val rpcEnv = workerRpcEnvs(i)
146+
val worker = new Worker(rpcEnv, 0, cores, memory, Array(masterRpcEnv.address),
147+
Worker.SYSTEM_NAME + i, Worker.ENDPOINT_NAME, null, conf, securityManager)
148+
rpcEnv.setupEndpoint(Worker.ENDPOINT_NAME, worker)
149+
worker
150+
}
151+
}
152+
153+
/** Get the Master state */
154+
private def getMasterState: MasterStateResponse = {
155+
master.self.askWithRetry[MasterStateResponse](RequestMasterState)
156+
}
157+
158+
/** Get the applictions that are active from Master */
159+
private def getApplications(): Seq[ApplicationInfo] = {
160+
getMasterState.activeApps
161+
}
162+
163+
/** Application Listener to collect events */
164+
private class AppClientCollector extends AppClientListener with Logging {
165+
val connectedIdList = new ArrayBuffer[String] with SynchronizedBuffer[String]
166+
@volatile var disconnectedCount: Int = 0
167+
val deadReasonList = new ArrayBuffer[String] with SynchronizedBuffer[String]
168+
val execAddedList = new ArrayBuffer[String] with SynchronizedBuffer[String]
169+
val execRemovedList = new ArrayBuffer[String] with SynchronizedBuffer[String]
170+
171+
def connected(id: String): Unit = {
172+
connectedIdList += id
173+
}
174+
175+
def disconnected(): Unit = {
176+
synchronized {
177+
disconnectedCount += 1
178+
}
179+
}
180+
181+
def dead(reason: String): Unit = {
182+
deadReasonList += reason
183+
}
184+
185+
def executorAdded(
186+
id: String,
187+
workerId: String,
188+
hostPort: String,
189+
cores: Int,
190+
memory: Int): Unit = {
191+
execAddedList += id
192+
}
193+
194+
def executorRemoved(id: String, message: String, exitStatus: Option[Int]): Unit = {
195+
execRemovedList += id
196+
}
197+
}
198+
199+
/** Create AppClient and supporting objects */
200+
private class AppClientInst(masterUrl: String) {
201+
val rpcEnv = RpcEnv.create("spark", Utils.localHostName(), 0, conf, securityManager)
202+
private val cmd = new Command(TestExecutor.getClass.getCanonicalName.stripSuffix("$"),
203+
List(), Map(), Seq(), Seq(), Seq())
204+
private val desc = new ApplicationDescription("AppClientSuite", Some(1), 512, cmd, "ignored")
205+
val listener = new AppClientCollector
206+
val client = new AppClient(rpcEnv, Array(masterUrl), desc, listener, new SparkConf)
207+
}
208+
209+
}

0 commit comments

Comments
 (0)