Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1542,8 +1542,8 @@ class SparkContext(config: SparkConf) extends Logging {
val schemeCorrectedURI = uri.getScheme match {
case null => new File(path).getCanonicalFile.toURI
case "local" =>
logWarning("File with 'local' scheme is not supported to add to file server, since " +
"it is already available on every node.")
logWarning(s"File with 'local' scheme $path is not supported to add to file server, " +
s"since it is already available on every node.")
return
case _ => uri
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ private[spark] class SparkSubmit extends Logging {
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
}.orNull
args.files = renameResourcesToLocalFS(args.files, localFiles)
args.pyFiles = renameResourcesToLocalFS(args.pyFiles, localPyFiles)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,19 @@ private[spark] object KubernetesUtils extends Logging {
isLocalDependency(Utils.resolveURI(resource))
}

def renameMainAppResource(resource: String, conf: SparkConf): String = {
def renameMainAppResource(
resource: String,
conf: Option[SparkConf] = None,
shouldUploadLocal: Boolean): String = {
if (isLocalAndResolvable(resource)) {
SparkLauncher.NO_RESOURCE
if (shouldUploadLocal) {
uploadFileUri(resource, conf)
} else {
SparkLauncher.NO_RESOURCE
}
} else {
resource
}
}
}

def uploadFileUri(uri: String, conf: Option[SparkConf] = None): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true",
MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)
// try upload local, resolvable files to a hadoop compatible file system
Seq(JARS, FILES).foreach { key =>
Seq(JARS, FILES, SUBMIT_PYTHON_FILES).foreach { key =>
val value = conf.get(key).filter(uri => KubernetesUtils.isLocalAndResolvable(uri))
val resolved = KubernetesUtils.uploadAndTransformFileUris(value, Some(conf.sparkConf))
if (resolved.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
}

private def configureForJava(pod: SparkPod, res: String): SparkPod = {
val driverContainer = baseDriverContainer(pod, res).build()
// re-write primary resource, app jar is also added to spark.jars by default in SparkSubmit
// no uploading takes place here
val newResName = KubernetesUtils
.renameMainAppResource(resource = res, shouldUploadLocal = false)
val driverContainer = baseDriverContainer(pod, newResName).build()
SparkPod(pod.pod, driverContainer)
}

Expand All @@ -73,7 +77,10 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
.withValue(conf.get(PYSPARK_MAJOR_PYTHON_VERSION))
.build())

val pythonContainer = baseDriverContainer(pod, res)
// re-write primary resource to be the remote one and upload the related file
val newResName = KubernetesUtils
.renameMainAppResource(res, Option(conf.sparkConf), true)
val pythonContainer = baseDriverContainer(pod, newResName)
.addAllToEnv(pythonEnvs.asJava)
.build()

Expand All @@ -88,7 +95,7 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
private def baseDriverContainer(pod: SparkPod, resource: String): ContainerBuilder = {
// re-write primary resource, app jar is also added to spark.jars by default in SparkSubmit
val resolvedResource = if (conf.mainAppResource.isInstanceOf[JavaMainAppResource]) {
KubernetesUtils.renameMainAppResource(resource, conf.sparkConf)
KubernetesUtils.renameMainAppResource(resource, Option(conf.sparkConf), false)
} else {
resource
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.scalatest.time.{Minutes, Span}
import org.apache.spark.SparkException
import org.apache.spark.deploy.k8s.integrationtest.DepsTestsSuite.{DEPS_TIMEOUT, FILE_CONTENTS, HOST_PATH}
import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{INTERVAL, MinikubeTag, TIMEOUT}
import org.apache.spark.deploy.k8s.integrationtest.Utils.getExamplesJarName
import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.Minikube

private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
Expand Down Expand Up @@ -120,16 +121,18 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
.endSpec()
.build()

kubernetesTestComponents
// try until the service from a previous test is deleted
Eventually.eventually(TIMEOUT, INTERVAL) (kubernetesTestComponents
.kubernetesClient
.services()
.create(minioService)
.create(minioService))

kubernetesTestComponents
// try until the stateful set of a previous test is deleted
Eventually.eventually(TIMEOUT, INTERVAL) (kubernetesTestComponents
.kubernetesClient
.apps()
.statefulSets()
.create(minioStatefulSet)
.create(minioStatefulSet))
}

private def deleteMinioStorage(): Unit = {
Expand All @@ -138,47 +141,52 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
.apps()
.statefulSets()
.withName(cName)
.withGracePeriod(0)
.delete()

kubernetesTestComponents
.kubernetesClient
.services()
.withName(svcName)
.withGracePeriod(0)
.delete()
}

test("Launcher client dependencies", k8sTestTag, MinikubeTag) {
val packages = if (Utils.isHadoop3) {
"org.apache.hadoop:hadoop-aws:3.2.0"
} else {
"com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.6"
}
val fileName = Utils.createTempFile(FILE_CONTENTS, HOST_PATH)
try {
setupMinioStorage()
val minioUrlStr = getServiceUrl(svcName)
val minioUrl = new URL(minioUrlStr)
val minioHost = minioUrl.getHost
val minioPort = minioUrl.getPort
val examplesJar = Utils.getExamplesJarAbsolutePath(sparkHomeDir)
sparkAppConf
.set("spark.hadoop.fs.s3a.access.key", ACCESS_KEY)
.set("spark.hadoop.fs.s3a.secret.key", SECRET_KEY)
.set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
.set("spark.hadoop.fs.s3a.endpoint", s"$minioHost:$minioPort")
.set("spark.kubernetes.file.upload.path", s"s3a://$BUCKET")
.set("spark.files", s"$HOST_PATH/$fileName")
.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.set("spark.jars.packages", packages)
.set("spark.driver.extraJavaOptions", "-Divy.cache.dir=/tmp -Divy.home=/tmp")
createS3Bucket(ACCESS_KEY, SECRET_KEY, minioUrlStr)
tryDepsTest({
val fileName = Utils.createTempFile(FILE_CONTENTS, HOST_PATH)
sparkAppConf.set("spark.files", s"$HOST_PATH/$fileName")
val examplesJar = Utils.getTestFileAbsolutePath(getExamplesJarName(), sparkHomeDir)
runSparkRemoteCheckAndVerifyCompletion(appResource = examplesJar,
appArgs = Array(fileName),
timeout = Option(DEPS_TIMEOUT))
} finally {
// make sure this always runs
deleteMinioStorage()
}
})
}

test("Launcher python client dependencies using a zip file", k8sTestTag, MinikubeTag) {
val inDepsFile = Utils.getTestFileAbsolutePath("py_container_checks.py", sparkHomeDir)
val outDepsFile = s"${inDepsFile.substring(0, inDepsFile.lastIndexOf("."))}.zip"
Utils.createZipFile(inDepsFile, outDepsFile)
testPythonDeps(outDepsFile)
}

private def testPythonDeps(depsFile: String): Unit = {
tryDepsTest({
val pySparkFiles = Utils.getTestFileAbsolutePath("pyfiles.py", sparkHomeDir)
setPythonSparkConfProperties(sparkAppConf)
runSparkApplicationAndVerifyCompletion(
appResource = pySparkFiles,
mainClass = "",
expectedLogOnCompletion = Seq(
"Python runtime version check is: True",
"Python environment version check is: True",
"Python runtime version check for executor is: True"),
appArgs = Array("python3"),
driverPodChecker = doBasicDriverPyPodCheck,
executorPodChecker = doBasicExecutorPyPodCheck,
appLocator = appLocator,
isJVM = false,
pyFiles = Option(depsFile)) })
}

private def extractS3Key(data: String, key: String): String = {
Expand Down Expand Up @@ -222,6 +230,48 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
url
}
}

private def getServiceHostAndPort(minioUrlStr : String) : (String, Int) = {
val minioUrl = new URL(minioUrlStr)
(minioUrl.getHost, minioUrl.getPort)
}

private def setCommonSparkConfPropertiesForS3Access(
conf: SparkAppConf,
minioUrlStr: String): Unit = {
val (minioHost, minioPort) = getServiceHostAndPort(minioUrlStr)
val packages = if (Utils.isHadoop3) {
"org.apache.hadoop:hadoop-aws:3.2.0"
} else {
"com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.6"
}
conf.set("spark.hadoop.fs.s3a.access.key", ACCESS_KEY)
.set("spark.hadoop.fs.s3a.secret.key", SECRET_KEY)
.set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
.set("spark.hadoop.fs.s3a.endpoint", s"$minioHost:$minioPort")
.set("spark.kubernetes.file.upload.path", s"s3a://$BUCKET")
.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.set("spark.jars.packages", packages)
.set("spark.driver.extraJavaOptions", "-Divy.cache.dir=/tmp -Divy.home=/tmp")
}

private def setPythonSparkConfProperties(conf: SparkAppConf): Unit = {
sparkAppConf.set("spark.kubernetes.container.image", pyImage)
.set("spark.kubernetes.pyspark.pythonVersion", "3")
}

private def tryDepsTest(runTest: => Unit): Unit = {
try {
setupMinioStorage()
val minioUrlStr = getServiceUrl(svcName)
createS3Bucket(ACCESS_KEY, SECRET_KEY, minioUrlStr)
setCommonSparkConfPropertiesForS3Access(sparkAppConf, minioUrlStr)
runTest
} finally {
// make sure this always runs
deleteMinioStorage()
}
}
}

private[spark] object DepsTestsSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@
*/
package org.apache.spark.deploy.k8s.integrationtest

import java.io.{Closeable, File, PrintWriter}
import java.io.{Closeable, File, FileInputStream, FileOutputStream, PrintWriter}
import java.nio.file.{Files, Path}
import java.util.concurrent.CountDownLatch
import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.JavaConverters._
import scala.util.Try

import io.fabric8.kubernetes.client.dsl.ExecListener
import okhttp3.Response
import org.apache.commons.compress.utils.IOUtils
import org.apache.commons.io.output.ByteArrayOutputStream
import org.apache.hadoop.util.VersionInfo

Expand Down Expand Up @@ -114,28 +116,39 @@ object Utils extends Logging {
filename
}

def getExamplesJarAbsolutePath(sparkHomeDir: Path): String = {
val jarName = getExamplesJarName()
val jarPathsFound = Files
def getTestFileAbsolutePath(fileName: String, sparkHomeDir: Path): String = {
val filePathsFound = Files
.walk(sparkHomeDir)
.filter(Files.isRegularFile(_))
.filter((f: Path) => {f.toFile.getName == jarName})
.filter((f: Path) => {f.toFile.getName == fileName})
// we should not have more than one here under current test build dir
// we only need one though
val jarPath = jarPathsFound
val filePath = filePathsFound
.iterator()
.asScala
.map(_.toAbsolutePath.toString)
.toArray
.headOption
jarPath match {
case Some(jar) => jar
case _ => throw new SparkException(s"No valid $jarName file was found " +
filePath match {
case Some(file) => file
case _ => throw new SparkException(s"No valid $fileName file was found " +
s"under spark home test dir ${sparkHomeDir.toAbsolutePath}!")
}
}

def isHadoop3(): Boolean = {
VersionInfo.getVersion.startsWith("3")
}

def createZipFile(inFile: String, outFile: String): Unit = {
val fileToZip = new File(inFile)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation?

val fis = new FileInputStream(fileToZip)
val fos = new FileOutputStream(outFile)
val zipOut = new ZipOutputStream(fos)
val zipEntry = new ZipEntry(fileToZip.getName)
zipOut.putNextEntry(zipEntry)
IOUtils.copy(fis, zipOut)
IOUtils.closeQuietly(fis)
IOUtils.closeQuietly(zipOut)
}
}