Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
322295c
Start of Scala + Armada work
ClifHouck Dec 2, 2024
e1b2dc1
Skeletonized code and at least one unit test!
ClifHouck Jan 15, 2025
3834f94
Remove code that is causing warnings to be spewed by the test runner
ClifHouck Jan 15, 2025
cede366
Add a README
ClifHouck Jan 15, 2025
20f344b
Update core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
ClifHouck Jan 28, 2025
30ee857
Update assembly/pom.xml
ClifHouck Jan 28, 2025
1eefe16
Update resource-managers/armada/core/README.md
ClifHouck Jan 28, 2025
60038cf
Update resource-managers/armada/core/README.md
ClifHouck Jan 28, 2025
b214857
Update resource-managers/armada/core/README.md
ClifHouck Jan 28, 2025
cfc7c0d
Update resource-managers/armada/core/README.md
ClifHouck Jan 28, 2025
b36b3f9
Add armada project to project/SparkBuild.scala
EnricoMi Jan 29, 2025
91b0c2b
Fix code style issues
EnricoMi Jan 29, 2025
d46a6c6
Merge remote-tracking branch 'upstream/master' into clif/armada_integ…
EnricoMi Jan 29, 2025
8eed374
Add integration test to Github CI
EnricoMi Jan 27, 2025
ddaa53a
Add some sleeps
EnricoMi Jan 27, 2025
7bd73fc
Add armada spark job yamls
EnricoMi Jan 28, 2025
ef9af65
More sleep
EnricoMi Jan 28, 2025
644cbe6
Use mvn rather than sbt to copy jar into assembly
EnricoMi Jan 29, 2025
e3e3b3b
Use sbt instead of mvn
EnricoMi Jan 29, 2025
7144daa
Add integration test to CI
EnricoMi Jan 29, 2025
abbf1f6
working
Jan 31, 2025
9ad6d65
cleanup
Jan 31, 2025
22ffae9
cleanup
Jan 31, 2025
12a9ba4
cleanup
Jan 31, 2025
9624c06
run script
Jan 31, 2025
f3dd404
cleanup
Jan 31, 2025
f8257a3
cleanup
Jan 31, 2025
36ee2ec
cleanup
Jan 31, 2025
3834812
cleanup entrypoint.sh
Jan 31, 2025
66f009b
restored old yaml files
Jan 31, 2025
6623dd2
cleanup
Jan 31, 2025
5f9caa0
cleanup
Feb 1, 2025
c4f9c64
cleanup
Feb 1, 2025
3b53d36
Merge pull request #37 from GeorgeJahad/addingSparkPi
ClifHouck Feb 3, 2025
6b1d11b
first attempt
Feb 5, 2025
01cab78
updated for latest client
Feb 5, 2025
e0f00b2
cleanup
Feb 5, 2025
e24ff41
cleanup
Feb 5, 2025
a1b86f9
cleanup
Feb 6, 2025
5d1fff1
Merge pull request #38 from GeorgeJahad/addingArmadaClient
ClifHouck Feb 7, 2025
b518e8b
Spark submit works with an armada URL and actually submits a driver t…
ClifHouck Feb 7, 2025
6f5f535
spark driver at loads the right class to execute
ClifHouck Feb 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions .github/workflows/armada.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
name: Aramda

on:
pull_request:

jobs:
armada:
name: Armada integration
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
path: spark-armada
- uses: actions/checkout@v4
with:
repository: armadaproject/armada-operator
path: armada-operator
- run: |
cd spark-armada
./build/sbt package -Parmada -Pkubernetes
./bin/docker-image-tool.sh -t testing build
docker image save -o ../spark_testing.tar spark:testing
cd ..

cd armada-operator
make kind-all
./bin/tooling/kind load image-archive ../spark_testing.tar --name armada

# sleep a bit, or we see: create queue request failed: rpc error: code = DeadlineExceeded
sleep 60

./bin/app/armadactl create queue test

# sleep a bit, or we see: rpc error: code = PermissionDenied desc = could not find queue "test"
sleep 60

./bin/app/armadactl submit ../spark-armada/examples/spark-driver-job.yaml
./bin/app/armadactl submit ../spark-armada/examples/spark-executor-job.yaml

# wait for the jobs to start
sleep 60

# inspect jobs
kubectl get pods
for pod in $(kubectl get pods | grep armada | cut -d " " -f 1)
do
echo "$pod"
kubectl logs pod/$pod
echo
done

10 changes: 10 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,16 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>armada</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-armada_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>hive</id>
<dependencies>
Expand Down
27 changes: 25 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,10 @@ private[spark] class SparkSubmit extends Logging {
case "yarn" => YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("k8s") => KUBERNETES
case m if m.startsWith("armada") => ARMADA
case m if m.startsWith("local") => LOCAL
case _ =>
error("Master must either be yarn or start with spark, k8s, or local")
error("Master must either be yarn or start with spark, k8s, armada, or local")
-1
}
case None => LOCAL // default master or remote mode.
Expand Down Expand Up @@ -296,6 +297,15 @@ private[spark] class SparkSubmit extends Logging {
}
}

if (clusterManager == ARMADA) {
printMessage(s"Armada selected as cluster manager.")
if (!Utils.classIsLoadable(ARMADA_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
error(
s"Could not load ARMADA class \"${ARMADA_CLUSTER_SUBMIT_CLASS}\". " +
"This copy of Spark may not have been compiled with ARMADA support.")
}
}

// Fail fast, the following modes are not supported or applicable
(clusterManager, deployMode) match {
case (STANDALONE, CLUSTER) if args.isPython =>
Expand Down Expand Up @@ -329,6 +339,8 @@ private[spark] class SparkSubmit extends Logging {
val isKubernetesClient = clusterManager == KUBERNETES && deployMode == CLIENT
val isKubernetesClusterModeDriver = isKubernetesClient &&
sparkConf.getBoolean("spark.kubernetes.submitInDriver", false)
// TODO: does client/cluster mode matter here?
val isArmada = clusterManager == ARMADA
val isCustomClasspathInClusterModeDisallowed =
!sparkConf.get(ALLOW_CUSTOM_CLASSPATH_BY_PROXY_USER_IN_CLUSTER_MODE) &&
args.proxyUser != null &&
Expand Down Expand Up @@ -416,6 +428,7 @@ private[spark] class SparkSubmit extends Logging {
downloadFileList(_, targetDir, sparkConf, hadoopConf)
}.orNull

// TODO: May have to do the same/similar for Armada
if (isKubernetesClusterModeDriver) {
// SPARK-33748: this mimics the behaviour of Yarn cluster mode. If the driver is running
// in cluster mode, the archives should be available in the driver's current working
Expand Down Expand Up @@ -670,6 +683,7 @@ private[spark] class SparkSubmit extends Logging {
confKey = KEYTAB.key),
OptionAssigner(args.pyFiles, ALL_CLUSTER_MGRS, CLUSTER, confKey = SUBMIT_PYTHON_FILES.key),

// TODO: Add Armada where appropriate.
// Propagate attributes for dependency resolution at the driver side
OptionAssigner(args.packages, STANDALONE | KUBERNETES,
CLUSTER, confKey = JAR_PACKAGES.key),
Expand Down Expand Up @@ -864,6 +878,12 @@ private[spark] class SparkSubmit extends Logging {
}
}

if (isArmada) {
// FIXME: Make sure we populate what we need here!
childMainClass = ARMADA_CLUSTER_SUBMIT_CLASS
childArgs ++= Array("--class", args.mainClass)
}

// Load any properties specified through --conf and the default properties file
for ((k, v) <- args.sparkProperties) {
sparkConf.setIfMissing(k, v)
Expand Down Expand Up @@ -1071,7 +1091,8 @@ object SparkSubmit extends CommandLineUtils with Logging {
private val STANDALONE = 2
private val LOCAL = 8
private val KUBERNETES = 16
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | LOCAL | KUBERNETES
private val ARMADA = 32
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | LOCAL | KUBERNETES | ARMADA

// Deploy modes
private val CLIENT = 1
Expand All @@ -1095,6 +1116,8 @@ object SparkSubmit extends CommandLineUtils with Logging {
private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()
private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS =
"org.apache.spark.deploy.k8s.submit.KubernetesClientApplication"
private[deploy] val ARMADA_CLUSTER_SUBMIT_CLASS =
"org.apache.spark.deploy.armada.submit.ArmadaClientApplication"

override def main(args: Array[String]): Unit = {
Option(System.getenv("SPARK_PREFER_IPV6"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
s"""
|Options:
| --master MASTER_URL spark://host:port, yarn,
| k8s://https://host:port, or local (Default: local[*]).
| k8s://https://host:port, armada://host:port,
| or local (Default: local[*]).
| --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or
| on one of the worker machines inside the cluster ("cluster")
| (Default: client).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,10 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter
executorStageSummaryWrappers.foreach { exec =>
// only the first executor is expected to be excluded
val expectedExcludedFlag = exec.executorId == execIds.head
assert(exec.info.isBlacklistedForStage === expectedExcludedFlag)
assert(exec.info.isExcludedForStage === expectedExcludedFlag)
}

check[ExecutorSummaryWrapper](execIds.head) { exec =>
assert(exec.info.blacklistedInStages === Set(stages.head.stageId))
assert(exec.info.excludedInStages === Set(stages.head.stageId))

}
Expand All @@ -306,7 +304,6 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter
assert(executorStageSummaryWrappersForNode.nonEmpty)
executorStageSummaryWrappersForNode.foreach { exec =>
// both executor is expected to be excluded
assert(exec.info.isBlacklistedForStage)
assert(exec.info.isExcludedForStage)

}
Expand Down Expand Up @@ -467,7 +464,6 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter
}

check[ExecutorSummaryWrapper](execIds.head) { exec =>
assert(exec.info.blacklistedInStages === Set())
assert(exec.info.excludedInStages === Set())
}

Expand Down Expand Up @@ -495,7 +491,6 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter
stageAttemptId = stages.last.attemptNumber()))

check[ExecutorSummaryWrapper](execIds.head) { exec =>
assert(exec.info.blacklistedInStages === Set(stages.last.stageId))
assert(exec.info.excludedInStages === Set(stages.last.stageId))
}

Expand Down Expand Up @@ -652,29 +647,25 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter
time += 1
listener.onExecutorExcluded(SparkListenerExecutorExcluded(time, "1", 42))
check[ExecutorSummaryWrapper]("1") { exec =>
assert(exec.info.isBlacklisted)
assert(exec.info.isExcluded)
}

time += 1
listener.onExecutorUnexcluded(SparkListenerExecutorUnexcluded(time, "1"))
check[ExecutorSummaryWrapper]("1") { exec =>
assert(!exec.info.isBlacklisted)
assert(!exec.info.isExcluded)
}

// Exclude a node.
time += 1
listener.onNodeExcluded(SparkListenerNodeExcluded(time, "1.example.com", 2))
check[ExecutorSummaryWrapper]("1") { exec =>
assert(exec.info.isBlacklisted)
assert(exec.info.isExcluded)
}

time += 1
listener.onNodeUnexcluded(SparkListenerNodeUnexcluded(time, "1.example.com"))
check[ExecutorSummaryWrapper]("1") { exec =>
assert(!exec.info.isBlacklisted)
assert(!exec.info.isExcluded)
}

Expand Down
28 changes: 28 additions & 0 deletions examples/runSparkPi.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/bash

# Start up the driver, get it's ip address, then start the executor with it
set -e

echo
echo starting SparkPi driver
armadactl submit examples/spark-pi-driver.yaml > /tmp/jobid.txt
JOB_ID=`cat /tmp/jobid.txt | awk '{print $5}'`
cat /tmp/jobid.txt
echo


echo waiting for SparkPi driver to start
sleep 20

echo
echo SparkPi driver ip addr:
IP_ADDR=`kubectl get pod "armada-$JOB_ID-0" -o jsonpath='{.status.podIP}'`
echo $IP_ADDR
echo

echo passing drivers ip addr to executor and starting it
IP_ADDR=$IP_ADDR envsubst < examples/spark-pi-executor.yaml > /tmp/ex.yaml
armadactl submit /tmp/ex.yaml
echo

echo SparkPi driver/executor started
31 changes: 31 additions & 0 deletions examples/spark-driver-job.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
queue: test
jobSetId: job-set-1
jobs:
- namespace: default
priority: 0
podSpec:
terminationGracePeriodSeconds: 0
restartPolicy: Never
containers:
- name: spark-driver
image: spark:testing
env:
- name: SPARK_DRIVER_BIND_ADDRESS
value: "0.0.0.0:1234"
command:
- /opt/entrypoint.sh
args:
- driver
- --verbose
- --class
- org.apache.spark.examples.LocalPi
- --master
- armada://192.168.1.167:50051
- submit
resources:
limits:
memory: 1Gi
cpu: 1
requests:
memory: 1Gi
cpu: 1
Loading
Loading