-
Notifications
You must be signed in to change notification settings - Fork 117
TPR Support #284
TPR Support #284
Conversation
|
@foxish putting this out there, still hacking through it. Feel free to jump in to suggest immediate changes |
fbf8d94 to
8450333
Compare
mccheah
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seeing that this is a work in progress, I took a preliminary scan over the changes so far and made some early suggestions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We likely want this class and the class below to be in separate files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we be more specific with this variable name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We usually use
try {
...
} catch {
case e: SparkException =>
}
I don't think the Spark codebase uses Scala's Try(...) match idiom. However I actually think the Try(...) match idiom makes sense here, so if there's other examples of this being done in the code then it's also appropriate to do so here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likely should be marked private[spark].
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indentation seems off here. Look at v2/Client.scala to see how we handle long argument lists.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove whitespace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class seems particularly important so some Javadoc on it would be good. I also would lean away from using an abstract class here. If we need the fields specifically here across multiple implementations, consider making this a concrete class that packages up the common operations and fields, and to have different implementations contain an instance of this concrete class. Or in other words, let's try to have a design that favors composition over inheritance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for looking at this, i'll address your comments
conf/kubernetes-resource.yaml
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark-job.apache.org
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
apache.org
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to put URLs here of the driver and executor UIs as accessible via the APIServer proxy. I'd suggest relative URLs of the form /api/v1/..../spark-driver:port , because then that way, we can get to them from the kubectl dashboard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good.
1a5231e to
b519c3e
Compare
|
Removed the watch (because I think we can leave it out for now, considering that TPRs themselves are considered alpha) TODO:
|
|
rerun unit tests please |
|
I'll add the doc changes and some small to changes to make the update portion take 1 or multiple requests at once. |
6766ca2 to
17ee380
Compare
|
rerun unit tests please |
17ee380 to
a3bbf6a
Compare
|
@varunkatta @kimoonkim Jenkins fell over again I think |
|
rerun all tests please |
093f5f5 to
97ca241
Compare
|
rerun unit tests please |
ad1daf7 to
91ffdd1
Compare
|
rerun unit tests please |
|
@mccheah please could you help take a quick look at this unit test failure ? I'm finding it hard to understand why the "run with dependency uploader" test passes and the "Run without dependency uploader" fails on the verify step. |
|
@iyanuobidele rebased and fixed conflicts with the other PR that just merged. No other changes. |
docs/running-on-kubernetes.md
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prerequisites
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably want to use a specific formatter here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Calendar is a really old class that's not good practice to use anymore. Can we use joda classes or the Java 8 time classes instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move sparkConf.set(...) down one line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of using this, look at what is done here: https://github.com/apache-spark-on-k8s/spark/pull/305/files#diff-801d4c840d0e60f5521c12f9389598b9R30
Essentially we can create a subclass of TypeReference for the enumeration's type, and then use @JsonScalaEnumeration wherever we are embedding an instance of the enumeration as a field of another class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inject the trait instead of the impl.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the naming here between the object and the class doesn't match. I like the name SparkJobResourceController better than TPRCrudCalls. Maybe you can rename the class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the fact that this variable is not localized to this method mean that when we watch multiple objects we can lose the watch source? What about if two callers concurrently invoke this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The resolved value of this should be in the Kubernetes client object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The controller class itself should probably be logging its success or failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, and that might eliminate this method entirely by moving everything into the updateJobObject call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Name the Scala file after the trait and not the impl.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on some of the usages of this, it would be nice to have a "batch update" method that can update multiple fields at once. In doing so, we can encode updates with a POJO that's more expressive than just String fields:
case class SparkJobPatch(field1: Option[String] = None, field2: Option[String] = None).
Since the fields are all optional with default values, we can be as selective as we want in what items to update. We can accomplish something similar with an updateJobObject method signature that takes in all of the possible updatable fields with default values, but I like encapsulating this in a single variadic argument type, so to speak.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use a more specific type than a map? If we always want the same fields, use a case class with the specific fields enumerated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we define the status as a case class as opposed to an arbitrary Map?
ash211
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good work @iyanuobidele ! Did a thorough review and left a bunch of comments.
One of the bigger questions is around the purpose of TPR. Right now it's mainly doing status updating (timestamps, executor counts, Spark UI URL). Do you imagine growing this to have more reporting here as well?
As written it seems like a "status updater" that happens to report to a TPR, but could go to other places as well (see e.g. the existing Spark event log). Possibly even TPR is one of many implementations of this.
What this ties into also is that it's very similar to the existing Spark events infrastructure, which is how Spark reports status through the driver back to a user application, and also how it writes logs to the Spark event log for post-application diagnosis.
If you registered a SparkListenerInterface could you listen to all the events (and lay the groundwork for future task/stage/metrics updating) instead of injecting method reporting calls throughout the main logic?
docs/running-on-kubernetes.md
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put the version requirement information here. starting in what version of k8s will the provided yaml file work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Calendar is a really old class that's not good practice to use anymore. Can we use joda classes or the Java 8 time classes instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the 10min lag you refer to here? is this something intrinsic about TPRs? can you link to something in comments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's my bad. It's 10s. Here's a comment on an issue opened on TPRs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make these two config items entries in config.scala with the other kubernetes-relevant config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also don't think the current naming reflects what this does. Right now it describes the state it expects (the job resource has been set on the cluster) rather than what actions the code takes when the flag is set (create a TPR and report status to it as the job progresses). Maybe name something like spark.kubernetes.statusReporting.enabled=true and spark.kubernetes.statusReporting.resourceName=<asdf> ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if the cluster admin killed the pods via kubectl -- what would that show up as?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this assume the user is accessing the SparkUI via a kubectl proxy on localhost:8001? Can we support ingress-based exposure of the Spark UI too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the naming here between the object and the class doesn't match. I like the name SparkJobResourceController better than TPRCrudCalls. Maybe you can rename the class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for updateJobObject it seems like there's a caller that expects this to throw a SparkException in a certain way.
Can you document that API on this method in the trait? Same for other methods if they're expected to throw specific exceptions in specific situations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, and that might eliminate this method entirely by moving everything into the updateJobObject call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this } looks odd to my eyes -- should it be moved to the previous line?
0fe9b15 to
f762f85
Compare
|
|
||
| ### Future work | ||
|
|
||
| Kube administrators or users would be able to stop a spark app running in their cluster by simply |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kubernetes cluster administrators or users should be able to stop...
| response, | ||
| Option(Seq(tprObjectName, response.message(), request.toString))) | ||
|
|
||
| response.body().close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put close() in finally blocks
| val msg = | ||
| s"Failed to delete resource. ${x.getMessage}." | ||
| logError(msg) | ||
| response.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
close() should be in a finally block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually since ResponseBody implements closeable, look into how we can use Utils.tryWithResource(...) {...}.
| .build() | ||
|
|
||
| logDebug(s"Get Request: $request") | ||
| var response: Response = null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use
val response = try {
httpCLient.newCall(request).execute()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just found out, as I've mentioned above, that ResponseBody implements Closeable, so we can do this:
Utils.tryWithResource(httpClient.newCall(request).execute()) { responseBody =>
// operation with responseBody
}
| val msg = | ||
| s"Failed to get resource $name. ${x.getMessage}." | ||
| logError(msg) | ||
| response.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you close here and the call to create failed, then you'll get a NullPointerException. If you use the paradigm as I've commented a few lines above however then this problem can be avoided.
|
|
||
| logDebug(s"Update Request: $request") | ||
| var response: Response = null | ||
| try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val response = try {...
Repeat this for all similar blocks.
| def extractHttpClientFromK8sClient(client: BaseClient): OkHttpClient = { | ||
| val field = classOf[BaseClient].getDeclaredField("httpClient") | ||
| try { | ||
| field.setAccessible(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm looking at BaseClient and it has a getHttpClient method. Is it therefore necessary to use reflection here?
https://github.com/fabric8io/kubernetes-client/blob/master/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/BaseClient.java#L113
| additionalInfo: Option[Seq[String]] = None): Unit = { | ||
|
|
||
| if (!response.isSuccessful) { | ||
| response.body().close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't close in methods that didn't open the response body.
|
@iyanuobidele is this PR still active? I'm unsure the current state of TPRs (since renamed?) so don't know how much we can reuse from this PR with the new APIs in upcoming kubernetes releases. |
|
Closing for inactivity -- please feel free to reopen when conflicts are merged and this is ready for more review! |
…apache [NOSQUASH] Resync from Apache
Following from the discussions from the SIG to keep the TPR manipulation implementations in this project until we make upstream changes to the k8s client.
This supports the crud+watch on the SparkJobResource using the suggested schema