From 7d34d43704e0245d73e50de7fc4c0cb669a21872 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 18 Jul 2017 15:59:38 -0700 Subject: [PATCH 01/10] Retry binding server to random port in the resource staging server test. --- .../ResourceStagingServerSuite.scala | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala index 0c0908da20d89..e4dea7bbdb7a2 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala @@ -29,6 +29,7 @@ import retrofit2.Call import org.apache.spark.{SparkFunSuite, SSLOptions} import org.apache.spark.deploy.kubernetes.SSLUtils +import org.apache.spark.internal.Logging import org.apache.spark.util.Utils /** @@ -40,13 +41,15 @@ import org.apache.spark.util.Utils * we've configured the Jetty server correctly and that the endpoints reached over HTTP can * receive streamed uploads and can stream downloads. */ -class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter { +class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter with Logging { + + private val MAX_SERVER_START_ATTEMPTS = 5 private var serviceImpl: ResourceStagingService = _ private var stagedResourcesCleaner: StagedResourcesCleaner = _ private var server: ResourceStagingServer = _ private val OBJECT_MAPPER = new ObjectMapper().registerModule(new DefaultScalaModule) - private val serverPort = new ServerSocket(0).getLocalPort + private var serverPort : Int = _ private val sslOptionsProvider = new SettableReferenceSslOptionsProvider() @@ -54,7 +57,22 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter { stagedResourcesCleaner = mock[StagedResourcesCleaner] serviceImpl = new ResourceStagingServiceImpl( new StagedResourcesStoreImpl(Utils.createTempDir()), stagedResourcesCleaner) - server = new ResourceStagingServer(serverPort, serviceImpl, sslOptionsProvider) + for (i <- 1 to MAX_SERVER_START_ATTEMPTS) { + serverPort = new ServerSocket(0).getLocalPort + try { + server = new ResourceStagingServer(serverPort, serviceImpl, sslOptionsProvider) + } catch { + case e: Exception if Utils.isBindCollision(e) => + if (i == MAX_SERVER_START_ATTEMPTS) { + throw new RuntimeException(s"Failed to bind to a random port" + + s" $MAX_SERVER_START_ATTEMPTS times. Last attempted port: $serverPort", e) + } else { + logWarning(s"Attempt $i/$MAX_SERVER_START_ATTEMPTS failed to start server on" + + s" port $serverPort.", e) + } + } + logInfo(s"Started resource staging server on port $serverPort.") + } } after { From 30148d5732c8fc5d9b26e6477abc5cbbe4c47dab Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 18 Jul 2017 16:09:56 -0700 Subject: [PATCH 02/10] Break if successful start --- .../kubernetes/ResourceStagingServerSuite.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala index e4dea7bbdb7a2..08fc527619b71 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala @@ -57,22 +57,26 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter with stagedResourcesCleaner = mock[StagedResourcesCleaner] serviceImpl = new ResourceStagingServiceImpl( new StagedResourcesStoreImpl(Utils.createTempDir()), stagedResourcesCleaner) - for (i <- 1 to MAX_SERVER_START_ATTEMPTS) { + var currentAttempt = 0 + var successfulStart = false + while (currentAttempt < MAX_SERVER_START_ATTEMPTS && !successfulStart) { serverPort = new ServerSocket(0).getLocalPort try { server = new ResourceStagingServer(serverPort, serviceImpl, sslOptionsProvider) + successfulStart = true } catch { case e: Exception if Utils.isBindCollision(e) => - if (i == MAX_SERVER_START_ATTEMPTS) { + currentAttempt += 1 + if (currentAttempt == MAX_SERVER_START_ATTEMPTS) { throw new RuntimeException(s"Failed to bind to a random port" + s" $MAX_SERVER_START_ATTEMPTS times. Last attempted port: $serverPort", e) } else { - logWarning(s"Attempt $i/$MAX_SERVER_START_ATTEMPTS failed to start server on" + - s" port $serverPort.", e) + logWarning(s"Attempt $currentAttempt/$MAX_SERVER_START_ATTEMPTS failed to start" + + s" server on port $serverPort.", e) } } - logInfo(s"Started resource staging server on port $serverPort.") } + logInfo(s"Started resource staging server on port $serverPort.") } after { From c6fc1240ff40298aee8aa949d5318f2b4d21a285 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 18 Jul 2017 17:06:28 -0700 Subject: [PATCH 03/10] Start server in try block. --- .../ResourceStagingServerSuite.scala | 68 +++++++++++-------- 1 file changed, 38 insertions(+), 30 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala index 08fc527619b71..b84e8f43d15cf 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala @@ -23,11 +23,12 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.google.common.io.ByteStreams import okhttp3.{RequestBody, ResponseBody} +import org.eclipse.jetty.server.Server import org.scalatest.BeforeAndAfter import org.scalatest.mock.MockitoSugar.mock import retrofit2.Call -import org.apache.spark.{SparkFunSuite, SSLOptions} +import org.apache.spark.{SSLOptions, SparkFunSuite} import org.apache.spark.deploy.kubernetes.SSLUtils import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -46,46 +47,26 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter with private val MAX_SERVER_START_ATTEMPTS = 5 private var serviceImpl: ResourceStagingService = _ private var stagedResourcesCleaner: StagedResourcesCleaner = _ - private var server: ResourceStagingServer = _ + private var server: Option[ResourceStagingServer] = None private val OBJECT_MAPPER = new ObjectMapper().registerModule(new DefaultScalaModule) - private var serverPort : Int = _ - private val sslOptionsProvider = new SettableReferenceSslOptionsProvider() before { stagedResourcesCleaner = mock[StagedResourcesCleaner] serviceImpl = new ResourceStagingServiceImpl( new StagedResourcesStoreImpl(Utils.createTempDir()), stagedResourcesCleaner) - var currentAttempt = 0 - var successfulStart = false - while (currentAttempt < MAX_SERVER_START_ATTEMPTS && !successfulStart) { - serverPort = new ServerSocket(0).getLocalPort - try { - server = new ResourceStagingServer(serverPort, serviceImpl, sslOptionsProvider) - successfulStart = true - } catch { - case e: Exception if Utils.isBindCollision(e) => - currentAttempt += 1 - if (currentAttempt == MAX_SERVER_START_ATTEMPTS) { - throw new RuntimeException(s"Failed to bind to a random port" + - s" $MAX_SERVER_START_ATTEMPTS times. Last attempted port: $serverPort", e) - } else { - logWarning(s"Attempt $currentAttempt/$MAX_SERVER_START_ATTEMPTS failed to start" + - s" server on port $serverPort.", e) - } - } - } - logInfo(s"Started resource staging server on port $serverPort.") + } after { - server.stop() + server.foreach(_.stop()) + server = None } test("Accept file and jar uploads and downloads") { - server.start() - runUploadAndDownload(SSLOptions()) + val serverPort = startServer() + runUploadAndDownload(SSLOptions(), serverPort) } test("Enable SSL on the server") { @@ -102,11 +83,11 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter with trustStore = Some(keyStoreAndTrustStore.trustStore), trustStorePassword = Some("trustStore")) sslOptionsProvider.setOptions(sslOptions) - server.start() - runUploadAndDownload(sslOptions) + val serverPort = startServer() + runUploadAndDownload(sslOptions, serverPort) } - private def runUploadAndDownload(sslOptions: SSLOptions): Unit = { + private def runUploadAndDownload(sslOptions: SSLOptions, serverPort: Int): Unit = { val scheme = if (sslOptions.enabled) "https" else "http" val retrofitService = RetrofitClientFactoryImpl.createRetrofitClient( s"$scheme://127.0.0.1:$serverPort/", @@ -147,6 +128,33 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter with val downloadedBytes = ByteStreams.toByteArray(responseBody.byteStream()) assert(downloadedBytes.toSeq === bytes) } + + private def startServer(): Int = { + var currentAttempt = 0 + var successfulStart = false + var latestServerPort = -1 + while (currentAttempt < MAX_SERVER_START_ATTEMPTS && !successfulStart) { + latestServerPort = new ServerSocket(0).getLocalPort + try { + val newServer = new ResourceStagingServer(latestServerPort, serviceImpl, sslOptionsProvider) + newServer.start() + successfulStart = true + server = Some(newServer) + } catch { + case e: Exception if Utils.isBindCollision(e) => + currentAttempt += 1 + if (currentAttempt == MAX_SERVER_START_ATTEMPTS) { + throw new RuntimeException(s"Failed to bind to a random port" + + s" $MAX_SERVER_START_ATTEMPTS times. Last attempted port: $latestServerPort", e) + } else { + logWarning(s"Attempt $currentAttempt/$MAX_SERVER_START_ATTEMPTS failed to start" + + s" server on port $latestServerPort.", e) + } + } + } + logInfo(s"Started resource staging server on port $latestServerPort.") + latestServerPort + } } private class SettableReferenceSslOptionsProvider extends ResourceStagingServerSslOptionsProvider { From 639c4ba768d71186f615249ab6e078994f96aa35 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 18 Jul 2017 17:08:33 -0700 Subject: [PATCH 04/10] FIx scalastyle --- .../deploy/rest/kubernetes/ResourceStagingServerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala index b84e8f43d15cf..c637b3f35bf10 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala @@ -28,7 +28,7 @@ import org.scalatest.BeforeAndAfter import org.scalatest.mock.MockitoSugar.mock import retrofit2.Call -import org.apache.spark.{SSLOptions, SparkFunSuite} +import org.apache.spark.{SparkFunSuite, SSLOptions} import org.apache.spark.deploy.kubernetes.SSLUtils import org.apache.spark.internal.Logging import org.apache.spark.util.Utils From c2c410d17ba5a2fc9c1480cabecc6a315e90faae Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 18 Jul 2017 17:16:26 -0700 Subject: [PATCH 05/10] More rigorous cleanup logic. Increment port numbers. --- .../ResourceStagingServerSuite.scala | 44 ++++++++++++++----- 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala index c637b3f35bf10..531e045db8d04 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala @@ -60,7 +60,16 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter with } after { - server.foreach(_.stop()) + server.foreach { s => + try { + s.stop() + } catch { + case e: RuntimeException => + log.warn("Failed to stop the resource staging server.", e) + case e: Exception => + log.warn("Failed to stop the resource staging server.", e) + } + } server = None } @@ -132,23 +141,34 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter with private def startServer(): Int = { var currentAttempt = 0 var successfulStart = false - var latestServerPort = -1 + var latestServerPort = new ServerSocket(0).getLocalPort while (currentAttempt < MAX_SERVER_START_ATTEMPTS && !successfulStart) { - latestServerPort = new ServerSocket(0).getLocalPort + val newServer = new ResourceStagingServer(latestServerPort, serviceImpl, sslOptionsProvider) try { - val newServer = new ResourceStagingServer(latestServerPort, serviceImpl, sslOptionsProvider) newServer.start() successfulStart = true server = Some(newServer) } catch { - case e: Exception if Utils.isBindCollision(e) => - currentAttempt += 1 - if (currentAttempt == MAX_SERVER_START_ATTEMPTS) { - throw new RuntimeException(s"Failed to bind to a random port" + - s" $MAX_SERVER_START_ATTEMPTS times. Last attempted port: $latestServerPort", e) - } else { - logWarning(s"Attempt $currentAttempt/$MAX_SERVER_START_ATTEMPTS failed to start" + - s" server on port $latestServerPort.", e) + case e: Exception => + try { + newServer.stop() + } catch { + case e2: RuntimeException => + log.warn("Failed to stop a resource staging server that failed to start.", e2) + case e3: Exception => + log.warn("Failed to stop a resource staging server that failed to start.", e3) + } + + if (Utils.isBindCollision(e)) { + currentAttempt += 1 + latestServerPort = latestServerPort + 1 + if (currentAttempt == MAX_SERVER_START_ATTEMPTS) { + throw new RuntimeException(s"Failed to bind to a random port" + + s" $MAX_SERVER_START_ATTEMPTS times. Last attempted port: $latestServerPort", e) + } else { + logWarning(s"Attempt $currentAttempt/$MAX_SERVER_START_ATTEMPTS failed to start" + + s" server on port $latestServerPort.", e) + } } } } From b8002b9abbfd692f60ddb60f0e7feb2ed01365f6 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 18 Jul 2017 17:19:28 -0700 Subject: [PATCH 06/10] Move around more exception logic. --- .../rest/kubernetes/ResourceStagingServerSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala index 531e045db8d04..1a5134a12c959 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala @@ -64,9 +64,7 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter with try { s.stop() } catch { - case e: RuntimeException => - log.warn("Failed to stop the resource staging server.", e) - case e: Exception => + case e: Throwable => log.warn("Failed to stop the resource staging server.", e) } } @@ -149,7 +147,7 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter with successfulStart = true server = Some(newServer) } catch { - case e: Exception => + case e: Throwable => try { newServer.stop() } catch { @@ -169,6 +167,8 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter with logWarning(s"Attempt $currentAttempt/$MAX_SERVER_START_ATTEMPTS failed to start" + s" server on port $latestServerPort.", e) } + } else { + throw e } } } From 8b3ff07494d3cdc3d7b6cb79ef8537e4894529d0 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 18 Jul 2017 17:20:07 -0700 Subject: [PATCH 07/10] More exception refactoring. --- .../deploy/rest/kubernetes/ResourceStagingServerSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala index 1a5134a12c959..a5bfcb243dd62 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala @@ -151,9 +151,7 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter with try { newServer.stop() } catch { - case e2: RuntimeException => - log.warn("Failed to stop a resource staging server that failed to start.", e2) - case e3: Exception => + case e2: Throwable => log.warn("Failed to stop a resource staging server that failed to start.", e3) } From 68831f8fa47dc95a0e2031a25b550a9341d1ee1c Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 18 Jul 2017 17:21:55 -0700 Subject: [PATCH 08/10] Remove whitespace --- .../deploy/rest/kubernetes/ResourceStagingServerSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala index a5bfcb243dd62..a879f410707f9 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala @@ -56,7 +56,6 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter with stagedResourcesCleaner = mock[StagedResourcesCleaner] serviceImpl = new ResourceStagingServiceImpl( new StagedResourcesStoreImpl(Utils.createTempDir()), stagedResourcesCleaner) - } after { From dbb9fbb370dd9c998d1ee9226e95fa8a474b5227 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 18 Jul 2017 17:23:08 -0700 Subject: [PATCH 09/10] Fix test --- .../deploy/rest/kubernetes/ResourceStagingServerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala index a879f410707f9..bae22fe126b8d 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala @@ -151,7 +151,7 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter with newServer.stop() } catch { case e2: Throwable => - log.warn("Failed to stop a resource staging server that failed to start.", e3) + log.warn("Failed to stop a resource staging server that failed to start.", e2) } if (Utils.isBindCollision(e)) { From 2290891e5182e82031854fa159ca67968eef6e2a Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 18 Jul 2017 17:23:37 -0700 Subject: [PATCH 10/10] Rename variable --- .../deploy/rest/kubernetes/ResourceStagingServerSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala index bae22fe126b8d..1bcd85a611e00 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala @@ -150,8 +150,8 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter with try { newServer.stop() } catch { - case e2: Throwable => - log.warn("Failed to stop a resource staging server that failed to start.", e2) + case e1: Throwable => + log.warn("Failed to stop a resource staging server that failed to start.", e1) } if (Utils.isBindCollision(e)) {