Skip to content
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1878,6 +1878,11 @@ private[spark] object Utils extends Logging {
startService: Int => (T, Int),
conf: SparkConf,
serviceName: String = ""): (T, Int) = {

require(startPort == 0 || 1024 <= startPort && startPort < 65536,
"startPort should be between " +
"1024(including) and 65536(excluding) or 0(for a random free port).")

val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
val maxRetries = portMaxRetries(conf)
for (offset <- 0 to maxRetries) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.concurrent.duration._
import scala.language.postfixOps

import com.google.common.base.Charsets
import org.apache.commons.lang3.RandomUtils
import org.apache.avro.ipc.NettyTransceiver
import org.apache.avro.ipc.specific.SpecificRequestor
import org.apache.flume.source.avro
Expand All @@ -40,7 +41,6 @@ import org.scalatest.concurrent.Eventually._
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream}
import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerReceiverStarted}
import org.apache.spark.util.Utils

class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with Logging {
Expand Down Expand Up @@ -76,7 +76,8 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L

/** Find a free port */
private def findFreePort(): Int = {
Utils.startServiceOnPort(23456, (trialPort: Int) => {
val candidatePort = RandomUtils.nextInt(1024, 65536)
Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
val socket = new ServerSocket(trialPort)
socket.close()
(null, trialPort)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.net.{URI, ServerSocket}
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

import org.apache.commons.lang3.RandomUtils

import scala.concurrent.duration._
import scala.language.postfixOps

Expand Down Expand Up @@ -113,7 +115,8 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
}

private def findFreePort(): Int = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this can be refactored rather than duplicated? don't bother unless there is clearly a good common home for it in test code. I don't know if there is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have a test utilities subproject, so this ends up getting duplicated, but note that we also have duplication of classes like LocalSparkContext; fixing this broader issue is outside the scope of this PR (there's a few JIRAs to track the creation of a "test utilities" project, though).

Utils.startServiceOnPort(23456, (trialPort: Int) => {
val candidatePort = RandomUtils.nextInt(1024, 65536)
Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
val socket = new ServerSocket(trialPort)
socket.close()
(null, trialPort)
Expand Down