Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1756,6 +1756,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
SparkEnv.set(null)
}
// Unset YARN mode system env variable, to allow switching between cluster types.
System.clearProperty("SPARK_YARN_MODE")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be super paranoid, I'd do this before the previous line.

SparkContext.clearActiveContext()
logInfo("Successfully stopped SparkContext")
}
Expand Down
30 changes: 15 additions & 15 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -385,27 +385,27 @@ class SparkHadoopUtil extends Logging {

object SparkHadoopUtil {

private val hadoop = {
val yarnMode = java.lang.Boolean.valueOf(
System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
if (yarnMode) {
try {
Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
.newInstance()
.asInstanceOf[SparkHadoopUtil]
} catch {
case e: Exception => throw new SparkException("Unable to load YARN support", e)
}
} else {
new SparkHadoopUtil
}
private lazy val hadoop = new SparkHadoopUtil
private lazy val yarn = try {
Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
.newInstance()
.asInstanceOf[SparkHadoopUtil]
} catch {
case e: Exception => throw new SparkException("Unable to load YARN support", e)
}

val SPARK_YARN_CREDS_TEMP_EXTENSION = ".tmp"

val SPARK_YARN_CREDS_COUNTER_DELIM = "-"

def get: SparkHadoopUtil = {
hadoop
// Check each time to support changing to/from YARN
val yarnMode = java.lang.Boolean.valueOf(
System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
if (yarnMode) {
yarn
} else {
hadoop
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ private[spark] class Client(
private val fireAndForget = isClusterMode &&
!sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true)

def stop(): Unit = yarnClient.stop()
def stop(): Unit = {
yarnClient.stop()
// Unset YARN mode system env variable, to allow switching between cluster types.
System.clearProperty("SPARK_YARN_MODE")
}

/**
* Submit an application running our ApplicationMaster to the ResourceManager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.scalatest.Matchers
import org.apache.hadoop.yarn.api.records.ApplicationAccessType

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils


Expand Down Expand Up @@ -233,4 +234,15 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
}
assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
}

test("check different hadoop utils based on env variable") {
try {
System.setProperty("SPARK_YARN_MODE", "true")
assert(SparkHadoopUtil.get.getClass === classOf[YarnSparkHadoopUtil])
System.setProperty("SPARK_YARN_MODE", "false")
assert(SparkHadoopUtil.get.getClass === classOf[SparkHadoopUtil])
} finally {
System.clearProperty("SPARK_YARN_MODE")
}
}
}