Skip to content

Commit 68fb5c4

Browse files
antonippdongjoon-hyun
authored andcommitted
[SPARK-40817][K8S][3.2] spark.files should preserve remote files
### What changes were proposed in this pull request? Backport apache#38376 to `branch-3.2` You can find a detailed description of the issue and an example reproduction on the Jira card: https://issues.apache.org/jira/browse/SPARK-40817 The idea for this fix is to update the logic which uploads user-specified files (via `spark.jars`, `spark.files`, etc) to `spark.kubernetes.file.upload.path`. After uploading local files, it used to overwrite the initial list of URIs passed by the user and it would thus erase all remote URIs which were specified there. Small example of this behaviour: 1. User set the value of `spark.jars` to `s3a://some-bucket/my-application.jar,/tmp/some-local-jar.jar` when running `spark-submit` in cluster mode 2. `BasicDriverFeatureStep.getAdditionalPodSystemProperties()` gets called at one point while running `spark-submit` 3. This function would set `spark.jars` to a new value of `${SPARK_KUBERNETES_UPLOAD_PATH}/spark-upload-${RANDOM_STRING}/some-local-jar.jar`. Note that `s3a://some-bucket/my-application.jar` has been discarded. With the logic proposed in this PR, the new value of `spark.jars` would be `s3a://some-bucket/my-application.jar,${SPARK_KUBERNETES_UPLOAD_PATH}/spark-upload-${RANDOM_STRING}/some-local-jar.jar`, so in other words we are making sure that remote URIs are no longer discarded. ### Why are the changes needed? We encountered this issue in production when trying to launch Spark on Kubernetes jobs in cluster mode with a fix of local and remote dependencies. ### Does this PR introduce _any_ user-facing change? Yes, see description of the new behaviour above. ### How was this patch tested? - Added a unit test for the new behaviour - Added an integration test for the new behaviour - Tried this patch in our Kubernetes environment with `SparkPi`: ``` spark-submit \ --master k8s://https://$KUBERNETES_API_SERVER_URL:443 \ --deploy-mode cluster \ --name=spark-submit-test \ --class org.apache.spark.examples.SparkPi \ --conf spark.jars=/opt/my-local-jar.jar,s3a://$BUCKET_NAME/my-remote-jar.jar \ --conf spark.kubernetes.file.upload.path=s3a://$BUCKET_NAME/my-upload-path/ \ [...] /opt/spark/examples/jars/spark-examples_2.12-3.1.3.jar ``` Before applying the patch, `s3a://$BUCKET_NAME/my-remote-jar.jar` was discarded from the final value of `spark.jars`. After applying the patch and launching the job again, I confirmed that `s3a://$BUCKET_NAME/my-remote-jar.jar` was no longer discarded by looking at the Spark config for the running job. Closes apache#39670 from antonipp/spark-40817-branch-3.2. Authored-by: Anton Ippolitov <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 60de4a5 commit 68fb5c4

File tree

3 files changed

+110
-10
lines changed

3 files changed

+110
-10
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -162,27 +162,27 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
162162
MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)
163163
// try upload local, resolvable files to a hadoop compatible file system
164164
Seq(JARS, FILES, ARCHIVES, SUBMIT_PYTHON_FILES).foreach { key =>
165-
val uris = conf.get(key).filter(uri => KubernetesUtils.isLocalAndResolvable(uri))
165+
val (localUris, remoteUris) =
166+
conf.get(key).partition(uri => KubernetesUtils.isLocalAndResolvable(uri))
166167
val value = {
167168
if (key == ARCHIVES) {
168-
uris.map(UriBuilder.fromUri(_).fragment(null).build()).map(_.toString)
169+
localUris.map(UriBuilder.fromUri(_).fragment(null).build()).map(_.toString)
169170
} else {
170-
uris
171+
localUris
171172
}
172173
}
173174
val resolved = KubernetesUtils.uploadAndTransformFileUris(value, Some(conf.sparkConf))
174175
if (resolved.nonEmpty) {
175176
val resolvedValue = if (key == ARCHIVES) {
176-
uris.zip(resolved).map { case (uri, r) =>
177+
localUris.zip(resolved).map { case (uri, r) =>
177178
UriBuilder.fromUri(r).fragment(new java.net.URI(uri).getFragment).build().toString
178179
}
179180
} else {
180181
resolved
181182
}
182-
additionalProps.put(key.key, resolvedValue.mkString(","))
183+
additionalProps.put(key.key, (resolvedValue ++ remoteUris).mkString(","))
183184
}
184185
}
185186
additionalProps.toMap
186187
}
187188
}
188-

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.k8s.features
1919
import scala.collection.JavaConverters._
2020

2121
import io.fabric8.kubernetes.api.model.{ContainerPort, ContainerPortBuilder, LocalObjectReferenceBuilder, Quantity}
22+
import org.apache.hadoop.fs.{LocalFileSystem, Path}
2223

2324
import org.apache.spark.{SparkConf, SparkFunSuite}
2425
import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod}
@@ -232,6 +233,33 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
232233
assert(portMap2(BLOCK_MANAGER_PORT_NAME) === 1235)
233234
}
234235

236+
test("SPARK-40817: Check that remote JARs do not get discarded in spark.jars") {
237+
val FILE_UPLOAD_PATH = "s3a://some-bucket/upload-path"
238+
val REMOTE_JAR_URI = "s3a://some-bucket/my-application.jar"
239+
val LOCAL_JAR_URI = "/tmp/some-local-jar.jar"
240+
241+
val sparkConf = new SparkConf()
242+
.set(CONTAINER_IMAGE, "spark-driver:latest")
243+
.set(JARS, Seq(REMOTE_JAR_URI, LOCAL_JAR_URI))
244+
.set(KUBERNETES_FILE_UPLOAD_PATH, FILE_UPLOAD_PATH)
245+
// Instead of using the real S3A Hadoop driver, use a fake local one
246+
.set("spark.hadoop.fs.s3a.impl", classOf[TestFileSystem].getCanonicalName)
247+
.set("spark.hadoop.fs.s3a.impl.disable.cache", "true")
248+
val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
249+
val featureStep = new BasicDriverFeatureStep(kubernetesConf)
250+
251+
val sparkJars = featureStep.getAdditionalPodSystemProperties()(JARS.key).split(",")
252+
253+
// Both the remote and the local JAR should be there
254+
assert(sparkJars.size == 2)
255+
// The remote JAR path should have been left untouched
256+
assert(sparkJars.contains(REMOTE_JAR_URI))
257+
// The local JAR should have been uploaded to spark.kubernetes.file.upload.path
258+
assert(!sparkJars.contains(LOCAL_JAR_URI))
259+
assert(sparkJars.exists(path =>
260+
path.startsWith(FILE_UPLOAD_PATH) && path.endsWith("some-local-jar.jar")))
261+
}
262+
235263
def containerPort(name: String, portNumber: Int): ContainerPort =
236264
new ContainerPortBuilder()
237265
.withName(name)
@@ -241,3 +269,16 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
241269

242270
private def amountAndFormat(quantity: Quantity): String = quantity.getAmount + quantity.getFormat
243271
}
272+
273+
/**
274+
* No-op Hadoop FileSystem
275+
*/
276+
private class TestFileSystem extends LocalFileSystem {
277+
override def copyFromLocalFile(
278+
delSrc: Boolean,
279+
overwrite: Boolean,
280+
src: Path,
281+
dst: Path): Unit = {}
282+
283+
override def mkdirs(path: Path): Boolean = true
284+
}

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.scalatest.time.{Minutes, Span}
3232

3333
import org.apache.spark.SparkException
3434
import org.apache.spark.deploy.k8s.integrationtest.DepsTestsSuite.{DEPS_TIMEOUT, FILE_CONTENTS, HOST_PATH}
35-
import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{INTERVAL, MinikubeTag, TIMEOUT}
35+
import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{INTERVAL, MinikubeTag, SPARK_PI_MAIN_CLASS, TIMEOUT}
3636
import org.apache.spark.deploy.k8s.integrationtest.Utils.getExamplesJarName
3737
import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.Minikube
3838
import org.apache.spark.internal.config.{ARCHIVES, PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}
@@ -167,6 +167,42 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
167167
})
168168
}
169169

170+
test(
171+
"SPARK-40817: Check that remote files do not get discarded in spark.files",
172+
k8sTestTag,
173+
MinikubeTag) {
174+
tryDepsTest({
175+
// Create a local file
176+
val localFileName = Utils.createTempFile(FILE_CONTENTS, HOST_PATH)
177+
178+
// Create a remote file on S3
179+
val remoteFileName = "some-remote-file.txt"
180+
val remoteFileKey = s"some-path/${remoteFileName}"
181+
createS3Object(remoteFileKey, "Some Content")
182+
val remoteFileFullPath = s"s3a://${BUCKET}/${remoteFileKey}"
183+
184+
// Put both file paths in spark.files
185+
sparkAppConf.set("spark.files", s"$HOST_PATH/$localFileName,${remoteFileFullPath}")
186+
// Allows to properly read executor logs once the job is finished
187+
sparkAppConf.set("spark.kubernetes.executor.deleteOnTermination", "false")
188+
189+
// Run SparkPi and make sure that both files have been properly downloaded on running pods
190+
val examplesJar = Utils.getTestFileAbsolutePath(getExamplesJarName(), sparkHomeDir)
191+
runSparkApplicationAndVerifyCompletion(
192+
appResource = examplesJar,
193+
mainClass = SPARK_PI_MAIN_CLASS,
194+
appArgs = Array(),
195+
expectedDriverLogOnCompletion = Seq("Pi is roughly 3"),
196+
// We can check whether the Executor pod has successfully
197+
// downloaded both the local and the remote file
198+
expectedExecutorLogOnCompletion = Seq(localFileName, remoteFileName),
199+
driverPodChecker = doBasicDriverPodCheck,
200+
executorPodChecker = doBasicExecutorPodCheck,
201+
isJVM = true
202+
)
203+
})
204+
}
205+
170206
test("SPARK-33615: Launcher client archives", k8sTestTag, MinikubeTag) {
171207
tryDepsTest {
172208
val fileName = Utils.createTempFile(FILE_CONTENTS, HOST_PATH)
@@ -259,12 +295,20 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
259295
}
260296
}
261297

298+
private def getS3Client(
299+
endPoint: String,
300+
accessKey: String = ACCESS_KEY,
301+
secretKey: String = SECRET_KEY): AmazonS3Client = {
302+
val credentials = new BasicAWSCredentials(accessKey, secretKey)
303+
val s3client = new AmazonS3Client(credentials)
304+
s3client.setEndpoint(endPoint)
305+
s3client
306+
}
307+
262308
private def createS3Bucket(accessKey: String, secretKey: String, endPoint: String): Unit = {
263309
Eventually.eventually(TIMEOUT, INTERVAL) {
264310
try {
265-
val credentials = new BasicAWSCredentials(accessKey, secretKey)
266-
val s3client = new AmazonS3Client(credentials)
267-
s3client.setEndpoint(endPoint)
311+
val s3client = getS3Client(endPoint, accessKey, secretKey)
268312
s3client.createBucket(BUCKET)
269313
} catch {
270314
case e: Exception =>
@@ -273,6 +317,21 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
273317
}
274318
}
275319

320+
private def createS3Object(
321+
objectKey: String,
322+
objectContent: String,
323+
endPoint: String = getServiceUrl(svcName)): Unit = {
324+
Eventually.eventually(TIMEOUT, INTERVAL) {
325+
try {
326+
val s3client = getS3Client(endPoint)
327+
s3client.putObject(BUCKET, objectKey, objectContent)
328+
} catch {
329+
case e: Exception =>
330+
throw new SparkException(s"Failed to create object $BUCKET/$objectKey.", e)
331+
}
332+
}
333+
}
334+
276335
private def getServiceUrl(serviceName: String): String = {
277336
val fuzzyUrlMatcher = """^(.*?)([a-zA-Z]+://.*?)(\s*)$""".r
278337
Eventually.eventually(TIMEOUT, INTERVAL) {

0 commit comments

Comments
 (0)