Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.json4s._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils}
import org.apache.spark.util.{ConfiguredTimeout, AkkaUtils, RpcUtils, Utils}
import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription}
import org.apache.spark.deploy.ClientArguments._

Expand Down Expand Up @@ -223,9 +223,9 @@ private[rest] class KillRequestServlet(masterActor: ActorRef, conf: SparkConf)
}

protected def handleKill(submissionId: String): KillSubmissionResponse = {
val askTimeout = RpcUtils.askTimeout(conf)
val confAskTimeout = ConfiguredTimeout.createAskTimeout(conf)
val response = AkkaUtils.askWithReply[DeployMessages.KillDriverResponse](
DeployMessages.RequestKillDriver(submissionId), masterActor, askTimeout)
DeployMessages.RequestKillDriver(submissionId), masterActor, confAskTimeout)
val k = new KillSubmissionResponse
k.serverSparkVersion = sparkVersion
k.message = response.message
Expand Down Expand Up @@ -257,9 +257,9 @@ private[rest] class StatusRequestServlet(masterActor: ActorRef, conf: SparkConf)
}

protected def handleStatus(submissionId: String): SubmissionStatusResponse = {
val askTimeout = RpcUtils.askTimeout(conf)
val confAskTimeout = ConfiguredTimeout.createAskTimeout(conf)
val response = AkkaUtils.askWithReply[DeployMessages.DriverStatusResponse](
DeployMessages.RequestDriverStatus(submissionId), masterActor, askTimeout)
DeployMessages.RequestDriverStatus(submissionId), masterActor, confAskTimeout)
val message = response.exception.map { s"Exception from the cluster:\n" + formatException(_) }
val d = new SubmissionStatusResponse
d.serverSparkVersion = sparkVersion
Expand Down Expand Up @@ -321,10 +321,10 @@ private[rest] class SubmitRequestServlet(
responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
requestMessage match {
case submitRequest: CreateSubmissionRequest =>
val askTimeout = RpcUtils.askTimeout(conf)
val confAskTimeout = ConfiguredTimeout.createAskTimeout(conf)
val driverDescription = buildDriverDescription(submitRequest)
val response = AkkaUtils.askWithReply[DeployMessages.SubmitDriverResponse](
DeployMessages.RequestSubmitDriver(driverDescription), masterActor, askTimeout)
DeployMessages.RequestSubmitDriver(driverDescription), masterActor, confAskTimeout)
val submitResponse = new CreateSubmissionResponse
submitResponse.serverSparkVersion = sparkVersion
submitResponse.message = response.message
Expand Down
75 changes: 70 additions & 5 deletions core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@

package org.apache.spark.util

import java.util.concurrent.TimeoutException

import scala.collection.JavaConversions.mapAsJavaMap
import scala.concurrent.Await
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.language.postfixOps

import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
import akka.pattern.ask
Expand All @@ -29,6 +33,61 @@ import org.apache.log4j.{Level, Logger}

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException}

/**
* Associates a timeout with a configuration property so that a TimeoutException can be
* traced back to the controlling property. The main constructor takes a generic timeout
* and description while the auxilary constructor uses a specific property defined in the
* given configuration.
* @param duration timeout duration in milliseconds
* @param description description to be displayed in a timeout exception
*/
class ConfiguredTimeout(duration: FiniteDuration, description: String) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this need to be scoped at least to private[spark], possibly even tighter. Same goes for other classes & methods. We can wait till the rest of the design shakes out a little more before worrying about that too much, though


/**
* Lookup the timeout property in the configuration and construct
* a FiniteDuration timeout with the property key as the description.
* Throws a NoSuchElementException if it's not set
* @param conf configuration properties containing the timeout
* @param timeoutProp property key for the timeout in seconds
*/
def this(conf: SparkConf, timeoutProp: String) = {
this(conf.getTimeAsSeconds(timeoutProp) seconds,
"This timeout is controlled by " + timeoutProp)
}

/**
* Lookup the timeout property in the configuration and construct
* a FiniteDuration timeout with the property key as the description.
* Uses the given default value if property is not set
* @param conf configuration properties containing the timeout
* @param timeoutProp property key for the timeout in seconds
*/
def this(conf: SparkConf, timeoutProp: String, defaultValue: String) = {
this(conf.getTimeAsSeconds(timeoutProp, defaultValue) seconds,
"This timeout is controlled by " + timeoutProp)
}

val timeout = duration
val desc = description
}

object ConfiguredTimeout {

def apply(conf: SparkConf, timeoutProp: String): ConfiguredTimeout =
new ConfiguredTimeout(conf, timeoutProp)

def apply(duration: FiniteDuration, description: String): ConfiguredTimeout =
new ConfiguredTimeout(duration, description)

/** Create a ConfiguredTimeout using standard property for askTimeouts */
def createAskTimeout(conf: SparkConf): ConfiguredTimeout = {
val askTimeoutKey = "spark.rpc.askTimeout"
val fd = { conf.getTimeAsSeconds(askTimeoutKey,
conf.get("spark.network.timeout", "120s")) seconds }
new ConfiguredTimeout(fd, "This timeout is controlled by " + askTimeoutKey)
}
}

/**
* Various utility classes for working with Akka.
*/
Expand Down Expand Up @@ -147,8 +206,8 @@ private[spark] object AkkaUtils extends Logging {
def askWithReply[T](
message: Any,
actor: ActorRef,
timeout: FiniteDuration): T = {
askWithReply[T](message, actor, maxAttempts = 1, retryInterval = Int.MaxValue, timeout)
confTimeout: ConfiguredTimeout): T = {
askWithReply[T](message, actor, maxAttempts = 1, retryInterval = Int.MaxValue, confTimeout)
}

/**
Expand All @@ -160,7 +219,7 @@ private[spark] object AkkaUtils extends Logging {
actor: ActorRef,
maxAttempts: Int,
retryInterval: Long,
timeout: FiniteDuration): T = {
confTimeout: ConfiguredTimeout): T = {
// TODO: Consider removing multiple attempts
if (actor == null) {
throw new SparkException(s"Error sending message [message = $message]" +
Expand All @@ -171,18 +230,24 @@ private[spark] object AkkaUtils extends Logging {
while (attempts < maxAttempts) {
attempts += 1
try {
val future = actor.ask(message)(timeout)
val result = Await.result(future, timeout)
val future = actor.ask(message)(confTimeout.timeout)
val result = Await.result(future, confTimeout.timeout)
if (result == null) {
throw new SparkException("Actor returned null")
}
return result.asInstanceOf[T]
} catch {
case ie: InterruptedException => throw ie
case te: TimeoutException =>
val msg = te.toString() + " " + confTimeout.desc
lastException = new TimeoutException(msg)
logWarning(s"Error sending message [message = $message] in $attempts attempts",
lastException)
case e: Exception =>
lastException = e
logWarning(s"Error sending message [message = $message] in $attempts attempts", e)
}

Thread.sleep(retryInterval)
}

Expand Down
22 changes: 22 additions & 0 deletions core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -353,4 +353,26 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro
slaveRpcEnv.shutdown()
}


test("construction of ConfiguredTimeout with properties") {
val conf = new SparkConf

val testProp = "spark.ask.test.timeout"
val testDurationSeconds = 30

conf.set(testProp, testDurationSeconds.toString + "s")

val ct = ConfiguredTimeout(conf, testProp)
assert( testDurationSeconds === ct.timeout.toSeconds )

var wasRaised = false
try {
ConfiguredTimeout(conf, "spark.ask.invalid.timeout")
} catch {
case _: Exception =>
wasRaised = true
}
assert(wasRaised)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming

import akka.actor.{Actor, Props}
import org.apache.spark._
import org.apache.spark.util.{AkkaUtils, ConfiguredTimeout}
import org.scalatest.concurrent.Timeouts
import org.scalatest.{BeforeAndAfter, FunSuite}


class EchoActor extends Actor {
def receive: Receive = {
case msg =>
Thread.sleep(1200)
sender() ! msg
}
}


class ActorSystemSuite extends FunSuite with BeforeAndAfter with Timeouts with Logging {

val master = "local[2]"
val appName = this.getClass.getSimpleName
val batchDuration = Milliseconds(500)
val sparkHome = "someDir"
val envPair = "key" -> "value"

var ssc: StreamingContext = null

test("actor askWithReply using ConfiguredTimeout") {
val conf = new SparkConf()

val shortProp = "spark.ask.short.timeout"

conf.set(shortProp, "1s")

ssc = new StreamingContext(master, appName, batchDuration)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need to make a streamingContext for this test at all, it just involves akka actors. In fact, I think this test should get moved to AkkaUtilsSuite as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem, in my initial tests I created the Actors from the ActorSystem, so I thought that was required. I'll rework this into AkkaUtilsSuite and shorten the timeouts as you mentioned below.


val actorSystem = AkkaUtils.createActorSystem("EchoActors", "127.0.0.1", 9999, conf = conf,
securityManager = new SecurityManager(conf))._1

val askingActor = actorSystem.actorOf(Props[EchoActor], "AskingActor")

AkkaUtils.askWithReply[String]("this should echo", askingActor, 1, 0,
ConfiguredTimeout.createAskTimeout(conf))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could get around the fact that the conf timeouts have a minimum time of seconds by directly creating your timeout here: new ConfiguredTimeout(20 milliseconds, "This timeout is configured by foobar"). 1 second for a test is not the end of the world, but would be nice to get it shorter. I'd also be sure to have one test where the reply is received within the timeout, so that we know we don't always incorrectly throw the exception.


try {
AkkaUtils.askWithReply[String]("this should timeout", askingActor, 1, 0,
ConfiguredTimeout(conf, shortProp))
throw new TestException("required exception not raised during AkkaUtils.askWithReply")
} catch {
case se: SparkException =>
assert(se.getCause().getMessage().contains(shortProp))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be simplified to:

val exc = intercept[SparkException] {
  AkkaUtils.askWithReply[String]("this should timeout", askingActor, 1, 0,
        ConfiguredTimeout(conf, shortProp))
}
assert(exc.getCause().getMessage().contains("..."))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a nice trick, thanks!

}
}

}