From 41267020701be4877be352e1678113bd9870ec12 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Fri, 24 Mar 2017 21:17:54 +0000 Subject: [PATCH] Initial attempt to add classifier, packaging support. Ivy integration still not quite right. --- .../org/apache/spark/deploy/SparkSubmit.scala | 75 +++++++++++++------ .../apache/spark/deploy/IvyTestUtils.scala | 34 ++++++--- .../spark/deploy/RPackageUtilsSuite.scala | 10 +-- .../spark/deploy/SparkSubmitSuite.scala | 6 +- .../spark/deploy/SparkSubmitUtilsSuite.scala | 19 ++++- 5 files changed, 100 insertions(+), 44 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 77005aa9040b..d6f6abf3424c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -22,6 +22,7 @@ import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowab import java.net.URL import java.security.PrivilegedExceptionAction import java.text.ParseException +import java.util.Collections import scala.annotation.tailrec import scala.collection.mutable.{ArrayBuffer, HashMap, Map} @@ -834,33 +835,56 @@ private[spark] object SparkSubmitUtils { var printStream = SparkSubmit.printStream /** - * Represents a Maven Coordinate + * Represents a Maven Coordinate. Refer to https://maven.apache.org/pom.html#Maven_Coordinates + * for more information. Standard ordering for a full coordinate is + * `groupId:artifactId:packaging:classifier:version` although packaging and classifier + * are optional. + * * @param groupId the groupId of the coordinate * @param artifactId the artifactId of the coordinate + * @param packaging Maven packaging type (e.g. jar), if any + * @param classifier Maven classifier, if any * @param version the version of the coordinate */ - private[deploy] case class MavenCoordinate(groupId: String, artifactId: String, version: String) { - override def toString: String = s"$groupId:$artifactId:$version" + private[deploy] case class MavenCoordinate( + groupId: String, + artifactId: String, + packaging: Option[String], + classifier: Option[String], + version: String) { + + def this(groupId: String, artifactId: String, version: String) = + this(groupId, artifactId, None, None, version) + + override def toString: String = { + (Seq(groupId, artifactId) ++ packaging ++ classifier ++ Seq(version)).mkString(":") + } } /** * Extracts maven coordinates from a comma-delimited string. Coordinates should be provided - * in the format `groupId:artifactId:version` or `groupId/artifactId:version`. - * @param coordinates Comma-delimited string of maven coordinates + * in the format `groupId:artifactId:version`, `groupId:artifactId:packaging:version`, or + * `groupId:artifactId:packaging:classifier:version`. '/' can be used as a separator instead + * of ':'. + * + * @param coordinates Comma-delimited string of Maven coordinates * @return Sequence of Maven coordinates */ def extractMavenCoordinates(coordinates: String): Seq[MavenCoordinate] = { - coordinates.split(",").map { p => - val splits = p.replace("/", ":").split(":") - require(splits.length == 3, s"Provided Maven Coordinates must be in the form " + - s"'groupId:artifactId:version'. The coordinate provided is: $p") - require(splits(0) != null && splits(0).trim.nonEmpty, s"The groupId cannot be null or " + - s"be whitespace. The groupId provided is: ${splits(0)}") - require(splits(1) != null && splits(1).trim.nonEmpty, s"The artifactId cannot be null or " + - s"be whitespace. The artifactId provided is: ${splits(1)}") - require(splits(2) != null && splits(2).trim.nonEmpty, s"The version cannot be null or " + - s"be whitespace. The version provided is: ${splits(2)}") - new MavenCoordinate(splits(0), splits(1), splits(2)) + coordinates.split(",").map { coordinate => + val splits = coordinate.split("[:/]") + require(splits.forall(split => split != null && split.trim.nonEmpty), + s"All elements of coordinate must be non-null and not whitespace; got $coordinate") + splits match { + case Array(groupId, artifactId, version) => + new MavenCoordinate(groupId, artifactId, version) + case Array(groupId, artifactId, packaging, version) => + new MavenCoordinate(groupId, artifactId, Some(packaging), None, version) + case Array(groupId, artifactId, packaging, classifier, version) => + new MavenCoordinate(groupId, artifactId, Some(packaging), Some(classifier), version) + case _ => throw new IllegalArgumentException("Coordinates must be of form " + + s"groupId:artifactId[:packaging[:classifier]]:version; got $coordinate") + } } } @@ -928,12 +952,14 @@ private[spark] object SparkSubmitUtils { * @return a comma-delimited list of paths for the dependencies */ def resolveDependencyPaths( - artifacts: Array[AnyRef], + artifacts: Array[Artifact], cacheDirectory: File): String = { artifacts.map { artifactInfo => - val artifact = artifactInfo.asInstanceOf[Artifact].getModuleRevisionId + val artifact = artifactInfo.getModuleRevisionId + val classifier = + Option(artifactInfo.getExtraAttribute("classifier")).map("-" + _).getOrElse("") cacheDirectory.getAbsolutePath + File.separator + - s"${artifact.getOrganisation}_${artifact.getName}-${artifact.getRevision}.jar" + s"${artifact.getOrganisation}_${artifact.getName}${classifier}-${artifact.getRevision}.jar" }.mkString(",") } @@ -950,6 +976,12 @@ private[spark] object SparkSubmitUtils { printStream.println(s"${dd.getDependencyId} added as a dependency") // scalastyle:on println md.addDependency(dd) + if (mvn.classifier.isDefined) { + val typeExt = mvn.packaging.getOrElse("jar") + dd.addDependencyArtifact(ivyConfName, new DefaultDependencyArtifactDescriptor( + dd, mvn.artifactId, typeExt, typeExt, null, + Collections.singletonMap("classifier", mvn.classifier.get))) + } } } @@ -1133,9 +1165,10 @@ private[spark] object SparkSubmitUtils { // retrieve all resolved dependencies ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId, packagesDirectory.getAbsolutePath + File.separator + - "[organization]_[artifact]-[revision].[ext]", + "[organization]_[artifact](-[classifier])-[revision].[ext]", retrieveOptions.setConfs(Array(ivyConfName))) - resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory) + resolveDependencyPaths( + rr.getArtifacts.toArray.map(_.asInstanceOf[Artifact]), packagesDirectory) } finally { System.setOut(sysOut) } diff --git a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala index f50cb38311db..a1d5c45d8053 100644 --- a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala @@ -34,13 +34,13 @@ import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate private[deploy] object IvyTestUtils { /** - * Create the path for the jar and pom from the maven coordinate. Extension should be `jar` - * or `pom`. + * Create the path for the jar and pom from the maven coordinate. `extOverride` should + * be `jar` or `pom`. */ private[deploy] def pathFromCoordinate( artifact: MavenCoordinate, prefix: File, - ext: String, + extOverride: Option[String], useIvyLayout: Boolean): File = { val groupDirs = artifact.groupId.replace(".", File.separator) val artifactDirs = artifact.artifactId @@ -48,6 +48,7 @@ private[deploy] object IvyTestUtils { if (!useIvyLayout) { Seq(groupDirs, artifactDirs, artifact.version).mkString(File.separator) } else { + val ext = extOverride.getOrElse(artifact.packaging.getOrElse("jar")) Seq(artifact.groupId, artifactDirs, artifact.version, ext + "s").mkString(File.separator) } new File(prefix, artifactPath) @@ -57,9 +58,11 @@ private[deploy] object IvyTestUtils { private[deploy] def artifactName( artifact: MavenCoordinate, useIvyLayout: Boolean, - ext: String = ".jar"): String = { + extOverride: Option[String]): String = { + val classifier = artifact.classifier.map("-" + _).getOrElse("") + val ext = extOverride.getOrElse("." + artifact.packaging.getOrElse("jar")) if (!useIvyLayout) { - s"${artifact.artifactId}-${artifact.version}$ext" + s"${artifact.artifactId}-${artifact.version}$classifier$ext" } else { s"${artifact.artifactId}$ext" } @@ -152,11 +155,11 @@ private[deploy] object IvyTestUtils { dependencies: Option[Seq[MavenCoordinate]], useIvyLayout: Boolean): File = { if (useIvyLayout) { - val ivyXmlPath = pathFromCoordinate(artifact, tempPath, "ivy", true) + val ivyXmlPath = pathFromCoordinate(artifact, tempPath, Some("ivy"), useIvyLayout) Files.createParentDirs(new File(ivyXmlPath, "dummy")) createIvyDescriptor(ivyXmlPath, artifact, dependencies) } else { - val pomPath = pathFromCoordinate(artifact, tempPath, "pom", useIvyLayout) + val pomPath = pathFromCoordinate(artifact, tempPath, Some("pom"), useIvyLayout) Files.createParentDirs(new File(pomPath, "dummy")) createPom(pomPath, artifact, dependencies) } @@ -167,6 +170,12 @@ private[deploy] object IvyTestUtils { var result = "\n" + " " * tabCount + s"${artifact.groupId}" result += "\n" + " " * tabCount + s"${artifact.artifactId}" result += "\n" + " " * tabCount + s"${artifact.version}" + if (artifact.classifier.isDefined) { + result += "\n" + " " * tabCount + s"${artifact.classifier.get}" + } + if (artifact.packaging.isDefined) { + result += "\n" + " " * tabCount + s"${artifact.packaging.get}" + } result } @@ -191,7 +200,7 @@ private[deploy] object IvyTestUtils { "\n \n" + inside + "\n " }.getOrElse("") content += "\n" - writeFile(dir, artifactName(artifact, false, ".pom"), content.trim) + writeFile(dir, artifactName(artifact, false, Some(".pom")), content.trim) } /** Helper method to write artifact information in the ivy.xml. */ @@ -206,6 +215,8 @@ private[deploy] object IvyTestUtils { dir: File, artifact: MavenCoordinate, dependencies: Option[Seq[MavenCoordinate]]): File = { + val typeExt = artifact.packaging.getOrElse("jar") + val classifier = artifact.classifier.map(c => s"classifier=$c").getOrElse("") var content = s""" | | @@ -221,7 +232,8 @@ private[deploy] object IvyTestUtils { | | | - | | """.stripMargin.trim @@ -241,7 +253,7 @@ private[deploy] object IvyTestUtils { useIvyLayout: Boolean, withR: Boolean, withManifest: Option[Manifest] = None): File = { - val jarFile = new File(dir, artifactName(artifact, useIvyLayout)) + val jarFile = new File(dir, artifactName(artifact, useIvyLayout, None)) val jarFileStream = new FileOutputStream(jarFile) val manifest = withManifest.getOrElse { val mani = new Manifest() @@ -295,7 +307,7 @@ private[deploy] object IvyTestUtils { val root = new File(tempPath, tempPath.hashCode().toString) Files.createParentDirs(new File(root, "dummy")) try { - val jarPath = pathFromCoordinate(artifact, tempPath, "jar", useIvyLayout) + val jarPath = pathFromCoordinate(artifact, tempPath, None, useIvyLayout) Files.createParentDirs(new File(jarPath, "dummy")) val className = "MyLib" diff --git a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala index 005587051b6a..2fa473cfab27 100644 --- a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala @@ -40,13 +40,13 @@ class RPackageUtilsSuite with BeforeAndAfterEach with ResetSystemProperties { - private val main = MavenCoordinate("a", "b", "c") - private val dep1 = MavenCoordinate("a", "dep1", "c") - private val dep2 = MavenCoordinate("a", "dep2", "d") + private val main = new MavenCoordinate("a", "b", "c") + private val dep1 = new MavenCoordinate("a", "dep1", "c") + private val dep2 = new MavenCoordinate("a", "dep2", "d") private def getJarPath(coord: MavenCoordinate, repo: File): File = { - new File(IvyTestUtils.pathFromCoordinate(coord, repo, "jar", useIvyLayout = false), - IvyTestUtils.artifactName(coord, useIvyLayout = false, ".jar")) + new File(IvyTestUtils.pathFromCoordinate(coord, repo, None, useIvyLayout = false), + IvyTestUtils.artifactName(coord, useIvyLayout = false, None)) } private val lineBuffer = ArrayBuffer[String]() diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 7c2ec01a03d0..5d6b91ba5c77 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -423,8 +423,8 @@ class SparkSubmitSuite // SPARK-7287 test("includes jars passed in through --packages") { val unusedJar = TestUtils.createJarWithClasses(Seq.empty) - val main = MavenCoordinate("my.great.lib", "mylib", "0.1") - val dep = MavenCoordinate("my.great.dep", "mylib", "0.1") + val main = new MavenCoordinate("my.great.lib", "mylib", "0.1") + val dep = new MavenCoordinate("my.great.dep", "mylib", "0.1") IvyTestUtils.withRepository(main, Some(dep.toString), None) { repo => val args = Seq( "--class", JarCreationTest.getClass.getName.stripSuffix("$"), @@ -446,7 +446,7 @@ class SparkSubmitSuite assume(RUtils.isRInstalled, "R isn't installed on this machine.") // Check if the SparkR package is installed assume(RUtils.isSparkRInstalled, "SparkR is not installed in this build.") - val main = MavenCoordinate("my.great.lib", "mylib", "0.1") + val main = new MavenCoordinate("my.great.lib", "mylib", "0.1") val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) val rScriptDir = Seq(sparkHome, "R", "pkg", "inst", "tests", "packageInAJarTest.R").mkString(File.separator) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index 266c9d33b5a9..2356acf24518 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -58,7 +58,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { } test("incorrect maven coordinate throws error") { - val coordinates = Seq("a:b: ", " :a:b", "a: :b", "a:b:", ":a:b", "a::b", "::", "a:b", "a") + val coordinates = Seq("a:b: ", " :a:b", "a: :b", "a:b:", ":a:b", "a::b", "::", "a:b", "a", + "a:b:c:d:e:f") for (coordinate <- coordinates) { intercept[IllegalArgumentException] { SparkSubmitUtils.extractMavenCoordinates(coordinate) @@ -91,6 +92,15 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { } } + test("extractMavenCoordinates parses correctly") { + for (s <- Seq("g:a:v,g:a:p:v,g:a:p:c:v", "g/a/v,g/a/p/v,g/a/p/c/v")) { + val Seq(gav, gapv, gapcv) = SparkSubmitUtils.extractMavenCoordinates(s) + assert(new MavenCoordinate("g", "a", "v") === gav) + assert(new MavenCoordinate("g", "a", Some("p"), None, "v") === gapv) + assert(new MavenCoordinate("g", "a", Some("p"), Some("c"), "v") === gapcv) + } + } + test("add dependencies works correctly") { val md = SparkSubmitUtils.getModuleDescriptor val artifacts = SparkSubmitUtils.extractMavenCoordinates("com.databricks:spark-csv_2.10:0.1," + @@ -115,7 +125,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { assert(rule2.getOrganisation === "c") assert(rule2.getName === "d") intercept[IllegalArgumentException] { - SparkSubmitUtils.createExclusion("e:f:g:h", new IvySettings, "default") + SparkSubmitUtils.createExclusion("e:f:g:h:i:j", new IvySettings, "default") } } @@ -128,7 +138,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { assert(index >= 0) jPaths = jPaths.substring(index + tempIvyPath.length) } - val main = MavenCoordinate("my.awesome.lib", "mylib", "0.1") + val main = new MavenCoordinate("my.awesome.lib", "mylib", "0.1") IvyTestUtils.withRepository(main, None, None) { repo => // end to end val jarPath = SparkSubmitUtils.resolveMavenCoordinates( @@ -199,7 +209,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { SparkSubmitUtils.buildIvySettings(None, None), isTest = true) assert(path === "", "should return empty path") - val main = MavenCoordinate("org.apache.spark", "spark-streaming-kafka-assembly_2.10", "1.2.0") + val main = new MavenCoordinate( + "org.apache.spark", "spark-streaming-kafka-assembly_2.10", "1.2.0") IvyTestUtils.withRepository(main, None, None) { repo => val files = SparkSubmitUtils.resolveMavenCoordinates( coordinates + "," + main.toString,