From 8212e42cfd79b5b92c7c664a9ecff7da68e062a5 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Tue, 3 Feb 2015 23:08:05 +0900 Subject: [PATCH 1/3] Modified default port used in FlumeStreamSuite from 23456 to random value --- .../org/apache/spark/streaming/flume/FlumeStreamSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index f333e3891b5f0..6ce66c1f9bf64 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -40,7 +40,6 @@ import org.scalatest.concurrent.Eventually._ import org.apache.spark.{Logging, SparkConf} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream} -import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerReceiverStarted} import org.apache.spark.util.Utils class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with Logging { @@ -76,7 +75,7 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L /** Find a free port */ private def findFreePort(): Int = { - Utils.startServiceOnPort(23456, (trialPort: Int) => { + Utils.startServiceOnPort((math.random * Int.MaxValue).toInt, (trialPort: Int) => { val socket = new ServerSocket(trialPort) socket.close() (null, trialPort) From 33357e3cb25241727a42a6ed55346cca5e765361 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Wed, 18 Feb 2015 11:46:53 +0900 Subject: [PATCH 2/3] Changed "findFreePort" method in MQTTStreamSuite and FlumeStreamSuite so that it can choose valid random port --- core/src/main/scala/org/apache/spark/util/Utils.scala | 6 ++++++ .../org/apache/spark/streaming/flume/FlumeStreamSuite.scala | 5 ++++- .../org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala | 5 ++++- 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index df21ed37e76b1..22b72fb6b5a04 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1814,6 +1814,12 @@ private[spark] object Utils extends Logging { startService: Int => (T, Int), conf: SparkConf, serviceName: String = ""): (T, Int) = { + + if (startPort != 0 && (startPort < 1024 || startPort >= 65536)) { + throw new IllegalArgumentException("startPort should be between " + + "1024(including) and 65536(excluding) or 0(for a random free port).") + } + val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" val maxRetries = portMaxRetries(conf) for (offset <- 0 to maxRetries) { diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index 351879fa40bdb..9e4353cf94266 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -20,6 +20,8 @@ package org.apache.spark.streaming.flume import java.net.{InetSocketAddress, ServerSocket} import java.nio.ByteBuffer +import org.apache.commons.lang3.RandomUtils + import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import scala.concurrent.duration._ @@ -75,7 +77,8 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L /** Find a free port */ private def findFreePort(): Int = { - Utils.startServiceOnPort((math.random * Int.MaxValue).toInt, (trialPort: Int) => { + val candidatePort = RandomUtils.nextInt(1024, 65536) + Utils.startServiceOnPort(candidatePort, (trialPort: Int) => { val socket = new ServerSocket(trialPort) socket.close() (null, trialPort) diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index 19c9271af77be..6804099bf063f 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -21,6 +21,8 @@ import java.net.{URI, ServerSocket} import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit +import org.apache.commons.lang3.RandomUtils + import scala.concurrent.duration._ import scala.language.postfixOps @@ -113,7 +115,8 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { } private def findFreePort(): Int = { - Utils.startServiceOnPort(23456, (trialPort: Int) => { + val candidatePort = RandomUtils.nextInt(1024, 65536) + Utils.startServiceOnPort(candidatePort, (trialPort: Int) => { val socket = new ServerSocket(trialPort) socket.close() (null, trialPort) From 16f109f13a90d28c3d187f47cb2d0dcd5fc782bc Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Mon, 23 Mar 2015 11:25:10 -0700 Subject: [PATCH 3/3] Added `require` to Utils#startServiceOnPort --- core/src/main/scala/org/apache/spark/util/Utils.scala | 5 ++--- .../org/apache/spark/streaming/flume/FlumeStreamSuite.scala | 3 +-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 9b369ec5014e8..ac6105a1addd9 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1879,10 +1879,9 @@ private[spark] object Utils extends Logging { conf: SparkConf, serviceName: String = ""): (T, Int) = { - if (startPort != 0 && (startPort < 1024 || startPort >= 65536)) { - throw new IllegalArgumentException("startPort should be between " + + require(startPort == 0 || 1024 <= startPort && startPort < 65536, + "startPort should be between " + "1024(including) and 65536(excluding) or 0(for a random free port).") - } val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" val maxRetries = portMaxRetries(conf) diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index 9e4353cf94266..4f1938cfa1367 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -20,14 +20,13 @@ package org.apache.spark.streaming.flume import java.net.{InetSocketAddress, ServerSocket} import java.nio.ByteBuffer -import org.apache.commons.lang3.RandomUtils - import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import scala.concurrent.duration._ import scala.language.postfixOps import com.google.common.base.Charsets +import org.apache.commons.lang3.RandomUtils import org.apache.avro.ipc.NettyTransceiver import org.apache.avro.ipc.specific.SpecificRequestor import org.apache.flume.source.avro