From b6ef038a04e87e4a0d42e714bf37bcdf3172336a Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 12 Jun 2015 12:09:18 -0700 Subject: [PATCH 1/5] [SPARK-8095] Resolve dependencies of Spark Packages in local ivy cache --- .../org/apache/spark/deploy/SparkSubmit.scala | 14 +-- .../apache/spark/deploy/IvyTestUtils.scala | 89 ++++++++++++++++--- .../spark/deploy/SparkSubmitUtilsSuite.scala | 22 +++-- 3 files changed, 99 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index b8978e25a02d2..dd774385370a9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -35,7 +35,8 @@ import org.apache.ivy.core.resolve.ResolveOptions import org.apache.ivy.core.retrieve.RetrieveOptions import org.apache.ivy.core.settings.IvySettings import org.apache.ivy.plugins.matcher.GlobPatternMatcher -import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver} +import org.apache.ivy.plugins.repository.file.FileRepository +import org.apache.ivy.plugins.resolver.{FileSystemResolver, ChainResolver, IBiblioResolver} import org.apache.spark.SPARK_VERSION import org.apache.spark.deploy.rest._ import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils} @@ -736,7 +737,7 @@ private[spark] object SparkSubmitUtils { /** Path of the local Maven cache. */ private[spark] def m2Path: File = new File(System.getProperty("user.home"), - ".m2" + File.separator + "repository" + File.separator) + ".m2" + File.separator + "repository") /** * Extracts maven coordinates from a comma-delimited string @@ -756,12 +757,13 @@ private[spark] object SparkSubmitUtils { localM2.setName("local-m2-cache") cr.add(localM2) - val localIvy = new IBiblioResolver - localIvy.setRoot(new File(ivySettings.getDefaultIvyUserDir, - "local" + File.separator).toURI.toString) + val localIvy = new FileSystemResolver + val localIvyRoot = new File(ivySettings.getDefaultIvyUserDir, "local") + localIvy.setLocal(true) + localIvy.setRepository(new FileRepository(localIvyRoot)) val ivyPattern = Seq("[organisation]", "[module]", "[revision]", "[type]s", "[artifact](-[classifier]).[ext]").mkString(File.separator) - localIvy.setPattern(ivyPattern) + localIvy.addIvyPattern(localIvyRoot.getAbsolutePath + File.separator + ivyPattern) localIvy.setName("local-ivy-cache") cr.add(localIvy) diff --git a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala index 7d39984424842..cd618528380f2 100644 --- a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala @@ -24,6 +24,8 @@ import com.google.common.io.{Files, ByteStreams} import org.apache.commons.io.FileUtils +import org.apache.ivy.core.settings.IvySettings + import org.apache.spark.TestUtils.{createCompiledClass, JavaSourceFromString} import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate @@ -44,13 +46,20 @@ private[deploy] object IvyTestUtils { if (!useIvyLayout) { Seq(groupDirs, artifactDirs, artifact.version).mkString(File.separator) } else { - Seq(groupDirs, artifactDirs, artifact.version, ext + "s").mkString(File.separator) + Seq(artifact.groupId, artifactDirs, artifact.version, ext + "s").mkString(File.separator) } new File(prefix, artifactPath) } - private def artifactName(artifact: MavenCoordinate, ext: String = ".jar"): String = { - s"${artifact.artifactId}-${artifact.version}$ext" + private def artifactName( + artifact: MavenCoordinate, + useIvyLayout: Boolean, + ext: String = ".jar"): String = { + if (!useIvyLayout) { + s"${artifact.artifactId}-${artifact.version}$ext" + } else { + s"${artifact.artifactId}$ext" + } } /** Write the contents to a file to the supplied directory. */ @@ -92,6 +101,22 @@ private[deploy] object IvyTestUtils { createCompiledClass(className, dir, sourceFile, Seq.empty) } + private def createDescriptor( + tempPath: File, + artifact: MavenCoordinate, + dependencies: Option[Seq[MavenCoordinate]], + useIvyLayout: Boolean): File = { + if (useIvyLayout) { + val ivyXmlPath = pathFromCoordinate(artifact, tempPath, "ivy", true) + Files.createParentDirs(new File(ivyXmlPath, "dummy")) + createIvyDescriptor(ivyXmlPath, artifact, dependencies) + } else { + val pomPath = pathFromCoordinate(artifact, tempPath, "pom", useIvyLayout) + Files.createParentDirs(new File(pomPath, "dummy")) + createPom(pomPath, artifact, dependencies) + } + } + /** Helper method to write artifact information in the pom. */ private def pomArtifactWriter(artifact: MavenCoordinate, tabCount: Int = 1): String = { var result = "\n" + " " * tabCount + s"${artifact.groupId}" @@ -121,15 +146,55 @@ private[deploy] object IvyTestUtils { "\n \n" + inside + "\n " }.getOrElse("") content += "\n" - writeFile(dir, artifactName(artifact, ".pom"), content.trim) + writeFile(dir, artifactName(artifact, false, ".pom"), content.trim) + } + + /** Helper method to write artifact information in the ivy.xml. */ + private def ivyArtifactWriter(artifact: MavenCoordinate): String = { + s"""""".stripMargin + } + + /** Create a pom file for this artifact. */ + private def createIvyDescriptor( + dir: File, + artifact: MavenCoordinate, + dependencies: Option[Seq[MavenCoordinate]]): File = { + var content = s""" + | + | + | + | + | + | + | + | + | + | + | + | + | + """.stripMargin.trim + content += dependencies.map { deps => + val inside = deps.map(ivyArtifactWriter).mkString("\n") + "\n \n" + inside + "\n " + }.getOrElse("") + content += "\n" + writeFile(dir, "ivy.xml", content.trim) } /** Create the jar for the given maven coordinate, using the supplied files. */ private def packJar( dir: File, artifact: MavenCoordinate, - files: Seq[(String, File)]): File = { - val jarFile = new File(dir, artifactName(artifact)) + files: Seq[(String, File)], + useIvyLayout: Boolean): File = { + val jarFile = new File(dir, artifactName(artifact, useIvyLayout)) val jarFileStream = new FileOutputStream(jarFile) val jarStream = new JarOutputStream(jarFileStream, new java.util.jar.Manifest()) @@ -187,12 +252,10 @@ private[deploy] object IvyTestUtils { } else { Seq(javaFile) } - val jarFile = packJar(jarPath, artifact, allFiles) + val jarFile = packJar(jarPath, artifact, allFiles, useIvyLayout) assert(jarFile.exists(), "Problem creating Jar file") - val pomPath = pathFromCoordinate(artifact, tempPath, "pom", useIvyLayout) - Files.createParentDirs(new File(pomPath, "dummy")) - val pomFile = createPom(pomPath, artifact, dependencies) - assert(pomFile.exists(), "Problem creating Pom file") + val descriptor = createDescriptor(tempPath, artifact, dependencies, useIvyLayout) + assert(descriptor.exists(), "Problem creating Pom file") } finally { FileUtils.deleteDirectory(root) } @@ -237,7 +300,8 @@ private[deploy] object IvyTestUtils { dependencies: Option[String], rootDir: Option[File], useIvyLayout: Boolean = false, - withPython: Boolean = false)(f: String => Unit): Unit = { + withPython: Boolean = false, + ivySettings: IvySettings = new IvySettings)(f: String => Unit): Unit = { val repo = createLocalRepositoryForTests(artifact, dependencies, rootDir, useIvyLayout, withPython) try { @@ -256,6 +320,7 @@ private[deploy] object IvyTestUtils { } else { FileUtils.deleteDirectory(repo) } + FileUtils.deleteDirectory(new File(ivySettings.getDefaultCache, artifact.groupId)) } } } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index 07d261cc428c4..3672f41383baf 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -24,7 +24,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.ivy.core.module.descriptor.MDArtifact import org.apache.ivy.core.settings.IvySettings -import org.apache.ivy.plugins.resolver.IBiblioResolver +import org.apache.ivy.plugins.resolver.{AbstractResolver, FileSystemResolver, IBiblioResolver} import org.apache.spark.SparkFunSuite import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate @@ -68,7 +68,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { // should have central and spark-packages by default assert(res1.getResolvers.size() === 4) assert(res1.getResolvers.get(0).asInstanceOf[IBiblioResolver].getName === "local-m2-cache") - assert(res1.getResolvers.get(1).asInstanceOf[IBiblioResolver].getName === "local-ivy-cache") + assert(res1.getResolvers.get(1).asInstanceOf[FileSystemResolver].getName === "local-ivy-cache") assert(res1.getResolvers.get(2).asInstanceOf[IBiblioResolver].getName === "central") assert(res1.getResolvers.get(3).asInstanceOf[IBiblioResolver].getName === "spark-packages") @@ -76,10 +76,10 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { val resolver2 = SparkSubmitUtils.createRepoResolvers(Option(repos), settings) assert(resolver2.getResolvers.size() === 7) val expected = repos.split(",").map(r => s"$r/") - resolver2.getResolvers.toArray.zipWithIndex.foreach { case (resolver: IBiblioResolver, i) => + resolver2.getResolvers.toArray.zipWithIndex.foreach { case (resolver: AbstractResolver, i) => if (i > 3) { assert(resolver.getName === s"repo-${i - 3}") - assert(resolver.getRoot === expected(i - 4)) + assert(resolver.asInstanceOf[IBiblioResolver].getRoot === expected(i - 4)) } } } @@ -112,26 +112,32 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { } test("search for artifact at local repositories") { - val main = new MavenCoordinate("my.awesome.lib", "mylib", "0.1") + val main = new MavenCoordinate("my.great.lib", "mylib", "0.1") + val dep = "my.great.dep:mydep:0.5" // Local M2 repository - IvyTestUtils.withRepository(main, None, Some(SparkSubmitUtils.m2Path)) { repo => + IvyTestUtils.withRepository(main, Some(dep), Some(SparkSubmitUtils.m2Path)) { repo => val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None, true) assert(jarPath.indexOf("mylib") >= 0, "should find artifact") + assert(jarPath.indexOf("mydep") >= 0, "should find dependency") } // Local Ivy Repository val settings = new IvySettings val ivyLocal = new File(settings.getDefaultIvyUserDir, "local" + File.separator) - IvyTestUtils.withRepository(main, None, Some(ivyLocal), true) { repo => + IvyTestUtils.withRepository(main, Some(dep), Some(ivyLocal), useIvyLayout = true) { repo => val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None, true) assert(jarPath.indexOf("mylib") >= 0, "should find artifact") + assert(jarPath.indexOf("mydep") >= 0, "should find dependency") } // Local ivy repository with modified home val dummyIvyLocal = new File(tempIvyPath, "local" + File.separator) - IvyTestUtils.withRepository(main, None, Some(dummyIvyLocal), true) { repo => + settings.setDefaultIvyUserDir(new File(tempIvyPath)) + IvyTestUtils.withRepository(main, Some(dep), Some(dummyIvyLocal), useIvyLayout = true, + ivySettings = settings) { repo => val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, Some(tempIvyPath), true) assert(jarPath.indexOf("mylib") >= 0, "should find artifact") assert(jarPath.indexOf(tempIvyPath) >= 0, "should be in new ivy path") + assert(jarPath.indexOf("mydep") >= 0, "should find dependency") } } From f60772cde8efcdd1d171890e5997f78fa6d09158 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 16 Jun 2015 11:25:50 -0700 Subject: [PATCH 2/5] use different folder for m2 cache during testing --- .../scala/org/apache/spark/deploy/SparkSubmit.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index dd774385370a9..dcdfa337c946e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -736,8 +736,14 @@ private[spark] object SparkSubmitUtils { } /** Path of the local Maven cache. */ - private[spark] def m2Path: File = new File(System.getProperty("user.home"), - ".m2" + File.separator + "repository") + private[spark] def m2Path: File = { + if (Utils.isTesting) { + // test builds delete the maven cache, and this can cause flakiness + new File(Utils.createTempDir("mvn2"), ".m2" + File.separator + "repository") + } else { + new File(System.getProperty("user.home"), ".m2" + File.separator + "repository") + } + } /** * Extracts maven coordinates from a comma-delimited string From a69e3e656542b98ba60e4621407b0101d48a0d67 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 16 Jun 2015 14:16:18 -0700 Subject: [PATCH 3/5] delete cache before test as well --- .../src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala index cd618528380f2..fb474dcbaa1b5 100644 --- a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala @@ -302,6 +302,10 @@ private[deploy] object IvyTestUtils { useIvyLayout: Boolean = false, withPython: Boolean = false, ivySettings: IvySettings = new IvySettings)(f: String => Unit): Unit = { + try { + // delete the artifact from the cache as well if it already exists + FileUtils.deleteDirectory(new File(ivySettings.getDefaultCache, artifact.groupId)) + } val repo = createLocalRepositoryForTests(artifact, dependencies, rootDir, useIvyLayout, withPython) try { From 48cc64830e9a141c21ba95d6d3381f754bfc35cc Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 16 Jun 2015 20:05:58 -0700 Subject: [PATCH 4/5] improve deletion --- .../apache/spark/deploy/IvyTestUtils.scala | 41 ++++++++++++++----- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala index fb474dcbaa1b5..823050b0aabbe 100644 --- a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala @@ -51,6 +51,7 @@ private[deploy] object IvyTestUtils { new File(prefix, artifactPath) } + /** Returns the artifact naming based on standard ivy or maven format. */ private def artifactName( artifact: MavenCoordinate, useIvyLayout: Boolean, @@ -62,6 +63,15 @@ private[deploy] object IvyTestUtils { } } + /** Returns the directory for the given groupId based on standard ivy or maven format. */ + private def getBaseGroupDirectory(artifact: MavenCoordinate, useIvyLayout: Boolean): String = { + if (!useIvyLayout) { + artifact.groupId.replace(".", File.separator) + } else { + artifact.groupId + } + } + /** Write the contents to a file to the supplied directory. */ private def writeFile(dir: File, fileName: String, contents: String): File = { val outputFile = new File(dir, fileName) @@ -302,10 +312,8 @@ private[deploy] object IvyTestUtils { useIvyLayout: Boolean = false, withPython: Boolean = false, ivySettings: IvySettings = new IvySettings)(f: String => Unit): Unit = { - try { - // delete the artifact from the cache as well if it already exists - FileUtils.deleteDirectory(new File(ivySettings.getDefaultCache, artifact.groupId)) - } + val deps = dependencies.map(SparkSubmitUtils.extractMavenCoordinates) + purgeLocalIvyCache(artifact, deps, ivySettings) val repo = createLocalRepositoryForTests(artifact, dependencies, rootDir, useIvyLayout, withPython) try { @@ -313,18 +321,29 @@ private[deploy] object IvyTestUtils { } finally { // Clean up if (repo.toString.contains(".m2") || repo.toString.contains(".ivy2")) { - FileUtils.deleteDirectory(new File(repo, - artifact.groupId.replace(".", File.separator) + File.separator + artifact.artifactId)) - dependencies.map(SparkSubmitUtils.extractMavenCoordinates).foreach { seq => - seq.foreach { dep => - FileUtils.deleteDirectory(new File(repo, - dep.artifactId.replace(".", File.separator))) + val groupDir = getBaseGroupDirectory(artifact, useIvyLayout) + FileUtils.deleteDirectory(new File(repo, groupDir + File.separator + artifact.artifactId)) + deps.foreach { _.foreach { dep => + FileUtils.deleteDirectory(new File(repo, getBaseGroupDirectory(dep, useIvyLayout))) } } } else { FileUtils.deleteDirectory(repo) } - FileUtils.deleteDirectory(new File(ivySettings.getDefaultCache, artifact.groupId)) + purgeLocalIvyCache(artifact, deps, ivySettings) + } + } + + /** Deletes the test packages from the ivy cache */ + private def purgeLocalIvyCache( + artifact: MavenCoordinate, + dependencies: Option[Seq[MavenCoordinate]], + ivySettings: IvySettings): Unit = { + // delete the artifact from the cache as well if it already exists + FileUtils.deleteDirectory(new File(ivySettings.getDefaultCache, artifact.groupId)) + dependencies.foreach { _.foreach { dep => + FileUtils.deleteDirectory(new File(ivySettings.getDefaultCache, dep.groupId)) + } } } } From 2875bf49a4fea8027d2c8224d6d4b2bed09893d5 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 16 Jun 2015 22:01:58 -0700 Subject: [PATCH 5/5] fix temp dir bug --- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index f23a5ef142da9..abf222757a95b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -739,7 +739,7 @@ private[spark] object SparkSubmitUtils { private[spark] def m2Path: File = { if (Utils.isTesting) { // test builds delete the maven cache, and this can cause flakiness - new File(Utils.createTempDir("mvn2"), ".m2" + File.separator + "repository") + new File("dummy", ".m2" + File.separator + "repository") } else { new File(System.getProperty("user.home"), ".m2" + File.separator + "repository") }