Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ private[spark] class Client(
private val driverDockerImage = sparkConf.get(
"spark.kubernetes.driver.docker.image", s"spark-driver:$SPARK_VERSION")
private val uploadedJars = sparkConf.getOption("spark.kubernetes.driver.uploads.jars")
private val driverLaunchTimeoutSecs = sparkConf.getTimeAsSeconds(
"spark.kubernetes.driverLaunchTimeout", s"${DEFAULT_LAUNCH_TIMEOUT_SECONDS}s")

private val secretBase64String = {
val secretBytes = new Array[Byte](128)
Expand Down Expand Up @@ -218,25 +220,25 @@ private[spark] class Client(
.done()
var submitSucceeded = false
try {
submitCompletedFuture.get(LAUNCH_TIMEOUT_SECONDS, TimeUnit.SECONDS)
submitCompletedFuture.get(driverLaunchTimeoutSecs, TimeUnit.SECONDS)
submitSucceeded = true
} catch {
case e: TimeoutException =>
val driverPod = try {
kubernetesClient.pods().withName(kubernetesAppId).get()
} catch {
case throwable: Throwable =>
logError(s"Timed out while waiting $LAUNCH_TIMEOUT_SECONDS seconds for the" +
logError(s"Timed out while waiting $driverLaunchTimeoutSecs seconds for the" +
" driver pod to start, but an error occurred while fetching the driver" +
" pod's details.", throwable)
throw new SparkException(s"Timed out while waiting $LAUNCH_TIMEOUT_SECONDS" +
throw new SparkException(s"Timed out while waiting $driverLaunchTimeoutSecs" +
" seconds for the driver pod to start. Unfortunately, in attempting to fetch" +
" the latest state of the pod, another error was thrown. Check the logs for" +
" the error that was thrown in looking up the driver pod.", e)
}
val topLevelMessage = s"The driver pod with name ${driverPod.getMetadata.getName}" +
s" in namespace ${driverPod.getMetadata.getNamespace} was not ready in" +
s" $LAUNCH_TIMEOUT_SECONDS seconds."
s" $driverLaunchTimeoutSecs seconds."
val podStatusPhase = if (driverPod.getStatus.getPhase != null) {
s"Latest phase from the pod is: ${driverPod.getStatus.getPhase}"
} else {
Expand Down Expand Up @@ -424,7 +426,7 @@ private[spark] object Client extends Logging {
private val DRIVER_LAUNCHER_CONTAINER_NAME = "spark-kubernetes-driver-launcher"
private val SECURE_RANDOM = new SecureRandom()
private val SPARK_SUBMISSION_SECRET_BASE_DIR = "/var/run/secrets/spark-submission"
private val LAUNCH_TIMEOUT_SECONDS = 30
private val DEFAULT_LAUNCH_TIMEOUT_SECONDS = 60
private val SPARK_APP_NAME_LABEL = "spark-app-name"

def main(args: Array[String]): Unit = {
Expand Down