Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,14 @@

package org.apache.spark.util

import java.io._
import java.lang.management.ManagementFactory
import java.net._
import java.nio.ByteBuffer

import java.util.{PriorityQueue, Properties, Locale, Random, UUID}
import java.util.concurrent._
import javax.net.ssl.HttpsURLConnection

import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import scala.reflect.ClassTag
import scala.util.{Failure, Success, Try}
import scala.util.{Random, Failure, Success, Try}
import scala.util.control.{ControlThrowable, NonFatal}

import com.google.common.io.{ByteStreams, Files}
Expand Down Expand Up @@ -2017,18 +2011,23 @@ private[spark] object Utils extends Logging {
conf: SparkConf,
serviceName: String = ""): (T, Int) = {

val maxPort = 65536
val failedPorts = new ArrayBuffer[Int]()
require(startPort == 0 || (1024 <= startPort && startPort < 65536),
"startPort should be between 1024 and 65535 (inclusive), or 0 for a random free port.")

val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
val maxRetries = portMaxRetries(conf)
for (offset <- 0 to maxRetries) {
// Do not increment port if startPort is 0, which is treated as a special port
val tryPort = if (startPort == 0) {
startPort
} else {
// If the new port wraps around, do not try a privilege port
((startPort + offset - 1024) % (65536 - 1024)) + 1024
}
else {
// If the new port wraps around, do not try a privilege port or a failedPort
while(!failedPorts.contains((startPort + Math.random() * (maxPort - startPort + 1))) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an array buffer, so contains is linear time, which is pretty expensive. Also this code is very difficult to understand.

(((startPort + offset - 1024) % (65536 - 1024)) + 1024) != (startPort + Math.random() * (maxPort - startPort + 1))) {
(startPort + Math.random() * (maxPort - startPort + 1))
}
}
try {
val (service, port) = startService(tryPort)
Expand All @@ -2039,6 +2038,7 @@ private[spark] object Utils extends Logging {
if (offset >= maxRetries) {
val exceptionMessage =
s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!"
failedPorts.insert(tryPort)
val exception = new BindException(exceptionMessage)
// restore original stack trace
exception.setStackTrace(e.getStackTrace)
Expand Down