Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>kubernetes</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-kubernetes_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>hive</id>
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network.shuffle.kubernetes;

import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.sasl.SecretKeyHolder;
import org.apache.spark.network.shuffle.ExternalShuffleClient;
import org.apache.spark.network.shuffle.protocol.kubernetes.ApplicationComplete;
import org.apache.spark.network.util.TransportConf;

import java.io.IOException;
import java.nio.ByteBuffer;

public class KubernetesExternalShuffleClient extends ExternalShuffleClient {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this being used?
And can you comment on why we need a new K8s shuffle client?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to document the fuller context somewhere about how security with the external shuffle service via daemon sets and secrets works, but this particular shuffle service client is used when an application completes and needs to clean up the secret being used to communicate with the shuffle service.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand, however I don't see it being used in this PR. Another PR will use this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be being used in KubernetesClusterSchedulerBackend: https://github.com/foxish/spark/pull/7/files#diff-857cd4a3ee24d6110c51756c8a3f051fR475


public KubernetesExternalShuffleClient(
TransportConf conf,
SecretKeyHolder secretKeyHolder,
boolean saslEnabled,
boolean saslEncryptionEnabled) {
super(conf, secretKeyHolder, saslEnabled, saslEncryptionEnabled);
}

public void sendApplicationComplete(String host, int port) throws IOException {
checkInit();
ByteBuffer applicationComplete = new ApplicationComplete(appId).toByteBuffer();
TransportClient client = clientFactory.createClient(host, port);
try {
client.send(applicationComplete);
} finally {
if (client != null) {
client.close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.netty.buffer.Unpooled;

import org.apache.spark.network.protocol.Encodable;
import org.apache.spark.network.shuffle.protocol.kubernetes.ApplicationComplete;
import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
import org.apache.spark.network.shuffle.protocol.mesos.ShuffleServiceHeartbeat;

Expand All @@ -42,7 +43,7 @@ public abstract class BlockTransferMessage implements Encodable {
/** Preceding every serialized message is its type, which allows us to deserialize it. */
public enum Type {
OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), REGISTER_DRIVER(4),
HEARTBEAT(5);
HEARTBEAT(5), APPLICATION_COMPLETE(6);

private final byte id;

Expand All @@ -67,6 +68,7 @@ public static BlockTransferMessage fromByteBuffer(ByteBuffer msg) {
case 3: return StreamHandle.decode(buf);
case 4: return RegisterDriver.decode(buf);
case 5: return ShuffleServiceHeartbeat.decode(buf);
case 6: return ApplicationComplete.decode(buf);
default: throw new IllegalArgumentException("Unknown message type: " + type);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network.shuffle.protocol.kubernetes;

import io.netty.buffer.ByteBuf;
import org.apache.spark.network.protocol.Encoders;
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;

import java.util.Objects;

public class ApplicationComplete extends BlockTransferMessage {

private final String appId;

public ApplicationComplete(String appId) {
this.appId = appId;
}

@Override
protected Type type() {
return Type.APPLICATION_COMPLETE;
}

@Override
public int encodedLength() {
return Encoders.Strings.encodedLength(appId);
}

@Override
public void encode(ByteBuf buf) {
Encoders.Strings.encode(buf, appId);
}

@Override
public boolean equals(Object other) {
if (!(other instanceof ApplicationComplete)) {
return false;
}
return Objects.equals(appId, ((ApplicationComplete) other).appId);
}

@Override
public int hashCode() {
return Objects.hashCode(appId);
}

public String getAppId() {
return appId;
}

public static ApplicationComplete decode(ByteBuf buf) {
String appId = Encoders.Strings.decode(buf);
return new ApplicationComplete(appId);
}
}
48 changes: 47 additions & 1 deletion core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ object SparkSubmit {
private val STANDALONE = 2
private val MESOS = 4
private val LOCAL = 8
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
private val KUBERNETES = 16
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | KUBERNETES

// Deploy modes
private val CLIENT = 1
Expand Down Expand Up @@ -237,6 +238,7 @@ object SparkSubmit {
printWarning(s"Master ${args.master} is deprecated since 2.0." +
" Please use master \"yarn\" with specified deploy mode instead.")
YARN
case m if m.startsWith("kubernetes") => KUBERNETES
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have been using k8s in the branch

case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("local") => LOCAL
Expand Down Expand Up @@ -276,6 +278,14 @@ object SparkSubmit {
}
}

if (clusterManager == KUBERNETES
&& !Utils.classIsLoadable("org.apache.spark.deploy.kubernetes.Client")
&& !Utils.isTesting) {
printErrorAndExit(
"Could not load Kubernetes classes. " +
"This copy of Spark may not have been compiled with Kubernetes support.")
}

// Update args.deployMode if it is null. It will be passed down as a Spark property later.
(args.deployMode, deployMode) match {
case (null, CLIENT) => args.deployMode = "client"
Expand Down Expand Up @@ -347,6 +357,12 @@ object SparkSubmit {
printErrorAndExit("Cluster deploy mode is not applicable to Spark SQL shell.")
case (_, CLUSTER) if isThriftServer(args.mainClass) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark Thrift server.")
case (KUBERNETES, CLIENT) =>
printErrorAndExit("Client mode is currently not supported for Kubernetes.")
case (KUBERNETES, CLUSTER) if args.isPython =>
printErrorAndExit("Python is currently not supported for Kubernetes.")
case (KUBERNETES, CLUSTER) if args.isR =>
printErrorAndExit("R is currently not supported for Kubernetes.")
case _ =>
}

Expand Down Expand Up @@ -466,6 +482,26 @@ object SparkSubmit {
OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.principal"),
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.keytab"),

// Kubernetes only
OptionAssigner(args.kubernetesAppName, KUBERNETES, CLUSTER,
sysProp = "spark.kubernetes.app.name"),
OptionAssigner(args.kubernetesAppNamespace, KUBERNETES, CLUSTER,
sysProp = "spark.kubernetes.namespace"),
OptionAssigner(args.kubernetesCaCertFile, KUBERNETES, CLUSTER,
sysProp = "spark.kubernetes.ca.cert.file"),
OptionAssigner(args.kubernetesClientCertFile, KUBERNETES, CLUSTER,
sysProp = "spark.kubernetes.client.cert.file"),
OptionAssigner(args.kubernetesClientKeyFile, KUBERNETES, CLUSTER,
sysProp = "spark.kubernetes.client.key.file"),
OptionAssigner(args.kubernetesMaster, KUBERNETES, CLUSTER,
sysProp = "spark.kubernetes.master"),
OptionAssigner(args.customExecutorSpecFile, KUBERNETES, CLUSTER,
sysProp = "spark.kubernetes.executor.custom.spec.file"),
OptionAssigner(args.customExecutorSpecContainerName, KUBERNETES, CLUSTER,
sysProp = "spark.kubernetes.executor.custom.spec.container.name"),
OptionAssigner(args.executorDockerImage, KUBERNETES, CLUSTER,
sysProp = "spark.kubernetes.executor.docker.image"),

// Other options
OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.cores"),
Expand All @@ -486,6 +522,16 @@ object SparkSubmit {
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy")
)

if (args.isKubernetesCluster) {
childMainClass = "org.apache.spark.deploy.kubernetes.Client"
for ((portName, portValue) <- args.exposeDriverPorts) {
childArgs += ("--expose-driver-port", s"$portName=$portValue")
}
args.childArgs.foreach(arg => childArgs += ("--arg", arg))
childArgs += ("--class", args.mainClass)
childArgs += ("--driver-docker-image", args.driverDockerImage)
}

// In client mode, launch the application main class directly
// In addition, add the main application jar and any added jars (if any) to the classpath
if (deployMode == CLIENT) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,19 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
var submissionToRequestStatusFor: String = null
var useRest: Boolean = true // used internally

// Kubernetes only
var kubernetesAppName: String = null
val exposeDriverPorts: HashMap[String, Int] = new HashMap[String, Int]()
var driverDockerImage: String = null
var executorDockerImage: String = null
var customExecutorSpecFile: String = null
var customExecutorSpecContainerName: String = null
var kubernetesMaster: String = null
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ideally putting these not into Spark Submit as much as possible is better, via configurations I think is the more preferred way as otherwise SparkSubmit will become very huge

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YARN allows specifying some configurations via spark submit arguments (e.g. queues) but it's unclear which arguments belong in spark submit arguments and which belong as Spark configurations.

var kubernetesAppNamespace: String = null
var kubernetesClientCertFile: String = null
var kubernetesClientKeyFile: String = null
var kubernetesCaCertFile: String = null

/** Default properties present in the currently defined defaults file. */
lazy val defaultSparkProperties: HashMap[String, String] = {
val defaultProperties = new HashMap[String, String]()
Expand Down Expand Up @@ -287,6 +300,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
master.startsWith("spark://") && deployMode == "cluster"
}

def isKubernetesCluster: Boolean = {
master.startsWith("kubernetes") && deployMode == "cluster"
}

override def toString: String = {
s"""Parsed arguments:
| master $master
Expand Down Expand Up @@ -438,6 +455,43 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case USAGE_ERROR =>
printUsageAndExit(1)

// Kubernetes only
case KUBERNETES_APP_NAME =>
kubernetesAppName = value

case KUBERNETES_APP_NAMESPACE =>
kubernetesAppNamespace = value

case KUBERNETES_CA_CERT_FILE =>
kubernetesCaCertFile = value

case KUBERNETES_CLIENT_CERT_FILE =>
kubernetesClientCertFile = value

case KUBERNETES_CLIENT_KEY_FILE =>
kubernetesClientKeyFile = value

case KUBERNETES_CUSTOM_EXECUTOR_SPEC_CONTAINER_NAME =>
customExecutorSpecFile = value

case KUBERNETES_CUSTOM_EXECUTOR_SPEC_FILE =>
customExecutorSpecContainerName = value

case KUBERNETES_DRIVER_DOCKER_IMAGE =>
driverDockerImage = value

case KUBERNETES_EXECUTOR_DOCKER_IMAGE =>
executorDockerImage = value

case KUBERNETES_EXPOSE_DRIVER_PORT =>
value.split("=", 2).toSeq match {
case Seq(k, v) => exposeDriverPorts(k) = v.toInt
case _ => SparkSubmit.printErrorAndExit(s"Driver port specified without '=': $value")
}

case KUBERNETES_MASTER =>
kubernetesMaster = value

case _ =>
throw new IllegalArgumentException(s"Unexpected argument '$opt'.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
new SecurityManager(executorConf),
clientMode = true)
val driver = fetcher.setupEndpointRefByURI(driverUrl)
val props = driver.askWithRetry[Seq[(String, String)]](RetrieveSparkProps) ++
val props = driver.askWithRetry[Seq[(String, String)]](RetrieveSparkProps(executorId)) ++
Seq[(String, String)](("spark.app.id", appId))
fetcher.shutdown()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable

private[spark] object CoarseGrainedClusterMessages {

case object RetrieveSparkProps extends CoarseGrainedClusterMessage

case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage

case class RetrieveSparkProps(executorId: String) extends CoarseGrainedClusterMessage

// Driver to executors
case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage

Expand Down
Loading