Skip to content

Commit 995d196

Browse files
committed
[SPARK-6980] Cleaned up import ordering, comments, spacing from PR feedback
1 parent 7774d56 commit 995d196

File tree

5 files changed

+20
-29
lines changed

5 files changed

+20
-29
lines changed

core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,8 @@ package org.apache.spark.rpc
2020
import java.net.URI
2121
import java.util.concurrent.TimeoutException
2222

23-
import scala.concurrent.duration.FiniteDuration
24-
import scala.concurrent.duration._
2523
import scala.concurrent.{Awaitable, Await, Future}
24+
import scala.concurrent.duration._
2625
import scala.language.postfixOps
2726

2827
import org.apache.spark.{SecurityManager, SparkConf}
@@ -229,7 +228,8 @@ private[spark] class RpcTimeout(timeout: FiniteDuration, description: String) {
229228
}
230229

231230
/**
232-
* Waits for a completed result to catch and amend a TimeoutException message
231+
* Wait for the completed result and return it. If the result is not available within this
232+
* timeout, throw a [[RpcTimeoutException]] to indicate which configuration controls the timeout.
233233
* @param awaitable the `Awaitable` to be awaited
234234
* @throws RpcTimeoutException if after waiting for the specified time `awaitable`
235235
* is still not ready
@@ -242,10 +242,6 @@ private[spark] class RpcTimeout(timeout: FiniteDuration, description: String) {
242242
}
243243

244244

245-
/**
246-
* Create an RpcTimeout using a configuration property that controls the timeout duration so when
247-
* a TimeoutException is thrown, the property key will be indicated in the message.
248-
*/
249245
object RpcTimeout {
250246

251247
private[this] val messagePrefix = "This timeout is controlled by "

core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -299,17 +299,17 @@ private[akka] class AkkaRpcEndpointRef(
299299

300300
override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
301301
actorRef.ask(AkkaMessage(message, true))(timeout.duration).flatMap {
302-
// The function will run in the calling thread, so it should be short and never block.
303-
case msg @ AkkaMessage(message, reply) =>
304-
if (reply) {
305-
logError(s"Receive $msg but the sender cannot reply")
306-
Future.failed(new SparkException(s"Receive $msg but the sender cannot reply"))
307-
} else {
308-
Future.successful(message)
309-
}
310-
case AkkaFailure(e) =>
311-
Future.failed(e)
312-
}(ThreadUtils.sameThread).mapTo[T].
302+
// The function will run in the calling thread, so it should be short and never block.
303+
case msg @ AkkaMessage(message, reply) =>
304+
if (reply) {
305+
logError(s"Receive $msg but the sender cannot reply")
306+
Future.failed(new SparkException(s"Receive $msg but the sender cannot reply"))
307+
} else {
308+
Future.successful(message)
309+
}
310+
case AkkaFailure(e) =>
311+
Future.failed(e)
312+
}(ThreadUtils.sameThread).mapTo[T].
313313
recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)
314314
}
315315

core/src/main/scala/org/apache/spark/util/AkkaUtils.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,17 @@
1717

1818
package org.apache.spark.util
1919

20-
import org.apache.spark.rpc.RpcTimeout
21-
22-
import scala.collection.JavaConversions.mapAsJavaMap
23-
2420
import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
2521
import akka.pattern.ask
2622

2723
import com.typesafe.config.ConfigFactory
28-
import org.apache.log4j.{Level, Logger}
2924

25+
import org.apache.log4j.{Level, Logger}
26+
import org.apache.spark.rpc.RpcTimeout
3027
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException}
3128

29+
import scala.collection.JavaConversions.mapAsJavaMap
30+
3231
/**
3332
* Various utility classes for working with Akka.
3433
*/

core/src/main/scala/org/apache/spark/util/RpcUtils.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.util
1919

2020
import scala.language.postfixOps
21-
import scala.concurrent.duration._
2221

2322
import org.apache.spark.{SparkEnv, SparkConf}
2423
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcTimeout}
@@ -48,13 +47,11 @@ object RpcUtils {
4847

4948
/** Returns the default Spark timeout to use for RPC ask operations. */
5049
def askTimeout(conf: SparkConf): RpcTimeout = {
51-
RpcTimeout(conf, Seq("spark.rpc.askTimeout",
52-
"spark.network.timeout"), "120s")
50+
RpcTimeout(conf, Seq("spark.rpc.askTimeout", "spark.network.timeout"), "120s")
5351
}
5452

5553
/** Returns the default Spark timeout to use for RPC remote endpoint lookup. */
5654
def lookupTimeout(conf: SparkConf): RpcTimeout = {
57-
RpcTimeout(conf, Seq("spark.rpc.lookupTimeout",
58-
"spark.network.timeout"), "120s")
55+
RpcTimeout(conf, Seq("spark.rpc.lookupTimeout", "spark.network.timeout"), "120s")
5956
}
6057
}

core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import akka.pattern.ask
2828
import org.apache.spark.rpc._
2929
import org.apache.spark.{SecurityManager, SparkConf}
3030

31-
3231
class AkkaRpcEnvSuite extends RpcEnvSuite {
3332

3433
override def createRpcEnv(conf: SparkConf, name: String, port: Int): RpcEnv = {

0 commit comments

Comments
 (0)