diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index c4012d0e83f7d..71b0f28de9ff5 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -17,20 +17,14 @@ package org.apache.spark.util -import java.io._ -import java.lang.management.ManagementFactory -import java.net._ -import java.nio.ByteBuffer + import java.util.{PriorityQueue, Properties, Locale, Random, UUID} -import java.util.concurrent._ -import javax.net.ssl.HttpsURLConnection -import scala.collection.JavaConversions._ import scala.collection.Map import scala.collection.mutable.ArrayBuffer import scala.io.Source import scala.reflect.ClassTag -import scala.util.{Failure, Success, Try} +import scala.util.{Random, Failure, Success, Try} import scala.util.control.{ControlThrowable, NonFatal} import com.google.common.io.{ByteStreams, Files} @@ -2017,18 +2011,23 @@ private[spark] object Utils extends Logging { conf: SparkConf, serviceName: String = ""): (T, Int) = { + val maxPort = 65536 + val failedPorts = new ArrayBuffer[Int]() require(startPort == 0 || (1024 <= startPort && startPort < 65536), "startPort should be between 1024 and 65535 (inclusive), or 0 for a random free port.") - val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" val maxRetries = portMaxRetries(conf) for (offset <- 0 to maxRetries) { // Do not increment port if startPort is 0, which is treated as a special port val tryPort = if (startPort == 0) { startPort - } else { - // If the new port wraps around, do not try a privilege port - ((startPort + offset - 1024) % (65536 - 1024)) + 1024 + } + else { + // If the new port wraps around, do not try a privilege port or a failedPort + while(!failedPorts.contains((startPort + Math.random() * (maxPort - startPort + 1))) && + (((startPort + offset - 1024) % (65536 - 1024)) + 1024) != (startPort + Math.random() * (maxPort - startPort + 1))) { + (startPort + Math.random() * (maxPort - startPort + 1)) + } } try { val (service, port) = startService(tryPort) @@ -2039,6 +2038,7 @@ private[spark] object Utils extends Logging { if (offset >= maxRetries) { val exceptionMessage = s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!" + failedPorts.insert(tryPort) val exception = new BindException(exceptionMessage) // restore original stack trace exception.setStackTrace(e.getStackTrace)