@@ -23,12 +23,14 @@ import com.fasterxml.jackson.databind.ObjectMapper
2323import com .fasterxml .jackson .module .scala .DefaultScalaModule
2424import com .google .common .io .ByteStreams
2525import okhttp3 .{RequestBody , ResponseBody }
26+ import org .eclipse .jetty .server .Server
2627import org .scalatest .BeforeAndAfter
2728import org .scalatest .mock .MockitoSugar .mock
2829import retrofit2 .Call
2930
3031import org .apache .spark .{SparkFunSuite , SSLOptions }
3132import org .apache .spark .deploy .kubernetes .SSLUtils
33+ import org .apache .spark .internal .Logging
3234import org .apache .spark .util .Utils
3335
3436/**
@@ -40,30 +42,37 @@ import org.apache.spark.util.Utils
4042 * we've configured the Jetty server correctly and that the endpoints reached over HTTP can
4143 * receive streamed uploads and can stream downloads.
4244 */
43- class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter {
45+ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter with Logging {
46+
47+ private val MAX_SERVER_START_ATTEMPTS = 5
4448 private var serviceImpl : ResourceStagingService = _
4549 private var stagedResourcesCleaner : StagedResourcesCleaner = _
46- private var server : ResourceStagingServer = _
50+ private var server : Option [ ResourceStagingServer ] = None
4751 private val OBJECT_MAPPER = new ObjectMapper ().registerModule(new DefaultScalaModule )
4852
49- private val serverPort = new ServerSocket (0 ).getLocalPort
50-
5153 private val sslOptionsProvider = new SettableReferenceSslOptionsProvider ()
5254
5355 before {
5456 stagedResourcesCleaner = mock[StagedResourcesCleaner ]
5557 serviceImpl = new ResourceStagingServiceImpl (
5658 new StagedResourcesStoreImpl (Utils .createTempDir()), stagedResourcesCleaner)
57- server = new ResourceStagingServer (serverPort, serviceImpl, sslOptionsProvider)
5859 }
5960
6061 after {
61- server.stop()
62+ server.foreach { s =>
63+ try {
64+ s.stop()
65+ } catch {
66+ case e : Throwable =>
67+ log.warn(" Failed to stop the resource staging server." , e)
68+ }
69+ }
70+ server = None
6271 }
6372
6473 test(" Accept file and jar uploads and downloads" ) {
65- server.start ()
66- runUploadAndDownload(SSLOptions ())
74+ val serverPort = startServer ()
75+ runUploadAndDownload(SSLOptions (), serverPort )
6776 }
6877
6978 test(" Enable SSL on the server" ) {
@@ -80,11 +89,11 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter {
8089 trustStore = Some (keyStoreAndTrustStore.trustStore),
8190 trustStorePassword = Some (" trustStore" ))
8291 sslOptionsProvider.setOptions(sslOptions)
83- server.start ()
84- runUploadAndDownload(sslOptions)
92+ val serverPort = startServer ()
93+ runUploadAndDownload(sslOptions, serverPort )
8594 }
8695
87- private def runUploadAndDownload (sslOptions : SSLOptions ): Unit = {
96+ private def runUploadAndDownload (sslOptions : SSLOptions , serverPort : Int ): Unit = {
8897 val scheme = if (sslOptions.enabled) " https" else " http"
8998 val retrofitService = RetrofitClientFactoryImpl .createRetrofitClient(
9099 s " $scheme://127.0.0.1: $serverPort/ " ,
@@ -125,6 +134,44 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter {
125134 val downloadedBytes = ByteStreams .toByteArray(responseBody.byteStream())
126135 assert(downloadedBytes.toSeq === bytes)
127136 }
137+
138+ private def startServer (): Int = {
139+ var currentAttempt = 0
140+ var successfulStart = false
141+ var latestServerPort = new ServerSocket (0 ).getLocalPort
142+ while (currentAttempt < MAX_SERVER_START_ATTEMPTS && ! successfulStart) {
143+ val newServer = new ResourceStagingServer (latestServerPort, serviceImpl, sslOptionsProvider)
144+ try {
145+ newServer.start()
146+ successfulStart = true
147+ server = Some (newServer)
148+ } catch {
149+ case e : Throwable =>
150+ try {
151+ newServer.stop()
152+ } catch {
153+ case e1 : Throwable =>
154+ log.warn(" Failed to stop a resource staging server that failed to start." , e1)
155+ }
156+
157+ if (Utils .isBindCollision(e)) {
158+ currentAttempt += 1
159+ latestServerPort = latestServerPort + 1
160+ if (currentAttempt == MAX_SERVER_START_ATTEMPTS ) {
161+ throw new RuntimeException (s " Failed to bind to a random port " +
162+ s " $MAX_SERVER_START_ATTEMPTS times. Last attempted port: $latestServerPort" , e)
163+ } else {
164+ logWarning(s " Attempt $currentAttempt/ $MAX_SERVER_START_ATTEMPTS failed to start " +
165+ s " server on port $latestServerPort. " , e)
166+ }
167+ } else {
168+ throw e
169+ }
170+ }
171+ }
172+ logInfo(s " Started resource staging server on port $latestServerPort. " )
173+ latestServerPort
174+ }
128175}
129176
130177private class SettableReferenceSslOptionsProvider extends ResourceStagingServerSslOptionsProvider {
0 commit comments