From 2450fefe49f0cd8a109b556e5b4efe4c3bf7d9fa Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Sat, 8 Aug 2015 14:10:36 -0700 Subject: [PATCH 1/6] cleanUp unused imports and random import --- core/src/main/scala/org/apache/spark/util/Utils.scala | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index c4012d0e83f7d..2fe00cc98eb1e 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -17,15 +17,10 @@ package org.apache.spark.util -import java.io._ -import java.lang.management.ManagementFactory -import java.net._ -import java.nio.ByteBuffer + import java.util.{PriorityQueue, Properties, Locale, Random, UUID} -import java.util.concurrent._ -import javax.net.ssl.HttpsURLConnection -import scala.collection.JavaConversions._ + import scala.collection.Map import scala.collection.mutable.ArrayBuffer import scala.io.Source From dcd512627c48911228ed0f3649d486fdaa0b1ce8 Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Sat, 8 Aug 2015 15:51:18 -0700 Subject: [PATCH 2/6] Initialized port --- .../scala/org/apache/spark/util/Utils.scala | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 2fe00cc98eb1e..556eae813a99b 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -20,12 +20,11 @@ package org.apache.spark.util import java.util.{PriorityQueue, Properties, Locale, Random, UUID} - import scala.collection.Map import scala.collection.mutable.ArrayBuffer import scala.io.Source import scala.reflect.ClassTag -import scala.util.{Failure, Success, Try} +import scala.util.{Random, Failure, Success, Try} import scala.util.control.{ControlThrowable, NonFatal} import com.google.common.io.{ByteStreams, Files} @@ -188,7 +187,7 @@ private[spark] object Utils extends Logging { } /** - * Primitive often used when writing [[java.nio.ByteBuffer]] to [[java.io.DataOutput]] + * Primitive often used when writing java.nio.ByteBuffer to java.io.DataOutput */ def writeByteBuffer(bb: ByteBuffer, out: ObjectOutput): Unit = { if (bb.hasArray) { @@ -2001,6 +2000,8 @@ private[spark] object Utils extends Logging { * Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0). * * @param startPort The initial port to start the service on. + * @param port This array will hold the minimum and the maximum port to start a service on + * If the maximum is not specified, it will be minPort + maxRetries * @param startService Function to start service on a given port. * This is expected to throw java.net.BindException on port collision. * @param conf A SparkConf used to get the maximum number of retries when binding to a port. @@ -2008,6 +2009,7 @@ private[spark] object Utils extends Logging { */ def startServiceOnPort[T]( startPort: Int, + port: Array[Int], startService: Int => (T, Int), conf: SparkConf, serviceName: String = ""): (T, Int) = { @@ -2015,8 +2017,19 @@ private[spark] object Utils extends Logging { require(startPort == 0 || (1024 <= startPort && startPort < 65536), "startPort should be between 1024 and 65535 (inclusive), or 0 for a random free port.") + var minPort, maxPort = 0 + + if(port.length == 2) { + minPort = port{0} + maxPort = port{1} + } + require(minPort <= maxPort) + require(minPort == 0 || minPort >=1024 && minPort <= 65536) + require(maxPort >=1024 && maxPort <= 65536) + val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" val maxRetries = portMaxRetries(conf) + for (offset <- 0 to maxRetries) { // Do not increment port if startPort is 0, which is treated as a special port val tryPort = if (startPort == 0) { From cf4af1a76ddbda7e9f83a64d1d2c323ba6eeb82a Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Sat, 8 Aug 2015 18:12:32 -0700 Subject: [PATCH 3/6] Added logic for port range --- .../scala/org/apache/spark/util/Utils.scala | 38 +++++++++++-------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 556eae813a99b..d56d0def2f52f 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2009,34 +2009,41 @@ private[spark] object Utils extends Logging { */ def startServiceOnPort[T]( startPort: Int, - port: Array[Int], + maxPort: Int, + failedPorts: ArrayBuffer[Int], startService: Int => (T, Int), conf: SparkConf, serviceName: String = ""): (T, Int) = { require(startPort == 0 || (1024 <= startPort && startPort < 65536), "startPort should be between 1024 and 65535 (inclusive), or 0 for a random free port.") - - var minPort, maxPort = 0 - - if(port.length == 2) { - minPort = port{0} - maxPort = port{1} - } - require(minPort <= maxPort) - require(minPort == 0 || minPort >=1024 && minPort <= 65536) - require(maxPort >=1024 && maxPort <= 65536) - val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" val maxRetries = portMaxRetries(conf) - + if(startPort + maxRetries <= 65536) { + maxPort = startPort + maxRetries + } for (offset <- 0 to maxRetries) { // Do not increment port if startPort is 0, which is treated as a special port val tryPort = if (startPort == 0) { startPort - } else { + } + else { // If the new port wraps around, do not try a privilege port - ((startPort + offset - 1024) % (65536 - 1024)) + 1024 + + if(!failedPorts.contains((startPort + Math.random() * (maxPort - startPort + 1)))) { + (startPort + Math.random() * (maxPort - startPort + 1)) + } + else { + // This condition here checks whether there are sufficient successful ports to try the retry attempt. + // If yes, tryPort is adjusted, else increasing the maxPort limit by 1 to help accommodate the maxRetries + if ((maxPort - startPort) - failedPorts.length < (maxRetries - offset)) + if (maxPort + 1 > 65536) { + maxPort + 1 + } + else { + maxPort = 65536 + } + } } try { val (service, port) = startService(tryPort) @@ -2047,6 +2054,7 @@ private[spark] object Utils extends Logging { if (offset >= maxRetries) { val exceptionMessage = s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!" + failedPorts.insert(tryPort) val exception = new BindException(exceptionMessage) // restore original stack trace exception.setStackTrace(e.getStackTrace) From 52326701bef8e22ab70a6c050f210c71de234fd7 Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Sat, 8 Aug 2015 18:49:21 -0700 Subject: [PATCH 4/6] Modified logic to include privileged ports --- .../scala/org/apache/spark/util/Utils.scala | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index d56d0def2f52f..90f23a4b1147d 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2029,21 +2029,20 @@ private[spark] object Utils extends Logging { } else { // If the new port wraps around, do not try a privilege port - - if(!failedPorts.contains((startPort + Math.random() * (maxPort - startPort + 1)))) { + while(!failedPorts.contains((startPort + Math.random() * (maxPort - startPort + 1))) && + (((startPort + offset - 1024) % (65536 - 1024)) + 1024) != (startPort + Math.random() * (maxPort - startPort + 1))) { (startPort + Math.random() * (maxPort - startPort + 1)) } + // This condition here checks whether there are sufficient successful ports to try the retry attempt. + // If yes, tryPort is adjusted, else increasing the maxPort limit by 1 to help accommodate the maxRetries + if ((maxPort - startPort) - failedPorts.length < (maxRetries - offset)) { + if (maxPort + 1 > 65536) { + maxPort + 1 + } else { - // This condition here checks whether there are sufficient successful ports to try the retry attempt. - // If yes, tryPort is adjusted, else increasing the maxPort limit by 1 to help accommodate the maxRetries - if ((maxPort - startPort) - failedPorts.length < (maxRetries - offset)) - if (maxPort + 1 > 65536) { - maxPort + 1 - } - else { - maxPort = 65536 - } + maxPort = 65536 } + } } try { val (service, port) = startService(tryPort) @@ -2056,7 +2055,7 @@ private[spark] object Utils extends Logging { s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!" failedPorts.insert(tryPort) val exception = new BindException(exceptionMessage) - // restore original stack trace + // restore original stack traceg exception.setStackTrace(e.getStackTrace) throw exception } From f8866a34f743598fc2c0a19a6bb922104bfb6b3b Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Sat, 8 Aug 2015 19:20:37 -0700 Subject: [PATCH 5/6] Corrected typo on 2058 --- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 90f23a4b1147d..a4e0982e0a5ef 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2055,7 +2055,7 @@ private[spark] object Utils extends Logging { s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!" failedPorts.insert(tryPort) val exception = new BindException(exceptionMessage) - // restore original stack traceg + // restore original stack trace exception.setStackTrace(e.getStackTrace) throw exception } From 9456cb561226ff3e74b62f04a3ff0b199e47e6cc Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Sun, 9 Aug 2015 13:24:03 -0700 Subject: [PATCH 6/6] Corrected unnecessary changes. Initialized maxPort and failedPorts correctly. Made maxPort 65536 to allow the range to be port to 65536. Implemented logic --- .../scala/org/apache/spark/util/Utils.scala | 23 ++++--------------- 1 file changed, 4 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index a4e0982e0a5ef..71b0f28de9ff5 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -187,7 +187,7 @@ private[spark] object Utils extends Logging { } /** - * Primitive often used when writing java.nio.ByteBuffer to java.io.DataOutput + * Primitive often used when writing [[java.nio.ByteBuffer]] to [[java.io.DataOutput]] */ def writeByteBuffer(bb: ByteBuffer, out: ObjectOutput): Unit = { if (bb.hasArray) { @@ -2000,8 +2000,6 @@ private[spark] object Utils extends Logging { * Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0). * * @param startPort The initial port to start the service on. - * @param port This array will hold the minimum and the maximum port to start a service on - * If the maximum is not specified, it will be minPort + maxRetries * @param startService Function to start service on a given port. * This is expected to throw java.net.BindException on port collision. * @param conf A SparkConf used to get the maximum number of retries when binding to a port. @@ -2009,40 +2007,27 @@ private[spark] object Utils extends Logging { */ def startServiceOnPort[T]( startPort: Int, - maxPort: Int, - failedPorts: ArrayBuffer[Int], startService: Int => (T, Int), conf: SparkConf, serviceName: String = ""): (T, Int) = { + val maxPort = 65536 + val failedPorts = new ArrayBuffer[Int]() require(startPort == 0 || (1024 <= startPort && startPort < 65536), "startPort should be between 1024 and 65535 (inclusive), or 0 for a random free port.") val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" val maxRetries = portMaxRetries(conf) - if(startPort + maxRetries <= 65536) { - maxPort = startPort + maxRetries - } for (offset <- 0 to maxRetries) { // Do not increment port if startPort is 0, which is treated as a special port val tryPort = if (startPort == 0) { startPort } else { - // If the new port wraps around, do not try a privilege port + // If the new port wraps around, do not try a privilege port or a failedPort while(!failedPorts.contains((startPort + Math.random() * (maxPort - startPort + 1))) && (((startPort + offset - 1024) % (65536 - 1024)) + 1024) != (startPort + Math.random() * (maxPort - startPort + 1))) { (startPort + Math.random() * (maxPort - startPort + 1)) } - // This condition here checks whether there are sufficient successful ports to try the retry attempt. - // If yes, tryPort is adjusted, else increasing the maxPort limit by 1 to help accommodate the maxRetries - if ((maxPort - startPort) - failedPorts.length < (maxRetries - offset)) { - if (maxPort + 1 > 65536) { - maxPort + 1 - } - else { - maxPort = 65536 - } - } } try { val (service, port) = startService(tryPort)