diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala index 0c0908da20d89..1bcd85a611e00 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala @@ -23,12 +23,14 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.google.common.io.ByteStreams import okhttp3.{RequestBody, ResponseBody} +import org.eclipse.jetty.server.Server import org.scalatest.BeforeAndAfter import org.scalatest.mock.MockitoSugar.mock import retrofit2.Call import org.apache.spark.{SparkFunSuite, SSLOptions} import org.apache.spark.deploy.kubernetes.SSLUtils +import org.apache.spark.internal.Logging import org.apache.spark.util.Utils /** @@ -40,30 +42,37 @@ import org.apache.spark.util.Utils * we've configured the Jetty server correctly and that the endpoints reached over HTTP can * receive streamed uploads and can stream downloads. */ -class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter { +class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter with Logging { + + private val MAX_SERVER_START_ATTEMPTS = 5 private var serviceImpl: ResourceStagingService = _ private var stagedResourcesCleaner: StagedResourcesCleaner = _ - private var server: ResourceStagingServer = _ + private var server: Option[ResourceStagingServer] = None private val OBJECT_MAPPER = new ObjectMapper().registerModule(new DefaultScalaModule) - private val serverPort = new ServerSocket(0).getLocalPort - private val sslOptionsProvider = new SettableReferenceSslOptionsProvider() before { stagedResourcesCleaner = mock[StagedResourcesCleaner] serviceImpl = new ResourceStagingServiceImpl( new StagedResourcesStoreImpl(Utils.createTempDir()), stagedResourcesCleaner) - server = new ResourceStagingServer(serverPort, serviceImpl, sslOptionsProvider) } after { - server.stop() + server.foreach { s => + try { + s.stop() + } catch { + case e: Throwable => + log.warn("Failed to stop the resource staging server.", e) + } + } + server = None } test("Accept file and jar uploads and downloads") { - server.start() - runUploadAndDownload(SSLOptions()) + val serverPort = startServer() + runUploadAndDownload(SSLOptions(), serverPort) } test("Enable SSL on the server") { @@ -80,11 +89,11 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter { trustStore = Some(keyStoreAndTrustStore.trustStore), trustStorePassword = Some("trustStore")) sslOptionsProvider.setOptions(sslOptions) - server.start() - runUploadAndDownload(sslOptions) + val serverPort = startServer() + runUploadAndDownload(sslOptions, serverPort) } - private def runUploadAndDownload(sslOptions: SSLOptions): Unit = { + private def runUploadAndDownload(sslOptions: SSLOptions, serverPort: Int): Unit = { val scheme = if (sslOptions.enabled) "https" else "http" val retrofitService = RetrofitClientFactoryImpl.createRetrofitClient( s"$scheme://127.0.0.1:$serverPort/", @@ -125,6 +134,44 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter { val downloadedBytes = ByteStreams.toByteArray(responseBody.byteStream()) assert(downloadedBytes.toSeq === bytes) } + + private def startServer(): Int = { + var currentAttempt = 0 + var successfulStart = false + var latestServerPort = new ServerSocket(0).getLocalPort + while (currentAttempt < MAX_SERVER_START_ATTEMPTS && !successfulStart) { + val newServer = new ResourceStagingServer(latestServerPort, serviceImpl, sslOptionsProvider) + try { + newServer.start() + successfulStart = true + server = Some(newServer) + } catch { + case e: Throwable => + try { + newServer.stop() + } catch { + case e1: Throwable => + log.warn("Failed to stop a resource staging server that failed to start.", e1) + } + + if (Utils.isBindCollision(e)) { + currentAttempt += 1 + latestServerPort = latestServerPort + 1 + if (currentAttempt == MAX_SERVER_START_ATTEMPTS) { + throw new RuntimeException(s"Failed to bind to a random port" + + s" $MAX_SERVER_START_ATTEMPTS times. Last attempted port: $latestServerPort", e) + } else { + logWarning(s"Attempt $currentAttempt/$MAX_SERVER_START_ATTEMPTS failed to start" + + s" server on port $latestServerPort.", e) + } + } else { + throw e + } + } + } + logInfo(s"Started resource staging server on port $latestServerPort.") + latestServerPort + } } private class SettableReferenceSslOptionsProvider extends ResourceStagingServerSslOptionsProvider {