Skip to content

Conversation

@jiangzho
Copy link
Contributor

@jiangzho jiangzho commented Apr 27, 2024

What changes were proposed in this pull request?

This is a breakdown PR of #2 - adding a submission worker implementation for SparkApplication.

Why are the changes needed?

Spark Operator needs a submission worker to convert its abstraction (the SparkApplication API) into k8s resource spec.
This is a light-weight implementation based on native k8s integration.

As of now, it's based off Spark 4.0.0-preview1 - but it's assumed to serve all Spark LTS versions. This is feasible because as it aims to cover only the spec generation, Spark core jars are still brought-in by application images. E2Es would set up with operator later to ensure that.

Per SPIP doc, in future operator version(s) we may add more implementations for submission worker based on different Spark versions to achieve 100% version agnostic, at the cost of having multiple workers stand-by.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added unit test coverage.

Was this patch authored or co-authored using generative AI tooling?

no


fabric8Version=6.12.1
# Caution: fabric8 version should be aligned with Spark dependency
fabric8Version=6.7.2
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is caused by the co-exist dependency of Spark 3.5.1 . fabric8 6.7.2 and 6.12.1 are not binary compatible, it complains even if we exclude that from Spark dependency.

A reminder has been added here to align the fabric8 version used by Spark and operator. I'll update this when 4.0.0 rcs become available

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The incompatible was captured via unit tests.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for making a PR.

As we observe here, I'm not sure if the AS-IS design is sufficient or robust across multiple Spark versions. If we cannot find a way to be robust, it will bite us in the future again and again in the production.

It seems that we need a shim layer or multiple module like Iceberg

@sunchao sunchao changed the title [SPARK-48017]Add Spark application submission worker for operator [SPARK-48017] Add Spark application submission worker for operator Apr 30, 2024
@jiangzho
Copy link
Contributor Author

jiangzho commented Apr 30, 2024

It seems that we need a shim layer or multiple module like Iceberg

Yes, that's one mid-term goal that we will target for operator v1.0, in order to achieve fully version agnostic.

This PR proposes single submission worker based on latest spark-kubernetes - consider it's history, we tested the compatibility with Spark 3.2, 3.3, 3.4, 3.5. We can do the same for 4.0 to ensure no breaking change is introduced. This is the pattern adopted by most operator solutions, like Flink operator / Google Spark operator. I'm not saying this is the absolutely right way to go for longer term, but it could enable the first batch of evaluations on operator 0.1 while we work on the multi-submission worker mode.

The challenges of multi-version submission worker mode involves

  • the operator image can be heavy (packaging multiple Spark jars)
  • runtime resource consumption can be higher, because we need multiple containers (per Spark version) to avoid jar conflicts in class path.
  • deployment (helm chart) of operator can be a bit more complex when users are more familiar with operator. i.e., users might want to deploy operator with single submission worker mode, or a selection of Spark versions, or all known versions based on the need.

Given this can we start with this PR for v0.1 ?

@dongjoon-hyun
Copy link
Member

Thanks. Let me consider more.

@dongjoon-hyun
Copy link
Member

Thank you for renaming the package and updating LICENSE.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apache Spark 4.0.0-preview will arrive next Monday. Let's resume this PR with that as the first Spark version until we have version-agnostic capability.

lombokVersion=1.18.32

#Spark
scalaVersion=2.12
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you know, there is not Scala 2.12 from Apache Spark 4.0.0-preview.

So, I guess we need to use Scala 2.13 only in Spark Operator for both case Scala 2.12 and 2.13 submission.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup! upgrading to 2.13 along with 4.0.0-preview1

dependencies {
implementation project(":spark-operator-api")

implementation("org.apache.spark:spark-kubernetes_$scalaVersion:$sparkVersion")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me try to define the used classes as DeveloperAPI for this work. That will protect spark-kubernetes-operator from any accidental change from 4.0.0.

For now, Apache Spark guarantees only the following.

$ git grep '@DeveloperApi' resource-managers/kubernetes/
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala:@DeveloperApi
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala:@DeveloperApi
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala:@DeveloperApi
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesDriverCustomFeatureConfigStep.scala:@DeveloperApi
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesExecutorCustomFeatureConfigStep.scala:@DeveloperApi
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala:@DeveloperApi
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/AbstractPodsAllocator.scala:@DeveloperApi
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala:@DeveloperApi
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala:@DeveloperApi

import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils;
import org.apache.spark.deploy.k8s.submit.MainAppResource;

public class SparkAppDriverConf extends KubernetesDriverConf {
Copy link
Member

@dongjoon-hyun dongjoon-hyun May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since KubernetesDriverConf is a private class, always there has been a breaking change on KubernetesDriverConf like the following.

Container containerWithVolume =
new ContainerBuilder(pod.container())
.addNewEnv()
.withName(org.apache.spark.deploy.k8s.Constants.ENV_SPARK_CONF_DIR())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need a full package name when we have import statement? Maybe, the following?

- .withName(org.apache.spark.deploy.k8s.Constants.ENV_SPARK_CONF_DIR())
+ .withName(Constants.ENV_SPARK_CONF_DIR())

new ContainerBuilder(pod.container())
.addNewEnv()
.withName(org.apache.spark.deploy.k8s.Constants.ENV_SPARK_CONF_DIR())
.withValue(org.apache.spark.deploy.k8s.Constants.SPARK_CONF_DIR_INTERNAL())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

.withValue(org.apache.spark.deploy.k8s.Constants.SPARK_CONF_DIR_INTERNAL())
.endEnv()
.addNewVolumeMount()
.withName(org.apache.spark.deploy.k8s.Constants.SPARK_CONF_VOLUME_DRIVER())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

.endEnv()
.addNewVolumeMount()
.withName(org.apache.spark.deploy.k8s.Constants.SPARK_CONF_VOLUME_DRIVER())
.withMountPath(org.apache.spark.deploy.k8s.Constants.SPARK_CONF_DIR_INTERNAL())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

dongjoon-hyun added a commit to apache/spark that referenced this pull request May 2, 2024
### What changes were proposed in this pull request?

This PR aims to promote `KubernetesVolumeUtils` to `DeveloperApi` from Apache Spark 4.0.0 for Apache Spark Kubernetes Operator.

### Why are the changes needed?

This API was added by the following at `Apache Spark 3.0.0` and has been stable.
- #22959

Since `Apache Spark Kubernetes Operator` requires this, we had better maintain it as a developer API officially from `Apache Spark 4.0.0`.
- apache/spark-kubernetes-operator#10

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46326 from dongjoon-hyun/SPARK-48076.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
dongjoon-hyun added a commit to apache/spark that referenced this pull request May 2, 2024
### What changes were proposed in this pull request?

This PR aims to promote `KubernetesClientUtils` to `DeveloperApi`.

### Why are the changes needed?

Since `Apache Spark Kubernetes Operator` requires this, we had better maintain it as a developer API officially from `Apache Spark 4.0.0`.
- apache/spark-kubernetes-operator#10

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46327 from dongjoon-hyun/SPARK-48077.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
dongjoon-hyun added a commit to apache/spark that referenced this pull request May 2, 2024
### What changes were proposed in this pull request?

This PR aims to promote `org.apache.spark.deploy.k8s.Constants` to `DeveloperApi`

### Why are the changes needed?

Since `Apache Spark Kubernetes Operator` depends on this, we had better maintain it as a developer API officially from `Apache Spark 4.0.0`.
- apache/spark-kubernetes-operator#10

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46329 from dongjoon-hyun/SPARK-48078.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
dongjoon-hyun added a commit to apache/spark that referenced this pull request May 2, 2024
… `DeveloperApi`

### What changes were proposed in this pull request?

This PR aims to promote `*MainAppResource` and `NonJVMResource` to `DeveloperApi`.

### Why are the changes needed?

Since `Apache Spark Kubernetes Operator` depends on these traits and classes, we had better maintain it as a developer API officially from `Apache Spark 4.0.0`.
- apache/spark-kubernetes-operator#10

Since there are no changes after `3.0.0`, these are defined as `Stable`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46332 from dongjoon-hyun/SPARK-48080.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
build.gradle Outdated
repositories {
mavenCentral()
// This is a workaround to resolve Spark 4.0.0-preview-1
// To be removed for official release
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apache Spark recommends to use IDed TODOs. Please file a JIRA issue and use it like the following.

- // To be removed for official release
+ // TODO(SPARK-XXXXX) Use Apache Spark 4.0.0-preview1 when it's ready

commonsIOVersion=2.16.1
lombokVersion=1.18.32

#Spark
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a space.

  • #Spark -> # Spark.

&& applicationSpec.getExecutorSpec().getPodTemplateSpec() != null;
}

public static long getAttemptId(final SparkApplication app) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for adding final.

return appId();
}

public String configMapNameDriver() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we make a new K8s resource name, we should guarantee that this complies K8s naming limit.

Could you add a method description, what is the range of string length of this method's return value?

public class SparkAppSubmissionWorker {
// Default length limit for generated app id. Generated id is used as resource-prefix when
// user-provided id is too long for this purpose. This applied to all resources associated with
// the Spark app (including k8s service which has different naming length limit). This we
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This we?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the catch! fixed the typo.

primaryResource = new RMainAppResource(applicationSpec.getSparkRFiles());
}
effectiveSparkConf.setIfMissing(
"spark.master", "k8s://https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To @jiangzho and @aaruna, as we know, Apache Spark's ExternalClusterManager allows a custom K8s-based external cluster manager. So, it would be great if Spark K8s Operator has a capability from the beginning to allow a custom prefix in addition to k8s from the beginning.

https://github.com/apache/spark/blob/51623785c38c9b17a6d91cb8e7f686459bd4803e/core/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala#L33

def canCreate(masterURL: String): Boolean

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes - it should be possible to use custom Cluster Manager by setting spark.master. When master is not expplicitly set in SparkConf, this would automatically generate master URL based on the environment which operator is deployed in.

To make it more handy (e.g. still support automatic generating master url but with a different prefix), a new property has been introduced. Added javaDoc in getResourceSpec to clarify the behavior

String preferredIdPrefix = preferredId.substring(0, preferredIdPrefixLength);
return generateHashBasedId(
preferredIdPrefix,
app.getMetadata().getNamespace(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line seems to have some assumption on K8s namespace length.

  • Can we support the maximum K8s namespace length here?
  • Please make a test case for generateSparkAppId for that maximum K8s namespace length.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The namespace name / app name are used as values to generate hash which has a final length limit applied. Added Javadoc and one more test case to clarify this behavior.

sparkConf, appId, mock(JavaMainAppResource.class), "foo", null, Option.empty());
String resourcePrefix = sparkAppDriverConf.resourceNamePrefix();
Assertions.assertEquals(resourcePrefix, appId);
Assertions.assertTrue(sparkAppDriverConf.configMapNameDriver().contains(resourcePrefix));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add more test cases seperately at least for all public methods. For example, it would be great if configMapNameDriver has a new test case.


SparkAppResourceSpec appResourceSpec = new SparkAppResourceSpec(mockConf, mockSpec);

Assertions.assertEquals(2, appResourceSpec.getDriverResources().size());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we import Assertions.assertEquals?

appProps.put("spark.kubernetes.namespace", "ns2");
Map<String, String> overrides = new HashMap<>();
overrides.put("spark.executor.instances", "5");
overrides.put("spark.kubernetes.namespace", "ns3");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a new test coverage with a long namespace length.

.withMountPath(Constants.SPARK_CONF_DIR_INTERNAL())
.endVolumeMount()
.build();
Pod podWithVolume =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to have more specific name like podWithConfigMapVolume.

appId,
primaryResource,
applicationSpec.getMainClass(),
applicationSpec.getDriverArgs().toArray(new String[0]),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please define a final constant in order to reuse this, new String[0].

prefix, DEFAULT_ENCODE_BASE, DEFAULT_HASH_BASED_IDENTIFIER_LENGTH_LIMIT, identifiers);
}

public static String generateHashBasedId(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a method description because this is public static method. Especially, about the guaranteed return string length.

import org.apache.spark.k8s.operator.status.ApplicationStatus;
import org.apache.spark.k8s.operator.status.AttemptInfo;

class SparkAppSubmissionWorkerTest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add more test coverage. For example, a corner case like generateHashBasedId whose input has long long identifiers.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @jiangzho . I finished another round of reviews. Could you address them?

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you for addressing all comments (with IDed TODOs).
Merged to main.

@dongjoon-hyun
Copy link
Member

I wrote the current status summary here.

@jiangzho jiangzho deleted the worker branch July 23, 2024 23:04
dongjoon-hyun added a commit that referenced this pull request Oct 11, 2025
…SparkApplication` CRD

### What changes were proposed in this pull request?

This PR aims to fix `SparkAppDriverConf` to respect `sparkVersion` of `SparkApplication` CRD.

### Why are the changes needed?

This is a long standing bug from the initial implementation.
- #10

Since Apache Spark K8s Operator can launch various Spark versions, `spark-version` label should come from `SparkApplication` CRD's `sparkVersion` field.

However, currently, the Spark version of compile dependency is used for `Driver` resources (like `Driver Pod` and `Driver Service`. We should override this.

### Does this PR introduce _any_ user-facing change?

Yes, this is a bug fix to use a correct version information.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #385 from dongjoon-hyun/SPARK-53874.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants