Skip to content

Commit fa6ed82

Browse files
committed
[SPARK-6980] Had to increase timeout on positive test case because a processor slowdown could trigger an Future TimeoutException
1 parent b05d449 commit fa6ed82

File tree

1 file changed

+11
-9
lines changed

1 file changed

+11
-9
lines changed

core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -73,32 +73,34 @@ class AkkaRpcEnvSuite extends RpcEnvSuite {
7373
val echoActor = system.actorOf(Props(new EchoActor(0)), name = "echo")
7474
val sleepyActor = system.actorOf(Props(new EchoActor(50)), name = "sleepy")
7575

76+
val longProp = "spark.rpc.long.timeout"
77+
val longTimeout = new RpcTimeout(1 second, longProp)
7678
val shortProp = "spark.rpc.short.timeout"
77-
val timeout = new RpcTimeout(10 millis, shortProp)
79+
val shortTimeout = new RpcTimeout(10 millis, shortProp)
7880

7981
try {
8082

8183
// Ask with immediate response
82-
var fut = echoActor.ask("hello")(timeout.duration).mapTo[String].
83-
recover(timeout.addMessageIfTimeout)
84+
var fut = echoActor.ask("hello")(longTimeout.duration).mapTo[String].
85+
recover(longTimeout.addMessageIfTimeout)
8486

8587
// This should complete successfully
86-
val result = timeout.awaitResult(fut)
88+
val result = longTimeout.awaitResult(fut)
8789

8890
assert(result.nonEmpty)
8991

9092
// Ask with a delayed response and wait for response immediately that should timeout
91-
fut = sleepyActor.ask("doh")(timeout.duration).mapTo[String]
93+
fut = sleepyActor.ask("doh")(shortTimeout.duration).mapTo[String]
9294
val msg1 =
9395
intercept[RpcTimeoutException] {
94-
timeout.awaitResult(fut)
96+
shortTimeout.awaitResult(fut)
9597
}.getMessage()
9698

9799
assert(msg1.contains(shortProp))
98100

99101
// Ask with delayed response using addMessageIfTimeout in recover callback
100-
fut = sleepyActor.ask("goodbye")(timeout.duration).mapTo[String].
101-
recover(timeout.addMessageIfTimeout)
102+
fut = sleepyActor.ask("goodbye")(shortTimeout.duration).mapTo[String].
103+
recover(shortTimeout.addMessageIfTimeout)
102104

103105
// Allow future to complete with failure using plain Await.result, this will return
104106
// once the future is complete to verify addMessageIfTimeout was invoked
@@ -113,7 +115,7 @@ class AkkaRpcEnvSuite extends RpcEnvSuite {
113115
// RpcTimeoutException, the same exception should be thrown
114116
val msg3 =
115117
intercept[RpcTimeoutException] {
116-
timeout.awaitResult(fut)
118+
shortTimeout.awaitResult(fut)
117119
}.getMessage()
118120

119121
// Ensure description is not in message twice after addMessageIfTimeout and awaitResult

0 commit comments

Comments
 (0)