From 9368e480c469eb0f0d36fbc32af6c9896034d01b Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 28 Apr 2015 00:41:54 -0700 Subject: [PATCH 1/2] [SPARK-6980] Akka timeout exceptions indicate which conf controls them --- .../org/apache/spark/util/AkkaUtils.scala | 60 +++++++++++++++++-- 1 file changed, 55 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index b725df3b44596..e4eca2be31d5c 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -17,6 +17,8 @@ package org.apache.spark.util +import java.util.concurrent.TimeoutException + import scala.collection.JavaConversions.mapAsJavaMap import scala.concurrent.Await import scala.concurrent.duration.FiniteDuration @@ -29,6 +31,48 @@ import org.apache.log4j.{Level, Logger} import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException} +/** + * Binds a timeout to a configuration property so that a thrown akka timeout exception can be + * traced back to the originating value. The main constructor takes a generic timeout and + * description while the auxilary constructor uses a specific property defined in the + * configuration. + * @param timeout_duration timeout duration in milliseconds + * @param timeout_description description to be displayed in a timeout exception + */ +class ConfiguredTimeout(timeout_duration: FiniteDuration, timeout_description: String = null) { + + /** + * Specialized constructor to lookup the timeout property in the configuration and construct + * a FiniteDuration timeout with the property key as the description + * @param conf configuration properties containing the timeout + * @param timeout_prop property key for the timeout + */ + def this(conf: SparkConf, timeout_prop: String) = { + this(FiniteDuration(conf.getInt(timeout_prop, -1), "millis"), timeout_prop) + require(timeout_duration.toMillis >= 0, "invalid property string: " + timeout_prop) + } + + val timeout = timeout_duration + val description = timeout_description +} + +object ConfiguredTimeout { + + /** + * Implicit conversion to allow for a simple FiniteDuration timeout to be used instead of a + * ConfiguredTimeout when the description is not needed. + * @param timeout_duration timeout duration in milliseconds + * @return ConfiguredTimeout object + */ + implicit def toConfiguredTimeout(timeout_duration: FiniteDuration): ConfiguredTimeout = + new ConfiguredTimeout(timeout_duration) + + def apply(conf: SparkConf, timeout_prop: String): ConfiguredTimeout = + new ConfiguredTimeout(conf, timeout_prop) + def apply(timeout_duration: FiniteDuration, timeout_description: String): ConfiguredTimeout = + new ConfiguredTimeout(timeout_duration, timeout_description) +} + /** * Various utility classes for working with Akka. */ @@ -147,8 +191,8 @@ private[spark] object AkkaUtils extends Logging { def askWithReply[T]( message: Any, actor: ActorRef, - timeout: FiniteDuration): T = { - askWithReply[T](message, actor, maxAttempts = 1, retryInterval = Int.MaxValue, timeout) + confTimeout: ConfiguredTimeout): T = { + askWithReply[T](message, actor, maxAttempts = 1, retryInterval = Int.MaxValue, confTimeout) } /** @@ -160,7 +204,7 @@ private[spark] object AkkaUtils extends Logging { actor: ActorRef, maxAttempts: Int, retryInterval: Long, - timeout: FiniteDuration): T = { + confTimeout: ConfiguredTimeout): T = { // TODO: Consider removing multiple attempts if (actor == null) { throw new SparkException(s"Error sending message [message = $message]" + @@ -171,14 +215,20 @@ private[spark] object AkkaUtils extends Logging { while (attempts < maxAttempts) { attempts += 1 try { - val future = actor.ask(message)(timeout) - val result = Await.result(future, timeout) + val future = actor.ask(message)(confTimeout.timeout) + val result = Await.result(future, confTimeout.timeout) if (result == null) { throw new SparkException("Actor returned null") } return result.asInstanceOf[T] } catch { case ie: InterruptedException => throw ie + case te: TimeoutException => + var msg = te.toString() + if (confTimeout.description != null) { + msg += " with [" + confTimeout.description + "]" + } + lastException = new TimeoutException(msg) case e: Exception => lastException = e logWarning(s"Error sending message [message = $message] in $attempts attempts", e) From d4bb0e9309df6d681eb7a809590cf859e7930701 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Sun, 3 May 2015 01:04:40 -0700 Subject: [PATCH 2/2] [SPARK-6980] Updated with comments from PR, added test in AkkaUtilsSuite and new test suite ActorSystemSuite --- .../deploy/rest/StandaloneRestServer.scala | 14 ++-- .../org/apache/spark/util/AkkaUtils.scala | 77 +++++++++++-------- .../apache/spark/util/AkkaUtilsSuite.scala | 22 ++++++ .../spark/streaming/ActorSystemSuite.scala | 73 ++++++++++++++++++ 4 files changed, 148 insertions(+), 38 deletions(-) create mode 100644 streaming/src/test/scala/org/apache/spark/streaming/ActorSystemSuite.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index 2d6b8d4204795..d56641ff78139 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -32,7 +32,7 @@ import org.json4s._ import org.json4s.jackson.JsonMethods._ import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion} -import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils} +import org.apache.spark.util.{ConfiguredTimeout, AkkaUtils, RpcUtils, Utils} import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription} import org.apache.spark.deploy.ClientArguments._ @@ -223,9 +223,9 @@ private[rest] class KillRequestServlet(masterActor: ActorRef, conf: SparkConf) } protected def handleKill(submissionId: String): KillSubmissionResponse = { - val askTimeout = RpcUtils.askTimeout(conf) + val confAskTimeout = ConfiguredTimeout.createAskTimeout(conf) val response = AkkaUtils.askWithReply[DeployMessages.KillDriverResponse]( - DeployMessages.RequestKillDriver(submissionId), masterActor, askTimeout) + DeployMessages.RequestKillDriver(submissionId), masterActor, confAskTimeout) val k = new KillSubmissionResponse k.serverSparkVersion = sparkVersion k.message = response.message @@ -257,9 +257,9 @@ private[rest] class StatusRequestServlet(masterActor: ActorRef, conf: SparkConf) } protected def handleStatus(submissionId: String): SubmissionStatusResponse = { - val askTimeout = RpcUtils.askTimeout(conf) + val confAskTimeout = ConfiguredTimeout.createAskTimeout(conf) val response = AkkaUtils.askWithReply[DeployMessages.DriverStatusResponse]( - DeployMessages.RequestDriverStatus(submissionId), masterActor, askTimeout) + DeployMessages.RequestDriverStatus(submissionId), masterActor, confAskTimeout) val message = response.exception.map { s"Exception from the cluster:\n" + formatException(_) } val d = new SubmissionStatusResponse d.serverSparkVersion = sparkVersion @@ -321,10 +321,10 @@ private[rest] class SubmitRequestServlet( responseServlet: HttpServletResponse): SubmitRestProtocolResponse = { requestMessage match { case submitRequest: CreateSubmissionRequest => - val askTimeout = RpcUtils.askTimeout(conf) + val confAskTimeout = ConfiguredTimeout.createAskTimeout(conf) val driverDescription = buildDriverDescription(submitRequest) val response = AkkaUtils.askWithReply[DeployMessages.SubmitDriverResponse]( - DeployMessages.RequestSubmitDriver(driverDescription), masterActor, askTimeout) + DeployMessages.RequestSubmitDriver(driverDescription), masterActor, confAskTimeout) val submitResponse = new CreateSubmissionResponse submitResponse.serverSparkVersion = sparkVersion submitResponse.message = response.message diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index e4eca2be31d5c..691faf194e4b4 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -22,6 +22,8 @@ import java.util.concurrent.TimeoutException import scala.collection.JavaConversions.mapAsJavaMap import scala.concurrent.Await import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ +import scala.language.postfixOps import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem} import akka.pattern.ask @@ -32,45 +34,58 @@ import org.apache.log4j.{Level, Logger} import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException} /** - * Binds a timeout to a configuration property so that a thrown akka timeout exception can be - * traced back to the originating value. The main constructor takes a generic timeout and - * description while the auxilary constructor uses a specific property defined in the - * configuration. - * @param timeout_duration timeout duration in milliseconds - * @param timeout_description description to be displayed in a timeout exception + * Associates a timeout with a configuration property so that a TimeoutException can be + * traced back to the controlling property. The main constructor takes a generic timeout + * and description while the auxilary constructor uses a specific property defined in the + * given configuration. + * @param duration timeout duration in milliseconds + * @param description description to be displayed in a timeout exception */ -class ConfiguredTimeout(timeout_duration: FiniteDuration, timeout_description: String = null) { +class ConfiguredTimeout(duration: FiniteDuration, description: String) { /** - * Specialized constructor to lookup the timeout property in the configuration and construct - * a FiniteDuration timeout with the property key as the description + * Lookup the timeout property in the configuration and construct + * a FiniteDuration timeout with the property key as the description. + * Throws a NoSuchElementException if it's not set * @param conf configuration properties containing the timeout - * @param timeout_prop property key for the timeout + * @param timeoutProp property key for the timeout in seconds */ - def this(conf: SparkConf, timeout_prop: String) = { - this(FiniteDuration(conf.getInt(timeout_prop, -1), "millis"), timeout_prop) - require(timeout_duration.toMillis >= 0, "invalid property string: " + timeout_prop) + def this(conf: SparkConf, timeoutProp: String) = { + this(conf.getTimeAsSeconds(timeoutProp) seconds, + "This timeout is controlled by " + timeoutProp) } - val timeout = timeout_duration - val description = timeout_description + /** + * Lookup the timeout property in the configuration and construct + * a FiniteDuration timeout with the property key as the description. + * Uses the given default value if property is not set + * @param conf configuration properties containing the timeout + * @param timeoutProp property key for the timeout in seconds + */ + def this(conf: SparkConf, timeoutProp: String, defaultValue: String) = { + this(conf.getTimeAsSeconds(timeoutProp, defaultValue) seconds, + "This timeout is controlled by " + timeoutProp) + } + + val timeout = duration + val desc = description } object ConfiguredTimeout { - /** - * Implicit conversion to allow for a simple FiniteDuration timeout to be used instead of a - * ConfiguredTimeout when the description is not needed. - * @param timeout_duration timeout duration in milliseconds - * @return ConfiguredTimeout object - */ - implicit def toConfiguredTimeout(timeout_duration: FiniteDuration): ConfiguredTimeout = - new ConfiguredTimeout(timeout_duration) + def apply(conf: SparkConf, timeoutProp: String): ConfiguredTimeout = + new ConfiguredTimeout(conf, timeoutProp) + + def apply(duration: FiniteDuration, description: String): ConfiguredTimeout = + new ConfiguredTimeout(duration, description) - def apply(conf: SparkConf, timeout_prop: String): ConfiguredTimeout = - new ConfiguredTimeout(conf, timeout_prop) - def apply(timeout_duration: FiniteDuration, timeout_description: String): ConfiguredTimeout = - new ConfiguredTimeout(timeout_duration, timeout_description) + /** Create a ConfiguredTimeout using standard property for askTimeouts */ + def createAskTimeout(conf: SparkConf): ConfiguredTimeout = { + val askTimeoutKey = "spark.rpc.askTimeout" + val fd = { conf.getTimeAsSeconds(askTimeoutKey, + conf.get("spark.network.timeout", "120s")) seconds } + new ConfiguredTimeout(fd, "This timeout is controlled by " + askTimeoutKey) + } } /** @@ -224,15 +239,15 @@ private[spark] object AkkaUtils extends Logging { } catch { case ie: InterruptedException => throw ie case te: TimeoutException => - var msg = te.toString() - if (confTimeout.description != null) { - msg += " with [" + confTimeout.description + "]" - } + val msg = te.toString() + " " + confTimeout.desc lastException = new TimeoutException(msg) + logWarning(s"Error sending message [message = $message] in $attempts attempts", + lastException) case e: Exception => lastException = e logWarning(s"Error sending message [message = $message] in $attempts attempts", e) } + Thread.sleep(retryInterval) } diff --git a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala index bec79fc4dc8f7..255450f3ec048 100644 --- a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala @@ -353,4 +353,26 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro slaveRpcEnv.shutdown() } + + test("construction of ConfiguredTimeout with properties") { + val conf = new SparkConf + + val testProp = "spark.ask.test.timeout" + val testDurationSeconds = 30 + + conf.set(testProp, testDurationSeconds.toString + "s") + + val ct = ConfiguredTimeout(conf, testProp) + assert( testDurationSeconds === ct.timeout.toSeconds ) + + var wasRaised = false + try { + ConfiguredTimeout(conf, "spark.ask.invalid.timeout") + } catch { + case _: Exception => + wasRaised = true + } + assert(wasRaised) + } + } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ActorSystemSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ActorSystemSuite.scala new file mode 100644 index 0000000000000..e599a03c89230 --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/ActorSystemSuite.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import akka.actor.{Actor, Props} +import org.apache.spark._ +import org.apache.spark.util.{AkkaUtils, ConfiguredTimeout} +import org.scalatest.concurrent.Timeouts +import org.scalatest.{BeforeAndAfter, FunSuite} + + +class EchoActor extends Actor { + def receive: Receive = { + case msg => + Thread.sleep(1200) + sender() ! msg + } +} + + +class ActorSystemSuite extends FunSuite with BeforeAndAfter with Timeouts with Logging { + + val master = "local[2]" + val appName = this.getClass.getSimpleName + val batchDuration = Milliseconds(500) + val sparkHome = "someDir" + val envPair = "key" -> "value" + + var ssc: StreamingContext = null + + test("actor askWithReply using ConfiguredTimeout") { + val conf = new SparkConf() + + val shortProp = "spark.ask.short.timeout" + + conf.set(shortProp, "1s") + + ssc = new StreamingContext(master, appName, batchDuration) + + val actorSystem = AkkaUtils.createActorSystem("EchoActors", "127.0.0.1", 9999, conf = conf, + securityManager = new SecurityManager(conf))._1 + + val askingActor = actorSystem.actorOf(Props[EchoActor], "AskingActor") + + AkkaUtils.askWithReply[String]("this should echo", askingActor, 1, 0, + ConfiguredTimeout.createAskTimeout(conf)) + + try { + AkkaUtils.askWithReply[String]("this should timeout", askingActor, 1, 0, + ConfiguredTimeout(conf, shortProp)) + throw new TestException("required exception not raised during AkkaUtils.askWithReply") + } catch { + case se: SparkException => + assert(se.getCause().getMessage().contains(shortProp)) + } + } + +}