Skip to content

Commit 3e409f5

Browse files
author
Fangshi Li
committed
[SPARK-24287][Core] Spark -packages option should support classifier, no-transitive, and custom conf
1 parent 4ac8f9b commit 3e409f5

File tree

2 files changed

+191
-50
lines changed

2 files changed

+191
-50
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 109 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.net.URL
2323
import java.security.PrivilegedExceptionAction
2424
import java.text.ParseException
2525
import java.util.UUID
26+
import java.util.Collections
2627

2728
import scala.annotation.tailrec
2829
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
@@ -1000,33 +1001,81 @@ private[spark] object SparkSubmitUtils {
10001001
"tags_", "unsafe_")
10011002

10021003
/**
1003-
* Represents a Maven Coordinate
1004-
* @param groupId the groupId of the coordinate
1005-
* @param artifactId the artifactId of the coordinate
1006-
* @param version the version of the coordinate
1004+
* Represents a Artifact Coordinate to resolve
1005+
* @param groupId the groupId of the artifact
1006+
* @param artifactId the artifactId of the artifact
1007+
* @param version the version of the artifact
1008+
* @param extraParams the extra params to resolve the artifact
10071009
*/
1008-
private[deploy] case class MavenCoordinate(groupId: String, artifactId: String, version: String) {
1009-
override def toString: String = s"$groupId:$artifactId:$version"
1010+
private[deploy] case class MavenCoordinate(groupId: String, artifactId: String, version: String,
1011+
extraParams: Map[String, String] = Map()) {
1012+
1013+
def params: String = if (extraParams.isEmpty) {
1014+
""
1015+
} else {
1016+
"?" + extraParams.map{ case (k, v) => (k + "=" + v) }.mkString(":")
1017+
}
1018+
1019+
override def toString: String = s"$groupId:$artifactId:$version$params"
10101020
}
10111021

10121022
/**
1013-
* Extracts maven coordinates from a comma-delimited string. Coordinates should be provided
1014-
* in the format `groupId:artifactId:version` or `groupId/artifactId:version`.
1015-
* @param coordinates Comma-delimited string of maven coordinates
1016-
* @return Sequence of Maven coordinates
1023+
* Extracts artifact coordinates from a comma-delimited string. Coordinates should be provided
1024+
* in the format `groupId:artifactId:version?param1=value1\&param2\&value2:..` or
1025+
* `groupId/artifactId:version?param1=value1\&param2=value2:..`
1026+
*
1027+
* Param splitter & is the background process char in cli, so when multiple params is used,
1028+
* either & should to be escaped or the value of --packages should be enclosed in double quotes
1029+
*
1030+
* Optional params are 'classifier', 'transitive', 'exclude', 'conf':
1031+
* classifier: classifier of the artifact
1032+
* transitive: whether to resolve transitive deps for the artifact
1033+
* exlude: exclude list of transitive artifacts for this artifact(e.g. "a#b#c")
1034+
* conf: the conf of the artifact
1035+
*
1036+
* @param coordinates Comma-delimited string of artifact coordinates
1037+
* @return Sequence of Artifact coordinates
10171038
*/
10181039
def extractMavenCoordinates(coordinates: String): Seq[MavenCoordinate] = {
10191040
coordinates.split(",").map { p =>
1020-
val splits = p.replace("/", ":").split(":")
1021-
require(splits.length == 3, s"Provided Maven Coordinates must be in the form " +
1022-
s"'groupId:artifactId:version'. The coordinate provided is: $p")
1023-
require(splits(0) != null && splits(0).trim.nonEmpty, s"The groupId cannot be null or " +
1024-
s"be whitespace. The groupId provided is: ${splits(0)}")
1025-
require(splits(1) != null && splits(1).trim.nonEmpty, s"The artifactId cannot be null or " +
1026-
s"be whitespace. The artifactId provided is: ${splits(1)}")
1027-
require(splits(2) != null && splits(2).trim.nonEmpty, s"The version cannot be null or " +
1028-
s"be whitespace. The version provided is: ${splits(2)}")
1029-
new MavenCoordinate(splits(0), splits(1), splits(2))
1041+
val errMsg = s"Provided Artifact Coordinates must be in the form " +
1042+
s"'groupId:artifactId:version?param1=a\\&param2=b\\&param3=c'. Optional params are" +
1043+
s"'classifier', 'transitive', 'exclude', 'conf'. The coordinate provided is: $p"
1044+
// Split artifact coordinate and params
1045+
val parts = p.replace("/", ":").split("\\?")
1046+
require(parts.length == 1 || parts.length == 2, errMsg)
1047+
// Parse coordinate 'groupId:artifactId:version'
1048+
val coords = parts(0).split(":")
1049+
require(coords.length == 3, errMsg)
1050+
require(coords(0) != null && coords(0).trim.nonEmpty, s"The groupId cannot be null or " +
1051+
s"be whitespace. The groupId provided is: ${coords(0)}. ${errMsg}")
1052+
require(coords(1) != null && coords(1).trim.nonEmpty, s"The artifactId cannot be null or " +
1053+
s"be whitespace. The artifactId provided is: ${coords(1)}. ${errMsg}")
1054+
require(coords(2) != null && coords(2).trim.nonEmpty, s"The version cannot be null or " +
1055+
s"be whitespace. The version provided is: ${coords(2)}. ${errMsg}")
1056+
if (parts.length == 1) {
1057+
new MavenCoordinate(coords(0), coords(1), coords(2))
1058+
} else {
1059+
// Parse params 'param1=a\&param2=b\&param3=c'
1060+
val params = parts(1).split("\\&")
1061+
var paramMap = Map[String, String]()
1062+
for (i <- 0 until params.length) {
1063+
require(params(i) != null && params(i).trim.nonEmpty, errMsg)
1064+
val param = params(i).split("=")
1065+
require(param.length == 2, errMsg)
1066+
require(param(0) != null && param(0).trim.nonEmpty, s"The param key " +
1067+
s"cannot be null or be whitespace. The key provided is: ${param(0)}. ${errMsg}")
1068+
require(param(1) != null && param(1).trim.nonEmpty, s"The param value " +
1069+
s"cannot be null or be whitespace. The value provided is: ${param(1)}. ${errMsg}")
1070+
1071+
if (Set("classifier", "transitive", "conf", "exclude").contains(param(0))) {
1072+
paramMap += (param(0) -> param(1))
1073+
} else {
1074+
throw new RuntimeException(errMsg)
1075+
}
1076+
}
1077+
new MavenCoordinate(coords(0), coords(1), coords(2), paramMap)
1078+
}
10301079
}
10311080
}
10321081

@@ -1098,20 +1147,51 @@ private[spark] object SparkSubmitUtils {
10981147
cacheDirectory: File): String = {
10991148
artifacts.map { artifactInfo =>
11001149
val artifact = artifactInfo.asInstanceOf[Artifact].getModuleRevisionId
1150+
val classifier = artifactInfo.asInstanceOf[Artifact].getId.getExtraAttribute("classifier")
1151+
val suffix = if (classifier == null) "" else s"-${classifier}"
11011152
cacheDirectory.getAbsolutePath + File.separator +
1102-
s"${artifact.getOrganisation}_${artifact.getName}-${artifact.getRevision}.jar"
1153+
s"${artifact.getOrganisation}_${artifact.getName}-${artifact.getRevision}${suffix}.jar"
11031154
}.mkString(",")
11041155
}
11051156

1106-
/** Adds the given maven coordinates to Ivy's module descriptor. */
1157+
/** Adds the given artifact coordinates to Ivy's module descriptor. */
11071158
def addDependenciesToIvy(
11081159
md: DefaultModuleDescriptor,
11091160
artifacts: Seq[MavenCoordinate],
1161+
ivySettings: IvySettings,
11101162
ivyConfName: String): Unit = {
1111-
artifacts.foreach { mvn =>
1112-
val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, mvn.version)
1113-
val dd = new DefaultDependencyDescriptor(ri, false, false)
1163+
artifacts.foreach { art =>
1164+
val ri = ModuleRevisionId.newInstance(art.groupId, art.artifactId, art.version)
1165+
val dd = art.extraParams.get("transitive") match {
1166+
case Some(t) => new DefaultDependencyDescriptor(null, ri, false, false, t.toBoolean)
1167+
case None => new DefaultDependencyDescriptor(ri, false, false)
1168+
}
11141169
dd.addDependencyConfiguration(ivyConfName, ivyConfName + "(runtime)")
1170+
1171+
art.extraParams.foreach { case (param, pvalue) =>
1172+
param match {
1173+
// Exclude dependencies(name separated by #) for this artifact
1174+
case "exclude" => pvalue.split("#").foreach { ex =>
1175+
dd.addExcludeRule(ivyConfName,
1176+
createExclusion("*:*" + ex + "*:*", ivySettings, ivyConfName))}
1177+
1178+
// Add ivy conf for the artifact to default conf, so it can be resolved with default
1179+
// e.g. ivy: <artifact name="shadow-jar" type="jar" ext="jar" conf="shadow" />
1180+
case "conf" => dd.addDependencyConfiguration(ivyConfName, pvalue)
1181+
1182+
// If this artifact has classifier, add descriptor with classifier so it can be resolved
1183+
// e.g. ivy: <artifact name="avro-mapred" type="jar" ext="jar" m:classifier="hadoop1"/>
1184+
case "classifier" =>
1185+
val dad = new DefaultDependencyArtifactDescriptor(dd, art.artifactId, "jar", "jar",
1186+
null, Collections.singletonMap(param, pvalue))
1187+
dad.addConfiguration(art.extraParams.getOrElse("conf", ""))
1188+
dd.addDependencyArtifact(ivyConfName, dad)
1189+
1190+
// Already used, ignore
1191+
case "transitive" =>
1192+
}
1193+
}
1194+
11151195
// scalastyle:off println
11161196
printStream.println(s"${dd.getDependencyId} added as a dependency")
11171197
// scalastyle:on println
@@ -1245,11 +1325,11 @@ private[spark] object SparkSubmitUtils {
12451325
}
12461326

12471327
/**
1248-
* Resolves any dependencies that were supplied through maven coordinates
1249-
* @param coordinates Comma-delimited string of maven coordinates
1328+
* Resolves any dependencies that were supplied through artifact coordinates
1329+
* @param coordinates Comma-delimited string of artifact coordinates
12501330
* @param ivySettings An IvySettings containing resolvers to use
12511331
* @param exclusions Exclusions to apply when resolving transitive dependencies
1252-
* @return The comma-delimited path to the jars of the given maven artifacts including their
1332+
* @return The comma-delimited path to the jars of the given artifacts including their
12531333
* transitive dependencies
12541334
*/
12551335
def resolveMavenCoordinates(
@@ -1299,7 +1379,7 @@ private[spark] object SparkSubmitUtils {
12991379
// Add exclusion rules for Spark and Scala Library
13001380
addExclusionRules(ivySettings, ivyConfName, md)
13011381
// add all supplied maven artifacts as dependencies
1302-
addDependenciesToIvy(md, artifacts, ivyConfName)
1382+
addDependenciesToIvy(md, artifacts, ivySettings, ivyConfName)
13031383
exclusions.foreach { e =>
13041384
md.addExcludeRule(createExclusion(e + ":*", ivySettings, ivyConfName))
13051385
}

core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala

Lines changed: 82 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.deploy
2020
import java.io.{File, OutputStream, PrintStream}
2121
import java.nio.charset.StandardCharsets
2222

23-
import scala.collection.mutable.ArrayBuffer
23+
import scala.collection.mutable.{ArrayBuffer, Map}
2424

2525
import com.google.common.io.Files
2626
import org.apache.ivy.core.module.descriptor.MDArtifact
@@ -40,6 +40,28 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
4040
def write(b: Int) = {}
4141
}
4242

43+
private def loadIvySettings(dummyIvyLocal: File): IvySettings = {
44+
val settingsText =
45+
s"""
46+
|<ivysettings>
47+
| <caches defaultCacheDir="$tempIvyPath/cache"/>
48+
| <settings defaultResolver="local-ivy-settings-file-test"/>
49+
| <resolvers>
50+
| <filesystem name="local-ivy-settings-file-test">
51+
| <ivy pattern=
52+
| "$dummyIvyLocal/[organisation]/[module]/[revision]/[type]s/[artifact].[ext]"/>
53+
| <artifact pattern=
54+
| "$dummyIvyLocal/[organisation]/[module]/[revision]/[type]s/[artifact].[ext]"/>
55+
| </filesystem>
56+
| </resolvers>
57+
|</ivysettings>
58+
|""".stripMargin
59+
60+
val settingsFile = new File(tempIvyPath, "ivysettings.xml")
61+
Files.write(settingsText, settingsFile, StandardCharsets.UTF_8)
62+
SparkSubmitUtils.loadIvySettings(settingsFile.toString, None, None)
63+
}
64+
4365
/** Simple PrintStream that reads data into a buffer */
4466
private class BufferPrintStream extends PrintStream(noOpOutputStream) {
4567
var lineBuffer = ArrayBuffer[String]()
@@ -93,11 +115,13 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
93115
}
94116

95117
test("add dependencies works correctly") {
118+
val repos = "a/1,b/2,c/3"
119+
val settings = SparkSubmitUtils.buildIvySettings(Option(repos), None)
96120
val md = SparkSubmitUtils.getModuleDescriptor
97121
val artifacts = SparkSubmitUtils.extractMavenCoordinates("com.databricks:spark-csv_2.11:0.1," +
98122
"com.databricks:spark-avro_2.11:0.1")
99123

100-
SparkSubmitUtils.addDependenciesToIvy(md, artifacts, "default")
124+
SparkSubmitUtils.addDependenciesToIvy(md, artifacts, settings, "default")
101125
assert(md.getDependencies.length === 2)
102126
}
103127

@@ -225,25 +249,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
225249
val main = new MavenCoordinate("my.great.lib", "mylib", "0.1")
226250
val dep = "my.great.dep:mydep:0.5"
227251
val dummyIvyLocal = new File(tempIvyPath, "local" + File.separator)
228-
val settingsText =
229-
s"""
230-
|<ivysettings>
231-
| <caches defaultCacheDir="$tempIvyPath/cache"/>
232-
| <settings defaultResolver="local-ivy-settings-file-test"/>
233-
| <resolvers>
234-
| <filesystem name="local-ivy-settings-file-test">
235-
| <ivy pattern=
236-
| "$dummyIvyLocal/[organisation]/[module]/[revision]/[type]s/[artifact].[ext]"/>
237-
| <artifact pattern=
238-
| "$dummyIvyLocal/[organisation]/[module]/[revision]/[type]s/[artifact].[ext]"/>
239-
| </filesystem>
240-
| </resolvers>
241-
|</ivysettings>
242-
|""".stripMargin
243-
244-
val settingsFile = new File(tempIvyPath, "ivysettings.xml")
245-
Files.write(settingsText, settingsFile, StandardCharsets.UTF_8)
246-
val settings = SparkSubmitUtils.loadIvySettings(settingsFile.toString, None, None)
252+
val settings = loadIvySettings(dummyIvyLocal)
247253
settings.setDefaultIvyUserDir(new File(tempIvyPath)) // NOTE - can't set this through file
248254

249255
val testUtilSettings = new IvySettings
@@ -271,4 +277,59 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
271277
.exists(r.findFirstIn(_).isDefined), "resolution files should be cleaned")
272278
}
273279
}
280+
281+
test("test artifact with no transitive dependencies") {
282+
val main = new MavenCoordinate("my.great.lib", "mylib", "0.1", Map("transitive" -> "false"))
283+
val dep = "my.great.dep:mydep:0.5"
284+
285+
IvyTestUtils.withRepository(main, Some(dep), None) { repo =>
286+
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
287+
main.toString,
288+
SparkSubmitUtils.buildIvySettings(Some(repo), None),
289+
isTest = true)
290+
assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
291+
assert(jarPath.indexOf("mydep") < 0, "should not transitive dependency")
292+
}
293+
}
294+
295+
test("test artifact excluding specific dependencies") {
296+
val main = new MavenCoordinate("my.great.lib", "mylib", "0.1", Map("exclude" -> "mydep"))
297+
val dep = "my.great.dep:mydep:0.5"
298+
299+
IvyTestUtils.withRepository(main, Some(dep), None) { repo =>
300+
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
301+
main.toString,
302+
SparkSubmitUtils.buildIvySettings(Some(repo), None),
303+
isTest = true)
304+
assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
305+
assert(jarPath.indexOf("mydep") < 0, "should not find excluded dependency")
306+
}
307+
}
308+
309+
test("test artifact with classifier") {
310+
val main = new MavenCoordinate("my.great.lib", "mylib", "0.1", Map("classifier" -> "test"))
311+
312+
IvyTestUtils.withRepository(main, None, None) { repo =>
313+
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
314+
main.toString,
315+
SparkSubmitUtils.buildIvySettings(Some(repo), None),
316+
isTest = true)
317+
assert(jarPath.indexOf("mylib-0.1-test") >= 0, "should find artifact with classifier")
318+
}
319+
}
320+
321+
test("test artifact with conf") {
322+
val main = new MavenCoordinate("my.great.lib", "mylib", "0.1", Map("conf" -> "master"))
323+
val badMain = new MavenCoordinate("my.great.lib", "mylib", "0.1", Map("conf" -> "badconf"))
324+
325+
IvyTestUtils.withRepository(main, None, None) { repo =>
326+
val settings = SparkSubmitUtils.buildIvySettings(Some(repo), None)
327+
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, settings, isTest = true)
328+
assert(jarPath.indexOf("mylib") >= 0, "should find artifact with good conf")
329+
// artifact with bad conf should fail on runtime exception: configuration 'badconf' not found
330+
intercept[RuntimeException] {
331+
SparkSubmitUtils.resolveMavenCoordinates(badMain.toString, settings, isTest = true)
332+
}
333+
}
334+
}
274335
}

0 commit comments

Comments
 (0)