@@ -21,11 +21,10 @@ import java.net.URL
2121
2222import org .apache .spark .{SPARK_VERSION => sparkVersion }
2323import org .apache .spark .deploy .SparkSubmitArguments
24- import org .apache .spark .util .Utils
2524
2625/**
27- * A client that submits applications to the standalone Master using the stable REST protocol.
28- * This client is intended to communicate with the StandaloneRestServer. Cluster mode only.
26+ * A client that submits applications to the standalone Master using the REST protocol
27+ * This client is intended to communicate with the [[ StandaloneRestServer ]] . Cluster mode only.
2928 */
3029private [spark] class StandaloneRestClient extends SubmitRestClient {
3130 import StandaloneRestClient ._
@@ -38,7 +37,8 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {
3837 * this reports failure and logs an error message provided by the REST server.
3938 */
4039 override def submitDriver (args : SparkSubmitArguments ): SubmitDriverResponse = {
41- val submitResponse = super .submitDriver(args).asInstanceOf [SubmitDriverResponse ]
40+ validateSubmitArgs(args)
41+ val submitResponse = super .submitDriver(args)
4242 val submitSuccess = submitResponse.getSuccess.toBoolean
4343 if (submitSuccess) {
4444 val driverId = submitResponse.getDriverId
@@ -51,14 +51,25 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {
5151 submitResponse
5252 }
5353
54+ /** Request that the REST server kill the specified driver. */
55+ override def killDriver (master : String , driverId : String ): KillDriverResponse = {
56+ validateMaster(master)
57+ super .killDriver(master, driverId)
58+ }
59+
60+ /** Request the status of the specified driver from the REST server. */
61+ override def requestDriverStatus (master : String , driverId : String ): DriverStatusResponse = {
62+ validateMaster(master)
63+ super .requestDriverStatus(master, driverId)
64+ }
65+
5466 /**
55- * Poll the status of the driver that was just submitted and report it.
56- * This retries up to a fixed number of times until giving up.
67+ * Poll the status of the driver that was just submitted and log it.
68+ * This retries up to a fixed number of times before giving up.
5769 */
5870 private def pollSubmittedDriverStatus (master : String , driverId : String ): Unit = {
5971 (1 to REPORT_DRIVER_STATUS_MAX_TRIES ).foreach { _ =>
6072 val statusResponse = requestDriverStatus(master, driverId)
61- .asInstanceOf [DriverStatusResponse ]
6273 val statusSuccess = statusResponse.getSuccess.toBoolean
6374 if (statusSuccess) {
6475 val driverState = statusResponse.getDriverState
@@ -75,13 +86,13 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {
7586 exception.foreach { e => logError(e) }
7687 return
7788 }
89+ Thread .sleep(REPORT_DRIVER_STATUS_INTERVAL )
7890 }
7991 logError(s " Error: Master did not recognize driver $driverId. " )
8092 }
8193
8294 /** Construct a submit driver request message. */
83- override protected def constructSubmitRequest (
84- args : SparkSubmitArguments ): SubmitDriverRequest = {
95+ protected override def constructSubmitRequest (args : SparkSubmitArguments ): SubmitDriverRequest = {
8596 val message = new SubmitDriverRequest ()
8697 .setSparkVersion(sparkVersion)
8798 .setAppName(args.name)
@@ -99,12 +110,14 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {
99110 .setTotalExecutorCores(args.totalExecutorCores)
100111 args.childArgs.foreach(message.addAppArg)
101112 args.sparkProperties.foreach { case (k, v) => message.setSparkProperty(k, v) }
102- // TODO: send special environment variables?
113+ sys.env.foreach { case (k, v) =>
114+ if (k.startsWith(" SPARK_" )) { message.setEnvironmentVariable(k, v) }
115+ }
103116 message
104117 }
105118
106119 /** Construct a kill driver request message. */
107- override protected def constructKillRequest (
120+ protected override def constructKillRequest (
108121 master : String ,
109122 driverId : String ): KillDriverRequest = {
110123 new KillDriverRequest ()
@@ -113,33 +126,34 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {
113126 }
114127
115128 /** Construct a driver status request message. */
116- override protected def constructStatusRequest (
129+ protected override def constructStatusRequest (
117130 master : String ,
118131 driverId : String ): DriverStatusRequest = {
119132 new DriverStatusRequest ()
120133 .setSparkVersion(sparkVersion)
121134 .setDriverId(driverId)
122135 }
123136
137+ /** Extract the URL portion of the master address. */
138+ protected override def getHttpUrl (master : String ): URL = {
139+ validateMaster(master)
140+ new URL (" http://" + master.stripPrefix(" spark://" ))
141+ }
142+
124143 /** Throw an exception if this is not standalone mode. */
125- override protected def validateMaster (master : String ): Unit = {
144+ private def validateMaster (master : String ): Unit = {
126145 if (! master.startsWith(" spark://" )) {
127146 throw new IllegalArgumentException (" This REST client is only supported in standalone mode." )
128147 }
129148 }
130149
131- /** Throw an exception if this is not cluster deploy mode. */
132- override protected def validateDeployMode (deployMode : String ): Unit = {
133- if (deployMode != " cluster" ) {
134- throw new IllegalArgumentException (" This REST client is only supported in cluster mode." )
150+ /** Throw an exception if this is not standalone cluster mode. */
151+ private def validateSubmitArgs (args : SparkSubmitArguments ): Unit = {
152+ if (! args.isStandaloneCluster) {
153+ throw new IllegalArgumentException (
154+ " This REST client is only supported in standalone cluster mode." )
135155 }
136156 }
137-
138- /** Extract the URL portion of the master address. */
139- override protected def getHttpUrl (master : String ): URL = {
140- validateMaster(master)
141- new URL (" http://" + master.stripPrefix(" spark://" ))
142- }
143157}
144158
145159private object StandaloneRestClient {
0 commit comments