Skip to content

Commit 462b334

Browse files
author
Davies Liu
committed
Merge branch 'master' of github.com:apache/spark into python_udf
2 parents 7bccc3b + 6aed719 commit 462b334

File tree

22 files changed

+693
-31
lines changed

22 files changed

+693
-31
lines changed

bin/utils.sh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@ function gatherSparkSubmitOpts() {
2626
exit 1
2727
fi
2828

29-
# NOTE: If you add or remove spark-sumbmit options,
29+
# NOTE: If you add or remove spark-submit options,
3030
# modify NOT ONLY this script but also SparkSubmitArgument.scala
3131
SUBMISSION_OPTS=()
3232
APPLICATION_OPTS=()
3333
while (($#)); do
3434
case "$1" in
35-
--master | --deploy-mode | --class | --name | --jars | --py-files | --files | \
36-
--conf | --properties-file | --driver-memory | --driver-java-options | \
35+
--master | --deploy-mode | --class | --name | --jars | --packages | --py-files | --files | \
36+
--conf | --repositories | --properties-file | --driver-memory | --driver-java-options | \
3737
--driver-library-path | --driver-class-path | --executor-memory | --driver-cores | \
3838
--total-executor-cores | --executor-cores | --queue | --num-executors | --archives)
3939
if [[ $# -lt 2 ]]; then

bin/windows-utils.cmd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ SET opts="\<--master\> \<--deploy-mode\> \<--class\> \<--name\> \<--jars\> \<--p
3232
SET opts="%opts:~1,-1% \<--conf\> \<--properties-file\> \<--driver-memory\> \<--driver-java-options\>"
3333
SET opts="%opts:~1,-1% \<--driver-library-path\> \<--driver-class-path\> \<--executor-memory\>"
3434
SET opts="%opts:~1,-1% \<--driver-cores\> \<--total-executor-cores\> \<--executor-cores\> \<--queue\>"
35-
SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\>"
35+
SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\> \<--packages\> \<--repositories\>"
3636

3737
echo %1 | findstr %opts% >nul
3838
if %ERRORLEVEL% equ 0 (

core/pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,17 @@
241241
<artifactId>derby</artifactId>
242242
<scope>test</scope>
243243
</dependency>
244+
<dependency>
245+
<groupId>org.apache.ivy</groupId>
246+
<artifactId>ivy</artifactId>
247+
<version>${ivy.version}</version>
248+
</dependency>
249+
<dependency>
250+
<groupId>oro</groupId>
251+
<!-- oro is needed by ivy, but only listed as an optional dependency, so we include it. -->
252+
<artifactId>oro</artifactId>
253+
<version>${oro.version}</version>
254+
</dependency>
244255
<dependency>
245256
<groupId>org.tachyonproject</groupId>
246257
<artifactId>tachyon-client</artifactId>

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -657,6 +657,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
657657
*
658658
* Load data from a flat binary file, assuming the length of each record is constant.
659659
*
660+
* '''Note:''' We ensure that the byte array for each record in the resulting RDD
661+
* has the provided record length.
662+
*
660663
* @param path Directory to the input data files
661664
* @param recordLength The length at which to split the records
662665
* @return An RDD of data with values, represented as byte arrays
@@ -671,7 +674,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
671674
classOf[LongWritable],
672675
classOf[BytesWritable],
673676
conf=conf)
674-
val data = br.map{ case (k, v) => v.getBytes}
677+
val data = br.map { case (k, v) =>
678+
val bytes = v.getBytes
679+
assert(bytes.length == recordLength, "Byte array does not have correct length")
680+
bytes
681+
}
675682
data
676683
}
677684

@@ -1224,7 +1231,19 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
12241231
null
12251232
}
12261233
} else {
1227-
env.httpFileServer.addJar(new File(uri.getPath))
1234+
try {
1235+
env.httpFileServer.addJar(new File(uri.getPath))
1236+
} catch {
1237+
case exc: FileNotFoundException =>
1238+
logError(s"Jar not found at $path")
1239+
null
1240+
case e: Exception =>
1241+
// For now just log an error but allow to go through so spark examples work.
1242+
// The spark examples don't really need the jar distributed since its also
1243+
// the app jar.
1244+
logError("Error adding jar (" + e + "), was the --addJars option used?")
1245+
null
1246+
}
12281247
}
12291248
// A JAR file which exists locally on every worker node
12301249
case "local" =>

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 215 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,17 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
2525

2626
import org.apache.hadoop.fs.Path
2727

28+
import org.apache.ivy.Ivy
29+
import org.apache.ivy.core.LogOptions
30+
import org.apache.ivy.core.module.descriptor.{DefaultExcludeRule, DefaultDependencyDescriptor, DefaultModuleDescriptor}
31+
import org.apache.ivy.core.module.id.{ModuleId, ArtifactId, ModuleRevisionId}
32+
import org.apache.ivy.core.report.ResolveReport
33+
import org.apache.ivy.core.resolve.{IvyNode, ResolveOptions}
34+
import org.apache.ivy.core.retrieve.RetrieveOptions
35+
import org.apache.ivy.core.settings.IvySettings
36+
import org.apache.ivy.plugins.matcher.GlobPatternMatcher
37+
import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}
38+
2839
import org.apache.spark.executor.ExecutorURLClassLoader
2940
import org.apache.spark.util.Utils
3041

@@ -194,6 +205,18 @@ object SparkSubmit {
194205
// Special flag to avoid deprecation warnings at the client
195206
sysProps("SPARK_SUBMIT") = "true"
196207

208+
// Resolve maven dependencies if there are any and add classpath to jars
209+
val resolvedMavenCoordinates =
210+
SparkSubmitUtils.resolveMavenCoordinates(
211+
args.packages, Option(args.repositories), Option(args.ivyRepoPath))
212+
if (!resolvedMavenCoordinates.trim.isEmpty) {
213+
if (args.jars == null || args.jars.trim.isEmpty) {
214+
args.jars = resolvedMavenCoordinates
215+
} else {
216+
args.jars += s",$resolvedMavenCoordinates"
217+
}
218+
}
219+
197220
// A list of rules to map each argument to system properties or command-line options in
198221
// each deploy mode; we iterate through these below
199222
val options = List[OptionAssigner](
@@ -202,6 +225,7 @@ object SparkSubmit {
202225
OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"),
203226
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
204227
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
228+
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"),
205229
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
206230
sysProp = "spark.driver.memory"),
207231
OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
@@ -213,6 +237,7 @@ object SparkSubmit {
213237

214238
// Standalone cluster only
215239
OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"),
240+
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"),
216241
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
217242
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"),
218243

@@ -384,8 +409,8 @@ object SparkSubmit {
384409
case e: ClassNotFoundException =>
385410
e.printStackTrace(printStream)
386411
if (childMainClass.contains("thriftserver")) {
387-
println(s"Failed to load main class $childMainClass.")
388-
println("You need to build Spark with -Phive and -Phive-thriftserver.")
412+
printStream.println(s"Failed to load main class $childMainClass.")
413+
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
389414
}
390415
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
391416
}
@@ -475,6 +500,194 @@ object SparkSubmit {
475500
}
476501
}
477502

503+
/** Provides utility functions to be used inside SparkSubmit. */
504+
private[spark] object SparkSubmitUtils {
505+
506+
// Exposed for testing
507+
private[spark] var printStream = SparkSubmit.printStream
508+
509+
/**
510+
* Represents a Maven Coordinate
511+
* @param groupId the groupId of the coordinate
512+
* @param artifactId the artifactId of the coordinate
513+
* @param version the version of the coordinate
514+
*/
515+
private[spark] case class MavenCoordinate(groupId: String, artifactId: String, version: String)
516+
517+
/**
518+
* Extracts maven coordinates from a comma-delimited string
519+
* @param coordinates Comma-delimited string of maven coordinates
520+
* @return Sequence of Maven coordinates
521+
*/
522+
private[spark] def extractMavenCoordinates(coordinates: String): Seq[MavenCoordinate] = {
523+
coordinates.split(",").map { p =>
524+
val splits = p.split(":")
525+
require(splits.length == 3, s"Provided Maven Coordinates must be in the form " +
526+
s"'groupId:artifactId:version'. The coordinate provided is: $p")
527+
require(splits(0) != null && splits(0).trim.nonEmpty, s"The groupId cannot be null or " +
528+
s"be whitespace. The groupId provided is: ${splits(0)}")
529+
require(splits(1) != null && splits(1).trim.nonEmpty, s"The artifactId cannot be null or " +
530+
s"be whitespace. The artifactId provided is: ${splits(1)}")
531+
require(splits(2) != null && splits(2).trim.nonEmpty, s"The version cannot be null or " +
532+
s"be whitespace. The version provided is: ${splits(2)}")
533+
new MavenCoordinate(splits(0), splits(1), splits(2))
534+
}
535+
}
536+
537+
/**
538+
* Extracts maven coordinates from a comma-delimited string
539+
* @param remoteRepos Comma-delimited string of remote repositories
540+
* @return A ChainResolver used by Ivy to search for and resolve dependencies.
541+
*/
542+
private[spark] def createRepoResolvers(remoteRepos: Option[String]): ChainResolver = {
543+
// We need a chain resolver if we want to check multiple repositories
544+
val cr = new ChainResolver
545+
cr.setName("list")
546+
547+
// the biblio resolver resolves POM declared dependencies
548+
val br: IBiblioResolver = new IBiblioResolver
549+
br.setM2compatible(true)
550+
br.setUsepoms(true)
551+
br.setName("central")
552+
cr.add(br)
553+
554+
val repositoryList = remoteRepos.getOrElse("")
555+
// add any other remote repositories other than maven central
556+
if (repositoryList.trim.nonEmpty) {
557+
repositoryList.split(",").zipWithIndex.foreach { case (repo, i) =>
558+
val brr: IBiblioResolver = new IBiblioResolver
559+
brr.setM2compatible(true)
560+
brr.setUsepoms(true)
561+
brr.setRoot(repo)
562+
brr.setName(s"repo-${i + 1}")
563+
cr.add(brr)
564+
printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}")
565+
}
566+
}
567+
cr
568+
}
569+
570+
/**
571+
* Output a comma-delimited list of paths for the downloaded jars to be added to the classpath
572+
* (will append to jars in SparkSubmit). The name of the jar is given
573+
* after a '!' by Ivy. It also sometimes contains '(bundle)' after '.jar'. Remove that as well.
574+
* @param artifacts Sequence of dependencies that were resolved and retrieved
575+
* @param cacheDirectory directory where jars are cached
576+
* @return a comma-delimited list of paths for the dependencies
577+
*/
578+
private[spark] def resolveDependencyPaths(
579+
artifacts: Array[AnyRef],
580+
cacheDirectory: File): String = {
581+
artifacts.map { artifactInfo =>
582+
val artifactString = artifactInfo.toString
583+
val jarName = artifactString.drop(artifactString.lastIndexOf("!") + 1)
584+
cacheDirectory.getAbsolutePath + File.separator +
585+
jarName.substring(0, jarName.lastIndexOf(".jar") + 4)
586+
}.mkString(",")
587+
}
588+
589+
/** Adds the given maven coordinates to Ivy's module descriptor. */
590+
private[spark] def addDependenciesToIvy(
591+
md: DefaultModuleDescriptor,
592+
artifacts: Seq[MavenCoordinate],
593+
ivyConfName: String): Unit = {
594+
artifacts.foreach { mvn =>
595+
val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, mvn.version)
596+
val dd = new DefaultDependencyDescriptor(ri, false, false)
597+
dd.addDependencyConfiguration(ivyConfName, ivyConfName)
598+
printStream.println(s"${dd.getDependencyId} added as a dependency")
599+
md.addDependency(dd)
600+
}
601+
}
602+
603+
/** A nice function to use in tests as well. Values are dummy strings. */
604+
private[spark] def getModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance(
605+
ModuleRevisionId.newInstance("org.apache.spark", "spark-submit-parent", "1.0"))
606+
607+
/**
608+
* Resolves any dependencies that were supplied through maven coordinates
609+
* @param coordinates Comma-delimited string of maven coordinates
610+
* @param remoteRepos Comma-delimited string of remote repositories other than maven central
611+
* @param ivyPath The path to the local ivy repository
612+
* @return The comma-delimited path to the jars of the given maven artifacts including their
613+
* transitive dependencies
614+
*/
615+
private[spark] def resolveMavenCoordinates(
616+
coordinates: String,
617+
remoteRepos: Option[String],
618+
ivyPath: Option[String],
619+
isTest: Boolean = false): String = {
620+
if (coordinates == null || coordinates.trim.isEmpty) {
621+
""
622+
} else {
623+
val artifacts = extractMavenCoordinates(coordinates)
624+
// Default configuration name for ivy
625+
val ivyConfName = "default"
626+
// set ivy settings for location of cache
627+
val ivySettings: IvySettings = new IvySettings
628+
// Directories for caching downloads through ivy and storing the jars when maven coordinates
629+
// are supplied to spark-submit
630+
val alternateIvyCache = ivyPath.getOrElse("")
631+
val packagesDirectory: File =
632+
if (alternateIvyCache.trim.isEmpty) {
633+
new File(ivySettings.getDefaultIvyUserDir, "jars")
634+
} else {
635+
ivySettings.setDefaultCache(new File(alternateIvyCache, "cache"))
636+
new File(alternateIvyCache, "jars")
637+
}
638+
printStream.println(
639+
s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}")
640+
printStream.println(s"The jars for the packages stored in: $packagesDirectory")
641+
// create a pattern matcher
642+
ivySettings.addMatcher(new GlobPatternMatcher)
643+
// create the dependency resolvers
644+
val repoResolver = createRepoResolvers(remoteRepos)
645+
ivySettings.addResolver(repoResolver)
646+
ivySettings.setDefaultResolver(repoResolver.getName)
647+
648+
val ivy = Ivy.newInstance(ivySettings)
649+
// Set resolve options to download transitive dependencies as well
650+
val resolveOptions = new ResolveOptions
651+
resolveOptions.setTransitive(true)
652+
val retrieveOptions = new RetrieveOptions
653+
// Turn downloading and logging off for testing
654+
if (isTest) {
655+
resolveOptions.setDownload(false)
656+
resolveOptions.setLog(LogOptions.LOG_QUIET)
657+
retrieveOptions.setLog(LogOptions.LOG_QUIET)
658+
} else {
659+
resolveOptions.setDownload(true)
660+
}
661+
662+
// A Module descriptor must be specified. Entries are dummy strings
663+
val md = getModuleDescriptor
664+
md.setDefaultConf(ivyConfName)
665+
666+
// Add an exclusion rule for Spark
667+
val sparkArtifacts = new ArtifactId(new ModuleId("org.apache.spark", "*"), "*", "*", "*")
668+
val sparkDependencyExcludeRule =
669+
new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null)
670+
sparkDependencyExcludeRule.addConfiguration(ivyConfName)
671+
672+
// Exclude any Spark dependencies, and add all supplied maven artifacts as dependencies
673+
md.addExcludeRule(sparkDependencyExcludeRule)
674+
addDependenciesToIvy(md, artifacts, ivyConfName)
675+
676+
// resolve dependencies
677+
val rr: ResolveReport = ivy.resolve(md, resolveOptions)
678+
if (rr.hasError) {
679+
throw new RuntimeException(rr.getAllProblemMessages.toString)
680+
}
681+
// retrieve all resolved dependencies
682+
ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
683+
packagesDirectory.getAbsolutePath + File.separator + "[artifact](-[classifier]).[ext]",
684+
retrieveOptions.setConfs(Array(ivyConfName)))
685+
686+
resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
687+
}
688+
}
689+
}
690+
478691
/**
479692
* Provides an indirection layer for passing arguments as system properties or flags to
480693
* the user's driver program or to downstream launcher tools.

0 commit comments

Comments
 (0)