Skip to content

Commit ae8a2b1

Browse files
aosagiecloud-fan
authored andcommitted
[SPARK-21176][WEB UI] Use a single ProxyServlet to proxy all workers and applications
## What changes were proposed in this pull request? Currently, each application and each worker creates their own proxy servlet. Each proxy servlet is backed by its own HTTP client and a relatively large number of selector threads. This is excessive but was fixed (to an extent) by #18437. However, a single HTTP client (backed by a single selector thread) should be enough to handle all proxy requests. This PR creates a single proxy servlet no matter how many applications and workers there are. ## How was this patch tested? . The unit tests for rewriting proxied locations and headers were updated. I then spun up a 100 node cluster to ensure that proxy'ing worked correctly jiangxb1987 Please let me know if there's anything else I can do to help push this thru. Thanks! Author: Anderson Osagie <[email protected]> Closes #18499 from aosagie/fix/minimize-proxy-threads.
1 parent f016f5c commit ae8a2b1

File tree

4 files changed

+46
-55
lines changed

4 files changed

+46
-55
lines changed

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ private[deploy] class Master(
133133
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
134134
if (reverseProxy) {
135135
masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl)
136+
webUi.addProxy()
136137
logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and " +
137138
s"Applications UIs are available at $masterWebUiUrl")
138139
}
@@ -769,9 +770,6 @@ private[deploy] class Master(
769770
workers += worker
770771
idToWorker(worker.id) = worker
771772
addressToWorker(workerAddress) = worker
772-
if (reverseProxy) {
773-
webUi.addProxyTargets(worker.id, worker.webUiAddress)
774-
}
775773
true
776774
}
777775

@@ -780,9 +778,7 @@ private[deploy] class Master(
780778
worker.setState(WorkerState.DEAD)
781779
idToWorker -= worker.id
782780
addressToWorker -= worker.endpoint.address
783-
if (reverseProxy) {
784-
webUi.removeProxyTargets(worker.id)
785-
}
781+
786782
for (exec <- worker.executors.values) {
787783
logInfo("Telling app of lost executor: " + exec.id)
788784
exec.application.driver.send(ExecutorUpdated(
@@ -844,9 +840,6 @@ private[deploy] class Master(
844840
endpointToApp(app.driver) = app
845841
addressToApp(appAddress) = app
846842
waitingApps += app
847-
if (reverseProxy) {
848-
webUi.addProxyTargets(app.id, app.desc.appUiUrl)
849-
}
850843
}
851844

852845
private def finishApplication(app: ApplicationInfo) {
@@ -860,9 +853,7 @@ private[deploy] class Master(
860853
idToApp -= app.id
861854
endpointToApp -= app.driver
862855
addressToApp -= app.driver.address
863-
if (reverseProxy) {
864-
webUi.removeProxyTargets(app.id)
865-
}
856+
866857
if (completedApps.size >= RETAINED_APPLICATIONS) {
867858
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
868859
completedApps.take(toRemove).foreach { a =>

core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,7 @@
1717

1818
package org.apache.spark.deploy.master.ui
1919

20-
import scala.collection.mutable.HashMap
21-
22-
import org.eclipse.jetty.servlet.ServletContextHandler
23-
20+
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
2421
import org.apache.spark.deploy.master.Master
2522
import org.apache.spark.internal.Logging
2623
import org.apache.spark.ui.{SparkUI, WebUI}
@@ -38,7 +35,6 @@ class MasterWebUI(
3835

3936
val masterEndpointRef = master.self
4037
val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)
41-
private val proxyHandlers = new HashMap[String, ServletContextHandler]
4238

4339
initialize()
4440

@@ -54,16 +50,19 @@ class MasterWebUI(
5450
"/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = Set("POST")))
5551
}
5652

57-
def addProxyTargets(id: String, target: String): Unit = {
58-
val endTarget = target.stripSuffix("/")
59-
val handler = createProxyHandler("/proxy/" + id, endTarget)
53+
def addProxy(): Unit = {
54+
val handler = createProxyHandler(idToUiAddress)
6055
attachHandler(handler)
61-
proxyHandlers(id) = handler
6256
}
6357

64-
def removeProxyTargets(id: String): Unit = {
65-
proxyHandlers.remove(id).foreach(detachHandler)
58+
def idToUiAddress(id: String): Option[String] = {
59+
val state = masterEndpointRef.askSync[MasterStateResponse](RequestMasterState)
60+
val maybeWorkerUiAddress = state.workers.find(_.id == id).map(_.webUiAddress)
61+
val maybeAppUiAddress = state.activeApps.find(_.id == id).map(_.desc.appUiUrl)
62+
63+
maybeWorkerUiAddress.orElse(maybeAppUiAddress)
6664
}
65+
6766
}
6867

6968
private[master] object MasterWebUI {

core/src/main/scala/org/apache/spark/ui/JettyUtils.scala

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -194,28 +194,32 @@ private[spark] object JettyUtils extends Logging {
194194
}
195195

196196
/** Create a handler for proxying request to Workers and Application Drivers */
197-
def createProxyHandler(
198-
prefix: String,
199-
target: String): ServletContextHandler = {
197+
def createProxyHandler(idToUiAddress: String => Option[String]): ServletContextHandler = {
200198
val servlet = new ProxyServlet {
201199
override def rewriteTarget(request: HttpServletRequest): String = {
202-
val rewrittenURI = createProxyURI(
203-
prefix, target, request.getRequestURI(), request.getQueryString())
204-
if (rewrittenURI == null) {
205-
return null
206-
}
207-
if (!validateDestination(rewrittenURI.getHost(), rewrittenURI.getPort())) {
208-
return null
200+
val path = request.getPathInfo
201+
if (path == null) return null
202+
203+
val prefixTrailingSlashIndex = path.indexOf('/', 1)
204+
val prefix = if (prefixTrailingSlashIndex == -1) {
205+
path
206+
} else {
207+
path.substring(0, prefixTrailingSlashIndex)
209208
}
210-
rewrittenURI.toString()
209+
val id = prefix.drop(1)
210+
211+
// Query master state for id's corresponding UI address
212+
// If that address exists, turn it into a valid, target URI string or return null
213+
idToUiAddress(id)
214+
.map(createProxyURI(prefix, _, path, request.getQueryString))
215+
.filter(uri => uri != null && validateDestination(uri.getHost, uri.getPort))
216+
.map(_.toString)
217+
.orNull
211218
}
212219

213220
override def newHttpClient(): HttpClient = {
214221
// SPARK-21176: Use the Jetty logic to calculate the number of selector threads (#CPUs/2),
215222
// but limit it to 8 max.
216-
// Otherwise, it might happen that we exhaust the threadpool since in reverse proxy mode
217-
// a proxy is instantiated for each executor. If the head node has many processors, this
218-
// can quickly add up to an unreasonably high number of threads.
219223
val numSelectors = math.max(1, math.min(8, Runtime.getRuntime().availableProcessors() / 2))
220224
new HttpClient(new HttpClientTransportOverHTTP(numSelectors), null)
221225
}
@@ -226,8 +230,8 @@ private[spark] object JettyUtils extends Logging {
226230
headerName: String,
227231
headerValue: String): String = {
228232
if (headerName.equalsIgnoreCase("location")) {
229-
val newHeader = createProxyLocationHeader(
230-
prefix, headerValue, clientRequest, serverResponse.getRequest().getURI())
233+
val newHeader = createProxyLocationHeader(headerValue, clientRequest,
234+
serverResponse.getRequest().getURI())
231235
if (newHeader != null) {
232236
return newHeader
233237
}
@@ -239,8 +243,8 @@ private[spark] object JettyUtils extends Logging {
239243

240244
val contextHandler = new ServletContextHandler
241245
val holder = new ServletHolder(servlet)
242-
contextHandler.setContextPath(prefix)
243-
contextHandler.addServlet(holder, "/")
246+
contextHandler.setContextPath("/proxy")
247+
contextHandler.addServlet(holder, "/*")
244248
contextHandler
245249
}
246250

@@ -438,7 +442,7 @@ private[spark] object JettyUtils extends Logging {
438442
val rest = path.substring(prefix.length())
439443

440444
if (!rest.isEmpty()) {
441-
if (!rest.startsWith("/")) {
445+
if (!rest.startsWith("/") && !uri.endsWith("/")) {
442446
uri.append("/")
443447
}
444448
uri.append(rest)
@@ -458,14 +462,13 @@ private[spark] object JettyUtils extends Logging {
458462
}
459463

460464
def createProxyLocationHeader(
461-
prefix: String,
462465
headerValue: String,
463466
clientRequest: HttpServletRequest,
464467
targetUri: URI): String = {
465468
val toReplace = targetUri.getScheme() + "://" + targetUri.getAuthority()
466469
if (headerValue.startsWith(toReplace)) {
467470
clientRequest.getScheme() + "://" + clientRequest.getHeader("host") +
468-
prefix + headerValue.substring(toReplace.length())
471+
clientRequest.getPathInfo() + headerValue.substring(toReplace.length())
469472
} else {
470473
null
471474
}

core/src/test/scala/org/apache/spark/ui/UISuite.scala

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -200,36 +200,34 @@ class UISuite extends SparkFunSuite {
200200
}
201201

202202
test("verify proxy rewrittenURI") {
203-
val prefix = "/proxy/worker-id"
203+
val prefix = "/worker-id"
204204
val target = "http://localhost:8081"
205-
val path = "/proxy/worker-id/json"
205+
val path = "/worker-id/json"
206206
var rewrittenURI = JettyUtils.createProxyURI(prefix, target, path, null)
207207
assert(rewrittenURI.toString() === "http://localhost:8081/json")
208208
rewrittenURI = JettyUtils.createProxyURI(prefix, target, path, "test=done")
209209
assert(rewrittenURI.toString() === "http://localhost:8081/json?test=done")
210-
rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-id", null)
210+
rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/worker-id", null)
211211
assert(rewrittenURI.toString() === "http://localhost:8081")
212-
rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-id/test%2F", null)
212+
rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/worker-id/test%2F", null)
213213
assert(rewrittenURI.toString() === "http://localhost:8081/test%2F")
214-
rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-id/%F0%9F%98%84", null)
214+
rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/worker-id/%F0%9F%98%84", null)
215215
assert(rewrittenURI.toString() === "http://localhost:8081/%F0%9F%98%84")
216-
rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-noid/json", null)
216+
rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/worker-noid/json", null)
217217
assert(rewrittenURI === null)
218218
}
219219

220220
test("verify rewriting location header for reverse proxy") {
221221
val clientRequest = mock(classOf[HttpServletRequest])
222222
var headerValue = "http://localhost:4040/jobs"
223-
val prefix = "/proxy/worker-id"
224223
val targetUri = URI.create("http://localhost:4040")
225224
when(clientRequest.getScheme()).thenReturn("http")
226225
when(clientRequest.getHeader("host")).thenReturn("localhost:8080")
227-
var newHeader = JettyUtils.createProxyLocationHeader(
228-
prefix, headerValue, clientRequest, targetUri)
226+
when(clientRequest.getPathInfo()).thenReturn("/proxy/worker-id")
227+
var newHeader = JettyUtils.createProxyLocationHeader(headerValue, clientRequest, targetUri)
229228
assert(newHeader.toString() === "http://localhost:8080/proxy/worker-id/jobs")
230229
headerValue = "http://localhost:4041/jobs"
231-
newHeader = JettyUtils.createProxyLocationHeader(
232-
prefix, headerValue, clientRequest, targetUri)
230+
newHeader = JettyUtils.createProxyLocationHeader(headerValue, clientRequest, targetUri)
233231
assert(newHeader === null)
234232
}
235233

0 commit comments

Comments
 (0)