Skip to content

Commit 9165ae8

Browse files
author
Andrew Or
committed
Fall back to Akka if endpoint was not REST
In this commit we also introduce a new type of exception to serve this purpose and privatize as many classes as possible.
1 parent 252d53c commit 9165ae8

File tree

6 files changed

+61
-56
lines changed

6 files changed

+61
-56
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -124,22 +124,33 @@ object SparkSubmit {
124124
* running the child main class based on the cluster manager and the deploy mode.
125125
* Second, we use this launch environment to invoke the main method of the child
126126
* main class.
127-
*
128-
* As of Spark 1.3, a REST-based application submission gateway is introduced.
129-
* If this is enabled, then we will run standalone cluster mode by passing the submit
130-
* parameters directly to a REST client, which will submit the application using the
131-
* REST protocol instead.
132127
*/
133128
private[spark] def submit(args: SparkSubmitArguments): Unit = {
134129
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
135-
if (args.isStandaloneCluster && args.isRestEnabled) {
136-
printStream.println("Running Spark using the REST application submission protocol.")
137-
val client = new StandaloneRestClient
138-
val response = client.createSubmission(args)
139-
response match {
140-
case s: CreateSubmissionResponse => handleRestResponse(s)
141-
case r => handleUnexpectedRestResponse(r)
130+
/*
131+
* In standalone cluster mode, there are two submission gateways:
132+
* (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper
133+
* (2) The new REST-based gateway introduced in Spark 1.3
134+
* The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
135+
* to use the legacy gateway if the master endpoint turns out to be not a REST server.
136+
*/
137+
if (args.isStandaloneCluster) {
138+
try {
139+
printStream.println("Running Spark using the REST application submission protocol.")
140+
val client = new StandaloneRestClient
141+
val response = client.createSubmission(args)
142+
response match {
143+
case s: CreateSubmissionResponse => handleRestResponse(s)
144+
case r => handleUnexpectedRestResponse(r)
145+
}
146+
} catch {
147+
// Fail over to use the legacy submission gateway
148+
case e: SubmitRestConnectionException =>
149+
printStream.println(s"Master endpoint ${args.master} was not a " +
150+
s"REST server. Falling back to legacy submission gateway instead.")
151+
runMain(childArgs, childClasspath, sysProps, childMainClass)
142152
}
153+
// In all other modes, just run the main class as prepared
143154
} else {
144155
runMain(childArgs, childClasspath, sysProps, childMainClass)
145156
}
@@ -152,6 +163,7 @@ object SparkSubmit {
152163
* (2) a list of classpath entries for the child,
153164
* (3) a list of system properties and env vars, and
154165
* (4) the main class for the child
166+
* In standalone cluster mode, this mutates the original arguments passed in.
155167
* Exposed for testing.
156168
*/
157169
private[spark] def prepareSubmitEnvironment(args: SparkSubmitArguments)
@@ -347,7 +359,7 @@ object SparkSubmit {
347359

348360
// In standalone-cluster mode, use Client as a wrapper around the user class
349361
// Note that we won't actually launch this class if we're using the REST protocol
350-
if (args.isStandaloneCluster && !args.isRestEnabled) {
362+
if (args.isStandaloneCluster) {
351363
childMainClass = "org.apache.spark.deploy.Client"
352364
if (args.supervise) {
353365
childArgs += "--supervise"
@@ -419,7 +431,7 @@ object SparkSubmit {
419431
// NOTE: If we are using the REST gateway, we will use the original arguments directly.
420432
// Since we mutate the values of some configs in this method, we must update the
421433
// corresponding fields in the original SparkSubmitArguments to reflect these changes.
422-
if (args.isStandaloneCluster && args.isRestEnabled) {
434+
if (args.isStandaloneCluster) {
423435
args.sparkProperties.clear()
424436
args.sparkProperties ++= sysProps
425437
sysProps.get("spark.jars").foreach { args.jars = _ }

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
6161
var driverToKill: String = null
6262
var driverToRequestStatusFor: String = null
6363

64-
private val restEnabledKey = "spark.submit.rest.enabled"
65-
6664
/** Default properties present in the currently defined defaults file. */
6765
lazy val defaultSparkProperties: HashMap[String, String] = {
6866
val defaultProperties = new HashMap[String, String]()
@@ -220,10 +218,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
220218
if (!isStandaloneCluster) {
221219
SparkSubmit.printErrorAndExit("Killing drivers is only supported in standalone cluster mode")
222220
}
223-
if (!isRestEnabled) {
224-
SparkSubmit.printErrorAndExit("Killing drivers is currently only supported " +
225-
s"through the REST interface. Please set $restEnabledKey to true.")
226-
}
227221
if (driverToKill == null) {
228222
SparkSubmit.printErrorAndExit("Please specify a driver to kill")
229223
}
@@ -234,10 +228,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
234228
SparkSubmit.printErrorAndExit(
235229
"Requesting driver statuses is only supported in standalone cluster mode")
236230
}
237-
if (!isRestEnabled) {
238-
SparkSubmit.printErrorAndExit("Requesting driver statuses is currently only " +
239-
s"supported through the REST interface. Please set $restEnabledKey to true.")
240-
}
241231
if (driverToRequestStatusFor == null) {
242232
SparkSubmit.printErrorAndExit("Please specify a driver to request status for")
243233
}
@@ -247,11 +237,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
247237
master.startsWith("spark://") && deployMode == "cluster"
248238
}
249239

250-
/** Return whether the REST application submission protocol is enabled. */
251-
def isRestEnabled: Boolean = {
252-
sparkProperties.get(restEnabledKey).getOrElse("false").toBoolean
253-
}
254-
255240
override def toString = {
256241
s"""Parsed arguments:
257242
| master $master
@@ -472,8 +457,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
472457
| Spark standalone with cluster deploy mode only:
473458
| --driver-cores NUM Cores for driver (Default: 1).
474459
| --supervise If given, restarts the driver on failure.
475-
| --kill DRIVER_ID If given, kills the driver specified.
476-
| --status DRIVER_ID If given, requests the status of the driver specified.
460+
| --kill SUBMISSION_ID If given, kills the driver specified.
461+
| --status SUBMISSION_ID If given, requests the status of the driver specified.
477462
|
478463
| Spark standalone and Mesos only:
479464
| --total-executor-cores NUM Total cores for all executors.

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717

1818
package org.apache.spark.deploy.rest
1919

20-
import java.io.{FileNotFoundException, DataOutputStream}
21-
import java.net.{HttpURLConnection, URL}
20+
import java.io.{DataOutputStream, FileNotFoundException}
21+
import java.net.{HttpURLConnection, SocketException, URL}
2222

2323
import scala.io.Source
2424

2525
import com.google.common.base.Charsets
2626

27-
import org.apache.spark.{Logging, SparkException, SPARK_VERSION => sparkVersion}
27+
import org.apache.spark.{Logging, SPARK_VERSION => sparkVersion}
2828
import org.apache.spark.deploy.SparkSubmitArguments
2929

3030
/**
@@ -43,8 +43,8 @@ import org.apache.spark.deploy.SparkSubmitArguments
4343
* Additionally, the base URL includes the version of the protocol. For instance:
4444
* http://1.2.3.4:6066/v1/submissions/create. Since the protocol is expected to be stable
4545
* across Spark versions, existing fields cannot be added or removed. In the rare event that
46-
* backward compatibility is broken, Spark must introduce a new protocol version (e.g. v2).
47-
* The client and the server must communicate on the same version of the protocol.
46+
* forward or backward compatibility is broken, Spark must introduce a new protocol version
47+
* (e.g. v2). The client and the server must communicate on the same version of the protocol.
4848
*/
4949
private[spark] class StandaloneRestClient extends Logging {
5050
import StandaloneRestClient._
@@ -123,7 +123,7 @@ private[spark] class StandaloneRestClient extends Logging {
123123
private def readResponse(connection: HttpURLConnection): SubmitRestProtocolResponse = {
124124
try {
125125
val responseJson = Source.fromInputStream(connection.getInputStream).mkString
126-
logDebug(s"Response from the REST server:\n$responseJson")
126+
logDebug(s"Response from the server:\n$responseJson")
127127
val response = SubmitRestProtocolMessage.fromJson(responseJson)
128128
// The response should have already been validated on the server.
129129
// In case this is not true, validate it ourselves to avoid potential NPEs.
@@ -139,39 +139,40 @@ private[spark] class StandaloneRestClient extends Logging {
139139
case error: ErrorResponse =>
140140
logError(s"Server responded with error:\n${error.message}")
141141
error
142-
case response: SubmitRestProtocolResponse =>
143-
response
142+
case response: SubmitRestProtocolResponse => response
144143
case unexpected =>
145144
throw new SubmitRestProtocolException(
146-
s"Unexpected message received from server:\n$unexpected")
145+
s"Message received from server was not a response:\n${unexpected.toJson}")
147146
}
148147
} catch {
149-
case e: FileNotFoundException =>
150-
throw new SparkException(s"Unable to connect to server ${connection.getURL}", e)
148+
case e @ (_: FileNotFoundException | _: SocketException) =>
149+
throw new SubmitRestConnectionException(
150+
s"Unable to connect to server ${connection.getURL}", e)
151151
}
152152
}
153153

154154
/** Return the REST URL for creating a new submission. */
155155
private def getSubmitUrl(master: String): URL = {
156156
val baseUrl = getBaseUrl(master)
157-
new URL(s"$baseUrl/submissions/create")
157+
new URL(s"$baseUrl/create")
158158
}
159159

160160
/** Return the REST URL for killing an existing submission. */
161161
private def getKillUrl(master: String, submissionId: String): URL = {
162162
val baseUrl = getBaseUrl(master)
163-
new URL(s"$baseUrl/submissions/kill/$submissionId")
163+
new URL(s"$baseUrl/kill/$submissionId")
164164
}
165165

166166
/** Return the REST URL for requesting the status of an existing submission. */
167167
private def getStatusUrl(master: String, submissionId: String): URL = {
168168
val baseUrl = getBaseUrl(master)
169-
new URL(s"$baseUrl/submissions/status/$submissionId")
169+
new URL(s"$baseUrl/status/$submissionId")
170170
}
171171

172172
/** Return the base URL for communicating with the server, including the protocol version. */
173173
private def getBaseUrl(master: String): String = {
174-
"http://" + master.stripPrefix("spark://").stripSuffix("/") + "/" + PROTOCOL_VERSION
174+
val masterUrl = master.stripPrefix("spark://").stripSuffix("/")
175+
s"http://$masterUrl/$PROTOCOL_VERSION/submissions"
175176
}
176177

177178
/** Throw an exception if this is not standalone mode. */
@@ -223,10 +224,11 @@ private[spark] class StandaloneRestClient extends Logging {
223224
if (submitSuccess) {
224225
val submissionId = submitResponse.submissionId
225226
if (submissionId != null) {
226-
logInfo(s"Driver successfully submitted as $submissionId. Polling driver state...")
227+
logInfo(s"Submission successfully created as $submissionId. Polling submission state...")
227228
pollSubmissionStatus(master, submissionId)
228229
} else {
229-
logError("Application successfully submitted, but driver ID was not provided!")
230+
// should never happen
231+
logError("Application successfully submitted, but submission ID was not provided!")
230232
}
231233
} else {
232234
val failMessage = Option(submitResponse.message).map { ": " + _ }.getOrElse("")
@@ -267,7 +269,7 @@ private[spark] class StandaloneRestClient extends Logging {
267269
}
268270
Thread.sleep(REPORT_DRIVER_STATUS_INTERVAL)
269271
}
270-
logError(s"Error: Master did not recognize submission $submissionId.")
272+
logError(s"Error: Master did not recognize driver $submissionId.")
271273
}
272274
}
273275

core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolException.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,17 @@ package org.apache.spark.deploy.rest
2020
/**
2121
* An exception thrown in the REST application submission protocol.
2222
*/
23-
class SubmitRestProtocolException(message: String, cause: Exception = null)
23+
private[spark] class SubmitRestProtocolException(message: String, cause: Throwable = null)
2424
extends Exception(message, cause)
2525

2626
/**
2727
* An exception thrown if a field is missing from a [[SubmitRestProtocolMessage]].
2828
*/
29-
class SubmitRestMissingFieldException(message: String) extends SubmitRestProtocolException(message)
29+
private[spark] class SubmitRestMissingFieldException(message: String)
30+
extends SubmitRestProtocolException(message)
31+
32+
/**
33+
* An exception thrown if the REST client cannot reach the REST server.
34+
*/
35+
private[spark] class SubmitRestConnectionException(message: String, cause: Throwable)
36+
extends SubmitRestProtocolException(message, cause)

core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import org.apache.spark.util.Utils
4141
@JsonInclude(Include.NON_NULL)
4242
@JsonAutoDetect(getterVisibility = Visibility.ANY, setterVisibility = Visibility.ANY)
4343
@JsonPropertyOrder(alphabetic = true)
44-
abstract class SubmitRestProtocolMessage {
44+
private[spark] abstract class SubmitRestProtocolMessage {
4545
@JsonIgnore
4646
val messageType = Utils.getFormattedClassName(this)
4747

core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestProtocolSuite.scala renamed to core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,11 @@ import org.apache.spark.deploy.worker.Worker
3838
/**
3939
* End-to-end tests for the REST application submission protocol in standalone mode.
4040
*/
41-
class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
41+
class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
4242
private val systemsToStop = new ArrayBuffer[ActorSystem]
4343
private val masterRestUrl = startLocalCluster()
4444
private val client = new StandaloneRestClient
45-
private val mainJar = StandaloneRestProtocolSuite.createJar()
45+
private val mainJar = StandaloneRestSubmitSuite.createJar()
4646
private val mainClass = StandaloneRestApp.getClass.getName.stripSuffix("$")
4747

4848
override def afterAll() {
@@ -125,7 +125,6 @@ class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with B
125125
"--master", masterRestUrl,
126126
"--name", mainClass,
127127
"--class", mainClass,
128-
"--conf", "spark.submit.rest.enabled=true",
129128
mainJar) ++ appArgs
130129
val args = new SparkSubmitArguments(commandLineArgs)
131130
SparkSubmit.prepareSubmitEnvironment(args)
@@ -195,7 +194,7 @@ class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with B
195194
}
196195
}
197196

198-
private object StandaloneRestProtocolSuite {
197+
private object StandaloneRestSubmitSuite {
199198
private val pathPrefix = "org/apache/spark/deploy/rest"
200199

201200
/**

0 commit comments

Comments
 (0)