From 55f5ceecdbd95ec63b0b53260f5e83f03606d6b0 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 10 Oct 2025 17:05:18 -0700 Subject: [PATCH] [SPARK-53874] `SparkAppDriverConf` should respect `sparkVersion` of `SparkApplication` CRD --- .../k8s/operator/SparkAppDriverConf.java | 20 ++++++++++- .../operator/SparkAppSubmissionWorker.java | 4 +++ .../k8s/operator/SparkAppDriverConfTest.java | 36 +++++++++++++++++-- .../SparkAppSubmissionWorkerTest.java | 30 ++++++++-------- 4 files changed, 72 insertions(+), 18 deletions(-) diff --git a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppDriverConf.java b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppDriverConf.java index 15f3a02b..a4cb03a9 100644 --- a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppDriverConf.java +++ b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppDriverConf.java @@ -19,6 +19,8 @@ package org.apache.spark.k8s.operator; +import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_VERSION_NAME; + import scala.Option; import org.apache.spark.SparkConf; @@ -30,14 +32,18 @@ /** Spark application driver configuration. */ public final class SparkAppDriverConf extends KubernetesDriverConf { + private final String sparkVersion; + private SparkAppDriverConf( SparkConf sparkConf, + String sparkVersion, String appId, MainAppResource mainAppResource, String mainClass, String[] appArgs, Option proxyUser) { super(sparkConf, appId, mainAppResource, mainClass, appArgs, proxyUser, null); + this.sparkVersion = sparkVersion; } /** @@ -53,6 +59,7 @@ private SparkAppDriverConf( */ public static SparkAppDriverConf create( SparkConf sparkConf, + String sparkVersion, String appId, MainAppResource mainAppResource, String mainClass, @@ -61,7 +68,8 @@ public static SparkAppDriverConf create( // pre-create check only KubernetesVolumeUtils.parseVolumesWithPrefix( sparkConf, Config.KUBERNETES_EXECUTOR_VOLUMES_PREFIX()); - return new SparkAppDriverConf(sparkConf, appId, mainAppResource, mainClass, appArgs, proxyUser); + return new SparkAppDriverConf( + sparkConf, sparkVersion, appId, mainAppResource, mainClass, appArgs, proxyUser); } /** @@ -74,6 +82,16 @@ public String resourceNamePrefix() { return appId(); } + /** + * Returns the driver label key and value map. + * + * @return The label key-value pair map. + */ + @Override + public scala.collection.immutable.Map labels() { + return super.labels().updated(LABEL_SPARK_VERSION_NAME, sparkVersion); + } + /** * Creates the name to be used by the driver config map. The name consists of `resourceNamePrefix` * and Spark instance type (driver). Operator proposes `resourceNamePrefix` with leaves naming diff --git a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java index 04beda0c..096f8fe2 100644 --- a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java +++ b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java @@ -41,6 +41,7 @@ import org.apache.spark.k8s.operator.spec.ApplicationSpec; import org.apache.spark.k8s.operator.spec.ConfigMapSpec; import org.apache.spark.k8s.operator.spec.DriverServiceIngressSpec; +import org.apache.spark.k8s.operator.spec.RuntimeVersions; import org.apache.spark.k8s.operator.utils.ModelUtils; import org.apache.spark.k8s.operator.utils.StringUtils; @@ -158,8 +159,11 @@ protected SparkAppDriverConf buildDriverConf( sparkMasterUrlPrefix + "https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT"); String appId = generateSparkAppId(app); effectiveSparkConf.setIfMissing("spark.app.id", appId); + RuntimeVersions versions = applicationSpec.getRuntimeVersions(); + String sparkVersion = (versions != null) ? versions.getSparkVersion() : "UNKNOWN"; return SparkAppDriverConf.create( effectiveSparkConf, + sparkVersion, effectiveSparkConf.getAppId(), primaryResource, applicationSpec.getMainClass(), diff --git a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppDriverConfTest.java b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppDriverConfTest.java index 69405150..fd94d3af 100644 --- a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppDriverConfTest.java +++ b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppDriverConfTest.java @@ -33,6 +33,8 @@ import org.apache.spark.deploy.k8s.submit.JavaMainAppResource; class SparkAppDriverConfTest { + static final String VERSION = "dev"; + @Test void testResourceNamePrefix() { // Resource prefix shall be deterministic per SparkApp per attempt @@ -42,7 +44,13 @@ void testResourceNamePrefix() { String appId = UUID.randomUUID().toString(); SparkAppDriverConf sparkAppDriverConf = SparkAppDriverConf.create( - sparkConf, appId, mock(JavaMainAppResource.class), "foo", null, Option.empty()); + sparkConf, + VERSION, + appId, + mock(JavaMainAppResource.class), + "foo", + null, + Option.empty()); String resourcePrefix = sparkAppDriverConf.resourceNamePrefix(); assertEquals( resourcePrefix, @@ -65,10 +73,34 @@ void testConfigMapNameDriver() { String appId = "a".repeat(1000); SparkAppDriverConf sparkAppDriverConf = SparkAppDriverConf.create( - sparkConf, appId, mock(JavaMainAppResource.class), "foo", null, Option.empty()); + sparkConf, + VERSION, + appId, + mock(JavaMainAppResource.class), + "foo", + null, + Option.empty()); String configMapNameDriver = sparkAppDriverConf.configMapNameDriver(); assertTrue( configMapNameDriver.length() <= 253, "config map name length should always comply k8s DNS subdomain length"); } + + @Test + void testLabels() { + SparkConf sparkConf = new SparkConf(); + sparkConf.set("foo", "bar"); + sparkConf.set("spark.executor.instances", "1"); + String appId = "a".repeat(1000); + SparkAppDriverConf sparkAppDriverConf = + SparkAppDriverConf.create( + sparkConf, + VERSION, + appId, + mock(JavaMainAppResource.class), + "foo", + null, + Option.empty()); + assertEquals(VERSION, sparkAppDriverConf.labels().get("spark-version").get()); + } } diff --git a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java index a1abde1b..0dcc43b1 100644 --- a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java +++ b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java @@ -76,7 +76,7 @@ void buildDriverConfShouldApplySpecAndPropertiesOverride() { SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker(); SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, overrides); - assertEquals(6, constructorArgs.get(conf).size()); + assertEquals(7, constructorArgs.get(conf).size()); // validate SparkConf with override assertInstanceOf(SparkConf.class, constructorArgs.get(conf).get(0)); @@ -90,14 +90,14 @@ void buildDriverConfShouldApplySpecAndPropertiesOverride() { "namespace from CR takes highest precedence"); // validate main resources - assertInstanceOf(JavaMainAppResource.class, constructorArgs.get(conf).get(2)); - JavaMainAppResource mainResource = (JavaMainAppResource) constructorArgs.get(conf).get(2); + assertInstanceOf(JavaMainAppResource.class, constructorArgs.get(conf).get(3)); + JavaMainAppResource mainResource = (JavaMainAppResource) constructorArgs.get(conf).get(3); assertTrue(mainResource.primaryResource().isEmpty()); - assertEquals("foo-class", constructorArgs.get(conf).get(3)); + assertEquals("foo-class", constructorArgs.get(conf).get(4)); - assertInstanceOf(String[].class, constructorArgs.get(conf).get(4)); - String[] capturedArgs = (String[]) constructorArgs.get(conf).get(4); + assertInstanceOf(String[].class, constructorArgs.get(conf).get(5)); + String[] capturedArgs = (String[]) constructorArgs.get(conf).get(5); assertEquals(2, capturedArgs.length); assertEquals("a", capturedArgs[0]); assertEquals("b", capturedArgs[1]); @@ -120,11 +120,11 @@ void buildDriverConfForPythonApp() { SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker(); SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, Collections.emptyMap()); - assertEquals(6, constructorArgs.get(conf).size()); + assertEquals(7, constructorArgs.get(conf).size()); // validate main resources - assertInstanceOf(PythonMainAppResource.class, constructorArgs.get(conf).get(2)); - PythonMainAppResource mainResource = (PythonMainAppResource) constructorArgs.get(conf).get(2); + assertInstanceOf(PythonMainAppResource.class, constructorArgs.get(conf).get(3)); + PythonMainAppResource mainResource = (PythonMainAppResource) constructorArgs.get(conf).get(3); assertEquals("foo", mainResource.primaryResource()); } } @@ -146,13 +146,13 @@ void handlePyFiles() { SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker(); SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, Collections.emptyMap()); - assertEquals(6, constructorArgs.get(conf).size()); + assertEquals(7, constructorArgs.get(conf).size()); assertEquals( "lib.py", ((SparkConf) constructorArgs.get(conf).get(0)).get("spark.submit.pyFiles")); // validate main resources - assertInstanceOf(PythonMainAppResource.class, constructorArgs.get(conf).get(2)); - PythonMainAppResource mainResource = (PythonMainAppResource) constructorArgs.get(conf).get(2); + assertInstanceOf(PythonMainAppResource.class, constructorArgs.get(conf).get(3)); + PythonMainAppResource mainResource = (PythonMainAppResource) constructorArgs.get(conf).get(3); assertEquals("main.py", mainResource.primaryResource()); } } @@ -173,11 +173,11 @@ void buildDriverConfForRApp() { SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker(); SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, Collections.emptyMap()); - assertEquals(6, constructorArgs.get(conf).size()); + assertEquals(7, constructorArgs.get(conf).size()); // validate main resources - assertInstanceOf(RMainAppResource.class, constructorArgs.get(conf).get(2)); - RMainAppResource mainResource = (RMainAppResource) constructorArgs.get(conf).get(2); + assertInstanceOf(RMainAppResource.class, constructorArgs.get(conf).get(3)); + RMainAppResource mainResource = (RMainAppResource) constructorArgs.get(conf).get(3); assertEquals("foo", mainResource.primaryResource()); } }