Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 54 additions & 21 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowab
import java.net.URL
import java.security.PrivilegedExceptionAction
import java.text.ParseException
import java.util.Collections

import scala.annotation.tailrec
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
Expand Down Expand Up @@ -834,33 +835,56 @@ private[spark] object SparkSubmitUtils {
var printStream = SparkSubmit.printStream

/**
* Represents a Maven Coordinate
* Represents a Maven Coordinate. Refer to https://maven.apache.org/pom.html#Maven_Coordinates
* for more information. Standard ordering for a full coordinate is
* `groupId:artifactId:packaging:classifier:version` although packaging and classifier
* are optional.
*
* @param groupId the groupId of the coordinate
* @param artifactId the artifactId of the coordinate
* @param packaging Maven packaging type (e.g. jar), if any
* @param classifier Maven classifier, if any
* @param version the version of the coordinate
*/
private[deploy] case class MavenCoordinate(groupId: String, artifactId: String, version: String) {
override def toString: String = s"$groupId:$artifactId:$version"
private[deploy] case class MavenCoordinate(
groupId: String,
artifactId: String,
packaging: Option[String],
classifier: Option[String],
version: String) {

def this(groupId: String, artifactId: String, version: String) =
this(groupId, artifactId, None, None, version)

override def toString: String = {
(Seq(groupId, artifactId) ++ packaging ++ classifier ++ Seq(version)).mkString(":")
}
}

/**
* Extracts maven coordinates from a comma-delimited string. Coordinates should be provided
* in the format `groupId:artifactId:version` or `groupId/artifactId:version`.
* @param coordinates Comma-delimited string of maven coordinates
* in the format `groupId:artifactId:version`, `groupId:artifactId:packaging:version`, or
* `groupId:artifactId:packaging:classifier:version`. '/' can be used as a separator instead
* of ':'.
*
* @param coordinates Comma-delimited string of Maven coordinates
* @return Sequence of Maven coordinates
*/
def extractMavenCoordinates(coordinates: String): Seq[MavenCoordinate] = {
coordinates.split(",").map { p =>
val splits = p.replace("/", ":").split(":")
require(splits.length == 3, s"Provided Maven Coordinates must be in the form " +
s"'groupId:artifactId:version'. The coordinate provided is: $p")
require(splits(0) != null && splits(0).trim.nonEmpty, s"The groupId cannot be null or " +
s"be whitespace. The groupId provided is: ${splits(0)}")
require(splits(1) != null && splits(1).trim.nonEmpty, s"The artifactId cannot be null or " +
s"be whitespace. The artifactId provided is: ${splits(1)}")
require(splits(2) != null && splits(2).trim.nonEmpty, s"The version cannot be null or " +
s"be whitespace. The version provided is: ${splits(2)}")
new MavenCoordinate(splits(0), splits(1), splits(2))
coordinates.split(",").map { coordinate =>
val splits = coordinate.split("[:/]")
require(splits.forall(split => split != null && split.trim.nonEmpty),
s"All elements of coordinate must be non-null and not whitespace; got $coordinate")
splits match {
case Array(groupId, artifactId, version) =>
new MavenCoordinate(groupId, artifactId, version)
case Array(groupId, artifactId, packaging, version) =>
new MavenCoordinate(groupId, artifactId, Some(packaging), None, version)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can a classifier be given without the packaging?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem like it, given https://maven.apache.org/pom.html#Maven_Coordinates . If there are four dimensions, the third is packaging.

case Array(groupId, artifactId, packaging, classifier, version) =>
new MavenCoordinate(groupId, artifactId, Some(packaging), Some(classifier), version)
case _ => throw new IllegalArgumentException("Coordinates must be of form " +
s"groupId:artifactId[:packaging[:classifier]]:version; got $coordinate")
}
}
}

Expand Down Expand Up @@ -928,12 +952,14 @@ private[spark] object SparkSubmitUtils {
* @return a comma-delimited list of paths for the dependencies
*/
def resolveDependencyPaths(
artifacts: Array[AnyRef],
artifacts: Array[Artifact],
cacheDirectory: File): String = {
artifacts.map { artifactInfo =>
val artifact = artifactInfo.asInstanceOf[Artifact].getModuleRevisionId
val artifact = artifactInfo.getModuleRevisionId
val classifier =
Option(artifactInfo.getExtraAttribute("classifier")).map("-" + _).getOrElse("")
cacheDirectory.getAbsolutePath + File.separator +
s"${artifact.getOrganisation}_${artifact.getName}-${artifact.getRevision}.jar"
s"${artifact.getOrganisation}_${artifact.getName}${classifier}-${artifact.getRevision}.jar"
}.mkString(",")
}

Expand All @@ -950,6 +976,12 @@ private[spark] object SparkSubmitUtils {
printStream.println(s"${dd.getDependencyId} added as a dependency")
// scalastyle:on println
md.addDependency(dd)
if (mvn.classifier.isDefined) {
val typeExt = mvn.packaging.getOrElse("jar")
dd.addDependencyArtifact(ivyConfName, new DefaultDependencyArtifactDescriptor(
dd, mvn.artifactId, typeExt, typeExt, null,
Collections.singletonMap("classifier", mvn.classifier.get)))
}
}
}

Expand Down Expand Up @@ -1133,9 +1165,10 @@ private[spark] object SparkSubmitUtils {
// retrieve all resolved dependencies
ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
packagesDirectory.getAbsolutePath + File.separator +
"[organization]_[artifact]-[revision].[ext]",
"[organization]_[artifact](-[classifier])-[revision].[ext]",
retrieveOptions.setConfs(Array(ivyConfName)))
resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
resolveDependencyPaths(
rr.getArtifacts.toArray.map(_.asInstanceOf[Artifact]), packagesDirectory)
} finally {
System.setOut(sysOut)
}
Expand Down
34 changes: 23 additions & 11 deletions core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,21 @@ import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
private[deploy] object IvyTestUtils {

/**
* Create the path for the jar and pom from the maven coordinate. Extension should be `jar`
* or `pom`.
* Create the path for the jar and pom from the maven coordinate. `extOverride` should
* be `jar` or `pom`.
*/
private[deploy] def pathFromCoordinate(
artifact: MavenCoordinate,
prefix: File,
ext: String,
extOverride: Option[String],
useIvyLayout: Boolean): File = {
val groupDirs = artifact.groupId.replace(".", File.separator)
val artifactDirs = artifact.artifactId
val artifactPath =
if (!useIvyLayout) {
Seq(groupDirs, artifactDirs, artifact.version).mkString(File.separator)
} else {
val ext = extOverride.getOrElse(artifact.packaging.getOrElse("jar"))
Seq(artifact.groupId, artifactDirs, artifact.version, ext + "s").mkString(File.separator)
}
new File(prefix, artifactPath)
Expand All @@ -57,9 +58,11 @@ private[deploy] object IvyTestUtils {
private[deploy] def artifactName(
artifact: MavenCoordinate,
useIvyLayout: Boolean,
ext: String = ".jar"): String = {
extOverride: Option[String]): String = {
val classifier = artifact.classifier.map("-" + _).getOrElse("")
val ext = extOverride.getOrElse("." + artifact.packaging.getOrElse("jar"))
if (!useIvyLayout) {
s"${artifact.artifactId}-${artifact.version}$ext"
s"${artifact.artifactId}-${artifact.version}$classifier$ext"
} else {
s"${artifact.artifactId}$ext"
}
Expand Down Expand Up @@ -152,11 +155,11 @@ private[deploy] object IvyTestUtils {
dependencies: Option[Seq[MavenCoordinate]],
useIvyLayout: Boolean): File = {
if (useIvyLayout) {
val ivyXmlPath = pathFromCoordinate(artifact, tempPath, "ivy", true)
val ivyXmlPath = pathFromCoordinate(artifact, tempPath, Some("ivy"), useIvyLayout)
Files.createParentDirs(new File(ivyXmlPath, "dummy"))
createIvyDescriptor(ivyXmlPath, artifact, dependencies)
} else {
val pomPath = pathFromCoordinate(artifact, tempPath, "pom", useIvyLayout)
val pomPath = pathFromCoordinate(artifact, tempPath, Some("pom"), useIvyLayout)
Files.createParentDirs(new File(pomPath, "dummy"))
createPom(pomPath, artifact, dependencies)
}
Expand All @@ -167,6 +170,12 @@ private[deploy] object IvyTestUtils {
var result = "\n" + " " * tabCount + s"<groupId>${artifact.groupId}</groupId>"
result += "\n" + " " * tabCount + s"<artifactId>${artifact.artifactId}</artifactId>"
result += "\n" + " " * tabCount + s"<version>${artifact.version}</version>"
if (artifact.classifier.isDefined) {
result += "\n" + " " * tabCount + s"<classifier>${artifact.classifier.get}</classifier>"
}
if (artifact.packaging.isDefined) {
result += "\n" + " " * tabCount + s"<type>${artifact.packaging.get}</type>"
}
result
}

Expand All @@ -191,7 +200,7 @@ private[deploy] object IvyTestUtils {
"\n <dependencies>\n" + inside + "\n </dependencies>"
}.getOrElse("")
content += "\n</project>"
writeFile(dir, artifactName(artifact, false, ".pom"), content.trim)
writeFile(dir, artifactName(artifact, false, Some(".pom")), content.trim)
}

/** Helper method to write artifact information in the ivy.xml. */
Expand All @@ -206,6 +215,8 @@ private[deploy] object IvyTestUtils {
dir: File,
artifact: MavenCoordinate,
dependencies: Option[Seq[MavenCoordinate]]): File = {
val typeExt = artifact.packaging.getOrElse("jar")
val classifier = artifact.classifier.map(c => s"classifier=$c").getOrElse("")
var content = s"""
|<?xml version="1.0" encoding="UTF-8"?>
|<ivy-module version="2.0" xmlns:m="http://ant.apache.org/ivy/maven">
Expand All @@ -221,7 +232,8 @@ private[deploy] object IvyTestUtils {
| <conf name="pom" visibility="public" description=""/>
| </configurations>
| <publications>
| <artifact name="${artifactName(artifact, true, "")}" type="jar" ext="jar"
| <artifact name="${artifactName(artifact, true, Some(""))}"
| type="$typeExt" ext="$typeExt" $classifier
| conf="master"/>
| </publications>
""".stripMargin.trim
Expand All @@ -241,7 +253,7 @@ private[deploy] object IvyTestUtils {
useIvyLayout: Boolean,
withR: Boolean,
withManifest: Option[Manifest] = None): File = {
val jarFile = new File(dir, artifactName(artifact, useIvyLayout))
val jarFile = new File(dir, artifactName(artifact, useIvyLayout, None))
val jarFileStream = new FileOutputStream(jarFile)
val manifest = withManifest.getOrElse {
val mani = new Manifest()
Expand Down Expand Up @@ -295,7 +307,7 @@ private[deploy] object IvyTestUtils {
val root = new File(tempPath, tempPath.hashCode().toString)
Files.createParentDirs(new File(root, "dummy"))
try {
val jarPath = pathFromCoordinate(artifact, tempPath, "jar", useIvyLayout)
val jarPath = pathFromCoordinate(artifact, tempPath, None, useIvyLayout)
Files.createParentDirs(new File(jarPath, "dummy"))
val className = "MyLib"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ class RPackageUtilsSuite
with BeforeAndAfterEach
with ResetSystemProperties {

private val main = MavenCoordinate("a", "b", "c")
private val dep1 = MavenCoordinate("a", "dep1", "c")
private val dep2 = MavenCoordinate("a", "dep2", "d")
private val main = new MavenCoordinate("a", "b", "c")
private val dep1 = new MavenCoordinate("a", "dep1", "c")
private val dep2 = new MavenCoordinate("a", "dep2", "d")

private def getJarPath(coord: MavenCoordinate, repo: File): File = {
new File(IvyTestUtils.pathFromCoordinate(coord, repo, "jar", useIvyLayout = false),
IvyTestUtils.artifactName(coord, useIvyLayout = false, ".jar"))
new File(IvyTestUtils.pathFromCoordinate(coord, repo, None, useIvyLayout = false),
IvyTestUtils.artifactName(coord, useIvyLayout = false, None))
}

private val lineBuffer = ArrayBuffer[String]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,8 @@ class SparkSubmitSuite
// SPARK-7287
test("includes jars passed in through --packages") {
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val main = MavenCoordinate("my.great.lib", "mylib", "0.1")
val dep = MavenCoordinate("my.great.dep", "mylib", "0.1")
val main = new MavenCoordinate("my.great.lib", "mylib", "0.1")
val dep = new MavenCoordinate("my.great.dep", "mylib", "0.1")
IvyTestUtils.withRepository(main, Some(dep.toString), None) { repo =>
val args = Seq(
"--class", JarCreationTest.getClass.getName.stripSuffix("$"),
Expand All @@ -446,7 +446,7 @@ class SparkSubmitSuite
assume(RUtils.isRInstalled, "R isn't installed on this machine.")
// Check if the SparkR package is installed
assume(RUtils.isSparkRInstalled, "SparkR is not installed in this build.")
val main = MavenCoordinate("my.great.lib", "mylib", "0.1")
val main = new MavenCoordinate("my.great.lib", "mylib", "0.1")
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
val rScriptDir =
Seq(sparkHome, "R", "pkg", "inst", "tests", "packageInAJarTest.R").mkString(File.separator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
}

test("incorrect maven coordinate throws error") {
val coordinates = Seq("a:b: ", " :a:b", "a: :b", "a:b:", ":a:b", "a::b", "::", "a:b", "a")
val coordinates = Seq("a:b: ", " :a:b", "a: :b", "a:b:", ":a:b", "a::b", "::", "a:b", "a",
"a:b:c:d:e:f")
for (coordinate <- coordinates) {
intercept[IllegalArgumentException] {
SparkSubmitUtils.extractMavenCoordinates(coordinate)
Expand Down Expand Up @@ -91,6 +92,15 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
}
}

test("extractMavenCoordinates parses correctly") {
for (s <- Seq("g:a:v,g:a:p:v,g:a:p:c:v", "g/a/v,g/a/p/v,g/a/p/c/v")) {
val Seq(gav, gapv, gapcv) = SparkSubmitUtils.extractMavenCoordinates(s)
assert(new MavenCoordinate("g", "a", "v") === gav)
assert(new MavenCoordinate("g", "a", Some("p"), None, "v") === gapv)
assert(new MavenCoordinate("g", "a", Some("p"), Some("c"), "v") === gapcv)
}
}

test("add dependencies works correctly") {
val md = SparkSubmitUtils.getModuleDescriptor
val artifacts = SparkSubmitUtils.extractMavenCoordinates("com.databricks:spark-csv_2.10:0.1," +
Expand All @@ -115,7 +125,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
assert(rule2.getOrganisation === "c")
assert(rule2.getName === "d")
intercept[IllegalArgumentException] {
SparkSubmitUtils.createExclusion("e:f:g:h", new IvySettings, "default")
SparkSubmitUtils.createExclusion("e:f:g:h:i:j", new IvySettings, "default")
}
}

Expand All @@ -128,7 +138,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
assert(index >= 0)
jPaths = jPaths.substring(index + tempIvyPath.length)
}
val main = MavenCoordinate("my.awesome.lib", "mylib", "0.1")
val main = new MavenCoordinate("my.awesome.lib", "mylib", "0.1")
IvyTestUtils.withRepository(main, None, None) { repo =>
// end to end
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
Expand Down Expand Up @@ -199,7 +209,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
SparkSubmitUtils.buildIvySettings(None, None),
isTest = true)
assert(path === "", "should return empty path")
val main = MavenCoordinate("org.apache.spark", "spark-streaming-kafka-assembly_2.10", "1.2.0")
val main = new MavenCoordinate(
"org.apache.spark", "spark-streaming-kafka-assembly_2.10", "1.2.0")
IvyTestUtils.withRepository(main, None, None) { repo =>
val files = SparkSubmitUtils.resolveMavenCoordinates(
coordinates + "," + main.toString,
Expand Down