Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.spark.k8s.operator;

import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_VERSION_NAME;

import scala.Option;

import org.apache.spark.SparkConf;
Expand All @@ -30,14 +32,18 @@

/** Spark application driver configuration. */
public final class SparkAppDriverConf extends KubernetesDriverConf {
private final String sparkVersion;

private SparkAppDriverConf(
SparkConf sparkConf,
String sparkVersion,
String appId,
MainAppResource mainAppResource,
String mainClass,
String[] appArgs,
Option<String> proxyUser) {
super(sparkConf, appId, mainAppResource, mainClass, appArgs, proxyUser, null);
this.sparkVersion = sparkVersion;
}

/**
Expand All @@ -53,6 +59,7 @@ private SparkAppDriverConf(
*/
public static SparkAppDriverConf create(
SparkConf sparkConf,
String sparkVersion,
String appId,
MainAppResource mainAppResource,
String mainClass,
Expand All @@ -61,7 +68,8 @@ public static SparkAppDriverConf create(
// pre-create check only
KubernetesVolumeUtils.parseVolumesWithPrefix(
sparkConf, Config.KUBERNETES_EXECUTOR_VOLUMES_PREFIX());
return new SparkAppDriverConf(sparkConf, appId, mainAppResource, mainClass, appArgs, proxyUser);
return new SparkAppDriverConf(
sparkConf, sparkVersion, appId, mainAppResource, mainClass, appArgs, proxyUser);
}

/**
Expand All @@ -74,6 +82,16 @@ public String resourceNamePrefix() {
return appId();
}

/**
* Returns the driver label key and value map.
*
* @return The label key-value pair map.
*/
@Override
public scala.collection.immutable.Map<String, String> labels() {
return super.labels().updated(LABEL_SPARK_VERSION_NAME, sparkVersion);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This overrides the underlying Spark version (from the compile dependency), 4.1.0-preview2.

}

/**
* Creates the name to be used by the driver config map. The name consists of `resourceNamePrefix`
* and Spark instance type (driver). Operator proposes `resourceNamePrefix` with leaves naming
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.spark.k8s.operator.spec.ApplicationSpec;
import org.apache.spark.k8s.operator.spec.ConfigMapSpec;
import org.apache.spark.k8s.operator.spec.DriverServiceIngressSpec;
import org.apache.spark.k8s.operator.spec.RuntimeVersions;
import org.apache.spark.k8s.operator.utils.ModelUtils;
import org.apache.spark.k8s.operator.utils.StringUtils;

Expand Down Expand Up @@ -158,8 +159,11 @@ protected SparkAppDriverConf buildDriverConf(
sparkMasterUrlPrefix + "https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT");
String appId = generateSparkAppId(app);
effectiveSparkConf.setIfMissing("spark.app.id", appId);
RuntimeVersions versions = applicationSpec.getRuntimeVersions();
String sparkVersion = (versions != null) ? versions.getSparkVersion() : "UNKNOWN";
return SparkAppDriverConf.create(
effectiveSparkConf,
sparkVersion,
effectiveSparkConf.getAppId(),
primaryResource,
applicationSpec.getMainClass(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.spark.deploy.k8s.submit.JavaMainAppResource;

class SparkAppDriverConfTest {
static final String VERSION = "dev";

@Test
void testResourceNamePrefix() {
// Resource prefix shall be deterministic per SparkApp per attempt
Expand All @@ -42,7 +44,13 @@ void testResourceNamePrefix() {
String appId = UUID.randomUUID().toString();
SparkAppDriverConf sparkAppDriverConf =
SparkAppDriverConf.create(
sparkConf, appId, mock(JavaMainAppResource.class), "foo", null, Option.empty());
sparkConf,
VERSION,
appId,
mock(JavaMainAppResource.class),
"foo",
null,
Option.empty());
String resourcePrefix = sparkAppDriverConf.resourceNamePrefix();
assertEquals(
resourcePrefix,
Expand All @@ -65,10 +73,34 @@ void testConfigMapNameDriver() {
String appId = "a".repeat(1000);
SparkAppDriverConf sparkAppDriverConf =
SparkAppDriverConf.create(
sparkConf, appId, mock(JavaMainAppResource.class), "foo", null, Option.empty());
sparkConf,
VERSION,
appId,
mock(JavaMainAppResource.class),
"foo",
null,
Option.empty());
String configMapNameDriver = sparkAppDriverConf.configMapNameDriver();
assertTrue(
configMapNameDriver.length() <= 253,
"config map name length should always comply k8s DNS subdomain length");
}

@Test
void testLabels() {
SparkConf sparkConf = new SparkConf();
sparkConf.set("foo", "bar");
sparkConf.set("spark.executor.instances", "1");
String appId = "a".repeat(1000);
SparkAppDriverConf sparkAppDriverConf =
SparkAppDriverConf.create(
sparkConf,
VERSION,
appId,
mock(JavaMainAppResource.class),
"foo",
null,
Option.empty());
assertEquals(VERSION, sparkAppDriverConf.labels().get("spark-version").get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ void buildDriverConfShouldApplySpecAndPropertiesOverride() {

SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker();
SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, overrides);
assertEquals(6, constructorArgs.get(conf).size());
assertEquals(7, constructorArgs.get(conf).size());

// validate SparkConf with override
assertInstanceOf(SparkConf.class, constructorArgs.get(conf).get(0));
Expand All @@ -90,14 +90,14 @@ void buildDriverConfShouldApplySpecAndPropertiesOverride() {
"namespace from CR takes highest precedence");

// validate main resources
assertInstanceOf(JavaMainAppResource.class, constructorArgs.get(conf).get(2));
JavaMainAppResource mainResource = (JavaMainAppResource) constructorArgs.get(conf).get(2);
assertInstanceOf(JavaMainAppResource.class, constructorArgs.get(conf).get(3));
JavaMainAppResource mainResource = (JavaMainAppResource) constructorArgs.get(conf).get(3);
assertTrue(mainResource.primaryResource().isEmpty());

assertEquals("foo-class", constructorArgs.get(conf).get(3));
assertEquals("foo-class", constructorArgs.get(conf).get(4));

assertInstanceOf(String[].class, constructorArgs.get(conf).get(4));
String[] capturedArgs = (String[]) constructorArgs.get(conf).get(4);
assertInstanceOf(String[].class, constructorArgs.get(conf).get(5));
String[] capturedArgs = (String[]) constructorArgs.get(conf).get(5);
assertEquals(2, capturedArgs.length);
assertEquals("a", capturedArgs[0]);
assertEquals("b", capturedArgs[1]);
Expand All @@ -120,11 +120,11 @@ void buildDriverConfForPythonApp() {

SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker();
SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, Collections.emptyMap());
assertEquals(6, constructorArgs.get(conf).size());
assertEquals(7, constructorArgs.get(conf).size());

// validate main resources
assertInstanceOf(PythonMainAppResource.class, constructorArgs.get(conf).get(2));
PythonMainAppResource mainResource = (PythonMainAppResource) constructorArgs.get(conf).get(2);
assertInstanceOf(PythonMainAppResource.class, constructorArgs.get(conf).get(3));
PythonMainAppResource mainResource = (PythonMainAppResource) constructorArgs.get(conf).get(3);
assertEquals("foo", mainResource.primaryResource());
}
}
Expand All @@ -146,13 +146,13 @@ void handlePyFiles() {

SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker();
SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, Collections.emptyMap());
assertEquals(6, constructorArgs.get(conf).size());
assertEquals(7, constructorArgs.get(conf).size());
assertEquals(
"lib.py", ((SparkConf) constructorArgs.get(conf).get(0)).get("spark.submit.pyFiles"));

// validate main resources
assertInstanceOf(PythonMainAppResource.class, constructorArgs.get(conf).get(2));
PythonMainAppResource mainResource = (PythonMainAppResource) constructorArgs.get(conf).get(2);
assertInstanceOf(PythonMainAppResource.class, constructorArgs.get(conf).get(3));
PythonMainAppResource mainResource = (PythonMainAppResource) constructorArgs.get(conf).get(3);
assertEquals("main.py", mainResource.primaryResource());
}
}
Expand All @@ -173,11 +173,11 @@ void buildDriverConfForRApp() {

SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker();
SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, Collections.emptyMap());
assertEquals(6, constructorArgs.get(conf).size());
assertEquals(7, constructorArgs.get(conf).size());

// validate main resources
assertInstanceOf(RMainAppResource.class, constructorArgs.get(conf).get(2));
RMainAppResource mainResource = (RMainAppResource) constructorArgs.get(conf).get(2);
assertInstanceOf(RMainAppResource.class, constructorArgs.get(conf).get(3));
RMainAppResource mainResource = (RMainAppResource) constructorArgs.get(conf).get(3);
assertEquals("foo", mainResource.primaryResource());
}
}
Expand Down
Loading