1717package org .apache .spark .deploy .kubernetes .submit .v2
1818
1919import java .io .File
20+ import java .net .URI
2021import java .util .Collections
2122
2223import io .fabric8 .kubernetes .api .model .{ContainerBuilder , EnvVarBuilder , HasMetadata , OwnerReferenceBuilder , PodBuilder }
2324import scala .collection .JavaConverters ._
2425import scala .collection .mutable
2526
26- import org .apache .spark .{SecurityManager => SparkSecurityManager , SparkConf , SparkException }
27+ import org .apache .spark .{SparkConf , SparkException }
2728import org .apache .spark .deploy .kubernetes .config ._
2829import org .apache .spark .deploy .kubernetes .constants ._
2930import org .apache .spark .internal .Logging
@@ -50,7 +51,8 @@ private[spark] class Client(
5051 appArgs : Array [String ],
5152 mainAppResource : String ,
5253 kubernetesClientProvider : SubmissionKubernetesClientProvider ,
53- mountedDependencyManagerProvider : MountedDependencyManagerProvider ) extends Logging {
54+ submittedDependencyManagerProvider : SubmittedDependencyManagerProvider ,
55+ remoteDependencyManagerProvider : DownloadRemoteDependencyManagerProvider ) extends Logging {
5456
5557 private val namespace = sparkConf.get(KUBERNETES_NAMESPACE )
5658 private val master = resolveK8sMaster(sparkConf.get(" spark.master" ))
@@ -132,27 +134,33 @@ private[spark] class Client(
132134 .endSpec()
133135
134136 val nonDriverPodKubernetesResources = mutable.Buffer [HasMetadata ]()
137+
138+ // resolvedJars is all of the original Spark jars, but files on the local disk that
139+ // were uploaded to the resource staging server are converted to the paths they would
140+ // have been downloaded at. Similarly for resolvedFiles.
141+ // If the resource staging server isn't being used, then resolvedJars = spark.jars.
135142 val resolvedJars = mutable.Buffer [String ]()
136143 val resolvedFiles = mutable.Buffer [String ]()
137144 val driverPodWithMountedDeps = maybeStagingServerUri.map { stagingServerUri =>
138- val mountedDependencyManager = mountedDependencyManagerProvider.getMountedDependencyManager(
139- kubernetesAppId,
140- stagingServerUri,
141- allLabels,
142- namespace,
143- sparkJars,
144- sparkFiles)
145- val jarsResourceIdentifier = mountedDependencyManager.uploadJars()
146- val filesResourceIdentifier = mountedDependencyManager.uploadFiles()
147- val initContainerKubernetesSecret = mountedDependencyManager.buildInitContainerSecret(
145+ val submittedDependencyManager = submittedDependencyManagerProvider
146+ .getSubmittedDependencyManager(
147+ kubernetesAppId,
148+ stagingServerUri,
149+ allLabels,
150+ namespace,
151+ sparkJars,
152+ sparkFiles)
153+ val jarsResourceIdentifier = submittedDependencyManager.uploadJars()
154+ val filesResourceIdentifier = submittedDependencyManager.uploadFiles()
155+ val initContainerKubernetesSecret = submittedDependencyManager.buildInitContainerSecret(
148156 jarsResourceIdentifier.resourceSecret, filesResourceIdentifier.resourceSecret)
149- val initContainerConfigMap = mountedDependencyManager .buildInitContainerConfigMap(
157+ val initContainerConfigMap = submittedDependencyManager .buildInitContainerConfigMap(
150158 jarsResourceIdentifier.resourceId, filesResourceIdentifier.resourceId)
151- resolvedJars ++= mountedDependencyManager .resolveSparkJars()
152- resolvedFiles ++= mountedDependencyManager .resolveSparkFiles()
159+ resolvedJars ++= submittedDependencyManager .resolveSparkJars()
160+ resolvedFiles ++= submittedDependencyManager .resolveSparkFiles()
153161 nonDriverPodKubernetesResources += initContainerKubernetesSecret
154162 nonDriverPodKubernetesResources += initContainerConfigMap
155- mountedDependencyManager .configurePodToMountLocalDependencies(
163+ submittedDependencyManager .configurePodToMountLocalDependencies(
156164 driverContainer.getName, initContainerKubernetesSecret, initContainerConfigMap, basePod)
157165 }.getOrElse {
158166 sparkJars.map(Utils .resolveURI).foreach { jar =>
@@ -186,19 +194,35 @@ private[spark] class Client(
186194 resolvedSparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN ).foreach { _ =>
187195 resolvedSparkConf.set(KUBERNETES_DRIVER_OAUTH_TOKEN .key, " <present_but_redacted>" )
188196 }
197+ val remoteDependencyManager = remoteDependencyManagerProvider
198+ .getDownloadRemoteDependencyManager(
199+ kubernetesAppId,
200+ resolvedJars,
201+ resolvedFiles)
202+ val downloadRemoteDependenciesConfigMap = remoteDependencyManager
203+ .buildInitContainerConfigMap()
204+ nonDriverPodKubernetesResources += downloadRemoteDependenciesConfigMap
205+ val driverPodWithMountedAndDownloadedDeps = remoteDependencyManager
206+ .configurePodToDownloadRemoteDependencies(
207+ downloadRemoteDependenciesConfigMap, driverContainer.getName, driverPodWithMountedDeps)
189208
190- val mountedClassPath = resolvedJars.map(Utils .resolveURI).filter { jarUri =>
191- val scheme = Option .apply(jarUri.getScheme).getOrElse(" file" )
192- scheme == " local" || scheme == " file"
193- }.map(_.getPath).mkString(File .pathSeparator)
209+ // The resolved local classpath should *only* contain local file URIs. It consists of the
210+ // driver's classpath (minus spark.driver.extraClassPath which was handled above) with the
211+ // assumption that the remote dependency manager has downloaded all of the remote
212+ // dependencies through its init-container, and thus replaces all the remote URIs with the
213+ // local paths they were downloaded to.
214+ val resolvedLocalClassPath = remoteDependencyManager.resolveLocalClasspath()
215+ resolvedLocalClassPath.foreach { classPathEntry =>
216+ require(Option (URI .create(classPathEntry).getScheme).isEmpty)
217+ }
194218 val resolvedDriverJavaOpts = resolvedSparkConf.getAll.map { case (confKey, confValue) =>
195219 s " -D $confKey= $confValue"
196220 }.mkString(" " ) + driverJavaOptions.map(" " + _).getOrElse(" " )
197- val resolvedDriverPod = driverPodWithMountedDeps .editSpec()
221+ val resolvedDriverPod = driverPodWithMountedAndDownloadedDeps .editSpec()
198222 .editMatchingContainer(new ContainerNameEqualityPredicate (driverContainer.getName))
199223 .addNewEnv()
200224 .withName(ENV_MOUNTED_CLASSPATH )
201- .withValue(mountedClassPath )
225+ .withValue(resolvedLocalClassPath.mkString( File .pathSeparator) )
202226 .endEnv()
203227 .addNewEnv()
204228 .withName(ENV_DRIVER_JAVA_OPTS )
0 commit comments