Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<!-- End of shaded deps. -->

<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
Expand All @@ -116,7 +118,11 @@
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
</dependency>
<!-- End of shaded deps. -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.submit.v1
package org.apache.spark.deploy.kubernetes

import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream, InputStream, OutputStream}
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

import com.google.common.io.Files
Expand Down Expand Up @@ -48,40 +48,7 @@ private[spark] object CompressionUtils extends Logging {
*/
def createTarGzip(paths: Iterable[String]): TarGzippedData = {
val compressedBytesStream = Utils.tryWithResource(new ByteBufferOutputStream()) { raw =>
Utils.tryWithResource(new GZIPOutputStream(raw)) { gzipping =>
Utils.tryWithResource(new TarArchiveOutputStream(
gzipping,
BLOCK_SIZE,
RECORD_SIZE,
ENCODING)) { tarStream =>
val usedFileNames = mutable.HashSet.empty[String]
for (path <- paths) {
val file = new File(path)
if (!file.isFile) {
throw new IllegalArgumentException(s"Cannot add $path to tarball; either does" +
s" not exist or is a directory.")
}
var resolvedFileName = file.getName
val extension = Files.getFileExtension(file.getName)
val nameWithoutExtension = Files.getNameWithoutExtension(file.getName)
var deduplicationCounter = 1
while (usedFileNames.contains(resolvedFileName)) {
val oldResolvedFileName = resolvedFileName
resolvedFileName = s"$nameWithoutExtension-$deduplicationCounter.$extension"
logWarning(s"File with name $oldResolvedFileName already exists. Trying to add" +
s" with file name $resolvedFileName instead.")
deduplicationCounter += 1
}
usedFileNames += resolvedFileName
val tarEntry = new TarArchiveEntry(file, resolvedFileName)
tarStream.putArchiveEntry(tarEntry)
Utils.tryWithResource(new FileInputStream(file)) { fileInput =>
IOUtils.copy(fileInput, tarStream)
}
tarStream.closeArchiveEntry()
}
}
}
writeTarGzipToStream(raw, paths)
raw
}
val compressedAsBase64 = Base64.encodeBase64String(compressedBytesStream.toByteBuffer.array)
Expand All @@ -93,6 +60,44 @@ private[spark] object CompressionUtils extends Logging {
)
}

def writeTarGzipToStream(outputStream: OutputStream, paths: Iterable[String]): Unit = {
Utils.tryWithResource(new GZIPOutputStream(outputStream)) { gzipping =>
Utils.tryWithResource(new TarArchiveOutputStream(
gzipping,
BLOCK_SIZE,
RECORD_SIZE,
ENCODING)) { tarStream =>
val usedFileNames = mutable.HashSet.empty[String]
for (path <- paths) {
val file = new File(path)
if (!file.isFile) {
throw new IllegalArgumentException(s"Cannot add $path to tarball; either does" +
s" not exist or is a directory.")
}
var resolvedFileName = file.getName
val extension = Files.getFileExtension(file.getName)
val nameWithoutExtension = Files.getNameWithoutExtension(file.getName)
var deduplicationCounter = 1
while (usedFileNames.contains(resolvedFileName)) {
val oldResolvedFileName = resolvedFileName
resolvedFileName = s"$nameWithoutExtension-$deduplicationCounter.$extension"
logWarning(s"File with name $oldResolvedFileName already exists. Trying to add" +
s" with file name $resolvedFileName instead.")
deduplicationCounter += 1
}
usedFileNames += resolvedFileName
val tarEntry = new TarArchiveEntry(resolvedFileName)
tarEntry.setSize(file.length());
tarStream.putArchiveEntry(tarEntry)
Utils.tryWithResource(new FileInputStream(file)) { fileInput =>
IOUtils.copy(fileInput, tarStream)
}
tarStream.closeArchiveEntry()
}
}
}
}

/**
* Decompresses the provided tar archive to a directory.
* @param compressedData In-memory representation of the compressed data, ideally created via
Expand All @@ -104,7 +109,6 @@ private[spark] object CompressionUtils extends Logging {
def unpackAndWriteCompressedFiles(
compressedData: TarGzippedData,
rootOutputDir: File): Seq[String] = {
val paths = mutable.Buffer.empty[String]
val compressedBytes = Base64.decodeBase64(compressedData.dataBase64)
if (!rootOutputDir.exists) {
if (!rootOutputDir.mkdirs) {
Expand All @@ -116,24 +120,39 @@ private[spark] object CompressionUtils extends Logging {
s"${rootOutputDir.getAbsolutePath} exists and is not a directory.")
}
Utils.tryWithResource(new ByteArrayInputStream(compressedBytes)) { compressedBytesStream =>
Utils.tryWithResource(new GZIPInputStream(compressedBytesStream)) { gzipped =>
Utils.tryWithResource(new TarArchiveInputStream(
gzipped,
compressedData.blockSize,
compressedData.recordSize,
compressedData.encoding)) { tarInputStream =>
var nextTarEntry = tarInputStream.getNextTarEntry
while (nextTarEntry != null) {
val outputFile = new File(rootOutputDir, nextTarEntry.getName)
Utils.tryWithResource(new FileOutputStream(outputFile)) { fileOutputStream =>
IOUtils.copy(tarInputStream, fileOutputStream)
}
paths += outputFile.getAbsolutePath
nextTarEntry = tarInputStream.getNextTarEntry
unpackTarStreamToDirectory(
compressedBytesStream,
rootOutputDir,
compressedData.blockSize,
compressedData.recordSize,
compressedData.encoding)
}
}

def unpackTarStreamToDirectory(
inputStream: InputStream,
outputDir: File,
blockSize: Int = BLOCK_SIZE,
recordSize: Int = RECORD_SIZE,
encoding: String = ENCODING): Seq[String] = {
val paths = mutable.Buffer.empty[String]
Utils.tryWithResource(new GZIPInputStream(inputStream)) { gzipped =>
Utils.tryWithResource(new TarArchiveInputStream(
gzipped,
blockSize,
recordSize,
encoding)) { tarInputStream =>
var nextTarEntry = tarInputStream.getNextTarEntry
while (nextTarEntry != null) {
val outputFile = new File(outputDir, nextTarEntry.getName)
Utils.tryWithResource(new FileOutputStream(outputFile)) { fileOutputStream =>
IOUtils.copy(tarInputStream, fileOutputStream)
}
paths += outputFile.getAbsolutePath
nextTarEntry = tarInputStream.getNextTarEntry
}
}
}
paths.toSeq
paths
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package org.apache.spark.deploy.kubernetes
import java.util.concurrent.TimeUnit

import org.apache.spark.{SPARK_VERSION => sparkVersion}
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.v1.NodePortUrisDriverServiceManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.network.util.ByteUnit

package object config {
package object config extends Logging {

private[spark] val KUBERNETES_NAMESPACE =
ConfigBuilder("spark.kubernetes.namespace")
Expand Down Expand Up @@ -321,4 +323,107 @@ package object config {
.doc("File containing the key password for the Kubernetes dependency server.")
.stringConf
.createOptional

private[spark] val RESOURCE_STAGING_SERVER_SSL_ENABLED =
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.enabled")
.doc("Whether or not to use SSL when communicating with the dependency server.")
.booleanConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE =
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.trustStore")
.doc("File containing the trustStore to communicate with the Kubernetes dependency server.")
.stringConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_TRUSTSTORE_PASSWORD =
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.trustStorePassword")
.doc("Password for the trustStore for talking to the dependency server.")
.stringConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_TRUSTSTORE_TYPE =
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.trustStoreType")
.doc("Type of trustStore for communicating with the dependency server.")
.stringConf
.createOptional

// Driver and Init-Container parameters for submission v2
private[spark] val RESOURCE_STAGING_SERVER_URI =
ConfigBuilder("spark.kubernetes.resourceStagingServer.uri")
.doc("Base URI for the Spark resource staging server")
.stringConf
.createOptional

private[spark] val INIT_CONTAINER_DOWNLOAD_JARS_RESOURCE_IDENTIFIER =
ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadJarsResourceIdentifier")
.doc("Identifier for the jars tarball that was uploaded to the staging service.")
.internal()
.stringConf
.createOptional

private[spark] val INIT_CONTAINER_DOWNLOAD_JARS_SECRET_LOCATION =
ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadJarsSecretLocation")
.doc("Location of the application secret to use when the init-container contacts the" +
" resource staging server to download jars.")
.internal()
.stringConf
.createWithDefault(INIT_CONTAINER_DOWNLOAD_JARS_SECRET_PATH)

private[spark] val INIT_CONTAINER_DOWNLOAD_FILES_RESOURCE_IDENTIFIER =
ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadFilesResourceIdentifier")
.doc("Identifier for the files tarball that was uploaded to the staging service.")
.internal()
.stringConf
.createOptional

private[spark] val INIT_CONTAINER_DOWNLOAD_FILES_SECRET_LOCATION =
ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadFilesSecretLocation")
.doc("Location of the application secret to use when the init-container contacts the" +
" resource staging server to download files.")
.internal()
.stringConf
.createWithDefault(INIT_CONTAINER_DOWNLOAD_FILES_SECRET_PATH)

private[spark] val INIT_CONTAINER_DOCKER_IMAGE =
ConfigBuilder("spark.kubernetes.driver.initcontainer.docker.image")
.doc("Image for the driver's init-container that downloads mounted dependencies.")
.stringConf
.createWithDefault(s"spark-driver-init:$sparkVersion")

private[spark] val DRIVER_LOCAL_JARS_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.driver.mountdependencies.jarsDownloadDir")
.doc("Location to download local jars to in the driver. When using spark-submit, this" +
" directory must be empty and will be mounted as an empty directory volume on the" +
" driver pod.")
.stringConf
.createWithDefault("/var/spark-data/spark-local-jars")

private[spark] val DRIVER_LOCAL_FILES_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.driver.mountdependencies.filesDownloadDir")
.doc("Location to download local files to in the driver. When using spark-submit, this" +
" directory must be empty and will be mounted as an empty directory volume on the" +
" driver pod.")
.stringConf
.createWithDefault("/var/spark-data/spark-local-files")

private[spark] val DRIVER_MOUNT_DEPENDENCIES_INIT_TIMEOUT =
ConfigBuilder("spark.kubernetes.mountdependencies.mountTimeout")
.doc("Timeout before aborting the attempt to download and unpack local dependencies from" +
" the dependency staging server when initializing the driver pod.")
.timeConf(TimeUnit.MINUTES)
.createWithDefault(5)

private[spark] def resolveK8sMaster(rawMasterString: String): String = {
if (!rawMasterString.startsWith("k8s://")) {
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")
}
val masterWithoutK8sPrefix = rawMasterString.replaceFirst("k8s://", "")
if (masterWithoutK8sPrefix.startsWith("http://")
|| masterWithoutK8sPrefix.startsWith("https://")) {
masterWithoutK8sPrefix
} else {
val resolvedURL = s"https://$masterWithoutK8sPrefix"
logDebug(s"No scheme specified for kubernetes master URL, so defaulting to https. Resolved" +
s" URL is $resolvedURL")
resolvedURL
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ package object constants {
private[spark] val SUBMISSION_APP_SECRET_PREFIX = "spark-submission-server-secret"
private[spark] val SUBMISSION_APP_SECRET_VOLUME_NAME = "spark-submission-secret-volume"
private[spark] val SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME =
"spark-submission-server-key-password"
"spark-submission-server-key-password"
private[spark] val SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME =
"spark-submission-server-keystore-password"
"spark-submission-server-keystore-password"
private[spark] val SUBMISSION_SSL_KEYSTORE_SECRET_NAME = "spark-submission-server-keystore"
private[spark] val SUBMISSION_SSL_SECRETS_PREFIX = "spark-submission-server-ssl"
private[spark] val SUBMISSION_SSL_SECRETS_VOLUME_NAME = "spark-submission-server-ssl-secrets"
Expand All @@ -55,9 +55,9 @@ package object constants {
private[spark] val ENV_SUBMISSION_SERVER_PORT = "SPARK_SUBMISSION_SERVER_PORT"
private[spark] val ENV_SUBMISSION_KEYSTORE_FILE = "SPARK_SUBMISSION_KEYSTORE_FILE"
private[spark] val ENV_SUBMISSION_KEYSTORE_PASSWORD_FILE =
"SPARK_SUBMISSION_KEYSTORE_PASSWORD_FILE"
"SPARK_SUBMISSION_KEYSTORE_PASSWORD_FILE"
private[spark] val ENV_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE =
"SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE"
"SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE"
private[spark] val ENV_SUBMISSION_KEYSTORE_TYPE = "SPARK_SUBMISSION_KEYSTORE_TYPE"
private[spark] val ENV_SUBMISSION_KEY_PEM_FILE = "SPARK_SUBMISSION_KEY_PEM_FILE"
private[spark] val ENV_SUBMISSION_CERT_PEM_FILE = "SPARK_SUBMISSION_CERT_PEM_FILE"
Expand All @@ -70,17 +70,47 @@ package object constants {
private[spark] val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID"
private[spark] val ENV_EXECUTOR_POD_IP = "SPARK_EXECUTOR_POD_IP"
private[spark] val ENV_DRIVER_MEMORY = "SPARK_DRIVER_MEMORY"
private[spark] val ENV_UPLOADED_JARS_DIR = "SPARK_UPLOADED_JARS_DIR"
private[spark] val ENV_SUBMIT_EXTRA_CLASSPATH = "SPARK_SUBMIT_EXTRA_CLASSPATH"
private[spark] val ENV_MOUNTED_CLASSPATH = "SPARK_MOUNTED_CLASSPATH"
private[spark] val ENV_DRIVER_MAIN_CLASS = "SPARK_DRIVER_CLASS"
private[spark] val ENV_DRIVER_ARGS = "SPARK_DRIVER_ARGS"
private[spark] val ENV_DRIVER_JAVA_OPTS = "SPARK_DRIVER_JAVA_OPTS"

// Annotation keys
private[spark] val ANNOTATION_PROVIDE_EXTERNAL_URI =
"spark-job.alpha.apache.org/provideExternalUri"
"spark-job.alpha.apache.org/provideExternalUri"
private[spark] val ANNOTATION_RESOLVED_EXTERNAL_URI =
"spark-job.alpha.apache.org/resolvedExternalUri"
"spark-job.alpha.apache.org/resolvedExternalUri"

// Miscellaneous
private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
private[spark] val DRIVER_SUBMIT_SSL_NAMESPACE = "kubernetes.driversubmitserver"
private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
private[spark] val MEMORY_OVERHEAD_FACTOR = 0.10
private[spark] val MEMORY_OVERHEAD_MIN = 384L

// V2 submission init container
private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The init-container annotation should not be necessary anymore as of k8s 1.6: they're a first class field (spec.InitContainers)). The annotation will be deprecated in future releases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Fabric8 Library doesn't support this field at the moment. Should we file an issue to the fabric8 project to support it?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something we haven't discussed much yet is the compatibility range the spark-k8s project has on different versions of k8s. Maybe we try to make this a rolling window, and support both latest and previous releases, which currently would be 1.6 and 1.5

In that case we should use the beta annotations until k8s 1.7 is released

Copy link
Member

@foxish foxish Apr 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll open up an issue against fabric8. That's a good point on compatibility. Supporting current version and the previous seems to be a good start (we should formalize and capture this somewhere in the documentation).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The beta annotation LGTM. I do want to take a closer look at the other k8s specific parts of this PR.
Can we hold off on merging till Monday morning?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing -- looking forward to your detailed review!

private[spark] val INIT_CONTAINER_SECRETS_VOLUME_NAME = "dependency-secret"
private[spark] val INIT_CONTAINER_SECRETS_VOLUME_MOUNT_PATH = "/mnt/secrets/spark-init"
private[spark] val INIT_CONTAINER_DOWNLOAD_JARS_SECRET_KEY = "downloadJarsSecret"
private[spark] val INIT_CONTAINER_DOWNLOAD_FILES_SECRET_KEY = "downloadFilesSecret"
private[spark] val INIT_CONTAINER_TRUSTSTORE_SECRET_KEY = "trustStore"
private[spark] val INIT_CONTAINER_DOWNLOAD_JARS_SECRET_PATH =
s"$INIT_CONTAINER_SECRETS_VOLUME_MOUNT_PATH/$INIT_CONTAINER_DOWNLOAD_JARS_SECRET_KEY"
private[spark] val INIT_CONTAINER_DOWNLOAD_FILES_SECRET_PATH =
s"$INIT_CONTAINER_SECRETS_VOLUME_MOUNT_PATH/$INIT_CONTAINER_DOWNLOAD_FILES_SECRET_KEY"
private[spark] val INIT_CONTAINER_TRUSTSTORE_PATH =
s"$INIT_CONTAINER_SECRETS_VOLUME_MOUNT_PATH/$INIT_CONTAINER_TRUSTSTORE_SECRET_KEY"
private[spark] val INIT_CONTAINER_DOWNLOAD_CREDENTIALS_PATH =
"/mnt/secrets/kubernetes-credentials"
private[spark] val INIT_CONTAINER_CONFIG_MAP_KEY = "init-driver"
private[spark] val INIT_CONTAINER_PROPERTIES_FILE_VOLUME = "init-container-properties"
private[spark] val INIT_CONTAINER_PROPERTIES_FILE_MOUNT_PATH = "/etc/spark-init/"
private[spark] val INIT_CONTAINER_PROPERTIES_FILE_NAME = "init-driver.properties"
private[spark] val INIT_CONTAINER_PROPERTIES_FILE_PATH =
s"$INIT_CONTAINER_PROPERTIES_FILE_MOUNT_PATH/$INIT_CONTAINER_PROPERTIES_FILE_NAME"
private[spark] val DOWNLOAD_JARS_VOLUME_NAME = "download-jars"
private[spark] val DOWNLOAD_FILES_VOLUME_NAME = "download-files"
}
Loading