From 2c2e1189aa100c45424c36ec2419eccdcbaa6e77 Mon Sep 17 00:00:00 2001 From: Bernardo Gomez Palacio Date: Thu, 17 Apr 2014 09:33:55 -0700 Subject: [PATCH] [SPARK-1522]: YARN ClientBase throws a NPE if there is no YARN Application specific CP The current implementation of ClientBase.getDefaultYarnApplicationClasspath inspects the MRJobConfig class for the field DEFAULT_YARN_APPLICATION_CLASSPATH when it should be really looking into YarnConfiguration. If the Application Configuration has no yarn.application.classpath defined a NPE exception will be thrown. Public API Changes =========================== * YARN ClientBase getDefault*ApplicationClasspath returns Option[Seq[String]] This commit depicts how the `ClientBase` API could change the `getDefaultYarnApplicationClasspath` and `getDefaultMRApplicationClasspath` to return a `Option[Seq[String]]` while recovering from `NoSuchFieldException`. Both methods that return the default application's *classpath*, for both *YARN* as well as *Map Reduce (MR)*, use reflection and per the Java API documentation they can throw the following exceptions: * Class:getField(String name): * NoSuchFieldException - if a field with the specified name is not found. * NullPointerException - if name is null * SecurityException - If a security manager, s, is present and any of the following conditions is met: 1. Invocation of s.checkMemberAccess(this, Member.PUBLIC) denies access to the field. 2. The caller's class loader is not the same as or an ancestor of the class loader for the current class and invocation of `s.checkPackageAccess()` denies access to the package of this class. * Field:Object get(Object obj): * IllegalAccessException - if this Field object is enforcing Java language access control and the underlying field is inaccessible. * IllegalArgumentException - if the specified object is not an instance of the class or interface declaring the underlying field (or a subclass or implementor thereof). * NullPointerException - if the specified object is null and the field is an instance field. * ExceptionInInitializerError - if the initialization provoked by this method fails. **NOTE**: The above is based on the *Java API for JDK 1.7* An interesting thing to notice is that the official JDK doesn't mention the occurrence of the `NoSuchFieldError`. This is completely acceptable per the JDK spec. The reason is that it is an *Error* and as described by the Java Language Specification and depicted in the *Error Class* documentation. An `Error` "indicates serious problems that a reasonable application should not try to catch." While An `Exception` "indicates conditions that a reasonable application might want to catch." If we actually dig deeper according to the *JVM SE7 Specification* "While Loading, Linking, and Initializing, if an error occurs during resolution of a symbolic reference, then an instance of IncompatibleClassChangeError (or a subclass) must be thrown..." "If an attempt by the Java Virtual Machine to resolve a symbolic reference fails because an error is thrown that is an instance of LinkageError (or a subclass), then subsequent attempts to resolve the reference always fail with the same error that was thrown as a result of the initial resolution attempt." Now `NoSuchFieldError` extends `LinkageError` which in turn is a `IncompatibleClassChangeError` and according to its documentation, the *LinkageError Class*, "indicates that a class has some dependency on another class; however, the latter class has incompatibly changed after the compilation of the former class." Why all these is important and how it relates with a couple of lines of code? Well, the original approach catches the two most probable problems you might encounter if you access, using reflection, a field that you are almost sure that if it exist it will be of _public_ access but you are not sure it will always be there. Interesting enough the original implementation addresses one of the _exceptions_ as well as a potential _linkage error_ but as mentioned neglects a documented _security exception_, probably due its unlikeliness to occur. Fact is that if an error _bubbles_ up the Spark YARN Client doesn't handle it will terminate, in a probably obscure fashion. The current call stack is as follows. Client >> run >> runApp >> ClientBase.setupLaunchEnv >> populateClasspath In my opinion it is questionable to let an exception escape of this context, the _ClientBase Object_. In my opinion such _ClientBase Object_ should fail gracefully by handling the potential _exceptions_ and _linkage error_ while providing enough logging to let a user know and identify what happened. Yet again, in my opinion the implementation in this commit handles it in a better, more resilient, manner than the previous implementation while adding logging that will help clarify the issues in case of an _exception_. Additional Changes include: =========================== * Test Suite for ClientBase added * Coding Style: * [Spark Style Guidelines](https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide) * [Scala Official Style Guidelines](http://docs.scala-lang.org/style/) * [Scalariform](https://github.com/mdr/scalariform) * Code refactoring and cleanup per review by andrewor14 Ref. "JVM SE7 Specification" http://docs.oracle.com/javase/specs/jvms/se7/html/jvms-5.html#jvms-5.4 "Java API for JDK 1.7" http://docs.oracle.com/javase/7/docs/api/ [ticket: SPARK-1522] : https://issues.apache.org/jira/browse/SPARK-1522 Author : berngp Reviewer : andrewor14, tgravescs Testing : ? --- .../apache/spark/deploy/yarn/ClientBase.scala | 89 ++++++++------ .../spark/deploy/yarn/ClientBaseSuite.scala | 112 ++++++++++++++++++ 2 files changed, 167 insertions(+), 34 deletions(-) create mode 100644 yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index aeb3f0062df3b..4b5e0efdde92d 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -23,6 +23,7 @@ import java.nio.ByteBuffer import scala.collection.JavaConversions._ import scala.collection.mutable.{HashMap, ListBuffer, Map} +import scala.util.{Try, Success, Failure} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ @@ -378,7 +379,7 @@ trait ClientBase extends Logging { } } -object ClientBase { +object ClientBase extends Logging { val SPARK_JAR: String = "__spark__.jar" val APP_JAR: String = "__app__.jar" val LOG4J_PROP: String = "log4j.properties" @@ -388,37 +389,47 @@ object ClientBase { def getSparkJar = sys.env.get("SPARK_JAR").getOrElse(SparkContext.jarOfClass(this.getClass).head) - // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps - def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) { - val classpathEntries = Option(conf.getStrings( - YarnConfiguration.YARN_APPLICATION_CLASSPATH)).getOrElse( - getDefaultYarnApplicationClasspath()) - if (classpathEntries != null) { - for (c <- classpathEntries) { - YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, c.trim, - File.pathSeparator) - } + def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) = { + val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf) + for (c <- classPathElementsToAdd.flatten) { + YarnSparkHadoopUtil.addToEnvironment( + env, + Environment.CLASSPATH.name, + c.trim, + File.pathSeparator) } + classPathElementsToAdd + } - val mrClasspathEntries = Option(conf.getStrings( - "mapreduce.application.classpath")).getOrElse( - getDefaultMRApplicationClasspath()) - if (mrClasspathEntries != null) { - for (c <- mrClasspathEntries) { - YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, c.trim, - File.pathSeparator) - } - } + private def getYarnAppClasspath(conf: Configuration): Option[Seq[String]] = + Option(conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) match { + case Some(s) => Some(s.toSeq) + case None => getDefaultYarnApplicationClasspath } - def getDefaultYarnApplicationClasspath(): Array[String] = { - try { - val field = classOf[MRJobConfig].getField("DEFAULT_YARN_APPLICATION_CLASSPATH") - field.get(null).asInstanceOf[Array[String]] - } catch { - case err: NoSuchFieldError => null - case err: NoSuchFieldException => null + private def getMRAppClasspath(conf: Configuration): Option[Seq[String]] = + Option(conf.getStrings("mapreduce.application.classpath")) match { + case Some(s) => Some(s.toSeq) + case None => getDefaultMRApplicationClasspath + } + + def getDefaultYarnApplicationClasspath: Option[Seq[String]] = { + val triedDefault = Try[Seq[String]] { + val field = classOf[YarnConfiguration].getField("DEFAULT_YARN_APPLICATION_CLASSPATH") + val value = field.get(null).asInstanceOf[Array[String]] + value.toSeq + } recoverWith { + case e: NoSuchFieldException => Success(Seq.empty[String]) } + + triedDefault match { + case f: Failure[_] => + logError("Unable to obtain the default YARN Application classpath.", f.exception) + case s: Success[_] => + logDebug(s"Using the default YARN application classpath: ${s.get.mkString(",")}") + } + + triedDefault.toOption } /** @@ -426,20 +437,30 @@ object ClientBase { * classpath. In Hadoop 2.0, it's an array of Strings, and in 2.2+ it's a String. * So we need to use reflection to retrieve it. */ - def getDefaultMRApplicationClasspath(): Array[String] = { - try { + def getDefaultMRApplicationClasspath: Option[Seq[String]] = { + val triedDefault = Try[Seq[String]] { val field = classOf[MRJobConfig].getField("DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH") - if (field.getType == classOf[String]) { - StringUtils.getStrings(field.get(null).asInstanceOf[String]) + val value = if (field.getType == classOf[String]) { + StringUtils.getStrings(field.get(null).asInstanceOf[String]).toArray } else { field.get(null).asInstanceOf[Array[String]] } - } catch { - case err: NoSuchFieldError => null - case err: NoSuchFieldException => null + value.toSeq + } recoverWith { + case e: NoSuchFieldException => Success(Seq.empty[String]) } + + triedDefault match { + case f: Failure[_] => + logError("Unable to obtain the default MR Application classpath.", f.exception) + case s: Success[_] => + logDebug(s"Using the default MR application classpath: ${s.get.mkString(",")}") + } + + triedDefault.toOption } + /** * Returns the java command line argument for setting up log4j. If there is a log4j.properties * in the given local resources, it is used, otherwise the SPARK_LOG4J_CONF environment variable diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala new file mode 100644 index 0000000000000..608c6e92624c6 --- /dev/null +++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.net.URI + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapreduce.MRJobConfig +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment + +import org.scalatest.FunSuite +import org.scalatest.matchers.ShouldMatchers._ + +import scala.collection.JavaConversions._ +import scala.collection.mutable.{ HashMap => MutableHashMap } +import scala.util.Try + + +class ClientBaseSuite extends FunSuite { + + test("default Yarn application classpath") { + ClientBase.getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP)) + } + + test("default MR application classpath") { + ClientBase.getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP)) + } + + test("resultant classpath for an application that defines a classpath for YARN") { + withAppConf(Fixtures.mapYARNAppConf) { conf => + val env = newEnv + ClientBase.populateHadoopClasspath(conf, env) + classpath(env) should be( + flatten(Fixtures.knownYARNAppCP, ClientBase.getDefaultMRApplicationClasspath)) + } + } + + test("resultant classpath for an application that defines a classpath for MR") { + withAppConf(Fixtures.mapMRAppConf) { conf => + val env = newEnv + ClientBase.populateHadoopClasspath(conf, env) + classpath(env) should be( + flatten(ClientBase.getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP)) + } + } + + test("resultant classpath for an application that defines both classpaths, YARN and MR") { + withAppConf(Fixtures.mapAppConf) { conf => + val env = newEnv + ClientBase.populateHadoopClasspath(conf, env) + classpath(env) should be(flatten(Fixtures.knownYARNAppCP, Fixtures.knownMRAppCP)) + } + } + + object Fixtures { + + val knownDefYarnAppCP: Seq[String] = + getFieldValue[Array[String], Seq[String]](classOf[YarnConfiguration], + "DEFAULT_YARN_APPLICATION_CLASSPATH", + Seq[String]())(a => a.toSeq) + + + val knownDefMRAppCP: Seq[String] = + getFieldValue[String, Seq[String]](classOf[MRJobConfig], + "DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH", + Seq[String]())(a => a.split(",")) + + val knownYARNAppCP = Some(Seq("/known/yarn/path")) + + val knownMRAppCP = Some(Seq("/known/mr/path")) + + val mapMRAppConf = + Map("mapreduce.application.classpath" -> knownMRAppCP.map(_.mkString(":")).get) + + val mapYARNAppConf = + Map(YarnConfiguration.YARN_APPLICATION_CLASSPATH -> knownYARNAppCP.map(_.mkString(":")).get) + + val mapAppConf = mapYARNAppConf ++ mapMRAppConf + } + + def withAppConf(m: Map[String, String] = Map())(testCode: (Configuration) => Any) { + val conf = new Configuration + m.foreach { case (k, v) => conf.set(k, v, "ClientBaseSpec") } + testCode(conf) + } + + def newEnv = MutableHashMap[String, String]() + + def classpath(env: MutableHashMap[String, String]) = env(Environment.CLASSPATH.name).split(":|;") + + def flatten(a: Option[Seq[String]], b: Option[Seq[String]]) = (a ++ b).flatten.toArray + + def getFieldValue[A, B](clazz: Class[_], field: String, defaults: => B)(mapTo: A => B): B = + Try(clazz.getField(field)).map(_.get(null).asInstanceOf[A]).toOption.map(mapTo).getOrElse(defaults) + +}