From fa1fa806460de532c25ec715d8bc8886ab990a98 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Wed, 25 Mar 2015 11:46:53 +0800 Subject: [PATCH 01/10] submit app to HA cluster in standalone cluster mode --- .../org/apache/spark/deploy/Client.scala | 82 ++++++++------- .../apache/spark/deploy/ClientArguments.scala | 6 +- .../org/apache/spark/deploy/SparkSubmit.scala | 8 +- .../apache/spark/deploy/master/Master.scala | 17 ++-- .../deploy/rest/StandaloneRestClient.scala | 99 +++++++++++-------- .../rest/StandaloneRestSubmitSuite.scala | 40 ++++---- 6 files changed, 149 insertions(+), 103 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 237d26fc6bd0e..c5572bc4189fd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -35,16 +35,15 @@ import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils} private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with ActorLogReceive with Logging { - var masterActor: ActorSelection = _ + var mastersActor = driverArgs.masters.map { m => + context.actorSelection(Master.toAkkaUrl(m, AkkaUtils.protocol(context.system))) + } val timeout = AkkaUtils.askTimeout(conf) override def preStart() = { - masterActor = context.actorSelection( - Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(context.system))) - context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) - println(s"Sending ${driverArgs.cmd} command to ${driverArgs.master}") + println(s"Sending ${driverArgs.cmd} command to ${driverArgs.masters}") driverArgs.cmd match { case "launch" => @@ -79,10 +78,12 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) driverArgs.supervise, command) + for (masterActor <- mastersActor) masterActor ! RequestSubmitDriver(driverDescription) case "kill" => val driverId = driverArgs.driverId + for (masterActor <- mastersActor) masterActor ! RequestKillDriver(driverId) } } @@ -92,29 +93,33 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) println(s"... waiting before polling master for driver state") Thread.sleep(5000) println("... polling master for driver state") - val statusFuture = (masterActor ? RequestDriverStatus(driverId))(timeout) - .mapTo[DriverStatusResponse] - val statusResponse = Await.result(statusFuture, timeout) - - statusResponse.found match { - case false => - println(s"ERROR: Cluster master did not recognize $driverId") - System.exit(-1) - case true => - println(s"State of $driverId is ${statusResponse.state.get}") - // Worker node, if present - (statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match { - case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => - println(s"Driver running on $hostPort ($id)") - case _ => - } - // Exception, if present - statusResponse.exception.map { e => - println(s"Exception from cluster was: $e") - e.printStackTrace() - System.exit(-1) - } - System.exit(0) + for (masterActor <- mastersActor) { + val statusFuture = (masterActor ? RequestDriverStatus(driverId))(timeout) + .mapTo[DriverStatusResponse] + val statusResponse = Await.result(statusFuture, timeout) + + statusResponse.found match { + case false => + statusResponse.exception.getOrElse { + println(s"ERROR: Cluster master did not recognize $driverId") + System.exit(-1) + } + case true => + println(s"State of $driverId is ${statusResponse.state.get}") + // Worker node, if present + (statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match { + case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => + println(s"Driver running on $hostPort ($id)") + case _ => + } + // Exception, if present + statusResponse.exception.map { e => + println(s"Exception from cluster was: $e") + e.printStackTrace() + System.exit(-1) + } + System.exit(0) + } } } @@ -122,18 +127,27 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) case SubmitDriverResponse(success, driverId, message) => println(message) - if (success) pollAndReportStatus(driverId.get) else System.exit(-1) + if (success) { + pollAndReportStatus(driverId.get) + } else if (!message.contains("Can only")) { + System.exit(-1) + } + case KillDriverResponse(driverId, success, message) => println(message) - if (success) pollAndReportStatus(driverId) else System.exit(-1) + if (success) { + pollAndReportStatus(driverId) + } else if (!message.contains("Can only")) { + System.exit(-1) + } case DisassociatedEvent(_, remoteAddress, _) => - println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.") + println(s"Error connecting to master ${driverArgs.masters} ($remoteAddress), exiting.") System.exit(-1) case AssociationErrorEvent(cause, _, remoteAddress, _, _) => - println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.") + println(s"Error connecting to master ${driverArgs.masters} ($remoteAddress), exiting.") println(s"Cause was: $cause") System.exit(-1) } @@ -163,7 +177,9 @@ object Client { "driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf)) // Verify driverArgs.master is a valid url so that we can use it in ClientActor safely - Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(actorSystem)) + for (m <- driverArgs.masters) { + Master.toAkkaUrl(m, AkkaUtils.protocol(actorSystem)) + } actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf)) actorSystem.awaitTermination() diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala index 53bc62aff7395..7a0e5ac9033f0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala @@ -35,7 +35,7 @@ private[deploy] class ClientArguments(args: Array[String]) { var logLevel = Level.WARN // launch parameters - var master: String = "" + var masters: Array[String] = null var jarUrl: String = "" var mainClass: String = "" var supervise: Boolean = DEFAULT_SUPERVISE @@ -80,13 +80,13 @@ private[deploy] class ClientArguments(args: Array[String]) { } jarUrl = _jarUrl - master = _master + masters = _master.stripPrefix("spark://").split(",").map("spark://" + _) mainClass = _mainClass _driverOptions ++= tail case "kill" :: _master :: _driverId :: tail => cmd = "kill" - master = _master + masters = _master.stripPrefix("spark://").split(",").map("spark://" + _) driverId = _driverId case _ => diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 4f506be63fe59..ab30b8724854c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -115,8 +115,8 @@ object SparkSubmit { /** Kill an existing submission using the REST protocol. Standalone cluster mode only. */ private def kill(args: SparkSubmitArguments): Unit = { - new StandaloneRestClient() - .killSubmission(args.master, args.submissionToKill) + new StandaloneRestClient(args.master) + .killSubmission(args.submissionToKill) } /** @@ -124,8 +124,8 @@ object SparkSubmit { * Standalone cluster mode only. */ private def requestStatus(args: SparkSubmitArguments): Unit = { - new StandaloneRestClient() - .requestSubmissionStatus(args.master, args.submissionToRequestStatusFor) + new StandaloneRestClient(args.master) + .requestSubmissionStatus(args.submissionToRequestStatusFor) } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 1b42121c8db05..df7ef2e833082 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -305,12 +305,17 @@ private[master] class Master( } case RequestDriverStatus(driverId) => { - (drivers ++ completedDrivers).find(_.id == driverId) match { - case Some(driver) => - sender ! DriverStatusResponse(found = true, Some(driver.state), - driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception) - case None => - sender ! DriverStatusResponse(found = false, None, None, None, None) + if (state != RecoveryState.ALIVE) { + val msg = s"Can only request driver status in ALIVE state. Current state: $state." + sender ! DriverStatusResponse(found = false, None, None, None, Some(new Exception(msg))) + } else { + (drivers ++ completedDrivers).find(_.id == driverId) match { + case Some(driver) => + sender ! DriverStatusResponse(found = true, Some(driver.state), + driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception) + case None => + sender ! DriverStatusResponse(found = false, None, None, None, None) + } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala index 420442f7564cc..a09b1d3a55175 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala @@ -52,9 +52,11 @@ import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion} * implementation of this client can use that information to retry using the version specified * by the server. */ -private[deploy] class StandaloneRestClient extends Logging { +private[deploy] class StandaloneRestClient(master: String) extends Logging { import StandaloneRestClient._ + val masters: Array[String] = master.stripPrefix("spark://").split(",").map("spark://" + _) + /** * Submit an application specified by the parameters in the provided request. * @@ -62,45 +64,70 @@ private[deploy] class StandaloneRestClient extends Logging { * it to the user. Otherwise, report the error message provided by the server. */ private[rest] def createSubmission( - master: String, request: CreateSubmissionRequest): SubmitRestProtocolResponse = { logInfo(s"Submitting a request to launch an application in $master.") - validateMaster(master) - val url = getSubmitUrl(master) - val response = postJson(url, request.toJson) - response match { - case s: CreateSubmissionResponse => - reportSubmissionStatus(master, s) - handleRestResponse(s) - case unexpected => - handleUnexpectedRestResponse(unexpected) + var suc: Boolean = false + var response: SubmitRestProtocolResponse = null + for (m <- masters if !suc) { + validateMaster(m) + val url = getSubmitUrl(m) + response = postJson(url, request.toJson) + response match { + case s: CreateSubmissionResponse => + if (s.success) { + reportSubmissionStatus(s) + handleRestResponse(s) + suc = true + } + case unexpected => + handleUnexpectedRestResponse(unexpected) + } } response } /** Request that the server kill the specified submission. */ - def killSubmission(master: String, submissionId: String): SubmitRestProtocolResponse = { + def killSubmission(submissionId: String): SubmitRestProtocolResponse = { logInfo(s"Submitting a request to kill submission $submissionId in $master.") - validateMaster(master) - val response = post(getKillUrl(master, submissionId)) - response match { - case k: KillSubmissionResponse => handleRestResponse(k) - case unexpected => handleUnexpectedRestResponse(unexpected) + var suc: Boolean = false + var response: SubmitRestProtocolResponse = null + for (m <- masters if !suc) { + validateMaster(m) + response = post(getKillUrl(m, submissionId)) + response match { + case k: KillSubmissionResponse => + if (!k.message.contains("Can only")) { + handleRestResponse(k) + suc = true + } + case unexpected => + handleUnexpectedRestResponse(unexpected) + } } response } /** Request the status of a submission from the server. */ def requestSubmissionStatus( - master: String, submissionId: String, quiet: Boolean = false): SubmitRestProtocolResponse = { logInfo(s"Submitting a request for the status of submission $submissionId in $master.") - validateMaster(master) - val response = get(getStatusUrl(master, submissionId)) - response match { - case s: SubmissionStatusResponse => if (!quiet) { handleRestResponse(s) } - case unexpected => handleUnexpectedRestResponse(unexpected) + var suc: Boolean = false + var response: SubmitRestProtocolResponse = null + for (m <- masters) { + validateMaster(m) + response = get(getStatusUrl(m, submissionId)) + response match { + case s: SubmissionStatusResponse => + if (!s.message.contains("Can only")) { + if (!quiet) { + handleRestResponse(s) + } + suc = true + } + case unexpected => + handleUnexpectedRestResponse(unexpected) + } } response } @@ -228,20 +255,14 @@ private[deploy] class StandaloneRestClient extends Logging { /** Report the status of a newly created submission. */ private def reportSubmissionStatus( - master: String, submitResponse: CreateSubmissionResponse): Unit = { - if (submitResponse.success) { - val submissionId = submitResponse.submissionId - if (submissionId != null) { - logInfo(s"Submission successfully created as $submissionId. Polling submission state...") - pollSubmissionStatus(master, submissionId) - } else { - // should never happen - logError("Application successfully submitted, but submission ID was not provided!") - } + val submissionId = submitResponse.submissionId + if (submissionId != null) { + logInfo(s"Submission successfully created as $submissionId. Polling submission state...") + pollSubmissionStatus(submissionId) } else { - val failMessage = Option(submitResponse.message).map { ": " + _ }.getOrElse("") - logError("Application submission failed" + failMessage) + // should never happen + logError("Application successfully submitted, but submission ID was not provided!") } } @@ -249,9 +270,9 @@ private[deploy] class StandaloneRestClient extends Logging { * Poll the status of the specified submission and log it. * This retries up to a fixed number of times before giving up. */ - private def pollSubmissionStatus(master: String, submissionId: String): Unit = { + private def pollSubmissionStatus(submissionId: String): Unit = { (1 to REPORT_DRIVER_STATUS_MAX_TRIES).foreach { _ => - val response = requestSubmissionStatus(master, submissionId, quiet = true) + val response = requestSubmissionStatus(submissionId, quiet = true) val statusResponse = response match { case s: SubmissionStatusResponse => s case _ => return // unexpected type, let upstream caller handle it @@ -311,10 +332,10 @@ private[rest] object StandaloneRestClient { } val sparkProperties = conf.getAll.toMap val environmentVariables = env.filter { case (k, _) => k.startsWith("SPARK_") } - val client = new StandaloneRestClient + val client = new StandaloneRestClient(master) val submitRequest = client.constructSubmitRequest( appResource, mainClass, appArgs, sparkProperties, environmentVariables) - client.createSubmission(master, submitRequest) + client.createSubmission(submitRequest) } def main(args: Array[String]): Unit = { diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala index 2fa90e3bd1c63..2e7201b58c7ac 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala @@ -39,7 +39,6 @@ import org.apache.spark.deploy.master.DriverState._ * Tests for the REST application submission protocol used in standalone cluster mode. */ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { - private val client = new StandaloneRestClient private var actorSystem: Option[ActorSystem] = None private var server: Option[StandaloneRestServer] = None @@ -52,7 +51,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { val appArgs = Array("one", "two", "three") val sparkProperties = Map("spark.app.name" -> "pi") val environmentVariables = Map("SPARK_ONE" -> "UN", "SPARK_TWO" -> "DEUX") - val request = client.constructSubmitRequest( + val request = new StandaloneRestClient("spark://host:port").constructSubmitRequest( "my-app-resource", "my-main-class", appArgs, sparkProperties, environmentVariables) assert(request.action === Utils.getFormattedClassName(request)) assert(request.clientSparkVersion === SPARK_VERSION) @@ -71,7 +70,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { val request = constructSubmitRequest(masterUrl, appArgs) assert(request.appArgs === appArgs) assert(request.sparkProperties("spark.master") === masterUrl) - val response = client.createSubmission(masterUrl, request) + val response = new StandaloneRestClient(masterUrl).createSubmission(request) val submitResponse = getSubmitResponse(response) assert(submitResponse.action === Utils.getFormattedClassName(submitResponse)) assert(submitResponse.serverSparkVersion === SPARK_VERSION) @@ -102,7 +101,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { val submissionId = "my-lyft-driver" val killMessage = "your driver is killed" val masterUrl = startDummyServer(killMessage = killMessage) - val response = client.killSubmission(masterUrl, submissionId) + val response = new StandaloneRestClient(masterUrl).killSubmission(submissionId) val killResponse = getKillResponse(response) assert(killResponse.action === Utils.getFormattedClassName(killResponse)) assert(killResponse.serverSparkVersion === SPARK_VERSION) @@ -116,7 +115,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { val submissionState = KILLED val submissionException = new Exception("there was an irresponsible mix of alcohol and cars") val masterUrl = startDummyServer(state = submissionState, exception = Some(submissionException)) - val response = client.requestSubmissionStatus(masterUrl, submissionId) + val response = new StandaloneRestClient(masterUrl).requestSubmissionStatus(submissionId) val statusResponse = getStatusResponse(response) assert(statusResponse.action === Utils.getFormattedClassName(statusResponse)) assert(statusResponse.serverSparkVersion === SPARK_VERSION) @@ -129,13 +128,14 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { test("create then kill") { val masterUrl = startSmartServer() val request = constructSubmitRequest(masterUrl) - val response1 = client.createSubmission(masterUrl, request) + val client = new StandaloneRestClient(masterUrl) + val response1 = client.createSubmission(request) val submitResponse = getSubmitResponse(response1) assert(submitResponse.success) assert(submitResponse.submissionId != null) // kill submission that was just created val submissionId = submitResponse.submissionId - val response2 = client.killSubmission(masterUrl, submissionId) + val response2 = client.killSubmission(submissionId) val killResponse = getKillResponse(response2) assert(killResponse.success) assert(killResponse.submissionId === submissionId) @@ -144,13 +144,14 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { test("create then request status") { val masterUrl = startSmartServer() val request = constructSubmitRequest(masterUrl) - val response1 = client.createSubmission(masterUrl, request) + val client = new StandaloneRestClient(masterUrl) + val response1 = client.createSubmission(request) val submitResponse = getSubmitResponse(response1) assert(submitResponse.success) assert(submitResponse.submissionId != null) // request status of submission that was just created val submissionId = submitResponse.submissionId - val response2 = client.requestSubmissionStatus(masterUrl, submissionId) + val response2 = client.requestSubmissionStatus(submissionId) val statusResponse = getStatusResponse(response2) assert(statusResponse.success) assert(statusResponse.submissionId === submissionId) @@ -160,8 +161,9 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { test("create then kill then request status") { val masterUrl = startSmartServer() val request = constructSubmitRequest(masterUrl) - val response1 = client.createSubmission(masterUrl, request) - val response2 = client.createSubmission(masterUrl, request) + val client = new StandaloneRestClient(masterUrl) + val response1 = client.createSubmission(request) + val response2 = client.createSubmission(request) val submitResponse1 = getSubmitResponse(response1) val submitResponse2 = getSubmitResponse(response2) assert(submitResponse1.success) @@ -171,13 +173,13 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { val submissionId1 = submitResponse1.submissionId val submissionId2 = submitResponse2.submissionId // kill only submission 1, but not submission 2 - val response3 = client.killSubmission(masterUrl, submissionId1) + val response3 = client.killSubmission(submissionId1) val killResponse = getKillResponse(response3) assert(killResponse.success) assert(killResponse.submissionId === submissionId1) // request status for both submissions: 1 should be KILLED but 2 should be RUNNING still - val response4 = client.requestSubmissionStatus(masterUrl, submissionId1) - val response5 = client.requestSubmissionStatus(masterUrl, submissionId2) + val response4 = client.requestSubmissionStatus(submissionId1) + val response5 = client.requestSubmissionStatus(submissionId2) val statusResponse1 = getStatusResponse(response4) val statusResponse2 = getStatusResponse(response5) assert(statusResponse1.submissionId === submissionId1) @@ -189,13 +191,14 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { test("kill or request status before create") { val masterUrl = startSmartServer() val doesNotExist = "does-not-exist" + val client = new StandaloneRestClient(masterUrl) // kill a non-existent submission - val response1 = client.killSubmission(masterUrl, doesNotExist) + val response1 = client.killSubmission(doesNotExist) val killResponse = getKillResponse(response1) assert(!killResponse.success) assert(killResponse.submissionId === doesNotExist) // request status for a non-existent submission - val response2 = client.requestSubmissionStatus(masterUrl, doesNotExist) + val response2 = client.requestSubmissionStatus(doesNotExist) val statusResponse = getStatusResponse(response2) assert(!statusResponse.success) assert(statusResponse.submissionId === doesNotExist) @@ -339,6 +342,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { test("client handles faulty server") { val masterUrl = startFaultyServer() + val client = new StandaloneRestClient(masterUrl) val httpUrl = masterUrl.replace("spark://", "http://") val v = StandaloneRestServer.PROTOCOL_VERSION val submitRequestPath = s"$httpUrl/$v/submissions/create" @@ -425,7 +429,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { mainJar) ++ appArgs val args = new SparkSubmitArguments(commandLineArgs) val (_, _, sparkProperties, _) = SparkSubmit.prepareSubmitEnvironment(args) - client.constructSubmitRequest( + new StandaloneRestClient("spark://host:port").constructSubmitRequest( mainJar, mainClass, appArgs, sparkProperties.toMap, Map.empty) } @@ -492,7 +496,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { method: String, body: String = ""): (SubmitRestProtocolResponse, Int) = { val conn = sendHttpRequest(url, method, body) - (client.readResponse(conn), conn.getResponseCode) + (new StandaloneRestClient("spark://host:port").readResponse(conn), conn.getResponseCode) } } From 2b011c9d0faa5e54818464cb19d6270e46b3823b Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Wed, 25 Mar 2015 15:55:15 +0800 Subject: [PATCH 02/10] fix broken tests --- .../spark/deploy/rest/StandaloneRestClient.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala index a09b1d3a55175..d2430136f2512 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala @@ -114,17 +114,16 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging { logInfo(s"Submitting a request for the status of submission $submissionId in $master.") var suc: Boolean = false var response: SubmitRestProtocolResponse = null + masters.foreach(println(_)) for (m <- masters) { validateMaster(m) response = get(getStatusUrl(m, submissionId)) response match { - case s: SubmissionStatusResponse => - if (!s.message.contains("Can only")) { - if (!quiet) { - handleRestResponse(s) - } - suc = true + case s: SubmissionStatusResponse if s.success => + if (!quiet) { + handleRestResponse(s) } + suc = true case unexpected => handleUnexpectedRestResponse(unexpected) } From 7a881b3b111e3b2b5f5c915d6c5ab0501586e515 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Thu, 26 Mar 2015 11:52:11 +0800 Subject: [PATCH 03/10] when one of masters is gone, we still can submit --- .../org/apache/spark/deploy/Client.scala | 89 ++++++++++-------- .../deploy/rest/StandaloneRestClient.scala | 90 +++++++++++++------ 2 files changed, 115 insertions(+), 64 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 3b6361ef41d22..6a140d610fa0c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -28,6 +28,7 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils} +import scala.collection.mutable.HashSet /** * Proxy that relays messages to the driver. @@ -35,16 +36,17 @@ import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils} private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with ActorLogReceive with Logging { - var mastersActor = driverArgs.masters.map { m => + val mastersActor = driverArgs.masters.map { m => context.actorSelection(Master.toAkkaUrl(m, AkkaUtils.protocol(context.system))) } + val lostMasters = new HashSet[Address] + var activeMasterActor: ActorSelection = null + val timeout = AkkaUtils.askTimeout(conf) override def preStart(): Unit = { context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) - println(s"Sending ${driverArgs.cmd} command to ${driverArgs.masters}") - driverArgs.cmd match { case "launch" => // TODO: We could add an env variable here and intercept it in `sc.addJar` that would @@ -78,13 +80,15 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) driverArgs.supervise, command) - for (masterActor <- mastersActor) + for (masterActor <- mastersActor) { masterActor ! RequestSubmitDriver(driverDescription) + } case "kill" => val driverId = driverArgs.driverId - for (masterActor <- mastersActor) + for (masterActor <- mastersActor) { masterActor ! RequestKillDriver(driverId) + } } } @@ -93,34 +97,31 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) println(s"... waiting before polling master for driver state") Thread.sleep(5000) println("... polling master for driver state") - for (masterActor <- mastersActor) { - val statusFuture = (masterActor ? RequestDriverStatus(driverId))(timeout) - .mapTo[DriverStatusResponse] - val statusResponse = Await.result(statusFuture, timeout) - - statusResponse.found match { - case false => - statusResponse.exception.getOrElse { - println(s"ERROR: Cluster master did not recognize $driverId") - System.exit(-1) - } - case true => - println(s"State of $driverId is ${statusResponse.state.get}") - // Worker node, if present - (statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match { - case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => - println(s"Driver running on $hostPort ($id)") - case _ => - } - // Exception, if present - statusResponse.exception.map { e => - println(s"Exception from cluster was: $e") - e.printStackTrace() - System.exit(-1) - } - System.exit(0) + val statusFuture = (activeMasterActor ? RequestDriverStatus(driverId))(timeout) + .mapTo[DriverStatusResponse] + val statusResponse = Await.result(statusFuture, timeout) + statusResponse.found match { + case false => + statusResponse.exception.getOrElse { + println(s"ERROR: Cluster master did not recognize $driverId") + System.exit(-1) + } + case true => + println(s"State of $driverId is ${statusResponse.state.get}") + // Worker node, if present + (statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match { + case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => + println(s"Driver running on $hostPort ($id)") + case _ => + } + // Exception, if present + statusResponse.exception.map { e => + println(s"Exception from cluster was: $e") + e.printStackTrace() + System.exit(-1) + } + System.exit(0) } - } } override def receiveWithLogging: PartialFunction[Any, Unit] = { @@ -128,6 +129,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) case SubmitDriverResponse(success, driverId, message) => println(message) if (success) { + activeMasterActor = context.actorSelection(sender.path) pollAndReportStatus(driverId.get) } else if (!message.contains("Can only")) { System.exit(-1) @@ -137,19 +139,32 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) case KillDriverResponse(driverId, success, message) => println(message) if (success) { + activeMasterActor = context.actorSelection(sender.path) pollAndReportStatus(driverId) } else if (!message.contains("Can only")) { System.exit(-1) } case DisassociatedEvent(_, remoteAddress, _) => - println(s"Error connecting to master ${driverArgs.masters} ($remoteAddress), exiting.") - System.exit(-1) + if (!lostMasters.contains(remoteAddress)) { + println(s"Error connecting to master $remoteAddress.") + lostMasters += remoteAddress + if (lostMasters.size >= mastersActor.size) { + println(s"No master is available, exiting.") + System.exit(-1) + } + } case AssociationErrorEvent(cause, _, remoteAddress, _, _) => - println(s"Error connecting to master ${driverArgs.masters} ($remoteAddress), exiting.") - println(s"Cause was: $cause") - System.exit(-1) + if (!lostMasters.contains(remoteAddress)) { + println(s"Error connecting to master ($remoteAddress).") + println(s"Cause was: $cause") + lostMasters += remoteAddress + if (lostMasters.size >= mastersActor.size) { + println(s"No master is available, exiting.") + System.exit(-1) + } + } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala index d2430136f2512..e7541e3dd3c23 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.rest import java.io.{DataOutputStream, FileNotFoundException} -import java.net.{HttpURLConnection, SocketException, URL} +import java.net.{ConnectException, HttpURLConnection, SocketException, URL} import javax.servlet.http.HttpServletResponse import scala.io.Source @@ -27,6 +27,7 @@ import com.fasterxml.jackson.core.JsonProcessingException import com.google.common.base.Charsets import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion} +import scala.collection.mutable /** * A client that submits applications to the standalone Master using a REST protocol. @@ -57,6 +58,8 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging { val masters: Array[String] = master.stripPrefix("spark://").split(",").map("spark://" + _) + private val lostMasters = new mutable.HashSet[String] + /** * Submit an application specified by the parameters in the provided request. * @@ -71,16 +74,24 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging { for (m <- masters if !suc) { validateMaster(m) val url = getSubmitUrl(m) - response = postJson(url, request.toJson) - response match { - case s: CreateSubmissionResponse => - if (s.success) { - reportSubmissionStatus(s) - handleRestResponse(s) - suc = true + try { + response = postJson(url, request.toJson) + response match { + case s: CreateSubmissionResponse => + if (s.success) { + reportSubmissionStatus(s) + handleRestResponse(s) + suc = true + } + case unexpected => + handleUnexpectedRestResponse(unexpected) + } + } catch { + case e @ (_: SubmitRestConnectionException | _: ConnectException) => + if(handleSubmitRestConnectionException(m)) { + throw new SubmitRestConnectionException( + "No master is available for createSubmission.", new Throwable("")) } - case unexpected => - handleUnexpectedRestResponse(unexpected) } } response @@ -93,15 +104,24 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging { var response: SubmitRestProtocolResponse = null for (m <- masters if !suc) { validateMaster(m) - response = post(getKillUrl(m, submissionId)) - response match { - case k: KillSubmissionResponse => - if (!k.message.contains("Can only")) { - handleRestResponse(k) - suc = true + val url = getKillUrl(m, submissionId) + try { + response = post(url) + response match { + case k: KillSubmissionResponse => + if (!k.message.contains("Can only")) { + handleRestResponse(k) + suc = true + } + case unexpected => + handleUnexpectedRestResponse(unexpected) + } + } catch { + case e @ (_: SubmitRestConnectionException | _: ConnectException) => + if(handleSubmitRestConnectionException(m)) { + throw new SubmitRestConnectionException( + "No master is available for killSubmission.", new Throwable("")) } - case unexpected => - handleUnexpectedRestResponse(unexpected) } } response @@ -114,18 +134,26 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging { logInfo(s"Submitting a request for the status of submission $submissionId in $master.") var suc: Boolean = false var response: SubmitRestProtocolResponse = null - masters.foreach(println(_)) for (m <- masters) { validateMaster(m) - response = get(getStatusUrl(m, submissionId)) - response match { - case s: SubmissionStatusResponse if s.success => - if (!quiet) { - handleRestResponse(s) + val url = getStatusUrl(m, submissionId) + try { + response = get(url) + response match { + case s: SubmissionStatusResponse if s.success => + if (!quiet) { + handleRestResponse(s) + } + suc = true + case unexpected => + handleUnexpectedRestResponse(unexpected) + } + } catch { + case e @ (_: SubmitRestConnectionException | _: ConnectException) => + if(handleSubmitRestConnectionException(m)) { + throw new SubmitRestConnectionException( + "No master is available for requestSubmissionStatus.", new Throwable("")) } - suc = true - case unexpected => - handleUnexpectedRestResponse(unexpected) } } response @@ -309,6 +337,14 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging { private def handleUnexpectedRestResponse(unexpected: SubmitRestProtocolResponse): Unit = { logError(s"Error: Server responded with message of unexpected type ${unexpected.messageType}.") } + + private def handleSubmitRestConnectionException(url: String): Boolean = { + if (!lostMasters.contains(url)) { + logWarning(s"Unable to connect to server ${url}.") + lostMasters += url + } + lostMasters.size >= masters.size + } } private[rest] object StandaloneRestClient { From 5d23958f0f103831d4804773a8bdc5f08f62eba9 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Tue, 7 Apr 2015 20:29:49 +0800 Subject: [PATCH 04/10] refact some duplicated code, style and comments --- .../org/apache/spark/deploy/Client.scala | 39 ++++--- .../apache/spark/deploy/ClientArguments.scala | 7 +- .../apache/spark/deploy/master/Master.scala | 8 +- .../deploy/rest/StandaloneRestClient.scala | 107 ++++++++++-------- .../spark/deploy/worker/WorkerArguments.scala | 2 +- .../scala/org/apache/spark/util/Utils.scala | 5 + 6 files changed, 96 insertions(+), 72 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 6a140d610fa0c..bbd39a17ca71a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy +import scala.collection.mutable.HashSet import scala.concurrent._ import akka.actor._ @@ -28,19 +29,21 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils} -import scala.collection.mutable.HashSet /** * Proxy that relays messages to the driver. + * + * Now we don't support retry in case submission failed. In HA mode, client will submit request to + * all masters and see which one could handle it. */ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with ActorLogReceive with Logging { - val mastersActor = driverArgs.masters.map { m => + private val masterActors = driverArgs.masters.map { m => context.actorSelection(Master.toAkkaUrl(m, AkkaUtils.protocol(context.system))) } - val lostMasters = new HashSet[Address] - var activeMasterActor: ActorSelection = null + private val lostMasters = new HashSet[Address] + private var activeMasterActor: ActorSelection = null val timeout = AkkaUtils.askTimeout(conf) @@ -80,15 +83,17 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) driverArgs.supervise, command) - for (masterActor <- mastersActor) { - masterActor ! RequestSubmitDriver(driverDescription) - } + // This assumes only one Master is active at a time + for (masterActor <- masterActors) { + masterActor ! RequestSubmitDriver(driverDescription) + } case "kill" => val driverId = driverArgs.driverId - for (masterActor <- mastersActor) { - masterActor ! RequestKillDriver(driverId) - } + // This assumes only one Master is active at a time + for (masterActor <- masterActors) { + masterActor ! RequestKillDriver(driverId) + } } } @@ -121,7 +126,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) System.exit(-1) } System.exit(0) - } + } } override def receiveWithLogging: PartialFunction[Any, Unit] = { @@ -131,7 +136,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) if (success) { activeMasterActor = context.actorSelection(sender.path) pollAndReportStatus(driverId.get) - } else if (!message.contains("Can only")) { + } else if (!message.startsWith(Utils.MASTER_NOT_ALIVE_STRING)) { System.exit(-1) } @@ -141,7 +146,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) if (success) { activeMasterActor = context.actorSelection(sender.path) pollAndReportStatus(driverId) - } else if (!message.contains("Can only")) { + } else if (!message.startsWith(Utils.MASTER_NOT_ALIVE_STRING)) { System.exit(-1) } @@ -149,8 +154,8 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) if (!lostMasters.contains(remoteAddress)) { println(s"Error connecting to master $remoteAddress.") lostMasters += remoteAddress - if (lostMasters.size >= mastersActor.size) { - println(s"No master is available, exiting.") + if (lostMasters.size >= masterActors.size) { + println("No master is available, exiting.") System.exit(-1) } } @@ -160,8 +165,8 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) println(s"Error connecting to master ($remoteAddress).") println(s"Cause was: $cause") lostMasters += remoteAddress - if (lostMasters.size >= mastersActor.size) { - println(s"No master is available, exiting.") + if (lostMasters.size >= masterActors.size) { + println("No master is available, exiting.") System.exit(-1) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala index 645cf60be50c9..e2ae84c97049c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala @@ -22,8 +22,7 @@ import java.net.{URI, URISyntaxException} import scala.collection.mutable.ListBuffer import org.apache.log4j.Level - -import org.apache.spark.util.{IntParam, MemoryParam} +import org.apache.spark.util.{IntParam, MemoryParam, Utils} /** * Command-line parser for the driver client. @@ -80,13 +79,13 @@ private[deploy] class ClientArguments(args: Array[String]) { } jarUrl = _jarUrl - masters = _master.stripPrefix("spark://").split(",").map("spark://" + _) + masters = Utils.splitMasterAdress(_master) mainClass = _mainClass _driverOptions ++= tail case "kill" :: _master :: _driverId :: tail => cmd = "kill" - masters = _master.stripPrefix("spark://").split(",").map("spark://" + _) + masters = Utils.splitMasterAdress(_master) driverId = _driverId case _ => diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 9a1ebf1b83270..c587428128db8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -254,7 +254,8 @@ private[master] class Master( case RequestSubmitDriver(description) => { if (state != RecoveryState.ALIVE) { - val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state." + val msg = s"${Utils.MASTER_NOT_ALIVE_STRING}$state. " + + "Can only accept driver submissions in ALIVE state." sender ! SubmitDriverResponse(false, None, msg) } else { logInfo("Driver submitted " + description.command.mainClass) @@ -274,7 +275,7 @@ private[master] class Master( case RequestKillDriver(driverId) => { if (state != RecoveryState.ALIVE) { - val msg = s"Can only kill drivers in ALIVE state. Current state: $state." + val msg = s"${Utils.MASTER_NOT_ALIVE_STRING}$state. Can only kill drivers in ALIVE state." sender ! KillDriverResponse(driverId, success = false, msg) } else { logInfo("Asked to kill driver " + driverId) @@ -306,7 +307,8 @@ private[master] class Master( case RequestDriverStatus(driverId) => { if (state != RecoveryState.ALIVE) { - val msg = s"Can only request driver status in ALIVE state. Current state: $state." + val msg = s"${Utils.MASTER_NOT_ALIVE_STRING}$state. " + + "Can only request driver status in ALIVE state." sender ! DriverStatusResponse(found = false, None, None, None, Some(new Exception(msg))) } else { (drivers ++ completedDrivers).find(_.id == driverId) match { diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala index e7541e3dd3c23..7f4b7be6e0b0d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala @@ -21,13 +21,14 @@ import java.io.{DataOutputStream, FileNotFoundException} import java.net.{ConnectException, HttpURLConnection, SocketException, URL} import javax.servlet.http.HttpServletResponse +import scala.collection.mutable import scala.io.Source import com.fasterxml.jackson.core.JsonProcessingException import com.google.common.base.Charsets import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion} -import scala.collection.mutable +import org.apache.spark.util.Utils /** * A client that submits applications to the standalone Master using a REST protocol. @@ -52,11 +53,14 @@ import scala.collection.mutable * is a mismatch, the server will respond with the highest protocol version it supports. A future * implementation of this client can use that information to retry using the version specified * by the server. + * + * Now we don't support retry in case submission failed. In HA mode, client will submit request to + * all masters and see which one could handle it. */ private[deploy] class StandaloneRestClient(master: String) extends Logging { import StandaloneRestClient._ - val masters: Array[String] = master.stripPrefix("spark://").split(",").map("spark://" + _) + private val masters: Array[String] = Utils.splitMasterAdress(master) private val lostMasters = new mutable.HashSet[String] @@ -87,10 +91,15 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging { handleUnexpectedRestResponse(unexpected) } } catch { - case e @ (_: SubmitRestConnectionException | _: ConnectException) => - if(handleSubmitRestConnectionException(m)) { + case unreachable @ (_: FileNotFoundException | _: SocketException | _: ConnectException) => + if(handleConnectionException(m)) { throw new SubmitRestConnectionException( - "No master is available for createSubmission.", new Throwable("")) + s"Unable to connect to server", unreachable) + } + case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) => + if(handleConnectionException(m)) { + throw new SubmitRestProtocolException( + "Malformed response received from server", malformed) } } } @@ -109,7 +118,7 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging { response = post(url) response match { case k: KillSubmissionResponse => - if (!k.message.contains("Can only")) { + if (!k.message.startsWith(Utils.MASTER_NOT_ALIVE_STRING)) { handleRestResponse(k) suc = true } @@ -117,10 +126,15 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging { handleUnexpectedRestResponse(unexpected) } } catch { - case e @ (_: SubmitRestConnectionException | _: ConnectException) => - if(handleSubmitRestConnectionException(m)) { + case unreachable @ (_: FileNotFoundException | _: SocketException | _: ConnectException) => + if(handleConnectionException(m)) { throw new SubmitRestConnectionException( - "No master is available for killSubmission.", new Throwable("")) + s"Unable to connect to server", unreachable) + } + case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) => + if(handleConnectionException(m)) { + throw new SubmitRestProtocolException( + "Malformed response received from server", malformed) } } } @@ -149,10 +163,15 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging { handleUnexpectedRestResponse(unexpected) } } catch { - case e @ (_: SubmitRestConnectionException | _: ConnectException) => - if(handleSubmitRestConnectionException(m)) { + case unreachable @ (_: FileNotFoundException | _: SocketException | _: ConnectException) => + if(handleConnectionException(m)) { throw new SubmitRestConnectionException( - "No master is available for requestSubmissionStatus.", new Throwable("")) + s"Unable to connect to server", unreachable) + } + case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) => + if(handleConnectionException(m)) { + throw new SubmitRestProtocolException( + "Malformed response received from server", malformed) } } } @@ -213,39 +232,30 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging { * Exposed for testing. */ private[rest] def readResponse(connection: HttpURLConnection): SubmitRestProtocolResponse = { - try { - val dataStream = - if (connection.getResponseCode == HttpServletResponse.SC_OK) { - connection.getInputStream - } else { - connection.getErrorStream - } - // If the server threw an exception while writing a response, it will not have a body - if (dataStream == null) { - throw new SubmitRestProtocolException("Server returned empty body") - } - val responseJson = Source.fromInputStream(dataStream).mkString - logDebug(s"Response from the server:\n$responseJson") - val response = SubmitRestProtocolMessage.fromJson(responseJson) - response.validate() - response match { - // If the response is an error, log the message - case error: ErrorResponse => - logError(s"Server responded with error:\n${error.message}") - error - // Otherwise, simply return the response - case response: SubmitRestProtocolResponse => response - case unexpected => - throw new SubmitRestProtocolException( - s"Message received from server was not a response:\n${unexpected.toJson}") + val dataStream = + if (connection.getResponseCode == HttpServletResponse.SC_OK) { + connection.getInputStream + } else { + connection.getErrorStream } - } catch { - case unreachable @ (_: FileNotFoundException | _: SocketException) => - throw new SubmitRestConnectionException( - s"Unable to connect to server ${connection.getURL}", unreachable) - case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) => + // If the server threw an exception while writing a response, it will not have a body + if (dataStream == null) { + throw new SubmitRestProtocolException("Server returned empty body") + } + val responseJson = Source.fromInputStream(dataStream).mkString + logDebug(s"Response from the server:\n$responseJson") + val response = SubmitRestProtocolMessage.fromJson(responseJson) + response.validate() + response match { + // If the response is an error, log the message + case error: ErrorResponse => + logError(s"Server responded with error:\n${error.message}") + error + // Otherwise, simply return the response + case response: SubmitRestProtocolResponse => response + case unexpected => throw new SubmitRestProtocolException( - "Malformed response received from server", malformed) + s"Message received from server was not a response:\n${unexpected.toJson}") } } @@ -338,10 +348,13 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging { logError(s"Error: Server responded with message of unexpected type ${unexpected.messageType}.") } - private def handleSubmitRestConnectionException(url: String): Boolean = { - if (!lostMasters.contains(url)) { - logWarning(s"Unable to connect to server ${url}.") - lostMasters += url + /** + * When a connection exception was caught, we see whether all masters are lost. + */ + private def handleConnectionException(masterUrl: String): Boolean = { + if (!lostMasters.contains(masterUrl)) { + logWarning(s"Unable to connect to server ${masterUrl}.") + lostMasters += masterUrl } lostMasters.size >= masters.size } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala index 88f9d880ac209..83f98617777e4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala @@ -105,7 +105,7 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) { if (masters != null) { // Two positional arguments were given printUsageAndExit(1) } - masters = value.stripPrefix("spark://").split(",").map("spark://" + _) + masters = Utils.splitMasterAdress(value) parse(tail) case Nil => diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0b5a914e7dbbf..3190cd4e2a60c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2058,6 +2058,11 @@ private[spark] object Utils extends Logging { .getOrElse(UserGroupInformation.getCurrentUser().getUserName()) } + def splitMasterAdress(masterAddr: String): Array[String] = { + masterAddr.stripPrefix("spark://").split(",").map("spark://" + _) + } + + val MASTER_NOT_ALIVE_STRING = "Current state is not alive: " } /** From e4f4ece5efa8096f06aff918b1b7fc1311445768 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Tue, 7 Apr 2015 22:30:40 +0800 Subject: [PATCH 05/10] fix failed test --- .../apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala index 2e7201b58c7ac..806f80824cc0e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala @@ -28,6 +28,7 @@ import com.google.common.base.Charsets import org.scalatest.{BeforeAndAfterEach, FunSuite} import org.json4s.JsonAST._ import org.json4s.jackson.JsonMethods._ +import com.fasterxml.jackson.core.JsonParseException import org.apache.spark._ import org.apache.spark.util.{AkkaUtils, Utils} @@ -352,7 +353,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { // server returns malformed response unwittingly // client should throw an appropriate exception to indicate server failure val conn1 = sendHttpRequest(submitRequestPath, "POST", json) - intercept[SubmitRestProtocolException] { client.readResponse(conn1) } + intercept[JsonParseException] { client.readResponse(conn1) } // server attempts to send invalid response, but fails internally on validation // client should receive an error response as server is able to recover val conn2 = sendHttpRequest(killRequestPath, "POST") @@ -362,7 +363,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { // server explodes internally beyond recovery // client should throw an appropriate exception to indicate server failure val conn3 = sendHttpRequest(statusRequestPath, "GET") - intercept[SubmitRestProtocolException] { client.readResponse(conn3) } // empty response + intercept[JsonParseException] { client.readResponse(conn3) } // empty response assert(conn3.getResponseCode === HttpServletResponse.SC_INTERNAL_SERVER_ERROR) } From 9d636be07020a1767e023e5ee7d29cad9f51b332 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Thu, 16 Apr 2015 11:11:44 +0800 Subject: [PATCH 06/10] per Andrew's comments --- .../org/apache/spark/deploy/Client.scala | 15 +-- .../apache/spark/deploy/ClientArguments.scala | 4 +- .../apache/spark/deploy/master/Master.scala | 6 +- .../deploy/rest/StandaloneRestClient.scala | 105 ++++++++---------- .../spark/deploy/worker/WorkerArguments.scala | 2 +- .../scala/org/apache/spark/util/Utils.scala | 16 ++- .../rest/StandaloneRestSubmitSuite.scala | 2 +- 7 files changed, 77 insertions(+), 73 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 29cb1348e4845..ef76de426ea3f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -33,7 +33,7 @@ import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils} /** * Proxy that relays messages to the driver. * - * Now we don't support retry in case submission failed. In HA mode, client will submit request to + * We currently don't support retry if submission fails. In HA mode, client will submit request to * all masters and see which one could handle it. */ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) @@ -107,10 +107,8 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) val statusResponse = Await.result(statusFuture, timeout) statusResponse.found match { case false => - statusResponse.exception.getOrElse { - println(s"ERROR: Cluster master did not recognize $driverId") - System.exit(-1) - } + println(s"ERROR: Cluster master did not recognize $driverId") + System.exit(-1) case true => println(s"State of $driverId is ${statusResponse.state.get}") // Worker node, if present @@ -136,7 +134,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) if (success) { activeMasterActor = context.actorSelection(sender.path) pollAndReportStatus(driverId.get) - } else if (!message.startsWith(Utils.MASTER_NOT_ALIVE_STRING)) { + } else if (!Utils.responseFromBackup(message)) { System.exit(-1) } @@ -146,7 +144,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) if (success) { activeMasterActor = context.actorSelection(sender.path) pollAndReportStatus(driverId) - } else if (!message.startsWith(Utils.MASTER_NOT_ALIVE_STRING)) { + } else if (!Utils.responseFromBackup(message)) { System.exit(-1) } @@ -154,6 +152,9 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) if (!lostMasters.contains(remoteAddress)) { println(s"Error connecting to master $remoteAddress.") lostMasters += remoteAddress + // Note that this heuristic does not account for the fact that a Master can recover within + // the lifetime of this client. Thus, once a Master is lost it is lost to us forever. This + // is not currently a concern, however, because this client does not retry submissions. if (lostMasters.size >= masterActors.size) { println("No master is available, exiting.") System.exit(-1) diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala index e2ae84c97049c..316e2d59f01b8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala @@ -79,13 +79,13 @@ private[deploy] class ClientArguments(args: Array[String]) { } jarUrl = _jarUrl - masters = Utils.splitMasterAdress(_master) + masters = Utils.parseStandaloneMasterUrls(_master) mainClass = _mainClass _driverOptions ++= tail case "kill" :: _master :: _driverId :: tail => cmd = "kill" - masters = Utils.splitMasterAdress(_master) + masters = Utils.parseStandaloneMasterUrls(_master) driverId = _driverId case _ => diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 9d699ec83492e..7171e925b62bc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -254,7 +254,7 @@ private[master] class Master( case RequestSubmitDriver(description) => { if (state != RecoveryState.ALIVE) { - val msg = s"${Utils.MASTER_NOT_ALIVE_STRING}$state. " + + val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + "Can only accept driver submissions in ALIVE state." sender ! SubmitDriverResponse(false, None, msg) } else { @@ -275,7 +275,7 @@ private[master] class Master( case RequestKillDriver(driverId) => { if (state != RecoveryState.ALIVE) { - val msg = s"${Utils.MASTER_NOT_ALIVE_STRING}$state. Can only kill drivers in ALIVE state." + val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. Can only kill drivers in ALIVE state." sender ! KillDriverResponse(driverId, success = false, msg) } else { logInfo("Asked to kill driver " + driverId) @@ -307,7 +307,7 @@ private[master] class Master( case RequestDriverStatus(driverId) => { if (state != RecoveryState.ALIVE) { - val msg = s"${Utils.MASTER_NOT_ALIVE_STRING}$state. " + + val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + "Can only request driver status in ALIVE state." sender ! DriverStatusResponse(found = false, None, None, None, Some(new Exception(msg))) } else { diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala index c52a5709c3994..1a25c7938c781 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala @@ -53,15 +53,14 @@ import org.apache.spark.util.Utils * is a mismatch, the server will respond with the highest protocol version it supports. A future * implementation of this client can use that information to retry using the version specified * by the server. - * - * Now we don't support retry in case submission failed. In HA mode, client will submit request to - * all masters and see which one could handle it. */ private[deploy] class StandaloneRestClient(master: String) extends Logging { import StandaloneRestClient._ - private val masters: Array[String] = Utils.splitMasterAdress(master) + private val masters: Array[String] = Utils.parseStandaloneMasterUrls(master) + // Set of masters that lost contact with us, used to keep track of + // whether there are masters still alive for us to communicate with private val lostMasters = new mutable.HashSet[String] /** @@ -73,9 +72,9 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging { private[rest] def createSubmission( request: CreateSubmissionRequest): SubmitRestProtocolResponse = { logInfo(s"Submitting a request to launch an application in $master.") - var suc: Boolean = false + var handled: Boolean = false var response: SubmitRestProtocolResponse = null - for (m <- masters if !suc) { + for (m <- masters if !handled) { validateMaster(m) val url = getSubmitUrl(m) try { @@ -85,22 +84,17 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging { if (s.success) { reportSubmissionStatus(s) handleRestResponse(s) - suc = true + handled = true } case unexpected => handleUnexpectedRestResponse(unexpected) } } catch { case unreachable @ (_: FileNotFoundException | _: SocketException | _: ConnectException) => - if(handleConnectionException(m)) { + if (handleConnectionException(m)) { throw new SubmitRestConnectionException( s"Unable to connect to server", unreachable) } - case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) => - if(handleConnectionException(m)) { - throw new SubmitRestProtocolException( - "Malformed response received from server", malformed) - } } } response @@ -109,33 +103,28 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging { /** Request that the server kill the specified submission. */ def killSubmission(submissionId: String): SubmitRestProtocolResponse = { logInfo(s"Submitting a request to kill submission $submissionId in $master.") - var suc: Boolean = false + var handled: Boolean = false var response: SubmitRestProtocolResponse = null - for (m <- masters if !suc) { + for (m <- masters if !handled) { validateMaster(m) val url = getKillUrl(m, submissionId) try { response = post(url) response match { case k: KillSubmissionResponse => - if (!k.message.startsWith(Utils.MASTER_NOT_ALIVE_STRING)) { + if (!Utils.responseFromBackup(k.message)) { handleRestResponse(k) - suc = true + handled = true } case unexpected => handleUnexpectedRestResponse(unexpected) } } catch { case unreachable @ (_: FileNotFoundException | _: SocketException | _: ConnectException) => - if(handleConnectionException(m)) { + if (handleConnectionException(m)) { throw new SubmitRestConnectionException( s"Unable to connect to server", unreachable) } - case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) => - if(handleConnectionException(m)) { - throw new SubmitRestProtocolException( - "Malformed response received from server", malformed) - } } } response @@ -146,9 +135,9 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging { submissionId: String, quiet: Boolean = false): SubmitRestProtocolResponse = { logInfo(s"Submitting a request for the status of submission $submissionId in $master.") - var suc: Boolean = false + var handled: Boolean = false var response: SubmitRestProtocolResponse = null - for (m <- masters) { + for (m <- masters if !handled) { validateMaster(m) val url = getStatusUrl(m, submissionId) try { @@ -158,21 +147,16 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging { if (!quiet) { handleRestResponse(s) } - suc = true + handled = true case unexpected => handleUnexpectedRestResponse(unexpected) } } catch { case unreachable @ (_: FileNotFoundException | _: SocketException | _: ConnectException) => - if(handleConnectionException(m)) { + if (handleConnectionException(m)) { throw new SubmitRestConnectionException( s"Unable to connect to server", unreachable) } - case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) => - if(handleConnectionException(m)) { - throw new SubmitRestProtocolException( - "Malformed response received from server", malformed) - } } } response @@ -235,30 +219,35 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging { * Exposed for testing. */ private[rest] def readResponse(connection: HttpURLConnection): SubmitRestProtocolResponse = { - val dataStream = - if (connection.getResponseCode == HttpServletResponse.SC_OK) { - connection.getInputStream - } else { - connection.getErrorStream + try { + val dataStream = + if (connection.getResponseCode == HttpServletResponse.SC_OK) { + connection.getInputStream + } else { + connection.getErrorStream + } + // If the server threw an exception while writing a response, it will not have a body + if (dataStream == null) { + throw new SubmitRestProtocolException("Server returned empty body") } - // If the server threw an exception while writing a response, it will not have a body - if (dataStream == null) { - throw new SubmitRestProtocolException("Server returned empty body") - } - val responseJson = Source.fromInputStream(dataStream).mkString - logDebug(s"Response from the server:\n$responseJson") - val response = SubmitRestProtocolMessage.fromJson(responseJson) - response.validate() - response match { - // If the response is an error, log the message - case error: ErrorResponse => - logError(s"Server responded with error:\n${error.message}") - error - // Otherwise, simply return the response - case response: SubmitRestProtocolResponse => response - case unexpected => - throw new SubmitRestProtocolException( - s"Message received from server was not a response:\n${unexpected.toJson}") + val responseJson = Source.fromInputStream(dataStream).mkString + logDebug(s"Response from the server:\n$responseJson") + val response = SubmitRestProtocolMessage.fromJson(responseJson) + response.validate() + response match { + // If the response is an error, log the message + case error: ErrorResponse => + logError(s"Server responded with error:\n${error.message}") + error + // Otherwise, simply return the response + case response: SubmitRestProtocolResponse => response + case unexpected => + throw new SubmitRestProtocolException( + s"Message received from server was not a response:\n${unexpected.toJson}") + } + } catch { + case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) => + throw new SubmitRestProtocolException("Malformed response received from server", malformed) } } @@ -357,7 +346,11 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging { } /** - * When a connection exception was caught, we see whether all masters are lost. + * When a connection exception is caught, return true if all masters are lost. + * Note that the heuristic used here does not take into account that masters + * can recover during the lifetime of this client. This assumption should be + * harmless because this client currently does not support retrying submission + * on failure yet (SPARK-6443). */ private def handleConnectionException(masterUrl: String): Boolean = { if (!lostMasters.contains(masterUrl)) { diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala index 83f98617777e4..9678631da9f6f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala @@ -105,7 +105,7 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) { if (masters != null) { // Two positional arguments were given printUsageAndExit(1) } - masters = Utils.splitMasterAdress(value) + masters = Utils.parseStandaloneMasterUrls(value) parse(tail) case Nil => diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index a7bf8c2215c86..64eb8a31958b9 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2106,11 +2106,21 @@ private[spark] object Utils extends Logging { .getOrElse(UserGroupInformation.getCurrentUser().getShortUserName()) } - def splitMasterAdress(masterAddr: String): Array[String] = { - masterAddr.stripPrefix("spark://").split(",").map("spark://" + _) + /** + * Split the comma delimited string of master URLs into a list. + * For instance, "spark://abc,def" becomes [spark://abc, spark://def]. + */ + def parseStandaloneMasterUrls(masterUrls: String): Array[String] = { + masterUrls.stripPrefix("spark://").split(",").map("spark://" + _) } - val MASTER_NOT_ALIVE_STRING = "Current state is not alive: " + /** An identifier that backup masters use in their responses. */ + val BACKUP_STANDALONE_MASTER_PREFIX = "Current state is not alive" + + /** Return true if the response message is sent from a backup Master on standby. */ + def responseFromBackup(msg: String): Boolean = { + message.startsWith(BACKUP_STANDALONE_MASTER_PREFIX) + } } /** diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala index 806f80824cc0e..036d656899fd3 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala @@ -24,11 +24,11 @@ import javax.servlet.http.HttpServletResponse import scala.collection.mutable import akka.actor.{Actor, ActorRef, ActorSystem, Props} +import com.fasterxml.jackson.core.JsonParseException import com.google.common.base.Charsets import org.scalatest.{BeforeAndAfterEach, FunSuite} import org.json4s.JsonAST._ import org.json4s.jackson.JsonMethods._ -import com.fasterxml.jackson.core.JsonParseException import org.apache.spark._ import org.apache.spark.util.{AkkaUtils, Utils} From 35119a07bd1d4097f202645540f66e989d2384b1 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Thu, 16 Apr 2015 12:06:53 +0800 Subject: [PATCH 07/10] style and compile issues --- .../src/main/scala/org/apache/spark/deploy/master/Master.scala | 3 ++- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 7171e925b62bc..c7c2de85099fb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -275,7 +275,8 @@ private[master] class Master( case RequestKillDriver(driverId) => { if (state != RecoveryState.ALIVE) { - val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. Can only kill drivers in ALIVE state." + val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + + s"Can only kill drivers in ALIVE state." sender ! KillDriverResponse(driverId, success = false, msg) } else { logInfo("Asked to kill driver " + driverId) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 64eb8a31958b9..f67ce2b7be15c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2119,7 +2119,7 @@ private[spark] object Utils extends Logging { /** Return true if the response message is sent from a backup Master on standby. */ def responseFromBackup(msg: String): Boolean = { - message.startsWith(BACKUP_STANDALONE_MASTER_PREFIX) + msg.startsWith(BACKUP_STANDALONE_MASTER_PREFIX) } } From 220cb3cf4c475b3ee279c4b170201cebc7574db6 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Thu, 16 Apr 2015 17:22:19 +0800 Subject: [PATCH 08/10] move connect exception inside --- .../deploy/rest/StandaloneRestClient.scala | 33 +++++++++++-------- .../rest/StandaloneRestSubmitSuite.scala | 5 ++- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala index 1a25c7938c781..63f5d36be0a36 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala @@ -90,10 +90,9 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging { handleUnexpectedRestResponse(unexpected) } } catch { - case unreachable @ (_: FileNotFoundException | _: SocketException | _: ConnectException) => + case e: SubmitRestConnectionException => if (handleConnectionException(m)) { - throw new SubmitRestConnectionException( - s"Unable to connect to server", unreachable) + throw new SubmitRestConnectionException("Unable to connect to server", e) } } } @@ -120,10 +119,9 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging { handleUnexpectedRestResponse(unexpected) } } catch { - case unreachable @ (_: FileNotFoundException | _: SocketException | _: ConnectException) => + case e: SubmitRestConnectionException => if (handleConnectionException(m)) { - throw new SubmitRestConnectionException( - s"Unable to connect to server", unreachable) + throw new SubmitRestConnectionException("Unable to connect to server", e) } } } @@ -135,6 +133,7 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging { submissionId: String, quiet: Boolean = false): SubmitRestProtocolResponse = { logInfo(s"Submitting a request for the status of submission $submissionId in $master.") + var handled: Boolean = false var response: SubmitRestProtocolResponse = null for (m <- masters if !handled) { @@ -152,10 +151,9 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging { handleUnexpectedRestResponse(unexpected) } } catch { - case unreachable @ (_: FileNotFoundException | _: SocketException | _: ConnectException) => + case e: SubmitRestConnectionException => if (handleConnectionException(m)) { - throw new SubmitRestConnectionException( - s"Unable to connect to server", unreachable) + throw new SubmitRestConnectionException("Unable to connect to server", e) } } } @@ -204,11 +202,16 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging { conn.setRequestProperty("Content-Type", "application/json") conn.setRequestProperty("charset", "utf-8") conn.setDoOutput(true) - val out = new DataOutputStream(conn.getOutputStream) - Utils.tryWithSafeFinally { - out.write(json.getBytes(Charsets.UTF_8)) - } { - out.close() + try { + val out = new DataOutputStream(conn.getOutputStream) + Utils.tryWithSafeFinally { + out.write(json.getBytes(Charsets.UTF_8)) + } { + out.close() + } + } catch { + case e: ConnectException => + throw new SubmitRestConnectionException("Connect Exception when connect to server", e) } readResponse(conn) } @@ -246,6 +249,8 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging { s"Message received from server was not a response:\n${unexpected.toJson}") } } catch { + case unreachable @ (_: FileNotFoundException | _: SocketException) => + throw new SubmitRestConnectionException("Unable to connect to server", unreachable) case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) => throw new SubmitRestProtocolException("Malformed response received from server", malformed) } diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala index 036d656899fd3..2e7201b58c7ac 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala @@ -24,7 +24,6 @@ import javax.servlet.http.HttpServletResponse import scala.collection.mutable import akka.actor.{Actor, ActorRef, ActorSystem, Props} -import com.fasterxml.jackson.core.JsonParseException import com.google.common.base.Charsets import org.scalatest.{BeforeAndAfterEach, FunSuite} import org.json4s.JsonAST._ @@ -353,7 +352,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { // server returns malformed response unwittingly // client should throw an appropriate exception to indicate server failure val conn1 = sendHttpRequest(submitRequestPath, "POST", json) - intercept[JsonParseException] { client.readResponse(conn1) } + intercept[SubmitRestProtocolException] { client.readResponse(conn1) } // server attempts to send invalid response, but fails internally on validation // client should receive an error response as server is able to recover val conn2 = sendHttpRequest(killRequestPath, "POST") @@ -363,7 +362,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { // server explodes internally beyond recovery // client should throw an appropriate exception to indicate server failure val conn3 = sendHttpRequest(statusRequestPath, "GET") - intercept[JsonParseException] { client.readResponse(conn3) } // empty response + intercept[SubmitRestProtocolException] { client.readResponse(conn3) } // empty response assert(conn3.getResponseCode === HttpServletResponse.SC_INTERNAL_SERVER_ERROR) } From f4f972ba31c021ecbbdb957033b9368d5c2e3640 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Wed, 22 Apr 2015 11:16:25 +0800 Subject: [PATCH 09/10] rebase...again --- core/src/main/scala/org/apache/spark/util/Utils.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index e206e5bf3aca2..9edceb0331b4d 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2137,7 +2137,6 @@ private[spark] object Utils extends Logging { } /** -<<<<<<< HEAD * Split the comma delimited string of master URLs into a list. * For instance, "spark://abc,def" becomes [spark://abc, spark://def]. */ @@ -2152,7 +2151,8 @@ private[spark] object Utils extends Logging { def responseFromBackup(msg: String): Boolean = { msg.startsWith(BACKUP_STANDALONE_MASTER_PREFIX) } -======= + + /** * Adds a shutdown hook with default priority. * * @param hook The code to run during shutdown. @@ -2248,7 +2248,6 @@ private class SparkShutdownHook(private val priority: Int, hook: () => Unit) def run(): Unit = hook() ->>>>>>> master } /** From 2a28aab2e2cb17891642d0076dd8d9fb9fcfedc5 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Wed, 29 Apr 2015 14:15:18 +0800 Subject: [PATCH 10/10] based the newest change https://github.com/apache/spark/pull/5144 --- .../rest/StandaloneRestSubmitSuite.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala index 505b843044294..f4d548d9e7720 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala @@ -51,7 +51,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { val appArgs = Array("one", "two", "three") val sparkProperties = Map("spark.app.name" -> "pi") val environmentVariables = Map("SPARK_ONE" -> "UN", "SPARK_TWO" -> "DEUX") - val request = new StandaloneRestClient("spark://host:port").constructSubmitRequest( + val request = new RestSubmissionClient("spark://host:port").constructSubmitRequest( "my-app-resource", "my-main-class", appArgs, sparkProperties, environmentVariables) assert(request.action === Utils.getFormattedClassName(request)) assert(request.clientSparkVersion === SPARK_VERSION) @@ -70,7 +70,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { val request = constructSubmitRequest(masterUrl, appArgs) assert(request.appArgs === appArgs) assert(request.sparkProperties("spark.master") === masterUrl) - val response = new StandaloneRestClient(masterUrl).createSubmission(request) + val response = new RestSubmissionClient(masterUrl).createSubmission(request) val submitResponse = getSubmitResponse(response) assert(submitResponse.action === Utils.getFormattedClassName(submitResponse)) assert(submitResponse.serverSparkVersion === SPARK_VERSION) @@ -101,7 +101,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { val submissionId = "my-lyft-driver" val killMessage = "your driver is killed" val masterUrl = startDummyServer(killMessage = killMessage) - val response = new StandaloneRestClient(masterUrl).killSubmission(submissionId) + val response = new RestSubmissionClient(masterUrl).killSubmission(submissionId) val killResponse = getKillResponse(response) assert(killResponse.action === Utils.getFormattedClassName(killResponse)) assert(killResponse.serverSparkVersion === SPARK_VERSION) @@ -115,7 +115,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { val submissionState = KILLED val submissionException = new Exception("there was an irresponsible mix of alcohol and cars") val masterUrl = startDummyServer(state = submissionState, exception = Some(submissionException)) - val response = new StandaloneRestClient(masterUrl).requestSubmissionStatus(submissionId) + val response = new RestSubmissionClient(masterUrl).requestSubmissionStatus(submissionId) val statusResponse = getStatusResponse(response) assert(statusResponse.action === Utils.getFormattedClassName(statusResponse)) assert(statusResponse.serverSparkVersion === SPARK_VERSION) @@ -128,7 +128,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { test("create then kill") { val masterUrl = startSmartServer() val request = constructSubmitRequest(masterUrl) - val client = new StandaloneRestClient(masterUrl) + val client = new RestSubmissionClient(masterUrl) val response1 = client.createSubmission(request) val submitResponse = getSubmitResponse(response1) assert(submitResponse.success) @@ -144,7 +144,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { test("create then request status") { val masterUrl = startSmartServer() val request = constructSubmitRequest(masterUrl) - val client = new StandaloneRestClient(masterUrl) + val client = new RestSubmissionClient(masterUrl) val response1 = client.createSubmission(request) val submitResponse = getSubmitResponse(response1) assert(submitResponse.success) @@ -161,7 +161,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { test("create then kill then request status") { val masterUrl = startSmartServer() val request = constructSubmitRequest(masterUrl) - val client = new StandaloneRestClient(masterUrl) + val client = new RestSubmissionClient(masterUrl) val response1 = client.createSubmission(request) val response2 = client.createSubmission(request) val submitResponse1 = getSubmitResponse(response1) @@ -191,7 +191,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { test("kill or request status before create") { val masterUrl = startSmartServer() val doesNotExist = "does-not-exist" - val client = new StandaloneRestClient(masterUrl) + val client = new RestSubmissionClient(masterUrl) // kill a non-existent submission val response1 = client.killSubmission(doesNotExist) val killResponse = getKillResponse(response1) @@ -342,7 +342,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { test("client handles faulty server") { val masterUrl = startFaultyServer() - val client = new StandaloneRestClient(masterUrl) + val client = new RestSubmissionClient(masterUrl) val httpUrl = masterUrl.replace("spark://", "http://") val v = RestSubmissionServer.PROTOCOL_VERSION val submitRequestPath = s"$httpUrl/$v/submissions/create" @@ -429,7 +429,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { mainJar) ++ appArgs val args = new SparkSubmitArguments(commandLineArgs) val (_, _, sparkProperties, _) = SparkSubmit.prepareSubmitEnvironment(args) - new StandaloneRestClient("spark://host:port").constructSubmitRequest( + new RestSubmissionClient("spark://host:port").constructSubmitRequest( mainJar, mainClass, appArgs, sparkProperties.toMap, Map.empty) } @@ -496,7 +496,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { method: String, body: String = ""): (SubmitRestProtocolResponse, Int) = { val conn = sendHttpRequest(url, method, body) - (new StandaloneRestClient("spark://host:port").readResponse(conn), conn.getResponseCode) + (new RestSubmissionClient("spark://host:port").readResponse(conn), conn.getResponseCode) } }