Skip to content

Commit fd85253

Browse files
author
Andrew Or
committed
Revert "SPARK-2624 add datanucleus jars to the container in yarn-cluster"
This reverts commit a975dc3.
1 parent 87437df commit fd85253

File tree

3 files changed

+0
-157
lines changed

3 files changed

+0
-157
lines changed

docs/running-on-yarn.md

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -139,21 +139,6 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
139139
The maximum number of threads to use in the application master for launching executor containers.
140140
</td>
141141
</tr>
142-
<tr>
143-
<td><code>spark.yarn.datanucleus.dir</code></td>
144-
<td>$SPARK_HOME/lib</td>
145-
<td>
146-
The location of the DataNucleus jars, in case overriding the default location is desired.
147-
By default, Spark on YARN will use the DataNucleus jars installed at
148-
<code>$SPARK_HOME/lib</code>, but the jars can also be in a world-readable location on HDFS.
149-
This allows YARN to cache it on nodes so that it doesn't need to be distributed each time an
150-
application runs. To point to a directory on HDFS, for example, set this configuration to
151-
"hdfs:///some/path".
152-
153-
This is required because the datanucleus jars cannot be packaged into the
154-
assembly jar due to metadata conflicts (involving <code>plugin.xml</code>.)
155-
</td>
156-
</tr>
157142
</table>
158143

159144
# Launching Spark on YARN

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala

Lines changed: 0 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.deploy.yarn
1919

2020
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
21-
import java.io.{File, FilenameFilter}
2221

2322
import scala.collection.JavaConversions._
2423
import scala.collection.mutable.{HashMap, ListBuffer, Map}
@@ -224,48 +223,10 @@ private[spark] trait ClientBase extends Logging {
224223
}
225224
}
226225
}
227-
228226
if (cachedSecondaryJarLinks.nonEmpty) {
229227
sparkConf.set(CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))
230228
}
231229

232-
/**
233-
* Do the same for datanucleus jars, if they exist in spark home. Find all datanucleus-* jars,
234-
* copy them to the remote fs, and add them to the class path.
235-
*
236-
* This is necessary because the datanucleus jars cannot be included in the assembly jar due
237-
* to metadata conflicts involving plugin.xml. At the time of writing, these are the only
238-
* jars that cannot be distributed with the uber jar and have to be treated differently.
239-
*
240-
* For more details, see SPARK-2624, and https://github.com/apache/spark/pull/3238
241-
*/
242-
for (libsDir <- dataNucleusJarsDir(sparkConf)) {
243-
val libsURI = new URI(libsDir)
244-
val jarLinks = ListBuffer.empty[String]
245-
if (libsURI.getScheme != LOCAL_SCHEME) {
246-
val localURI = getQualifiedLocalPath(libsURI).toUri()
247-
val jars = FileSystem.get(localURI, hadoopConf).listFiles(new Path(localURI.getPath), false)
248-
while (jars.hasNext) {
249-
val jar = jars.next()
250-
val name = jar.getPath.getName
251-
if (name.startsWith("datanucleus-")) {
252-
// copy to remote and add to classpath
253-
val src = jar.getPath
254-
val destPath = copyFileToRemote(dst, src, replication)
255-
distCacheMgr.addResource(fs, hadoopConf, destPath,
256-
localResources, LocalResourceType.FILE, name, statCache)
257-
jarLinks += name
258-
}
259-
}
260-
} else {
261-
jarLinks += libsURI.toString + Path.SEPARATOR + "*"
262-
}
263-
264-
if (jarLinks.nonEmpty) {
265-
sparkConf.set(CONF_SPARK_DATANUCLEUS_JARS, jarLinks.mkString(","))
266-
}
267-
}
268-
269230
localResources
270231
}
271232

@@ -590,13 +551,6 @@ private[spark] object ClientBase extends Logging {
590551
// Internal config to propagate the location of the user's jar to the driver/executors
591552
val CONF_SPARK_USER_JAR = "spark.yarn.user.jar"
592553

593-
// Location of the datanucleus jars
594-
val CONF_SPARK_DATANUCLEUS_DIR = "spark.yarn.datanucleus.dir"
595-
596-
// Internal config to propagate the locations of datanucleus jars found to add to the
597-
// classpath of the executors. Value should be a comma-separated list of paths to each jar.
598-
val CONF_SPARK_DATANUCLEUS_JARS = "spark.yarn.datanucleus.jars"
599-
600554
// Internal config to propagate the locations of any extra jars to add to the classpath
601555
// of the executors
602556
val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"
@@ -629,19 +583,6 @@ private[spark] object ClientBase extends Logging {
629583
}
630584
}
631585

632-
/**
633-
* Find the user-defined provided jars directory if configured, or return SPARK_HOME/lib if not.
634-
*
635-
* This method first looks for $CONF_SPARK_DATANUCLEUS_DIR inside the SparkConf, then looks for
636-
* Spark home inside the the SparkConf and the user environment.
637-
*/
638-
private def dataNucleusJarsDir(conf: SparkConf): Option[String] = {
639-
conf.getOption(CONF_SPARK_DATANUCLEUS_DIR).orElse {
640-
val sparkHome = conf.getOption("spark.home").orElse(sys.env.get("SPARK_HOME"))
641-
sparkHome.map(path => path + Path.SEPARATOR + "lib")
642-
}
643-
}
644-
645586
/**
646587
* Return the path to the given application's staging directory.
647588
*/
@@ -743,13 +684,6 @@ private[spark] object ClientBase extends Logging {
743684
addUserClasspath(args, sparkConf, env)
744685
}
745686

746-
// Add datanucleus jars to classpath
747-
for (entries <- sparkConf.getOption(CONF_SPARK_DATANUCLEUS_JARS)) {
748-
entries.split(",").filter(_.nonEmpty).foreach { entry =>
749-
addFileToClasspath(entry, null, env)
750-
}
751-
}
752-
753687
// Append all jar files under the working directory to the classpath.
754688
addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + "*", env)
755689
}

yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala

Lines changed: 0 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import java.io.File
2121
import java.net.URI
2222

2323
import org.apache.hadoop.conf.Configuration
24-
import org.apache.hadoop.fs.FileSystem
2524
import org.apache.hadoop.fs.Path
2625
import org.apache.hadoop.mapreduce.MRJobConfig
2726
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
@@ -105,81 +104,6 @@ class ClientBaseSuite extends FunSuite with Matchers {
105104
cp should not contain (ClientBase.APP_JAR)
106105
}
107106

108-
test("DataNucleus in classpath") {
109-
val dnJars = "local:/dn/core.jar,/dn/api.jar"
110-
val conf = new Configuration()
111-
val sparkConf = new SparkConf()
112-
.set(ClientBase.CONF_SPARK_JAR, SPARK)
113-
.set(ClientBase.CONF_SPARK_DATANUCLEUS_JARS, dnJars)
114-
val env = new MutableHashMap[String, String]()
115-
val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
116-
117-
ClientBase.populateClasspath(args, conf, sparkConf, env)
118-
119-
val cp = env("CLASSPATH").split(File.pathSeparator)
120-
s"$dnJars".split(",").foreach({ entry =>
121-
val uri = new URI(entry)
122-
if (ClientBase.LOCAL_SCHEME.equals(uri.getScheme())) {
123-
cp should contain (uri.getPath())
124-
} else {
125-
cp should not contain (uri.getPath())
126-
}
127-
})
128-
}
129-
130-
test("DataNucleus using local:") {
131-
val dnDir = "local:/datanucleus"
132-
val conf = new Configuration()
133-
val sparkConf = new SparkConf()
134-
.set(ClientBase.CONF_SPARK_JAR, SPARK)
135-
.set(ClientBase.CONF_SPARK_DATANUCLEUS_DIR, dnDir)
136-
val yarnConf = new YarnConfiguration()
137-
val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
138-
139-
val client = spy(new DummyClient(args, conf, sparkConf, yarnConf))
140-
doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
141-
any(classOf[Path]), anyShort(), anyBoolean())
142-
143-
val tempDir = Utils.createTempDir()
144-
try {
145-
client.prepareLocalResources(tempDir.getAbsolutePath())
146-
val jars = sparkConf.get(ClientBase.CONF_SPARK_DATANUCLEUS_JARS).split(",")
147-
val uri = new URI(dnDir)
148-
jars should contain (uri.toString + Path.SEPARATOR + "*")
149-
} finally {
150-
Utils.deleteRecursively(tempDir)
151-
}
152-
}
153-
154-
test("DataNucleus using file:") {
155-
val dnDir = Utils.createTempDir()
156-
val tempDir = Utils.createTempDir()
157-
158-
try {
159-
// create mock datanucleus jar
160-
val tempJar = File.createTempFile("datanucleus-", null, dnDir)
161-
162-
val conf = new Configuration()
163-
val sparkConf = new SparkConf()
164-
.set(ClientBase.CONF_SPARK_JAR, SPARK)
165-
.set(ClientBase.CONF_SPARK_DATANUCLEUS_DIR, dnDir.toURI.toString)
166-
val yarnConf = new YarnConfiguration()
167-
val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
168-
169-
val client = spy(new DummyClient(args, conf, sparkConf, yarnConf))
170-
doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
171-
any(classOf[Path]), anyShort(), anyBoolean())
172-
173-
client.prepareLocalResources(tempDir.getAbsolutePath())
174-
175-
val jars = sparkConf.get(ClientBase.CONF_SPARK_DATANUCLEUS_JARS).split(",")
176-
jars should contain (tempJar.getName)
177-
} finally {
178-
Utils.deleteRecursively(dnDir)
179-
Utils.deleteRecursively(tempDir)
180-
}
181-
}
182-
183107
test("Jar path propagation through SparkConf") {
184108
val conf = new Configuration()
185109
val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK)

0 commit comments

Comments
 (0)