diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index 2d6b8d4204795..d56641ff78139 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -32,7 +32,7 @@ import org.json4s._ import org.json4s.jackson.JsonMethods._ import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion} -import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils} +import org.apache.spark.util.{ConfiguredTimeout, AkkaUtils, RpcUtils, Utils} import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription} import org.apache.spark.deploy.ClientArguments._ @@ -223,9 +223,9 @@ private[rest] class KillRequestServlet(masterActor: ActorRef, conf: SparkConf) } protected def handleKill(submissionId: String): KillSubmissionResponse = { - val askTimeout = RpcUtils.askTimeout(conf) + val confAskTimeout = ConfiguredTimeout.createAskTimeout(conf) val response = AkkaUtils.askWithReply[DeployMessages.KillDriverResponse]( - DeployMessages.RequestKillDriver(submissionId), masterActor, askTimeout) + DeployMessages.RequestKillDriver(submissionId), masterActor, confAskTimeout) val k = new KillSubmissionResponse k.serverSparkVersion = sparkVersion k.message = response.message @@ -257,9 +257,9 @@ private[rest] class StatusRequestServlet(masterActor: ActorRef, conf: SparkConf) } protected def handleStatus(submissionId: String): SubmissionStatusResponse = { - val askTimeout = RpcUtils.askTimeout(conf) + val confAskTimeout = ConfiguredTimeout.createAskTimeout(conf) val response = AkkaUtils.askWithReply[DeployMessages.DriverStatusResponse]( - DeployMessages.RequestDriverStatus(submissionId), masterActor, askTimeout) + DeployMessages.RequestDriverStatus(submissionId), masterActor, confAskTimeout) val message = response.exception.map { s"Exception from the cluster:\n" + formatException(_) } val d = new SubmissionStatusResponse d.serverSparkVersion = sparkVersion @@ -321,10 +321,10 @@ private[rest] class SubmitRequestServlet( responseServlet: HttpServletResponse): SubmitRestProtocolResponse = { requestMessage match { case submitRequest: CreateSubmissionRequest => - val askTimeout = RpcUtils.askTimeout(conf) + val confAskTimeout = ConfiguredTimeout.createAskTimeout(conf) val driverDescription = buildDriverDescription(submitRequest) val response = AkkaUtils.askWithReply[DeployMessages.SubmitDriverResponse]( - DeployMessages.RequestSubmitDriver(driverDescription), masterActor, askTimeout) + DeployMessages.RequestSubmitDriver(driverDescription), masterActor, confAskTimeout) val submitResponse = new CreateSubmissionResponse submitResponse.serverSparkVersion = sparkVersion submitResponse.message = response.message diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index b725df3b44596..691faf194e4b4 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -17,9 +17,13 @@ package org.apache.spark.util +import java.util.concurrent.TimeoutException + import scala.collection.JavaConversions.mapAsJavaMap import scala.concurrent.Await import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ +import scala.language.postfixOps import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem} import akka.pattern.ask @@ -29,6 +33,61 @@ import org.apache.log4j.{Level, Logger} import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException} +/** + * Associates a timeout with a configuration property so that a TimeoutException can be + * traced back to the controlling property. The main constructor takes a generic timeout + * and description while the auxilary constructor uses a specific property defined in the + * given configuration. + * @param duration timeout duration in milliseconds + * @param description description to be displayed in a timeout exception + */ +class ConfiguredTimeout(duration: FiniteDuration, description: String) { + + /** + * Lookup the timeout property in the configuration and construct + * a FiniteDuration timeout with the property key as the description. + * Throws a NoSuchElementException if it's not set + * @param conf configuration properties containing the timeout + * @param timeoutProp property key for the timeout in seconds + */ + def this(conf: SparkConf, timeoutProp: String) = { + this(conf.getTimeAsSeconds(timeoutProp) seconds, + "This timeout is controlled by " + timeoutProp) + } + + /** + * Lookup the timeout property in the configuration and construct + * a FiniteDuration timeout with the property key as the description. + * Uses the given default value if property is not set + * @param conf configuration properties containing the timeout + * @param timeoutProp property key for the timeout in seconds + */ + def this(conf: SparkConf, timeoutProp: String, defaultValue: String) = { + this(conf.getTimeAsSeconds(timeoutProp, defaultValue) seconds, + "This timeout is controlled by " + timeoutProp) + } + + val timeout = duration + val desc = description +} + +object ConfiguredTimeout { + + def apply(conf: SparkConf, timeoutProp: String): ConfiguredTimeout = + new ConfiguredTimeout(conf, timeoutProp) + + def apply(duration: FiniteDuration, description: String): ConfiguredTimeout = + new ConfiguredTimeout(duration, description) + + /** Create a ConfiguredTimeout using standard property for askTimeouts */ + def createAskTimeout(conf: SparkConf): ConfiguredTimeout = { + val askTimeoutKey = "spark.rpc.askTimeout" + val fd = { conf.getTimeAsSeconds(askTimeoutKey, + conf.get("spark.network.timeout", "120s")) seconds } + new ConfiguredTimeout(fd, "This timeout is controlled by " + askTimeoutKey) + } +} + /** * Various utility classes for working with Akka. */ @@ -147,8 +206,8 @@ private[spark] object AkkaUtils extends Logging { def askWithReply[T]( message: Any, actor: ActorRef, - timeout: FiniteDuration): T = { - askWithReply[T](message, actor, maxAttempts = 1, retryInterval = Int.MaxValue, timeout) + confTimeout: ConfiguredTimeout): T = { + askWithReply[T](message, actor, maxAttempts = 1, retryInterval = Int.MaxValue, confTimeout) } /** @@ -160,7 +219,7 @@ private[spark] object AkkaUtils extends Logging { actor: ActorRef, maxAttempts: Int, retryInterval: Long, - timeout: FiniteDuration): T = { + confTimeout: ConfiguredTimeout): T = { // TODO: Consider removing multiple attempts if (actor == null) { throw new SparkException(s"Error sending message [message = $message]" + @@ -171,18 +230,24 @@ private[spark] object AkkaUtils extends Logging { while (attempts < maxAttempts) { attempts += 1 try { - val future = actor.ask(message)(timeout) - val result = Await.result(future, timeout) + val future = actor.ask(message)(confTimeout.timeout) + val result = Await.result(future, confTimeout.timeout) if (result == null) { throw new SparkException("Actor returned null") } return result.asInstanceOf[T] } catch { case ie: InterruptedException => throw ie + case te: TimeoutException => + val msg = te.toString() + " " + confTimeout.desc + lastException = new TimeoutException(msg) + logWarning(s"Error sending message [message = $message] in $attempts attempts", + lastException) case e: Exception => lastException = e logWarning(s"Error sending message [message = $message] in $attempts attempts", e) } + Thread.sleep(retryInterval) } diff --git a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala index bec79fc4dc8f7..255450f3ec048 100644 --- a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala @@ -353,4 +353,26 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro slaveRpcEnv.shutdown() } + + test("construction of ConfiguredTimeout with properties") { + val conf = new SparkConf + + val testProp = "spark.ask.test.timeout" + val testDurationSeconds = 30 + + conf.set(testProp, testDurationSeconds.toString + "s") + + val ct = ConfiguredTimeout(conf, testProp) + assert( testDurationSeconds === ct.timeout.toSeconds ) + + var wasRaised = false + try { + ConfiguredTimeout(conf, "spark.ask.invalid.timeout") + } catch { + case _: Exception => + wasRaised = true + } + assert(wasRaised) + } + } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ActorSystemSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ActorSystemSuite.scala new file mode 100644 index 0000000000000..e599a03c89230 --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/ActorSystemSuite.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import akka.actor.{Actor, Props} +import org.apache.spark._ +import org.apache.spark.util.{AkkaUtils, ConfiguredTimeout} +import org.scalatest.concurrent.Timeouts +import org.scalatest.{BeforeAndAfter, FunSuite} + + +class EchoActor extends Actor { + def receive: Receive = { + case msg => + Thread.sleep(1200) + sender() ! msg + } +} + + +class ActorSystemSuite extends FunSuite with BeforeAndAfter with Timeouts with Logging { + + val master = "local[2]" + val appName = this.getClass.getSimpleName + val batchDuration = Milliseconds(500) + val sparkHome = "someDir" + val envPair = "key" -> "value" + + var ssc: StreamingContext = null + + test("actor askWithReply using ConfiguredTimeout") { + val conf = new SparkConf() + + val shortProp = "spark.ask.short.timeout" + + conf.set(shortProp, "1s") + + ssc = new StreamingContext(master, appName, batchDuration) + + val actorSystem = AkkaUtils.createActorSystem("EchoActors", "127.0.0.1", 9999, conf = conf, + securityManager = new SecurityManager(conf))._1 + + val askingActor = actorSystem.actorOf(Props[EchoActor], "AskingActor") + + AkkaUtils.askWithReply[String]("this should echo", askingActor, 1, 0, + ConfiguredTimeout.createAskTimeout(conf)) + + try { + AkkaUtils.askWithReply[String]("this should timeout", askingActor, 1, 0, + ConfiguredTimeout(conf, shortProp)) + throw new TestException("required exception not raised during AkkaUtils.askWithReply") + } catch { + case se: SparkException => + assert(se.getCause().getMessage().contains(shortProp)) + } + } + +}