-
Notifications
You must be signed in to change notification settings - Fork 117
Extract constants and config into separate file. Launch => Submit. #65
Conversation
| .done()) | ||
| } catch { | ||
| case throwable: Throwable => | ||
| logError("Failed to allocate executor pod.", throwable) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In testing and running into problems I found that the exception here wasn't appearing in the logs; hence the change. There might be something in the calling code worth fixing instead of logging at this layer.
| @Produces(Array(MediaType.APPLICATION_JSON)) | ||
| @Path("/create") | ||
| def create(request: KubernetesCreateSubmissionRequest): CreateSubmissionResponse | ||
| def submitApplication(request: KubernetesCreateSubmissionRequest): CreateSubmissionResponse |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The path is still /create to be consistent with Standalone's REST submission server, although it's unclear if that's the correct precedent to follow.
| private val serviceAccount = sparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) | ||
| private val customLabels = sparkConf.get(KUBERNETES_DRIVER_LABELS) | ||
|
|
||
| private implicit val retryableExecutionContext = ExecutionContext |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this is not part of this commit, but in the spirit of refactoring, which I see a lot of, perhaps, you can make use of the spark wrappers here ?
Like so:
private implicit val retryableExecutionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonSingleThreadExecutor("kubernetes-client-retryable-futures-%d"))
| "spark.kubernetes.driver.docker.image", s"spark-driver:$sparkVersion") | ||
| private val uploadedJars = sparkConf.getOption("spark.kubernetes.driver.uploads.jars") | ||
| private val kubernetesAppId = sparkConf | ||
| .get("spark.app.id", s"$appName-$launchTime").toLowerCase.replaceAll("\\.", "-") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't an equivalent change -- I think we wanted to add the launch time even to a user-specified spark.app.id -- @foxish ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's some inconsistency between this app id and the application ID expected by KubernetesClusterSchedulerBackend before this change. We could propagate the kubernetesAppId to the Spark driver conf and use that in KubernetesClusterSchedulerBackend instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we have something as similarly named as "app id" and "application ID" it should be the same thing everywhere. Rather than a kubernetes app id and a spark app id being really close in name but slightly different.
Does that mean we need to use the with-timestamp version everywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also think the semantics of the spark.app.id conf are such that it should be expected to be uniquely identifiable. It also doesn't appear to be set by users often; spark.app.id isn't documented on the Spark configuration page. It's perhaps reasonable to instead force what spark.app.id is and don't give the user a say in the matter.
| val driverKubernetesSelectors = (Map( | ||
| DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue, | ||
| SPARK_DRIVER_LABEL -> kubernetesAppId, | ||
| SPARK_APP_ID_LABEL -> kubernetesAppId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why have both of these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first label indicates that this is a spark driver as opposed to a Spark executor. Thus when trying to match labels for the driver you want to pick up only the pods with the driver label (and not the executor label, for example)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the other hand it's still useful to get labels to match everything (driver + executors) for a given app - which is what the second label is for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah got it, just wasn't expecting to see much more than purely refactor changes here. This makes sense.
| .stringConf | ||
| .createOptional | ||
|
|
||
| // Note that while we set a default for this in practice, it's |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe instead of "in practice", you mean "during submission" ? I was expecting a default here in the config object after reading the comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not "during submission" but rather at the time the scheduler is launched. See this.
Could adjust the comment to that effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes please
| .stringConf | ||
| .createOptional | ||
|
|
||
| // Note that while we set a default for this in practice, it's |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe instead of "in practice", you mean "during submission" ? I was expecting a default here in the config object after reading the comment.
| private[spark] val KUBERNETES_DRIVER_LABELS = | ||
| ConfigBuilder("spark.kubernetes.driver.labels") | ||
| .doc(""" | ||
| | Custom labels that will be added to the driver pod. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should mention that these custom labels are in addition to labels created through the submission process
| private[spark] val KUBERNETES_DRIVER_SUBMIT_TIMEOUT = | ||
| ConfigBuilder("spark.kubernetes.driverSubmitTimeout") | ||
| .doc(""" | ||
| | Time to wait for the driver pod to be initially ready |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe instead of "initially ready", use "ready to receive a job launch request" ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tricky - it's a timeout which waits for the entire driver launch process to complete. Not sure of the best way to word it. Maybe "Time to wait for the driver pod to start running"?
| .doc(""" | ||
| | KeyStore file for the driver submission server listening | ||
| | on SSL. Can be pre-mounted on the driver container | ||
| | or uploaded from the submitting client. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not really sure where these doc descriptions go -- should we mention container:// URLs here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think these are published by a script or something by any means - this is probably solely developer documentation. I think going without container:// is fine in that context.
| .doc(""" | ||
| | KeyStore file for the driver submission server listening | ||
| | on SSL. Can be pre-mounted on the driver container | ||
| | or uploaded from the submitting client. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not really sure where these doc descriptions go -- should we mention container:// URLs here?
| private[spark] val KUBERNETES_DRIVER_SERVICE_NAME = | ||
| ConfigBuilder("spark.kubernetes.driver.service.name") | ||
| .doc(""" | ||
| | Kubernetes service that is in front of the driver |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe instead "exposes the driver pod to outside the k8s cluster" ? The "expose" keyword seems more in line with the vocabulary the k8s project uses
| .endContainer() | ||
| .endSpec() | ||
| .done()) | ||
| val requiredEnv = Seq( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for functional approach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's probably some helper methods to pull out here too - creating a KubernetesUtils class seems potentially useful.
|
This LGTM. |
|
@ash211 @iyanuobidele addressed the comments, anything else to add? |
| .setDaemon(true) | ||
| .setNameFormat("kubernetes-executor-requests-%d") | ||
| .build)) | ||
| ThreadUtils.newDaemonSingleThreadExecutor("kubernetes-executor-requests-%d")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason why you changed the executor service from CachedThreadPool to a SingleThreadExecutor ?
IIRC, ThreadUtil has a wrapper for newDaemonCachedThreadPool as well. Perhaps, you want that instead ?
|
This looks good to merge once the checks complete. |
|
Merging with a +1 from @iyanuobidele and me, and green CI build. |
* Extract constants and config into separate file. Launch => Submit. * Address comments * A small shorthand * Refactor more ThreadUtils * Fix scalastyle, use cached thread pool * Tiny Scala style change
* Extract constants and config into separate file. Launch => Submit. * Address comments * A small shorthand * Refactor more ThreadUtils * Fix scalastyle, use cached thread pool * Tiny Scala style change
* Extract constants and config into separate file. Launch => Submit. * Address comments * A small shorthand * Refactor more ThreadUtils * Fix scalastyle, use cached thread pool * Tiny Scala style change
…pache-spark-on-k8s#65) * Extract constants and config into separate file. Launch => Submit. * Address comments * A small shorthand * Refactor more ThreadUtils * Fix scalastyle, use cached thread pool * Tiny Scala style change
…pache-spark-on-k8s#65) * Extract constants and config into separate file. Launch => Submit. * Address comments * A small shorthand * Refactor more ThreadUtils * Fix scalastyle, use cached thread pool * Tiny Scala style change
Closes #47