Skip to content

Conversation

@sarutak
Copy link
Member

@sarutak sarutak commented Feb 3, 2015

When we run FlumeStreamSuite on Jenkins, sometimes we get error like as follows.

sbt.ForkMain$ForkError: The code passed to eventually never returned normally. Attempted 52 times over 10.094849836 seconds. Last failure message: Error connecting to localhost/127.0.0.1:23456.
    at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420)
    at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438)
    at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
    at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:307)
   at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
   at org.apache.spark.streaming.flume.FlumeStreamSuite.writeAndVerify(FlumeStreamSuite.scala:116)
       at org.apache.spark.streaming.flume.FlumeStreamSuite.org$apache$spark$streaming$flume$FlumeStreamSuite$$testFlumeStream(FlumeStreamSuite.scala:74)
   at org.apache.spark.streaming.flume.FlumeStreamSuite$$anonfun$3.apply$mcV$sp(FlumeStreamSuite.scala:66)
    at org.apache.spark.streaming.flume.FlumeStreamSuite$$anonfun$3.apply(FlumeStreamSuite.scala:66)
    at org.apache.spark.streaming.flume.FlumeStreamSuite$$anonfun$3.apply(FlumeStreamSuite.scala:66)
    at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
    at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    at org.scalatest.Transformer.apply(Transformer.scala:22)
    at org.scalatest.Transformer.apply(Transformer.scala:20)
        at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
    at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
    at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555)
    at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
    at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
    at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)

This error is caused by check-then-act logic when it find free-port .

  /** Find a free port */
  private def findFreePort(): Int = {
    Utils.startServiceOnPort(23456, (trialPort: Int) => {
      val socket = new ServerSocket(trialPort)
      socket.close()
      (null, trialPort)
    }, conf)._2
  }

Removing the check-then-act is not easy but we can reduce the chance of having the error by choosing random value for initial port instead of 23456.

@SparkQA
Copy link

SparkQA commented Feb 3, 2015

Test build #26654 has finished for PR 4337 at commit 8212e42.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor

I agree that the randomization will probably make this more stable, but is there any reason that we can't bind to 0 and figure out the ephemeral port that was used? That might be less brittle.

I think there's another flaky Flume suite test, but it looks like it might be unrelated to the issue addressed by this patch: https://issues.apache.org/jira/browse/SPARK-4905

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea but surely you don't mean to choose a random port from 0 to 2 billion? surely something in [1024,65536)? 1024 + new Random().nextInt(65536 - 1024) or similar?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Utils#startServiceOnPort adjusts the range of port so I simply pass the random value to the method. But I noticed we should pass the value > 0.

  val tryPort = if (startPort == 0) {
    startPort
  } else {
    // If the new port wraps around, do not try a privilege port
    ((startPort + offset - 1024) % (65536 - 1024)) + 1024
  }

So, I'll fix it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will actually fail if the very large random port, plus offset - 1024, rolls over to a negative value. The returned port will be negative. Although it's a rare case, I think writing the correct version is also clearer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Should we modify Utils#startServiceOnPort ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If anything, it should be changed to reject an invalid startPort, and this code should not pass invalid port values in any event.

@JoshRosen
Copy link
Contributor

I noticed that this findFreePort method is duplicated verbatim in MQQTSuite, so we should probably apply whatever fix we decide on there, too.

@sarutak
Copy link
Member Author

sarutak commented Feb 8, 2015

@JoshRosen It's good if we could use the port 0 but I found there are no simple way to get the port where FlumeReceiver#server (NettyServer) bound.
I'm also investigating another flakiness in FlumeStreamSuite but I've not found so far...

And as you said, I found findFreePort in MQTTStreamSuite so I'll also fix it.

@SparkQA
Copy link

SparkQA commented Feb 18, 2015

Test build #27671 has finished for PR 4337 at commit 33357e3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class Partitioner(object):
    • class SparkJobInfo(namedtuple("SparkJobInfo", "jobId stageIds status")):
    • class SparkStageInfo(namedtuple("SparkStageInfo",
    • class StatusTracker(object):

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd use require(); you can write in [1024,65536) to express the meaning succinctly; missing space after 0 and I don't think the last parens are needed.

@andrewor14
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Feb 19, 2015

Test build #27733 has finished for PR 4337 at commit 33357e3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor

@sarutak If you've got time, mind addressing Sean's minor comments? Other than those, this LGTM.

@sarutak
Copy link
Member Author

sarutak commented Mar 23, 2015

Sorry, I had no time until last weekend but now I have. I'll address that soon.

@SparkQA
Copy link

SparkQA commented Mar 23, 2015

Test build #29015 has finished for PR 4337 at commit 16f109f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class TaskCommitDenied(jobID: Int, partitionID: Int, attemptID: Int) extends TaskFailedReason
    • class JavaDoubleRDD(val srdd: RDD[scala.Double])
    • class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends Source
    • class MLPairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) extends Serializable
    • class NaiveBayesModel(Saveable, Loader):
    • class KMeansModel(Saveable, Loader):
    • class SqlParser extends AbstractSparkSQLParser with DataTypeParser
    • case class CombineSum(child: Expression) extends AggregateExpression
    • case class CombineSumFunction(expr: Expression, base: AggregateExpression)
    • protected[sql] class DataTypeException(message: String) extends Exception(message)
    • class CheckpointWriteHandler(
    • logError("User class threw exception: " + cause, cause)

@sarutak
Copy link
Member Author

sarutak commented Mar 23, 2015

retest this please.

@srowen
Copy link
Member

srowen commented Mar 23, 2015

OK I think the import is still in slightly the wrong place in both cases and the text could be edited a bit, but I can carefully adjust that on merge.

@SparkQA
Copy link

SparkQA commented Mar 23, 2015

Test build #29024 has finished for PR 4337 at commit 16f109f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Mar 24, 2015
…en running FlumeStreamSuite

When we run FlumeStreamSuite on Jenkins, sometimes we get error like as follows.

    sbt.ForkMain$ForkError: The code passed to eventually never returned normally. Attempted 52 times over 10.094849836 seconds. Last failure message: Error connecting to localhost/127.0.0.1:23456.
	    at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420)
	    at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438)
	    at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
	    at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:307)
	   at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
	   at org.apache.spark.streaming.flume.FlumeStreamSuite.writeAndVerify(FlumeStreamSuite.scala:116)
           at org.apache.spark.streaming.flume.FlumeStreamSuite.org$apache$spark$streaming$flume$FlumeStreamSuite$$testFlumeStream(FlumeStreamSuite.scala:74)
	   at org.apache.spark.streaming.flume.FlumeStreamSuite$$anonfun$3.apply$mcV$sp(FlumeStreamSuite.scala:66)
	    at org.apache.spark.streaming.flume.FlumeStreamSuite$$anonfun$3.apply(FlumeStreamSuite.scala:66)
	    at org.apache.spark.streaming.flume.FlumeStreamSuite$$anonfun$3.apply(FlumeStreamSuite.scala:66)
	    at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
	    at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
	    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	    at org.scalatest.Transformer.apply(Transformer.scala:22)
	    at org.scalatest.Transformer.apply(Transformer.scala:20)
    	    at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
	    at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
	    at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555)
	    at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
	   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
	    at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
	    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	    at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)

This error is caused by check-then-act logic  when it find free-port .

      /** Find a free port */
      private def findFreePort(): Int = {
        Utils.startServiceOnPort(23456, (trialPort: Int) => {
          val socket = new ServerSocket(trialPort)
          socket.close()
          (null, trialPort)
        }, conf)._2
      }

Removing the check-then-act is not easy but we can reduce the chance of having the error by choosing random value for initial port instead of 23456.

Author: Kousuke Saruta <[email protected]>

Closes #4337 from sarutak/SPARK-5559 and squashes the following commits:

16f109f [Kousuke Saruta] Added `require` to Utils#startServiceOnPort
c39d8b6 [Kousuke Saruta] Merge branch 'SPARK-5559' of github.com:sarutak/spark into SPARK-5559
1610ba2 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-5559
33357e3 [Kousuke Saruta] Changed "findFreePort" method in MQTTStreamSuite and FlumeStreamSuite so that it can choose valid random port
a9029fe [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-5559
9489ef9 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-5559
8212e42 [Kousuke Saruta] Modified default port used in FlumeStreamSuite from 23456 to random value

(cherry picked from commit 85cf063)
Signed-off-by: Sean Owen <[email protected]>
@asfgit asfgit closed this in 85cf063 Mar 24, 2015
asfgit pushed a commit that referenced this pull request Mar 24, 2015
…en running FlumeStreamSuite

When we run FlumeStreamSuite on Jenkins, sometimes we get error like as follows.

    sbt.ForkMain$ForkError: The code passed to eventually never returned normally. Attempted 52 times over 10.094849836 seconds. Last failure message: Error connecting to localhost/127.0.0.1:23456.
	    at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420)
	    at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438)
	    at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
	    at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:307)
	   at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
	   at org.apache.spark.streaming.flume.FlumeStreamSuite.writeAndVerify(FlumeStreamSuite.scala:116)
           at org.apache.spark.streaming.flume.FlumeStreamSuite.org$apache$spark$streaming$flume$FlumeStreamSuite$$testFlumeStream(FlumeStreamSuite.scala:74)
	   at org.apache.spark.streaming.flume.FlumeStreamSuite$$anonfun$3.apply$mcV$sp(FlumeStreamSuite.scala:66)
	    at org.apache.spark.streaming.flume.FlumeStreamSuite$$anonfun$3.apply(FlumeStreamSuite.scala:66)
	    at org.apache.spark.streaming.flume.FlumeStreamSuite$$anonfun$3.apply(FlumeStreamSuite.scala:66)
	    at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
	    at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
	    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	    at org.scalatest.Transformer.apply(Transformer.scala:22)
	    at org.scalatest.Transformer.apply(Transformer.scala:20)
    	    at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
	    at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
	    at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555)
	    at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
	   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
	    at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
	    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	    at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)

This error is caused by check-then-act logic  when it find free-port .

      /** Find a free port */
      private def findFreePort(): Int = {
        Utils.startServiceOnPort(23456, (trialPort: Int) => {
          val socket = new ServerSocket(trialPort)
          socket.close()
          (null, trialPort)
        }, conf)._2
      }

Removing the check-then-act is not easy but we can reduce the chance of having the error by choosing random value for initial port instead of 23456.

Author: Kousuke Saruta <[email protected]>

Closes #4337 from sarutak/SPARK-5559 and squashes the following commits:

16f109f [Kousuke Saruta] Added `require` to Utils#startServiceOnPort
c39d8b6 [Kousuke Saruta] Merge branch 'SPARK-5559' of github.com:sarutak/spark into SPARK-5559
1610ba2 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-5559
33357e3 [Kousuke Saruta] Changed "findFreePort" method in MQTTStreamSuite and FlumeStreamSuite so that it can choose valid random port
a9029fe [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-5559
9489ef9 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-5559
8212e42 [Kousuke Saruta] Modified default port used in FlumeStreamSuite from 23456 to random value

(cherry picked from commit 85cf063)
Signed-off-by: Sean Owen <[email protected]>
@andrewor14
Copy link
Contributor

Woohoo! Thanks @sarutak this has been flaky for a long time. Hopefully this fixes it.

@sarutak sarutak deleted the SPARK-5559 branch April 11, 2015 05:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants