From af3ddc9397cecd6eeb350778f8fcb5671891bbe6 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 5 Aug 2014 23:59:11 -0700 Subject: [PATCH 1/3] Handle port collisions in flume polling test --- .../flume/FlumePollingStreamSuite.scala | 31 ++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala index 27bf2ac962721..d8d5bc6d38537 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala @@ -45,8 +45,37 @@ class FlumePollingStreamSuite extends TestSuiteBase { val eventsPerBatch = 100 val totalEventsPerChannel = batchCount * eventsPerBatch val channelCapacity = 5000 + val maxAttempts = 5 test("flume polling test") { + testMultipleTimes(testFlumePolling) + } + + test("flume polling test multiple hosts") { + testMultipleTimes(testFlumePollingMultipleHost) + } + + /** + * Run the given test until no more java.net.BindException's are thrown. + * Do this only up to a certain attempt limit. + */ + private def testMultipleTimes(test: () => Unit): Unit = { + var testPassed = false + var attempt = 0 + while (!testPassed && attempt < maxAttempts) { + try { + test() + testPassed = true + } catch { + case e: java.net.BindException => + logError("Exception when running flume polling test", e) + attempt += 1 + } + } + assert(testPassed, s"Test failed after $attempt attempts!") + } + + private def testFlumePolling(): Unit = { val testPort = getTestPort // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) @@ -80,7 +109,7 @@ class FlumePollingStreamSuite extends TestSuiteBase { channel.stop() } - test("flume polling test multiple hosts") { + private def testFlumePollingMultipleHost(): Unit = { val testPort = getTestPort // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) From 664095c2d8bc3344c03c3fd88628c50e7169f673 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 6 Aug 2014 12:20:08 -0700 Subject: [PATCH 2/3] Tone down bind exception message --- .../apache/spark/streaming/flume/FlumePollingStreamSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala index d8d5bc6d38537..488a0b1f827f0 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala @@ -68,7 +68,7 @@ class FlumePollingStreamSuite extends TestSuiteBase { testPassed = true } catch { case e: java.net.BindException => - logError("Exception when running flume polling test", e) + logWarning("Exception when running flume polling test: " + e) attempt += 1 } } From ea11a0351fa89cfb775a2a6a0913326fb39e6880 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 6 Aug 2014 12:21:22 -0700 Subject: [PATCH 3/3] Catch all exceptions caused by BindExceptions Looks like the actual exception thrown sometimes is org.jboss.netty.channel.ChannelException: Failed to bind to: localhost/127.0.0.1:60550, which is not a java.net.BindException. We should catch these too. --- .../apache/spark/streaming/flume/FlumePollingStreamSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala index 488a0b1f827f0..a69baa16981a1 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala @@ -35,6 +35,7 @@ import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.util.ManualClock import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingContext} import org.apache.spark.streaming.flume.sink._ +import org.apache.spark.util.Utils class FlumePollingStreamSuite extends TestSuiteBase { @@ -67,7 +68,7 @@ class FlumePollingStreamSuite extends TestSuiteBase { test() testPassed = true } catch { - case e: java.net.BindException => + case e: Exception if Utils.isBindCollision(e) => logWarning("Exception when running flume polling test: " + e) attempt += 1 }