Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 178abc1

Browse files
mccheahash211
authored andcommitted
Use tar and gzip to compress+archive shipped jars (#2)
* Use tar and gzip to archive shipped jars. * Address comments * Move files to resolve merge
1 parent f9ddb63 commit 178abc1

File tree

11 files changed

+254
-48
lines changed

11 files changed

+254
-48
lines changed

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2638,6 +2638,7 @@
26382638
<module>resource-managers/kubernetes/docker-minimal-bundle</module>
26392639
<module>resource-managers/kubernetes/integration-tests</module>
26402640
<module>resource-managers/kubernetes/integration-tests-spark-jobs</module>
2641+
<module>resource-managers/kubernetes/integration-tests-spark-jobs-helpers</module>
26412642
</modules>
26422643
</profile>
26432644

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import scala.concurrent.duration.DurationInt
3535
import scala.util.Success
3636

3737
import org.apache.spark.{SPARK_VERSION, SparkConf}
38-
import org.apache.spark.deploy.rest.{AppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, UploadedAppResource}
38+
import org.apache.spark.deploy.rest.{AppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, TarGzippedData, UploadedAppResource}
3939
import org.apache.spark.deploy.rest.kubernetes._
4040
import org.apache.spark.internal.Logging
4141
import org.apache.spark.util.Utils
@@ -284,8 +284,8 @@ private[spark] class Client(
284284
case other => RemoteAppResource(other)
285285
}
286286

287-
val uploadDriverExtraClasspathBase64Contents = getFileContents(uploadedDriverExtraClasspath)
288-
val uploadJarsBase64Contents = getFileContents(uploadedJars)
287+
val uploadDriverExtraClasspathBase64Contents = compressJars(uploadedDriverExtraClasspath)
288+
val uploadJarsBase64Contents = compressJars(uploadedJars)
289289
KubernetesCreateSubmissionRequest(
290290
appResource = resolvedAppResource,
291291
mainClass = mainClass,
@@ -296,19 +296,10 @@ private[spark] class Client(
296296
uploadedJarsBase64Contents = uploadJarsBase64Contents)
297297
}
298298

299-
def getFileContents(maybeFilePaths: Option[String]): Array[(String, String)] = {
299+
def compressJars(maybeFilePaths: Option[String]): Option[TarGzippedData] = {
300300
maybeFilePaths
301-
.map(_.split(",").map(filePath => {
302-
val fileToUpload = new File(filePath)
303-
if (!fileToUpload.isFile) {
304-
throw new IllegalStateException("Provided file to upload for driver extra classpath" +
305-
s" does not exist or is not a file: $filePath")
306-
} else {
307-
val fileBytes = Files.toByteArray(fileToUpload)
308-
val fileBase64 = Base64.encodeBase64String(fileBytes)
309-
(fileToUpload.getName, fileBase64)
310-
}
311-
})).getOrElse(Array.empty[(String, String)])
301+
.map(_.split(","))
302+
.map(CompressionUtils.createTarGzip(_))
312303
}
313304

314305
private def getDriverLauncherService(

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,19 @@ case class KubernetesCreateSubmissionRequest(
2727
val appArgs: Array[String],
2828
val sparkProperties: Map[String, String],
2929
val secret: String,
30-
val uploadedDriverExtraClasspathBase64Contents: Array[(String, String)]
31-
= Array.empty[(String, String)],
32-
val uploadedJarsBase64Contents: Array[(String, String)]
33-
= Array.empty[(String, String)]) extends SubmitRestProtocolRequest {
30+
val uploadedDriverExtraClasspathBase64Contents: Option[TarGzippedData],
31+
val uploadedJarsBase64Contents: Option[TarGzippedData]) extends SubmitRestProtocolRequest {
3432
message = "create"
3533
clientSparkVersion = SPARK_VERSION
3634
}
3735

36+
case class TarGzippedData(
37+
val dataBase64: String,
38+
val blockSize: Int = 10240,
39+
val recordSize: Int = 512,
40+
val encoding: String
41+
)
42+
3843
@JsonTypeInfo(
3944
use = JsonTypeInfo.Id.NAME,
4045
include = JsonTypeInfo.As.PROPERTY,
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.rest.kubernetes
18+
19+
import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
20+
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
21+
22+
import com.google.common.io.Files
23+
import org.apache.commons.codec.binary.Base64
24+
import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, TarArchiveInputStream, TarArchiveOutputStream}
25+
import org.apache.commons.compress.utils.CharsetNames
26+
import org.apache.commons.io.IOUtils
27+
import scala.collection.mutable
28+
29+
import org.apache.spark.deploy.rest.TarGzippedData
30+
import org.apache.spark.internal.Logging
31+
import org.apache.spark.util.{ByteBufferOutputStream, Utils}
32+
33+
private[spark] object CompressionUtils extends Logging {
34+
// Defaults from TarArchiveOutputStream
35+
private val BLOCK_SIZE = 10240
36+
private val RECORD_SIZE = 512
37+
private val ENCODING = CharsetNames.UTF_8
38+
39+
/**
40+
* Compresses all of the given paths into a gzipped-tar archive, returning the compressed data in
41+
* memory as an instance of {@link TarGzippedData}. The files are taken without consideration to their
42+
* original folder structure, and are added to the tar archive in a flat hierarchy. Directories are
43+
* not allowed, and duplicate file names are de-duplicated by appending a numeric suffix to the file name,
44+
* before the file extension. For example, if paths a/b.txt and b/b.txt were provided, then the files added
45+
* to the tar archive would be b.txt and b-1.txt.
46+
* @param paths A list of file paths to be archived
47+
* @return An in-memory representation of the compressed data.
48+
*/
49+
def createTarGzip(paths: Iterable[String]): TarGzippedData = {
50+
val compressedBytesStream = Utils.tryWithResource(new ByteBufferOutputStream()) { raw =>
51+
Utils.tryWithResource(new GZIPOutputStream(raw)) { gzipping =>
52+
Utils.tryWithResource(new TarArchiveOutputStream(
53+
gzipping,
54+
BLOCK_SIZE,
55+
RECORD_SIZE,
56+
ENCODING)) { tarStream =>
57+
val usedFileNames = mutable.HashSet.empty[String]
58+
for (path <- paths) {
59+
val file = new File(path)
60+
if (!file.isFile) {
61+
throw new IllegalArgumentException(s"Cannot add $path to tarball; either does" +
62+
s" not exist or is a directory.")
63+
}
64+
var resolvedFileName = file.getName
65+
val extension = Files.getFileExtension(file.getName)
66+
val nameWithoutExtension = Files.getNameWithoutExtension(file.getName)
67+
var deduplicationCounter = 1
68+
while (usedFileNames.contains(resolvedFileName)) {
69+
val oldResolvedFileName = resolvedFileName
70+
resolvedFileName = s"$nameWithoutExtension-$deduplicationCounter.$extension"
71+
logWarning(s"File with name $oldResolvedFileName already exists. Trying to add with" +
72+
s" file name $resolvedFileName instead.")
73+
deduplicationCounter += 1
74+
}
75+
usedFileNames += resolvedFileName
76+
val tarEntry = new TarArchiveEntry(file, resolvedFileName)
77+
tarStream.putArchiveEntry(tarEntry)
78+
Utils.tryWithResource(new FileInputStream(file)) { fileInput =>
79+
IOUtils.copy(fileInput, tarStream)
80+
}
81+
tarStream.closeArchiveEntry()
82+
}
83+
}
84+
}
85+
raw
86+
}
87+
val compressedAsBase64 = Base64.encodeBase64String(compressedBytesStream.toByteBuffer.array)
88+
TarGzippedData(
89+
dataBase64 = compressedAsBase64,
90+
blockSize = BLOCK_SIZE,
91+
recordSize = RECORD_SIZE,
92+
encoding = ENCODING
93+
)
94+
}
95+
96+
/**
97+
* Decompresses the provided tar archive to a directory.
98+
* @param compressedData In-memory representation of the compressed data, ideally created via
99+
* {@link createTarGzip}.
100+
* @param rootOutputDir Directory to write the output files to. All files from the tarball
101+
* are written here in a flat hierarchy.
102+
* @return List of file paths for each file that was unpacked from the archive.
103+
*/
104+
def unpackAndWriteCompressedFiles(
105+
compressedData: TarGzippedData,
106+
rootOutputDir: File): Seq[String] = {
107+
val paths = mutable.Buffer.empty[String]
108+
val compressedBytes = Base64.decodeBase64(compressedData.dataBase64)
109+
if (!rootOutputDir.exists) {
110+
if (!rootOutputDir.mkdirs) {
111+
throw new IllegalStateException(s"Failed to create output directory for unpacking" +
112+
s" files at ${rootOutputDir.getAbsolutePath}")
113+
}
114+
} else if (rootOutputDir.isFile) {
115+
throw new IllegalArgumentException(s"Root dir for writing decompressed files: " +
116+
s"${rootOutputDir.getAbsolutePath} exists and is not a directory.")
117+
}
118+
Utils.tryWithResource(new ByteArrayInputStream(compressedBytes)) { compressedBytesStream =>
119+
Utils.tryWithResource(new GZIPInputStream(compressedBytesStream)) { gzipped =>
120+
Utils.tryWithResource(new TarArchiveInputStream(
121+
gzipped,
122+
compressedData.blockSize,
123+
compressedData.recordSize,
124+
compressedData.encoding)) { tarInputStream =>
125+
var nextTarEntry = tarInputStream.getNextTarEntry
126+
while (nextTarEntry != null) {
127+
val outputFile = new File(rootOutputDir, nextTarEntry.getName)
128+
Utils.tryWithResource(new FileOutputStream(outputFile)) { fileOutputStream =>
129+
IOUtils.copy(tarInputStream, fileOutputStream)
130+
}
131+
paths += outputFile.getAbsolutePath
132+
nextTarEntry = tarInputStream.getNextTarEntry
133+
}
134+
}
135+
}
136+
}
137+
paths.toSeq
138+
}
139+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala

Lines changed: 4 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -217,30 +217,11 @@ private[spark] class KubernetesSparkRestServer(
217217
}
218218

219219
private def writeBase64ContentsToFiles(
220-
filesBase64Contents: Array[(String, String)],
220+
maybeCompressedFiles: Option[TarGzippedData],
221221
rootDir: File): Seq[String] = {
222-
val resolvedFileNames = new scala.collection.mutable.HashSet[String]
223-
val resolvedFilePaths = new ArrayBuffer[String]
224-
for (file <- filesBase64Contents) {
225-
var currentFileName = file._1
226-
var deduplicationCounter = 1
227-
while (resolvedFileNames.contains(currentFileName)) {
228-
// Prepend the deduplication counter so as to not mess with the extension
229-
currentFileName = s"$deduplicationCounter-$currentFileName"
230-
deduplicationCounter += 1
231-
}
232-
val resolvedFile = new File(rootDir, currentFileName)
233-
val resolvedFilePath = resolvedFile.getAbsolutePath
234-
if (resolvedFile.createNewFile()) {
235-
val fileContents = Base64.decodeBase64(file._2)
236-
Files.write(fileContents, resolvedFile)
237-
} else {
238-
throw new IllegalStateException(s"Could not write jar file to $resolvedFilePath")
239-
}
240-
resolvedFileNames += currentFileName
241-
resolvedFilePaths += resolvedFilePath
242-
}
243-
resolvedFilePaths.toSeq
222+
maybeCompressedFiles.map { compressedFiles =>
223+
CompressionUtils.unpackAndWriteCompressedFiles(compressedFiles, rootDir)
224+
}.getOrElse(Seq.empty[String])
244225
}
245226
}
246227

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one or more
4+
~ contributor license agreements. See the NOTICE file distributed with
5+
~ this work for additional information regarding copyright ownership.
6+
~ The ASF licenses this file to You under the Apache License, Version 2.0
7+
~ (the "License"); you may not use this file except in compliance with
8+
~ the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
-->
18+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
19+
<modelVersion>4.0.0</modelVersion>
20+
<parent>
21+
<groupId>org.apache.spark</groupId>
22+
<artifactId>spark-parent_2.11</artifactId>
23+
<version>2.2.0-SNAPSHOT</version>
24+
<relativePath>../../../pom.xml</relativePath>
25+
</parent>
26+
27+
<artifactId>spark-kubernetes-integration-tests-spark-jobs-helpers_2.11</artifactId>
28+
<packaging>jar</packaging>
29+
<name>Spark Project Kubernetes Integration Tests Spark Jobs Helpers</name>
30+
31+
<dependencies>
32+
</dependencies>
33+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.integrationtest;
18+
19+
/**
20+
* Primarily extracted so that a separate jar can be added as a dependency for the
21+
* test Spark job.
22+
*/
23+
public class PiHelper {
24+
public static int helpPi() {
25+
double x = Math.random() * 2 - 1;
26+
double y = Math.random() * 2 - 1;
27+
if (x*x + y*y < 1) {
28+
return 1;
29+
} else {
30+
return 0;
31+
}
32+
}
33+
}

resource-managers/kubernetes/integration-tests-spark-jobs/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@
2929
<name>Spark Project Kubernetes Integration Tests Spark Jobs</name>
3030

3131
<dependencies>
32+
<dependency>
33+
<groupId>org.apache.spark</groupId>
34+
<artifactId>spark-kubernetes-integration-tests-spark-jobs-helpers_${scala.binary.version}</artifactId>
35+
<version>${project.version}</version>
36+
<scope>provided</scope>
37+
</dependency>
3238
<dependency>
3339
<groupId>org.apache.spark</groupId>
3440
<artifactId>spark-core_${scala.binary.version}</artifactId>

resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/SparkPiWithInfiniteWait.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616
*/
1717
package org.apache.spark.deploy.kubernetes.integrationtest.jobs
1818

19-
import scala.math.random
20-
19+
import org.apache.spark.deploy.kubernetes.integrationtest.PiHelper
2120
import org.apache.spark.sql.SparkSession
2221

2322
// Equivalent to SparkPi except does not stop the Spark Context
@@ -32,10 +31,8 @@ private[spark] object SparkPiWithInfiniteWait {
3231
.getOrCreate()
3332
val slices = if (args.length > 0) args(0).toInt else 10
3433
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
35-
val count = spark.sparkContext.parallelize(1 until n, slices).map { i =>
36-
val x = random * 2 - 1
37-
val y = random * 2 - 1
38-
if (x*x + y*y < 1) 1 else 0
34+
val count = spark.sparkContext.parallelize(1 until n, slices).map { _ =>
35+
PiHelper.helpPi()
3936
}.reduce(_ + _)
4037
// scalastyle:off println
4138
println("Pi is roughly " + 4.0 * count / (n - 1))

resource-managers/kubernetes/integration-tests/pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,12 @@
4848
<version>${project.version}</version>
4949
<scope>test</scope>
5050
</dependency>
51+
<dependency>
52+
<groupId>org.apache.spark</groupId>
53+
<artifactId>spark-kubernetes-integration-tests-spark-jobs-helpers_${scala.binary.version}</artifactId>
54+
<version>${project.version}</version>
55+
<scope>test</scope>
56+
</dependency>
5157
<dependency>
5258
<groupId>org.apache.spark</groupId>
5359
<artifactId>spark-docker-minimal-bundle_${scala.binary.version}</artifactId>
@@ -123,6 +129,13 @@
123129
<type>jar</type>
124130
<outputDirectory>${project.build.directory}/integration-tests-spark-jobs</outputDirectory>
125131
</artifactItem>
132+
<artifactItem>
133+
<groupId>org.apache.spark</groupId>
134+
<artifactId>spark-kubernetes-integration-tests-spark-jobs-helpers_${scala.binary.version}</artifactId>
135+
<version>${project.version}</version>
136+
<type>jar</type>
137+
<outputDirectory>${project.build.directory}/integration-tests-spark-jobs-helpers</outputDirectory>
138+
</artifactItem>
126139
</artifactItems>
127140
</configuration>
128141
</execution>

0 commit comments

Comments
 (0)