Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2649,6 +2649,7 @@
<module>resource-managers/kubernetes/docker-minimal-bundle</module>
<module>resource-managers/kubernetes/integration-tests</module>
<module>resource-managers/kubernetes/integration-tests-spark-jobs</module>
<module>resource-managers/kubernetes/integration-tests-spark-jobs-helpers</module>
</modules>
</profile>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import scala.concurrent.duration.DurationInt
import scala.util.Success

import org.apache.spark.{SPARK_VERSION, SparkConf}
import org.apache.spark.deploy.rest.{AppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, UploadedAppResource}
import org.apache.spark.deploy.rest.{AppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, TarGzippedData, UploadedAppResource}
import org.apache.spark.deploy.rest.kubernetes._
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -284,8 +284,8 @@ private[spark] class Client(
case other => RemoteAppResource(other)
}

val uploadDriverExtraClasspathBase64Contents = getFileContents(uploadedDriverExtraClasspath)
val uploadJarsBase64Contents = getFileContents(uploadedJars)
val uploadDriverExtraClasspathBase64Contents = compressJars(uploadedDriverExtraClasspath)
val uploadJarsBase64Contents = compressJars(uploadedJars)
KubernetesCreateSubmissionRequest(
appResource = resolvedAppResource,
mainClass = mainClass,
Expand All @@ -296,19 +296,10 @@ private[spark] class Client(
uploadedJarsBase64Contents = uploadJarsBase64Contents)
}

def getFileContents(maybeFilePaths: Option[String]): Array[(String, String)] = {
def compressJars(maybeFilePaths: Option[String]): Option[TarGzippedData] = {
maybeFilePaths
.map(_.split(",").map(filePath => {
val fileToUpload = new File(filePath)
if (!fileToUpload.isFile) {
throw new IllegalStateException("Provided file to upload for driver extra classpath" +
s" does not exist or is not a file: $filePath")
} else {
val fileBytes = Files.toByteArray(fileToUpload)
val fileBase64 = Base64.encodeBase64String(fileBytes)
(fileToUpload.getName, fileBase64)
}
})).getOrElse(Array.empty[(String, String)])
.map(_.split(","))
.map(CompressionUtils.createTarGzip(_))
}

private def getDriverLauncherService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,19 @@ case class KubernetesCreateSubmissionRequest(
val appArgs: Array[String],
val sparkProperties: Map[String, String],
val secret: String,
val uploadedDriverExtraClasspathBase64Contents: Array[(String, String)]
= Array.empty[(String, String)],
val uploadedJarsBase64Contents: Array[(String, String)]
= Array.empty[(String, String)]) extends SubmitRestProtocolRequest {
val uploadedDriverExtraClasspathBase64Contents: Option[TarGzippedData],
val uploadedJarsBase64Contents: Option[TarGzippedData]) extends SubmitRestProtocolRequest {
message = "create"
clientSparkVersion = SPARK_VERSION
}

case class TarGzippedData(
val dataBase64: String,
val blockSize: Int = 10240,
val recordSize: Int = 512,
val encoding: String
)

@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.rest.kubernetes

import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

import com.google.common.io.Files
import org.apache.commons.codec.binary.Base64
import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, TarArchiveInputStream, TarArchiveOutputStream}
import org.apache.commons.compress.utils.CharsetNames
import org.apache.commons.io.IOUtils
import scala.collection.mutable

import org.apache.spark.deploy.rest.TarGzippedData
import org.apache.spark.internal.Logging
import org.apache.spark.util.{ByteBufferOutputStream, Utils}

private[spark] object CompressionUtils extends Logging {
// Defaults from TarArchiveOutputStream
private val BLOCK_SIZE = 10240
private val RECORD_SIZE = 512
private val ENCODING = CharsetNames.UTF_8

/**
* Compresses all of the given paths into a gzipped-tar archive, returning the compressed data in
* memory as an instance of {@link TarGzippedData}. The files are taken without consideration to their
* original folder structure, and are added to the tar archive in a flat hierarchy. Directories are
* not allowed, and duplicate file names are de-duplicated by appending a numeric suffix to the file name,
* before the file extension. For example, if paths a/b.txt and b/b.txt were provided, then the files added
* to the tar archive would be b.txt and b-1.txt.
* @param paths A list of file paths to be archived
* @return An in-memory representation of the compressed data.
*/
def createTarGzip(paths: Iterable[String]): TarGzippedData = {
val compressedBytesStream = Utils.tryWithResource(new ByteBufferOutputStream()) { raw =>
Utils.tryWithResource(new GZIPOutputStream(raw)) { gzipping =>
Utils.tryWithResource(new TarArchiveOutputStream(
gzipping,
BLOCK_SIZE,
RECORD_SIZE,
ENCODING)) { tarStream =>
val usedFileNames = mutable.HashSet.empty[String]
for (path <- paths) {
val file = new File(path)
if (!file.isFile) {
throw new IllegalArgumentException(s"Cannot add $path to tarball; either does" +
s" not exist or is a directory.")
}
var resolvedFileName = file.getName
val extension = Files.getFileExtension(file.getName)
val nameWithoutExtension = Files.getNameWithoutExtension(file.getName)
var deduplicationCounter = 1
while (usedFileNames.contains(resolvedFileName)) {
val oldResolvedFileName = resolvedFileName
resolvedFileName = s"$nameWithoutExtension-$deduplicationCounter.$extension"
logWarning(s"File with name $oldResolvedFileName already exists. Trying to add with" +
s" file name $resolvedFileName instead.")
deduplicationCounter += 1
}
usedFileNames += resolvedFileName
val tarEntry = new TarArchiveEntry(file, resolvedFileName)
tarStream.putArchiveEntry(tarEntry)
Utils.tryWithResource(new FileInputStream(file)) { fileInput =>
IOUtils.copy(fileInput, tarStream)
}
tarStream.closeArchiveEntry()
}
}
}
raw
}
val compressedAsBase64 = Base64.encodeBase64String(compressedBytesStream.toByteBuffer.array)
TarGzippedData(
dataBase64 = compressedAsBase64,
blockSize = BLOCK_SIZE,
recordSize = RECORD_SIZE,
encoding = ENCODING
)
}

/**
* Decompresses the provided tar archive to a directory.
* @param compressedData In-memory representation of the compressed data, ideally created via
* {@link createTarGzip}.
* @param rootOutputDir Directory to write the output files to. All files from the tarball
* are written here in a flat hierarchy.
* @return List of file paths for each file that was unpacked from the archive.
*/
def unpackAndWriteCompressedFiles(
compressedData: TarGzippedData,
rootOutputDir: File): Seq[String] = {
val paths = mutable.Buffer.empty[String]
val compressedBytes = Base64.decodeBase64(compressedData.dataBase64)
if (!rootOutputDir.exists) {
if (!rootOutputDir.mkdirs) {
throw new IllegalStateException(s"Failed to create output directory for unpacking" +
s" files at ${rootOutputDir.getAbsolutePath}")
}
} else if (rootOutputDir.isFile) {
throw new IllegalArgumentException(s"Root dir for writing decompressed files: " +
s"${rootOutputDir.getAbsolutePath} exists and is not a directory.")
}
Utils.tryWithResource(new ByteArrayInputStream(compressedBytes)) { compressedBytesStream =>
Utils.tryWithResource(new GZIPInputStream(compressedBytesStream)) { gzipped =>
Utils.tryWithResource(new TarArchiveInputStream(
gzipped,
compressedData.blockSize,
compressedData.recordSize,
compressedData.encoding)) { tarInputStream =>
var nextTarEntry = tarInputStream.getNextTarEntry
while (nextTarEntry != null) {
val outputFile = new File(rootOutputDir, nextTarEntry.getName)
Utils.tryWithResource(new FileOutputStream(outputFile)) { fileOutputStream =>
IOUtils.copy(tarInputStream, fileOutputStream)
}
paths += outputFile.getAbsolutePath
nextTarEntry = tarInputStream.getNextTarEntry
}
}
}
}
paths.toSeq
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -217,30 +217,11 @@ private[spark] class KubernetesSparkRestServer(
}

private def writeBase64ContentsToFiles(
filesBase64Contents: Array[(String, String)],
maybeCompressedFiles: Option[TarGzippedData],
rootDir: File): Seq[String] = {
val resolvedFileNames = new scala.collection.mutable.HashSet[String]
val resolvedFilePaths = new ArrayBuffer[String]
for (file <- filesBase64Contents) {
var currentFileName = file._1
var deduplicationCounter = 1
while (resolvedFileNames.contains(currentFileName)) {
// Prepend the deduplication counter so as to not mess with the extension
currentFileName = s"$deduplicationCounter-$currentFileName"
deduplicationCounter += 1
}
val resolvedFile = new File(rootDir, currentFileName)
val resolvedFilePath = resolvedFile.getAbsolutePath
if (resolvedFile.createNewFile()) {
val fileContents = Base64.decodeBase64(file._2)
Files.write(fileContents, resolvedFile)
} else {
throw new IllegalStateException(s"Could not write jar file to $resolvedFilePath")
}
resolvedFileNames += currentFileName
resolvedFilePaths += resolvedFilePath
}
resolvedFilePaths.toSeq
maybeCompressedFiles.map { compressedFiles =>
CompressionUtils.unpackAndWriteCompressedFiles(compressedFiles, rootDir)
}.getOrElse(Seq.empty[String])
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>

<artifactId>spark-kubernetes-integration-tests-spark-jobs-helpers_2.11</artifactId>
<packaging>jar</packaging>
<name>Spark Project Kubernetes Integration Tests Spark Jobs Helpers</name>

<dependencies>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.integrationtest;

/**
* Primarily extracted so that a separate jar can be added as a dependency for the
* test Spark job.
*/
public class PiHelper {
public static int helpPi() {
double x = Math.random() * 2 - 1;
double y = Math.random() * 2 - 1;
if (x*x + y*y < 1) {
return 1;
} else {
return 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
<name>Spark Project Kubernetes Integration Tests Spark Jobs</name>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-kubernetes-integration-tests-spark-jobs-helpers_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
*/
package org.apache.spark.deploy.kubernetes.integrationtest.jobs

import scala.math.random

import org.apache.spark.deploy.kubernetes.integrationtest.PiHelper
import org.apache.spark.sql.SparkSession

// Equivalent to SparkPi except does not stop the Spark Context
Expand All @@ -32,10 +31,8 @@ private[spark] object SparkPiWithInfiniteWait {
.getOrCreate()
val slices = if (args.length > 0) args(0).toInt else 10
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
val count = spark.sparkContext.parallelize(1 until n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
val count = spark.sparkContext.parallelize(1 until n, slices).map { _ =>
PiHelper.helpPi()
}.reduce(_ + _)
// scalastyle:off println
println("Pi is roughly " + 4.0 * count / (n - 1))
Expand Down
13 changes: 13 additions & 0 deletions resource-managers/kubernetes/integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-kubernetes-integration-tests-spark-jobs-helpers_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-docker-minimal-bundle_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -123,6 +129,13 @@
<type>jar</type>
<outputDirectory>${project.build.directory}/integration-tests-spark-jobs</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.spark</groupId>
<artifactId>spark-kubernetes-integration-tests-spark-jobs-helpers_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>jar</type>
<outputDirectory>${project.build.directory}/integration-tests-spark-jobs-helpers</outputDirectory>
</artifactItem>
</artifactItems>
</configuration>
</execution>
Expand Down
Loading