Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 6ee3be5

Browse files
mccheahash211
authored andcommitted
Retry the submit-application request to multiple nodes (#69)
* Retry the submit-application request to multiple nodes. * Fix doc style comment * Check node unschedulable, log retry failures
1 parent b2e6877 commit 6ee3be5

File tree

5 files changed

+117
-28
lines changed

5 files changed

+117
-28
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -361,11 +361,13 @@ private[spark] class Client(
361361
DEFAULT_BLOCKMANAGER_PORT.toString)
362362
val driverSubmitter = buildDriverSubmissionClient(kubernetesClient, service,
363363
driverSubmitSslOptions)
364-
val ping = Retry.retry(5, 5.seconds) {
364+
val ping = Retry.retry(5, 5.seconds,
365+
Some("Failed to contact the driver server")) {
365366
driverSubmitter.ping()
366367
}
367368
ping onFailure {
368369
case t: Throwable =>
370+
logError("Ping failed to the driver server", t)
369371
submitCompletedFuture.setException(t)
370372
kubernetesClient.services().delete(service)
371373
}
@@ -532,17 +534,6 @@ private[spark] class Client(
532534
kubernetesClient: KubernetesClient,
533535
service: Service,
534536
driverSubmitSslOptions: SSLOptions): KubernetesSparkRestApi = {
535-
val servicePort = service
536-
.getSpec
537-
.getPorts
538-
.asScala
539-
.filter(_.getName == SUBMISSION_SERVER_PORT_NAME)
540-
.head
541-
.getNodePort
542-
// NodePort is exposed on every node, so just pick one of them.
543-
// TODO be resilient to node failures and try all of them
544-
val node = kubernetesClient.nodes.list.getItems.asScala.head
545-
val nodeAddress = node.getStatus.getAddresses.asScala.head.getAddress
546537
val urlScheme = if (driverSubmitSslOptions.enabled) {
547538
"https"
548539
} else {
@@ -551,15 +542,23 @@ private[spark] class Client(
551542
" to secure this step.")
552543
"http"
553544
}
545+
val servicePort = service.getSpec.getPorts.asScala
546+
.filter(_.getName == SUBMISSION_SERVER_PORT_NAME)
547+
.head.getNodePort
548+
val nodeUrls = kubernetesClient.nodes.list.getItems.asScala
549+
.filterNot(_.getSpec.getUnschedulable)
550+
.flatMap(_.getStatus.getAddresses.asScala.map(address => {
551+
s"$urlScheme://${address.getAddress}:$servicePort"
552+
})).toArray
553+
require(nodeUrls.nonEmpty, "No nodes found to contact the driver!")
554554
val (trustManager, sslContext): (X509TrustManager, SSLContext) =
555555
if (driverSubmitSslOptions.enabled) {
556556
buildSslConnectionConfiguration(driverSubmitSslOptions)
557557
} else {
558558
(null, SSLContext.getDefault)
559559
}
560-
val url = s"$urlScheme://$nodeAddress:$servicePort"
561560
HttpClientUtil.createClient[KubernetesSparkRestApi](
562-
url,
561+
uris = nodeUrls,
563562
sslSocketFactory = sslContext.getSocketFactory,
564563
trustContext = trustManager)
565564
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Retry.scala

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,36 @@ package org.apache.spark.deploy.kubernetes
1919
import scala.concurrent.{ExecutionContext, Future}
2020
import scala.concurrent.duration.Duration
2121

22-
private[spark] object Retry {
22+
import org.apache.spark.SparkException
23+
import org.apache.spark.internal.Logging
24+
25+
private[spark] object Retry extends Logging {
2326

2427
private def retryableFuture[T]
25-
(times: Int, interval: Duration)
28+
(attempt: Int, maxAttempts: Int, interval: Duration, retryMessage: Option[String])
2629
(f: => Future[T])
2730
(implicit executionContext: ExecutionContext): Future[T] = {
2831
f recoverWith {
29-
case _ if times > 0 => {
30-
Thread.sleep(interval.toMillis)
31-
retryableFuture(times - 1, interval)(f)
32-
}
32+
case error: Throwable =>
33+
if (attempt <= maxAttempts) {
34+
retryMessage.foreach { message =>
35+
logWarning(s"$message - attempt $attempt of $maxAttempts", error)
36+
}
37+
Thread.sleep(interval.toMillis)
38+
retryableFuture(attempt + 1, maxAttempts, interval, retryMessage)(f)
39+
} else {
40+
Future.failed(retryMessage.map(message =>
41+
new SparkException(s"$message - reached $maxAttempts attempts," +
42+
s" and aborting task.", error)
43+
).getOrElse(error))
44+
}
3345
}
3446
}
3547

3648
def retry[T]
37-
(times: Int, interval: Duration)
49+
(times: Int, interval: Duration, retryMessage: Option[String] = None)
3850
(f: => T)
3951
(implicit executionContext: ExecutionContext): Future[T] = {
40-
retryableFuture(times, interval)(Future[T] { f })
52+
retryableFuture(1, times, interval, retryMessage)(Future[T] { f })
4153
}
4254
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/HttpClientUtil.scala

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import javax.net.ssl.{SSLContext, SSLSocketFactory, X509TrustManager}
2020

2121
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
2222
import com.fasterxml.jackson.module.scala.DefaultScalaModule
23-
import feign.Feign
23+
import feign.{Client, Feign, Request, Response}
2424
import feign.Request.Options
2525
import feign.jackson.{JacksonDecoder, JacksonEncoder}
2626
import feign.jaxrs.JAXRSContract
@@ -32,7 +32,7 @@ import org.apache.spark.status.api.v1.JacksonMessageWriter
3232
private[spark] object HttpClientUtil {
3333

3434
def createClient[T: ClassTag](
35-
uri: String,
35+
uris: Array[String],
3636
sslSocketFactory: SSLSocketFactory = SSLContext.getDefault.getSocketFactory,
3737
trustContext: X509TrustManager = null,
3838
readTimeoutMillis: Int = 20000,
@@ -45,13 +45,24 @@ private[spark] object HttpClientUtil {
4545
.registerModule(new DefaultScalaModule)
4646
.setDateFormat(JacksonMessageWriter.makeISODateFormat)
4747
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
48-
val clazz = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
48+
val target = new MultiServerFeignTarget[T](uris)
49+
val baseHttpClient = new feign.okhttp.OkHttpClient(httpClientBuilder.build())
50+
val resetTargetHttpClient = new Client {
51+
override def execute(request: Request, options: Options): Response = {
52+
val response = baseHttpClient.execute(request, options)
53+
if (response.status() >= 200 && response.status() < 300) {
54+
target.reset()
55+
}
56+
response
57+
}
58+
}
4959
Feign.builder()
50-
.client(new feign.okhttp.OkHttpClient(httpClientBuilder.build()))
60+
.client(resetTargetHttpClient)
5161
.contract(new JAXRSContract)
5262
.encoder(new JacksonEncoder(objectMapper))
5363
.decoder(new JacksonDecoder(objectMapper))
5464
.options(new Options(connectTimeoutMillis, readTimeoutMillis))
55-
.target(clazz, uri)
65+
.retryer(target)
66+
.target(target)
5667
}
5768
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.rest.kubernetes
18+
19+
import feign.{Request, RequestTemplate, RetryableException, Retryer, Target}
20+
import scala.reflect.ClassTag
21+
import scala.util.Random
22+
23+
private[kubernetes] class MultiServerFeignTarget[T : ClassTag](
24+
private val servers: Seq[String]) extends Target[T] with Retryer {
25+
require(servers.nonEmpty, "Must provide at least one server URI.")
26+
27+
private val threadLocalShuffledServers = new ThreadLocal[Seq[String]] {
28+
override def initialValue(): Seq[String] = Random.shuffle(servers)
29+
}
30+
31+
override def `type`(): Class[T] = {
32+
implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
33+
}
34+
35+
override def url(): String = threadLocalShuffledServers.get.head
36+
37+
/**
38+
* Cloning the target is done on every request, for use on the current
39+
* thread - thus it's important that clone returns a "fresh" target.
40+
*/
41+
override def clone(): Retryer = {
42+
reset()
43+
this
44+
}
45+
46+
override def name(): String = {
47+
s"${getClass.getSimpleName} with servers [${servers.mkString(",")}]"
48+
}
49+
50+
override def apply(requestTemplate: RequestTemplate): Request = {
51+
if (!requestTemplate.url().startsWith("http")) {
52+
requestTemplate.insert(0, url())
53+
}
54+
requestTemplate.request()
55+
}
56+
57+
override def continueOrPropagate(e: RetryableException): Unit = {
58+
threadLocalShuffledServers.set(threadLocalShuffledServers.get.drop(1))
59+
if (threadLocalShuffledServers.get.isEmpty) {
60+
throw e
61+
}
62+
}
63+
64+
def reset(): Unit = {
65+
threadLocalShuffledServers.set(Random.shuffle(servers))
66+
}
67+
}

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ private[spark] object Minikube extends Logging {
123123
.build()
124124
val sslContext = SSLUtils.sslContext(kubernetesConf)
125125
val trustManager = SSLUtils.trustManagers(kubernetesConf)(0).asInstanceOf[X509TrustManager]
126-
HttpClientUtil.createClient[T](url, sslContext.getSocketFactory, trustManager)
126+
HttpClientUtil.createClient[T](Array(url), sslContext.getSocketFactory, trustManager)
127127
}
128128

129129
def executeMinikubeSsh(command: String): Unit = {

0 commit comments

Comments
 (0)