Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions conf/kubernetes-custom-resource.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

metadata:
name: spark-job.apache.org
labels:
resource: spark-job
object: spark
apiVersion: extensions/v1beta1
kind: ThirdPartyResource
description: "A resource that reports status of a spark job"
versions:
- name: v1
39 changes: 39 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,45 @@ the command may then look like the following:
--conf spark.kubernetes.shuffle.labels="app=spark-shuffle-service,spark-version=2.1.0" \
local:///opt/spark/examples/jars/spark_examples_2.11-2.2.0.jar 10 400000 2

## ThirdPartyResources for visibility into state of deployed Spark job

In order to expose the state of a deployed spark job to a kubernetes administrator or user, via the kubectl or the
kubernetes dashboard, we have added a kubernetes Resource (of kind: SparkJob) storing pertinent information
related to a specific spark job.

Using this, we can view current and all past (if not already cleaned up) deployed spark apps within the
current namespace using `kubectl` like so:

kubectl get sparkjobs

Or via the kubernetes dashboard using the link as provided by:

kubectl cluster-info


### Prerequisites

Note that this resource is dependent on extending the kubernetes API using a
[ThirdPartyResource (TPR)](https://kubernetes.io/docs/tasks/access-kubernetes-api/extend-api-third-party-resource/).

TPRs are available in K8s API as of v1.5

See conf/kubernetes-custom-resource.yaml for the recommended yaml file. From the spark base directory,
we can create the recommended TPR like so:

kubectl create -f conf/kubernetes-custom-resource.yaml

### Important Things to note

TPRs are an alpha feature that might not be available in every cluster.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put the version requirement information here. starting in what version of k8s will the provided yaml file work?

TPRs need to be manually cleaned up because garbage collection support does not exist for them yet.

### Future work

Kube administrators or users would be able to stop a spark app running in their cluster by simply
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kubernetes cluster administrators or users should be able to stop...

deleting the attached resource.


## Advanced

### Securing the Resource Staging Server with TLS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,21 @@ package object config extends Logging {
.longConf
.createWithDefault(1)

private[spark] val KUBERNETES_JOB_RESOURCE_NAME =
ConfigBuilder("spark.kubernetes.statusReporting.resourceName")
.doc("Name of SparkJob Resource attached to the said spark job. ")
.internal()
.stringConf
.createOptional

private[spark] val KUBERNETES_JOB_RESOURCE_ENABLED =
ConfigBuilder("spark.kubernetes.statusReporting.enabled")
.doc("This is set to true when creation of SparkJob resource is successful" +
" which directly means we need to keep the resource updated")
.internal()
.booleanConf
.createWithDefault(false)

private[spark] val WAIT_FOR_APP_COMPLETION =
ConfigBuilder("spark.kubernetes.submission.waitAppCompletion")
.doc("In cluster mode, whether to wait for the application to finish before exiting the" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,14 @@ package object constants {
private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
private[spark] val MEMORY_OVERHEAD_FACTOR = 0.10
private[spark] val MEMORY_OVERHEAD_MIN = 384L

// TPR
private[spark] val TPR_API_GROUP = "apache.org"
private[spark] val TPR_API_VERSION = "v1"
private[spark] val TPR_KIND = "SparkJob"

// SparkJob Status
private[spark] val STATUS_NOT_AVAILABLE = "N/A"
private[spark] val STATUS_PENDING = "Pending"

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@
package org.apache.spark.deploy.kubernetes.submit

import java.io.File
import java.util.Collections
import java.text.SimpleDateFormat
import java.util.{Collections, Date}

import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, OwnerReferenceBuilder, PodBuilder}
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}

import org.apache.spark.SparkConf
import org.apache.spark.deploy.kubernetes.ConfigurationUtils
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.tpr.{sparkJobResourceController, sparkJobResourceControllerImpl, JobState, Status}
import org.apache.spark.deploy.rest.kubernetes.ResourceStagingServerSslOptionsProviderImpl
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.SparkLauncher
Expand Down Expand Up @@ -53,7 +56,8 @@ private[spark] class Client(
kubernetesClientProvider: SubmissionKubernetesClientProvider,
initContainerComponentsProvider: DriverInitContainerComponentsProvider,
kubernetesCredentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider,
loggingPodStatusWatcher: LoggingPodStatusWatcher)
loggingPodStatusWatcher: LoggingPodStatusWatcher,
sparkJobResourceController: sparkJobResourceController)
extends Logging {

private val kubernetesDriverPodName = sparkConf.get(KUBERNETES_DRIVER_POD_NAME)
Expand All @@ -72,6 +76,31 @@ private[spark] class Client(
org.apache.spark.internal.config.DRIVER_CLASS_PATH)
private val driverJavaOptions = sparkConf.get(
org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
private val resourceTimeFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.S'Z'")

// create resource of kind - SparkJob representing the deployed spark app
private val status = Status(
creationTimeStamp = resourceTimeFormat.format(new Date()),
completionTimeStamp = STATUS_NOT_AVAILABLE,
sparkDriver = kubernetesDriverPodName,
driverImage = driverDockerImage,
executorImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE),
jobState = JobState.QUEUED,
desiredExecutors = sparkConf.getInt("spark.executor.instances", 1),
currentExecutors = 0,
driverUi = STATUS_PENDING
)

// Failure might be due to TPR inexistence or maybe we're stuck in the 10 minute lag
// TODO: in the latter case we can attempt a retry depending on the rc
// This also assumes that once we fail at creation, we won't bother trying
// anything on the resource for the lifetime of the app
Try(sparkJobResourceController.createJobObject(kubernetesAppId, status)) match {
case Success(_) =>
sparkConf.set(KUBERNETES_JOB_RESOURCE_ENABLED, true)
sparkConf.set(KUBERNETES_JOB_RESOURCE_NAME, kubernetesAppId)
case Failure(_) =>
}

def run(): Unit = {
validateNoDuplicateFileNames(sparkJars)
Expand Down Expand Up @@ -277,6 +306,8 @@ private[spark] object Client {
val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
val loggingInterval = Option(sparkConf.get(REPORT_INTERVAL)).filter( _ => waitForAppCompletion)
val loggingPodStatusWatcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval)
val sparkJobResourceController = new sparkJobResourceControllerImpl(
kubernetesClientProvider.get)
new Client(
appName,
kubernetesAppId,
Expand All @@ -289,6 +320,7 @@ private[spark] object Client {
kubernetesClientProvider,
initContainerComponentsProvider,
kubernetesCredentialsMounterProvider,
loggingPodStatusWatcher).run()
loggingPodStatusWatcher,
sparkJobResourceController).run()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.kubernetes.tpr

private[spark] object JobState extends Enumeration {
type JobState = Value

/*
* QUEUED - Spark Job has been queued to run
* SUBMITTED - Driver Pod deployed but tasks are not yet scheduled on worker pod(s)
* RUNNING - Task(s) have been allocated to worker pod(s) to run and Spark Job is now running
* FINISHED - Spark Job ran and exited cleanly, i.e, worker pod(s) and driver pod were
* gracefully deleted
* FAILED - Spark Job Failed due to error
* KILLED - A user manually killed this Spark Job
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the cluster admin killed the pods via kubectl -- what would that show up as?

*/
val QUEUED, SUBMITTED, RUNNING, FINISHED, FAILED, KILLED = Value
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.kubernetes.tpr

import org.json4s.{CustomSerializer, JString}
import org.json4s.JsonAST.JNull

import org.apache.spark.deploy.kubernetes.tpr.JobState.JobState

/**
* JobState Serializer and Deserializer
*/
private[spark] object JobStateSerDe extends CustomSerializer[JobState](_ =>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using this, look at what is done here: https://github.com/apache-spark-on-k8s/spark/pull/305/files#diff-801d4c840d0e60f5521c12f9389598b9R30

Essentially we can create a subclass of TypeReference for the enumeration's type, and then use @JsonScalaEnumeration wherever we are embedding an instance of the enumeration as a field of another class.

({
case JString("SUBMITTED") => JobState.SUBMITTED
case JString("QUEUED") => JobState.QUEUED
case JString("RUNNING") => JobState.RUNNING
case JString("FINISHED") => JobState.FINISHED
case JString("KILLED") => JobState.KILLED
case JString("FAILED") => JobState.FAILED
case JNull =>
throw new UnsupportedOperationException("No JobState Specified")
}, {
case JobState.FAILED => JString("FAILED")
case JobState.SUBMITTED => JString("SUBMITTED")
case JobState.KILLED => JString("KILLED")
case JobState.FINISHED => JString("FINISHED")
case JobState.QUEUED => JString("QUEUED")
case JobState.RUNNING => JString("RUNNING")
})
)
Loading