From cc2df53caa26dbf0b023cbffec8982edc2946652 Mon Sep 17 00:00:00 2001 From: Hong Zhiguo Date: Tue, 4 Jul 2017 16:02:03 +0800 Subject: [PATCH 1/2] set RestartPolicy=Never for executor As for current implementation the RestartPolicy of executor pod is not set, so the default value "OnFailure" is in effect. But this causes problem. If an executor is terminated unexpectedly, for example, exit by java.lang.OutOfMemoryError, it'll be restarted by k8s with the same executor ID. When the new executor tries to fetch a block hold by the last executor, ShuffleBlockFetcherIterator.splitLocalRemoteBlocks() think it's a **local** block and tries to read it from it's local dir. But the executor's local dir is changed because random generated ID is part of local dir. FetchFailedException will raise and the stage will fail. The rolling Error message: 17/06/29 01:54:56 WARN KubernetesTaskSetManager: Lost task 0.1 in stage 2.0 (TID 7, 172.16.75.92, executor 1): FetchFailed(BlockManagerId(1, 172.16.75.92, 40539, None), shuffleId=2, mapId=0, reduceId=0, message= org.apache.spark.shuffle.FetchFailedException: /data2/spark/blockmgr-0e228d3c-8727-422e-aa97-2841a877c42a/32/shuffle_2_0_0.index (No such file or directory) at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) --- .../cluster/kubernetes/KubernetesClusterSchedulerBackend.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index d880cee315c0d..9ef717eaa87eb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -432,6 +432,7 @@ private[spark] class KubernetesClusterSchedulerBackend( .endMetadata() .withNewSpec() .withHostname(hostname) + .withRestartPolicy("Never") .addNewContainer() .withName(s"executor") .withImage(executorDockerImage) From 7ee7b0c1e109ec0e8d28e5e61b3d0030cbc80623 Mon Sep 17 00:00:00 2001 From: Anirudh Ramanathan Date: Wed, 19 Jul 2017 11:36:14 -0700 Subject: [PATCH 2/2] Update KubernetesClusterSchedulerBackend.scala --- .../KubernetesClusterSchedulerBackend.scala | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 0df88e2e94325..a0753728f8cfd 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -456,19 +456,6 @@ private[spark] class KubernetesClusterSchedulerBackend( .withHostname(hostname) .withRestartPolicy("Never") .withNodeSelector(nodeSelector.asJava) - .addNewContainer() - .withName(s"executor") - .withImage(executorDockerImage) - .withImagePullPolicy(dockerImagePullPolicy) - .withNewResources() - .addToRequests("memory", executorMemoryQuantity) - .addToLimits("memory", executorMemoryLimitQuantity) - .addToRequests("cpu", executorCpuQuantity) - .endResources() - .addAllToEnv(requiredEnv.asJava) - .addToEnv(executorExtraClasspathEnv.toSeq: _*) - .withPorts(requiredPorts.asJava) - .endContainer() .endSpec() .build()