-
Notifications
You must be signed in to change notification settings - Fork 117
SparkJob resource support #126
Conversation
| import org.apache.spark.SparkException | ||
| import org.apache.spark.util.ThreadUtils | ||
|
|
||
| /** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will take this comment out
| spec: | ||
| image: "driver-image" | ||
| state: "completed" | ||
| num-executors: 10 No newline at end of file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"/n"
| kind: ThirdPartyResource | ||
| description: "A resource that manages a spark job" | ||
| versions: | ||
| - name: v1 No newline at end of file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"/n"
| protected val kubeToken: Option[String] = None | ||
|
|
||
| implicit val formats: Formats = DefaultFormats + JobStateSerDe | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one extra "/n" to take out
| case JobState.QUEUED => JString("QUEUED") | ||
| case JobState.RUNNING => JString("RUNNING") | ||
| }) | ||
| ) No newline at end of file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"/n"
| * KILLED - A user manually killed this Spark Job | ||
| */ | ||
| val QUEUED, SUBMITTED, RUNNING, FINISHED, FAILED, KILLED = Value | ||
| } No newline at end of file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| } | ||
|
|
||
| def buildOkhttpClientFromWithinPod(client: BaseClient): OkHttpClient = { | ||
| val field = classOf[BaseClient].getDeclaredField("httpClient") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this reflection to grab the httpClient out of the BaseClient? Is this what we have to do because the fabric8 client doesn't support TPR yet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes you're exactly right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move forward the Kubernetes client to support third party resources?
| case JString("RUNNING") => JobState.RUNNING | ||
| case JString("FINISHED") => JobState.FINISHED | ||
| case JString("KILLED") => JobState.KILLED | ||
| case JString("FAILED") => JobState.FAILED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's probably a nicer way to do this rather than the manual mapping -- I think Scala case classes have an apply method that creates the class you expect?
| def getJobObject(name: String): SparkJobState = { | ||
| val request = completeRequest(new Request.Builder() | ||
| .get() | ||
| .url(s"$kubeMaster/${TPR_API_ENDPOINT.format(TPR_API_VERSION, namespace)}/$name")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pull these URLs out to statics. Maybe use Feign with the JAX-RS annotations instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Thanks for the quick review. I'll look into your comments.
| * This method relies on the assumption of one sparkjob per namespace | ||
| */ | ||
| def watchJobObject(): Future[WatchObject] = { | ||
| val watchClient = httpClient.newBuilder().readTimeout(0, TimeUnit.MILLISECONDS).build() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should probably be a clone not newBuilder
|
Thanks @iyanuobidele! apiVersion: "apache.org/v1" # apache.org, conforming with the project itself.
kind: "SparkJob"
metadata:
name: "spark-job-1"
status:
image: "driver-image"
state: "completed"
numExecutors: 10 # camelcase, instead of num-executors
...
...
# other status itemsI think it previously was nested under |
mccheah
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This review is at a first glance - much of this code assumes the Kubernetes client cannot support third party resources directly. We should look into contributing to the Kubernetes client project. If that doesn't make sense for our use case then we can move forward with this implementation.
| import org.apache.spark.deploy.kubernetes.constants._ | ||
|
|
||
| private[spark] object KubernetesClientBuilder { | ||
| private[spark] object ClientBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any particular reason for the name change?
| private val protocol: String = "https://" | ||
|
|
||
| // we can also get the host from the environment variable | ||
| private val kubeHost: String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use what we already have in constants.scala, which provides a hostname that can be DNS resolved within a pod.
| case Failure(_) => None | ||
| } | ||
| host.map(h => h).getOrElse { | ||
| // Log a warning just in case, but this should almost certainly never happen |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throwing an exception here would be best, but this shouldn't be necessary once we use the proper hostname.
|
|
||
| // the port from the environment variable | ||
| private val kubeHostPort: String = { | ||
| val port = Try(sys.env("KUBERNETES_PORT_443_TCP_PORT")) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe using the hostname with DNS resolution removes the need to specify the port.
| protected implicit val ec: ThreadPoolExecutor = ThreadUtils | ||
| .newDaemonCachedThreadPool("tpr-watcher-pool") | ||
|
|
||
| private def executeBlocking(cb: => WatchObject): Future[WatchObject] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This once again seems like something the kubernetes client should provide for us. Although, it's unclear how the kubernetes client would be able to support completely arbitrary resource types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Matt for taking a look at this. This is mostly a port of work we did a couple months ago. At the time, we considered making a contribution upstream, but we decided to go the crud way to save some cycles.
I'll spend sometime looking into what it'll take to get a contribution upstream and maybe we could also talk about this at the next sig.
|
Sure @foxish, I'll take note of that. I'll be making patches to this shortly. |
|
Please rebase onto |
|
@iyanuobidele, will you have time to pick this PR up again? |
05d8885 to
c198d9f
Compare
|
closing this and opening a new PR to cover this. |
This will eventually close #111.
Initial implementation of the CRUD + Watch calls for the SparkJob resource. This will be the base I'll be making patches against.
/cc
@foxish @mccheah @ash211 @erikerlandson @ssuchter