Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Conversation

@mccheah
Copy link

@mccheah mccheah commented May 25, 2017

Closes #237.

Allows a resource staging server to monitor the API server for objects that have the given labels. When objects with the given labels are not found on the API server, the resource files are deleted. Additionally, to cover for cases where the API object is not actually created, such as if the submission client fails for any reason before creating the driver pod, the resource staging server checks if the resource is ever accessed after it is first created, for some period of time. If the resource bundle is not downloaded after a given period of time, it is deleted.

@mccheah
Copy link
Author

mccheah commented May 25, 2017

@foxish @aash @erikerlandson this is still a work in progress and tests still need to be written. The testing here will be a bit tricky.

But I would appreciate getting some perspectives on this approach early. The approach here might leave some corner cases open and i'm open to suggestions especially for StagedResourcesExpirationManagerImpl.

try {
if (kubernetesClient
.pods()
.withLabels(monitoredResource.podLabels.asJava)
Copy link
Author

@mccheah mccheah May 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SubmittedDependencyUploader is specifically sending the labels of the driver to monitor here. I think that's ok, but it might technically be more correct to send labels that match both the driver and the executors, since all of these pods will pull these resources. That being said, executors should only be restarting if there's a driver that is running or restarting.

override def eventReceived(action: Action, resource: Pod): Unit = {
action match {
case Action.ADDED | Action.MODIFIED =>
expirationTask.cancel()
Copy link
Author

@mccheah mccheah May 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative way to do this would be to not watch the pods at all but rather to wait for something to attempt to download the resource itself.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually like this a little more. It should minimize the number of connections we need to keep open to Kubernetes. @aash @foxish @erikerlandson for thoughts but I'm going to go ahead and make this change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. that is interesting. I think fewer components that watch and need to talk to the APIServer is definitely a shift in the right direction. +1!

.createWithDefaultString("1s")

// Spark resource staging server.
private[spark] val RESOURCE_STAGING_SERVER_API_SERVER_URL =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of moving the staging server config out to a separate constants file?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want both client and server config in this separate file?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can open an issue to track the refactor effort and do it in a later PR.

.doc("Time for which resources should remain on the resource staging server before they" +
" cleaned up. Note that this is the time period after the resource staging server first" +
" detects that no pods are present that are using their mounted resources.")
.timeConf(TimeUnit.MILLISECONDS)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no value in specifying a very small TTL in milliseconds I imagine. Making it a minimum granularity of minutes might be a better idea perhaps?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Milliseconds here only serves to make it easier for the code itself to use this value, particularly when comparing against time values in miliseconds. The configuration itself is still specified as a string value, e.g. 1m.

override def eventReceived(action: Action, resource: Pod): Unit = {
action match {
case Action.ADDED | Action.MODIFIED =>
expirationTask.cancel()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. that is interesting. I think fewer components that watch and need to talk to the APIServer is definitely a shift in the right direction. +1!

@foxish
Copy link
Member

foxish commented May 25, 2017

The general approach seems reasonable to me; I like not watching the APIServer and simply using the file access as a sign of "use" with the TTL. Setting the TTL to a very low value on the other hand is a bit worrisome in that case. Defaulting to 5m for example, seems low in a longer running sparkjob, where executors may later spawn and try to fetch files.

Cancellation is no longer instantaneous and we might clean up a little
later than the given TTL. However, the tradeoff is a simpler
implementation with clearer contracts about when things will and will
not be cleaned up.
@mccheah
Copy link
Author

mccheah commented May 25, 2017

I turned the direction considerably after some careful thought. It's actually hard to reason about the concurrent access patterns when we're trying to react instantly to pods recovering and requesting the resources again. Instead, the cleanup thread now just moves resources from active to expired, and then checks resources in the expired state to see if they should be cleaned up.

It's a less granular approach but it's easier to reason about. There's a convoluted case here where, supposing we had a cleanup interval of I:

  • Resource is marked for expiration at time T
  • At time T + I/3 the pod comes back
  • At time T + 2I/3 the pod disappears

In such a case the expiration should technically be reset, that is, pushed back. But I think this is acceptable behavior regardless.

* implementation methods directly as opposed to over HTTP, as well as check the
* data written to the underlying disk.
*/
class ResourceStagingServiceImplSuite extends SparkFunSuite {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO - this will be moved to StagedResourcesStoreSuite.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

ConfigBuilder("spark.kubernetes.resourceStagingServer.apiServer.url")
.doc("URL for the Kubernetes API server. The resource staging server monitors the API" +
" server to check when pods no longer are using mounted resources. Note that this isn't" +
" to be used in Spark applications, as the API server URL should be set via spark.master.")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there ever times when the resource staging server might not have access to the apiserver? This is a new dependency and I'm not sure that event-based subscription should be a requirement.

Maybe the system could be that there's a resource bundle ttl that's reset on every access and deletes the resource when it runs out, as well as an optional watch on an owning k8s resource (not necessarily just a pod) so RSS can delete its file resources when the k8s resource is deleted/stopped. Key point being that the watch is optional

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be difficult to specify a concise API for a request to "watch this resource", I think. Do we provide an API specifying a resource name, resource type, and labels?

The problem with having only reset-on-access is it leaves out the possibility that entities like pods might only re-run after a long period of time, certainly longer than the expiration window for the resource. This is why I think the watch-based behavior is more or less mandatory to get the most consistent behavior.

" cleaned up. Note that this is the time period after the resource staging server first" +
" detects that no pods are present that are using their mounted resources.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30m")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

30m seems way too small. For applications that sit idle overnight while their users are inactive this would delete their resources far too quickly. Even a team lunch break could be enough time to lose the resources.

I'd suggest 24hr as a default.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only purges the resources when it sees that there are no pods left in the cluster with the matching label, and only after that, it waits for the 30 minutes, then it cleans up those resources.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this should reflect the amount of time we think it would take for pods to restart upon failure and for pods to want to re-retrieve those resources after crashing, as opposed to the amount of time the resources should remain available in general.

ConfigBuilder("spark.kubernetes.resourceStagingServer.resourceCleanupInterval")
.doc("Time between inspections for resources that can be cleaned up.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5m")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure this needs to be configurable -- I'd imagine this is doing a pretty tiny amount of work on every interval? Make it something like fixed 30sec?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. I think there is no need for this additional option for how often we check for expiration.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mccheah should we remove this option?

resourceId: String,
podNamespace: String,
podLabels: Map[String, String],
kubernetesCredentials: PodMonitoringCredentials)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when do you expect each resource to need its own creds for monitoring? vs a service account that can monitor everything?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal here was to make it such that this doesn't have to use a service account and just uses the credentials that are provided by the users. But I'm not sure if this is the correct approach as it complicates the API.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach would reduce the maintenance burden for cluster administrators though. This way the administrator doesn't have to be concerned about the specific service account's permissions for this entity, and re-uses the policies for their Spark users which already need read-access for pods in their namespace in any scenario.

resourceCleanupIntervalMs: Long)
extends StagedResourcesExpirationManager {

private val activeResources = TrieMap.empty[String, MonitoredResource]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this a TrieMap vs another kind of Map?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's Scala's best ConcurrentMap.

val podsWithLabels = kubernetesClient
.pods()
.withLabels(resource.podLabels.asJava)
.list()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be a watch instead of polling apiserver for all these things every interval?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making this a watch is tricky because of the concurrency guarantees we have to satisfy here. Take a few scenarios, for example:

Suppose we had a policy that waits for pods to be deleted before starting to count down the expiration. Now suppose that before the watch is finished being established, the resources are deleted from the cluster. Then the watch never receives the delete event and thus never knows that resources should be cleaned up. In addition, watches don't give us the ability to know how many instances of a given resource are left, only that we have deleted a resource matching the label.

We could inverse this by saying that the watch will immediately begin counting down until expiration, and only when the watch receives an event like ADDED or MODIFIED then the expiration is halted. But in this case we have the inverse problem: if the pod is created before the watch is established, then we'll expire the resource prematurely.

One way we could make this all consistent is to enforce at the resource staging server that when resources are first sent to the cluster for pods with a given label set, that no pods with those labels are allowed to exist at the time that the resources are uploaded. Then we would open the watch immediately and begin ticking down expiration, canceling expiration if resources are added, and re-added expiration when resources are deleted. However again it's concerning that watches don't give us the ability to know how many of a given resource are remaining.

One other concern about watches is that they keep open connections to Kubernetes. We don't want to keep too many of these connections open at once.


private class CleanupRunnable extends Runnable with Logging {

private val expiringResources = mutable.Map.empty[String, ExpiringResource]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this shared between runs? to prevent double-deletes? instead can we synchronize run, or otherwise if a run of the run method takes longer than the interval, skip the next run?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run() is guaranteed to be run only on one thread, or in other words, we can't have overlapping cleanups happening at once.

if (podsWithLabels.getItems.isEmpty) {
activeResources.remove(resourceId)
val expiresAt = clock.getTimeMillis() + expiredResourceTtlMs
expiringResources(resourceId) = ExpiringResource(expiresAt, resource)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the owning pod isn't alive anymore, why wait out an expiration time instead of deleting the resource immediately? Did we decide that resources could be shared between Spark apps?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because if pods can be restarted upon failure they will want to pull the resources again, which might result in the pods being removed from the API server and being added back again. However, @foxish may correct me if this is incorrect logic - the API server may only delete pods when they are truly gone forever.

@ash211
Copy link

ash211 commented May 26, 2017

Quick notes from our in-person convo just now on the goal state:

  • want these "owner references" to be things other than just pods

  • for now, use an enum with one POD value

  • resources staging server ("RSS") resources are tied to the lifetime of the k8s resource (i.e. no reuse of RSS resources between jobs -- those things are better distributed baked into the image)

  • this implies we will need non-pod owner references in the future since in a replica set on driver one pod can be destroyed and another created to maintain the target replica count. So the owner reference in that situation would need to be on the replica set not one of its pods

  • this also implies that when the owner reference dies, immediately delete the RSS resource

  • initial access timeout: if the RSS resource isn't accessed within N minutes (default 30?) then delete the resource. This doesn't use the "owner reference" because there's a potential race between owner reference creation and when RSS does its first existence check

  • general: log creation/deletion of resources and for deletion the reason

  • open question: should the creds that RSS uses to poll apiserver for owner reference existence be on a per-RSS resource basis or one service account that does all the watching?

  • future followup: use TPRs with owner references where each TPR is a staged resource and let k8s do the resource deletion via owner reference graph

mccheah added 2 commits May 26, 2017 18:24
- Delete resources immediately when owners do not exist
- Delete resources if after they are first uploaded, they are not
accessed for a certain period of time.
- Resource owners are more specifically defined and can have a type
(currently only uses pods)
…park-on-k8s/spark into resource-staging-server-cleanup
@mccheah
Copy link
Author

mccheah commented May 30, 2017

@ash211 @foxish the most recent change reflects the discussion mentioned above. Please take a look.

.withName(resource.stagedResourceOwner.ownerNamespace)
.get()
if (namespace == null) {
logInfo(s"Resource with id $resourceId is being removed. The owner's namespace" +
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is being removed -> No longer tracking resource with id $resourceId

At first read I thought it was being deleted out of k8s, rather than forgotten in RSS

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The resource ID is the resource bundle ID and not the Kubernetes resource. In that sense it is being deleted though.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the log wording to indicate we're removing the resource files and not the Kubernetes objects.

.list()
.getItems
.isEmpty) {
logInfo(s"Resource with id $resourceId is being removed. Owners of the resource" +
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer tracking like above

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the log wording to indicate we're removing the files and not Kubernetes objects.

Copy link

@ash211 ash211 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM -- any thoughts @foxish ?

.newDaemonSingleThreadScheduledExecutor("resource-expiration"),
clock = new SystemClock(),
initialAccessExpirationMs = initialAccessExpirationMs,
resourceCleanupIntervalMs = resourceCleanupIntervalMs)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need the names for each of the variables here? adds a lot of visual clutter

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mainly didn't want to mix up the expiration time and the interval time here, since they're both the same type and the argument list is quite long.

clientCertDataBase64 = clientCertBase64,
oauthTokenBase64 = oauthTokenBase64)
val stagedResourcesOwner = StagedResourcesOwner(
ownerNamespace = podNamespace,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand the design here. Why does each owner's namespace and credentials for monitoring need to be provided? Can't we assume that the credentials mounted into the submission server has RBAC turned on and the ability to read resources in all namespaces?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems odd to me that we would store multiple sets of privileged credentials in this one place for the purpose of watching resources.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a tradeoff. In this model, using the resource staging server doesn't require a cluster administrator to provision any extra users to be able to read the API objects of interest, in exchange for an API that's a bit more complex requiring the user to provide their credentials to handle the cleanup.

Copy link
Member

@foxish foxish Jun 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will also be issues when we use expiring credentials like oauth tokens; it seems like in general, storing cert/token data long-term in a single component is going to be dangerous IMO. I think we ought to encourage creating a role/cluster-role, and a service account allowing read access to the pods in the cluster that need to be tracked for this component. That's more idiomatic and works more broadly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok this makes sense, I'll make the appropriate changes.

mccheah added 2 commits June 1, 2017 17:01
Also refactors construction of Kubernetes Clients to unify the code
paths.
…park-on-k8s/spark into resource-staging-server-cleanup
@mccheah
Copy link
Author

mccheah commented Jun 2, 2017

@foxish @aash the latest change makes the resource staging server use a single set of credentials to monitor the API server. I also consolidated all of the places where we create Kubernetes clients into a single factory, which extracts values from a SparkConf based on any prefix with a unified set of suffixes. It's very similar in spirit to how Spark's SecurityManager builds SSLOptions objects with the SSL "namespace" construct. Please take a look.

@mccheah
Copy link
Author

mccheah commented Jun 2, 2017

rerun unit tests please

@mccheah
Copy link
Author

mccheah commented Jun 2, 2017

Looking into the integration test failure now.

@mccheah
Copy link
Author

mccheah commented Jun 2, 2017

rerun integration tests please

Whether or not to use a service account token and a service account CA certificate when the resource staging server
authenticates to Kubernetes. If this is set, interactions with Kubernetes will authenticate using a token located at
<code>/var/run/secrets/kubernetes.io/serviceaccount/token</code> and the CA certificate located at
<code>/var/run/secrets/kubernetes.io/serviceaccount/ca.crt</code>. Note that if
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two options:

spark.kubernetes.authenticate.resourceStagingServer.oauthTokenFile
and
spark.kubernetes.authenticate.resourceStagingServer.caCertFile are already mentioned above in the table.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but here it's worth mentioning that those configurations take precedence over the service account credentials.

.createOptional
private[spark] val APISERVER_AUTH_SUBMISSION_CONF_PREFIX =
"spark.kubernetes.authenticate.submission"
private[spark] val APISERVER_AUTH_DRIVER_CONF_PREFIX =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are all of these prefixes still used?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they are now embedded into the code itself. See DriverPodKubernetesCredentialsMounterImpl for example.

// Namespace doesn't matter because we list resources from various namespaces
val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
apiServerUri,
"default",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to create a non-namespaced client for use here. I made a change in #319 to facilitate that.

@foxish
Copy link
Member

foxish commented Jun 2, 2017

Basically LGTM; left minor comments, and the unit test failure needs fixing.
Thanks @mccheah for incorporating the changes.

@mccheah
Copy link
Author

mccheah commented Jun 2, 2017

@foxish addressed comments and also fixed the merge conflict. The merge conflict resolution required some changes to the external shuffle service, so please take a look.

new DefaultKubernetesClient(httpClientWithCustomDispatcher, withOptionalConfigurations)
}

private class OptionConfigurableConfigBuilder(configBuilder: ConfigBuilder) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be done using an implicit class, so that it could operate directly on ConfigBuilder?

implicit class EnhanceForWithOption(cb: ConfigBuilder) {
    def withOption[T](o: Option[T])(c: (T, ConfigBuilder) => ConfigBuilder): ConfigBuilder =
        o.map { opt => c(opt, cb) }.getOrElse(cb)
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is one of the cases where implicit gives us nicer readability, so +1.

<td>(none)</td>
<td>
File containing the OAuth token to use when authenticating against the against the Kubernetes API server from the
resource staging server, when it monitors objects in determining when to clean up resource bundles. The resource
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can these two config values not be set at the same time? should mention that in the docs

  • spark.kubernetes.authenticate.resourceStagingServer.oauthToken
  • spark.kubernetes.authenticate.resourceStagingServer.oauthTokenFile

over the usage of the service account token file. Also, if
<code>spark.kubernetes.authenticate.resourceStagingServer.caCertFile</code> is set, it takes precedence over using
the service account's CA certificate file. This generally should be set to true (the default value) when the
resource staging server is deployed as a Kubernetes pod, but should be set to false if the resource staging server
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"on bare metal" -> "outside kubernetes"

newBlockHandler.start()
// TODO: figure out a better way of doing this.
// This is necessary because the constructor is not called
// when this class is initialized through ExternalShuffleService.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does the ExternalShuffleService instantiate the KubernetesExternalShuffleService without calling its constructor?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@foxish for this, I was also confused here. I don't see this being created reflectively in the surrounding code.

Copy link
Member

@foxish foxish Jun 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the comment isn't clear there. The issue I think, was that the ExternalShuffleService is extended by the KubernetesExternalShuffleService, but the ExternalShuffleService calls an overridden method within its own constructor:

  private val blockHandler = newShuffleBlockHandler(transportConf)

So, at that point, the constructor for KubernetesExternalShuffleService has not yet been called.

import org.apache.spark.util.ThreadUtils

/**
* Spark-opinionated builder for Kubernetes clients.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mention that this reads from a prefix in a similar manner as Spark's SSL config options which are grouped with a prefix

.doc("Use a service account token and CA certificate in the resource staging server to" +
" watch the API server's objects.")
.booleanConf
.createWithDefault(true)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens when this is false? no watching on resource destruction to clean up resources?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is false then the other credentials values need to be set. If none of them are set, then Kubernetes access will not have any authentication - likely resulting in a Kubernetes client error in many cases, but probably still a valid configuration in rare cases nonetheless.

ConfigBuilder("spark.kubernetes.resourceStagingServer.resourceCleanupInterval")
.doc("Time between inspections for resources that can be cleaned up.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5m")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mccheah should we remove this option?

} catch {
case e: Throwable =>
newBlockHandler.close()
throw e
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where would this get caught, and does that place log this exception? definitely would want to know the exception details for debugging

Copy link

@ash211 ash211 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! I'm ready to merge.

Any last comments @erikerlandson @foxish @kimoonkim ?

@foxish
Copy link
Member

foxish commented Jun 3, 2017

Nothing more from my end. LGTM

@erikerlandson
Copy link
Member

LGTM

@foxish foxish merged commit e37b0cf into branch-2.1-kubernetes Jun 3, 2017
@foxish foxish deleted the resource-staging-server-cleanup branch June 3, 2017 03:25
foxish pushed a commit that referenced this pull request Jul 24, 2017
* Clean up resources that are not used by pods.

* Make client side send correct credentials.

* Simplify cleanup logic.

Cancellation is no longer instantaneous and we might clean up a little
later than the given TTL. However, the tradeoff is a simpler
implementation with clearer contracts about when things will and will
not be cleaned up.

* Remove class

* Fix imports and line length.

* Remove import.

* Add a unit test for StagingResourcesStore.

* Revamp cleanup process.

- Delete resources immediately when owners do not exist
- Delete resources if after they are first uploaded, they are not
accessed for a certain period of time.
- Resource owners are more specifically defined and can have a type
(currently only uses pods)

* Clarify log messages

* Use a single set of credentials in resource staging server.

Also refactors construction of Kubernetes Clients to unify the code
paths.

* Fix unit test.

* Safe close if creating shuffle block handler fails

* Use implicit class.

* Address comments.

* Fix broken test.
puneetloya pushed a commit to puneetloya/spark that referenced this pull request Mar 11, 2019
* Clean up resources that are not used by pods.

* Make client side send correct credentials.

* Simplify cleanup logic.

Cancellation is no longer instantaneous and we might clean up a little
later than the given TTL. However, the tradeoff is a simpler
implementation with clearer contracts about when things will and will
not be cleaned up.

* Remove class

* Fix imports and line length.

* Remove import.

* Add a unit test for StagingResourcesStore.

* Revamp cleanup process.

- Delete resources immediately when owners do not exist
- Delete resources if after they are first uploaded, they are not
accessed for a certain period of time.
- Resource owners are more specifically defined and can have a type
(currently only uses pods)

* Clarify log messages

* Use a single set of credentials in resource staging server.

Also refactors construction of Kubernetes Clients to unify the code
paths.

* Fix unit test.

* Safe close if creating shuffle block handler fails

* Use implicit class.

* Address comments.

* Fix broken test.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants