Skip to content

Commit 32fe5ae

Browse files
committed
Merge branch 'master' into external_shuffle_service_NM_restart
2 parents d7450f0 + 5eb89f6 commit 32fe5ae

File tree

93 files changed

+3151
-698
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

93 files changed

+3151
-698
lines changed

core/pom.xml

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -46,30 +46,10 @@
4646
<dependency>
4747
<groupId>com.twitter</groupId>
4848
<artifactId>chill_${scala.binary.version}</artifactId>
49-
<exclusions>
50-
<exclusion>
51-
<groupId>org.ow2.asm</groupId>
52-
<artifactId>asm</artifactId>
53-
</exclusion>
54-
<exclusion>
55-
<groupId>org.ow2.asm</groupId>
56-
<artifactId>asm-commons</artifactId>
57-
</exclusion>
58-
</exclusions>
5949
</dependency>
6050
<dependency>
6151
<groupId>com.twitter</groupId>
6252
<artifactId>chill-java</artifactId>
63-
<exclusions>
64-
<exclusion>
65-
<groupId>org.ow2.asm</groupId>
66-
<artifactId>asm</artifactId>
67-
</exclusion>
68-
<exclusion>
69-
<groupId>org.ow2.asm</groupId>
70-
<artifactId>asm-commons</artifactId>
71-
</exclusion>
72-
</exclusions>
7353
</dependency>
7454
<dependency>
7555
<groupId>org.apache.hadoop</groupId>

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ public void insertKVRecord(
428428

429429
public UnsafeSorterIterator getSortedIterator() throws IOException {
430430
assert(inMemSorter != null);
431-
final UnsafeSorterIterator inMemoryIterator = inMemSorter.getSortedIterator();
431+
final UnsafeInMemorySorter.SortedIterator inMemoryIterator = inMemSorter.getSortedIterator();
432432
int numIteratorsToMerge = spillWriters.size() + (inMemoryIterator.hasNext() ? 1 : 0);
433433
if (spillWriters.isEmpty()) {
434434
return inMemoryIterator;

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public void insertRecord(long recordPointer, long keyPrefix) {
133133
pointerArrayInsertPosition++;
134134
}
135135

136-
private static final class SortedIterator extends UnsafeSorterIterator {
136+
public static final class SortedIterator extends UnsafeSorterIterator {
137137

138138
private final TaskMemoryManager memoryManager;
139139
private final int sortBufferInsertPosition;
@@ -144,7 +144,7 @@ private static final class SortedIterator extends UnsafeSorterIterator {
144144
private long keyPrefix;
145145
private int recordLength;
146146

147-
SortedIterator(
147+
private SortedIterator(
148148
TaskMemoryManager memoryManager,
149149
int sortBufferInsertPosition,
150150
long[] sortBuffer) {
@@ -186,7 +186,7 @@ public void loadNext() {
186186
* Return an iterator over record pointers in sorted order. For efficiency, all calls to
187187
* {@code next()} will return the same mutable object.
188188
*/
189-
public UnsafeSorterIterator getSortedIterator() {
189+
public SortedIterator getSortedIterator() {
190190
sorter.sort(pointerArray, 0, pointerArrayInsertPosition / 2, sortComparator);
191191
return new SortedIterator(memoryManager, pointerArrayInsertPosition, pointerArray);
192192
}

core/src/main/resources/org/apache/spark/ui/static/webui.css

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,3 +224,11 @@ span.additional-metric-title {
224224
a.expandbutton {
225225
cursor: pointer;
226226
}
227+
228+
.executor-thread {
229+
background: #E6E6E6;
230+
}
231+
232+
.non-executor-thread {
233+
background: #FAFAFA;
234+
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import java.security.PrivilegedExceptionAction
2424

2525
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
2626

27+
import org.apache.commons.lang3.StringUtils
2728
import org.apache.hadoop.fs.Path
2829
import org.apache.hadoop.security.UserGroupInformation
2930
import org.apache.ivy.Ivy
@@ -37,6 +38,7 @@ import org.apache.ivy.core.settings.IvySettings
3738
import org.apache.ivy.plugins.matcher.GlobPatternMatcher
3839
import org.apache.ivy.plugins.repository.file.FileRepository
3940
import org.apache.ivy.plugins.resolver.{FileSystemResolver, ChainResolver, IBiblioResolver}
41+
4042
import org.apache.spark.api.r.RUtils
4143
import org.apache.spark.SPARK_VERSION
4244
import org.apache.spark.deploy.rest._
@@ -275,21 +277,18 @@ object SparkSubmit {
275277

276278
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
277279
// too for packages that include Python code
278-
val resolvedMavenCoordinates =
279-
SparkSubmitUtils.resolveMavenCoordinates(
280-
args.packages, Option(args.repositories), Option(args.ivyRepoPath))
281-
if (!resolvedMavenCoordinates.trim.isEmpty) {
282-
if (args.jars == null || args.jars.trim.isEmpty) {
283-
args.jars = resolvedMavenCoordinates
280+
val exclusions: Seq[String] =
281+
if (!StringUtils.isBlank(args.packagesExclusions)) {
282+
args.packagesExclusions.split(",")
284283
} else {
285-
args.jars += s",$resolvedMavenCoordinates"
284+
Nil
286285
}
286+
val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages,
287+
Some(args.repositories), Some(args.ivyRepoPath), exclusions = exclusions)
288+
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
289+
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
287290
if (args.isPython) {
288-
if (args.pyFiles == null || args.pyFiles.trim.isEmpty) {
289-
args.pyFiles = resolvedMavenCoordinates
290-
} else {
291-
args.pyFiles += s",$resolvedMavenCoordinates"
292-
}
291+
args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
293292
}
294293
}
295294

@@ -736,7 +735,7 @@ object SparkSubmit {
736735
* no files, into a single comma-separated string.
737736
*/
738737
private def mergeFileLists(lists: String*): String = {
739-
val merged = lists.filter(_ != null)
738+
val merged = lists.filterNot(StringUtils.isBlank)
740739
.flatMap(_.split(","))
741740
.mkString(",")
742741
if (merged == "") null else merged
@@ -938,7 +937,7 @@ private[spark] object SparkSubmitUtils {
938937
// are supplied to spark-submit
939938
val alternateIvyCache = ivyPath.getOrElse("")
940939
val packagesDirectory: File =
941-
if (alternateIvyCache.trim.isEmpty) {
940+
if (alternateIvyCache == null || alternateIvyCache.trim.isEmpty) {
942941
new File(ivySettings.getDefaultIvyUserDir, "jars")
943942
} else {
944943
ivySettings.setDefaultIvyUserDir(new File(alternateIvyCache))
@@ -1010,7 +1009,7 @@ private[spark] object SparkSubmitUtils {
10101009
}
10111010
}
10121011

1013-
private def createExclusion(
1012+
private[deploy] def createExclusion(
10141013
coords: String,
10151014
ivySettings: IvySettings,
10161015
ivyConfName: String): ExcludeRule = {

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
5959
var packages: String = null
6060
var repositories: String = null
6161
var ivyRepoPath: String = null
62+
var packagesExclusions: String = null
6263
var verbose: Boolean = false
6364
var isPython: Boolean = false
6465
var pyFiles: String = null
@@ -172,6 +173,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
172173
name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull
173174
jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull
174175
ivyRepoPath = sparkProperties.get("spark.jars.ivy").orNull
176+
packages = Option(packages).orElse(sparkProperties.get("spark.jars.packages")).orNull
177+
packagesExclusions = Option(packagesExclusions)
178+
.orElse(sparkProperties.get("spark.jars.excludes")).orNull
175179
deployMode = Option(deployMode).orElse(env.get("DEPLOY_MODE")).orNull
176180
numExecutors = Option(numExecutors)
177181
.getOrElse(sparkProperties.get("spark.executor.instances").orNull)
@@ -299,6 +303,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
299303
| childArgs [${childArgs.mkString(" ")}]
300304
| jars $jars
301305
| packages $packages
306+
| packagesExclusions $packagesExclusions
302307
| repositories $repositories
303308
| verbose $verbose
304309
|
@@ -391,6 +396,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
391396
case PACKAGES =>
392397
packages = value
393398

399+
case PACKAGES_EXCLUDE =>
400+
packagesExclusions = value
401+
394402
case REPOSITORIES =>
395403
repositories = value
396404

@@ -482,6 +490,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
482490
| maven repo, then maven central and any additional remote
483491
| repositories given by --repositories. The format for the
484492
| coordinates should be groupId:artifactId:version.
493+
| --exclude-packages Comma-separated list of groupId:artifactId, to exclude while
494+
| resolving the dependencies provided in --packages to avoid
495+
| dependency conflicts.
485496
| --repositories Comma-separated list of additional remote repositories to
486497
| search for the maven coordinates given with --packages.
487498
| --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place

core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,29 @@ private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage
4949
val maybeThreadDump = sc.get.getExecutorThreadDump(executorId)
5050

5151
val content = maybeThreadDump.map { threadDump =>
52-
val dumpRows = threadDump.map { thread =>
52+
val dumpRows = threadDump.sortWith {
53+
case (threadTrace1, threadTrace2) => {
54+
val v1 = if (threadTrace1.threadName.contains("Executor task launch")) 1 else 0
55+
val v2 = if (threadTrace2.threadName.contains("Executor task launch")) 1 else 0
56+
if (v1 == v2) {
57+
threadTrace1.threadName.toLowerCase < threadTrace2.threadName.toLowerCase
58+
} else {
59+
v1 > v2
60+
}
61+
}
62+
}.map { thread =>
63+
val threadName = thread.threadName
64+
val className = "accordion-heading " + {
65+
if (threadName.contains("Executor task launch")) {
66+
"executor-thread"
67+
} else {
68+
"non-executor-thread"
69+
}
70+
}
5371
<div class="accordion-group">
54-
<div class="accordion-heading" onclick="$(this).next().toggleClass('hidden')">
72+
<div class={className} onclick="$(this).next().toggleClass('hidden')">
5573
<a class="accordion-toggle">
56-
Thread {thread.threadId}: {thread.threadName} ({thread.threadState})
74+
Thread {thread.threadId}: {threadName} ({thread.threadState})
5775
</a>
5876
</div>
5977
<div class="accordion-body hidden">

core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,25 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
9595
assert(md.getDependencies.length === 2)
9696
}
9797

98+
test("excludes works correctly") {
99+
val md = SparkSubmitUtils.getModuleDescriptor
100+
val excludes = Seq("a:b", "c:d")
101+
excludes.foreach { e =>
102+
md.addExcludeRule(SparkSubmitUtils.createExclusion(e + ":*", new IvySettings, "default"))
103+
}
104+
val rules = md.getAllExcludeRules
105+
assert(rules.length === 2)
106+
val rule1 = rules(0).getId.getModuleId
107+
assert(rule1.getOrganisation === "a")
108+
assert(rule1.getName === "b")
109+
val rule2 = rules(1).getId.getModuleId
110+
assert(rule2.getOrganisation === "c")
111+
assert(rule2.getName === "d")
112+
intercept[IllegalArgumentException] {
113+
SparkSubmitUtils.createExclusion("e:f:g:h", new IvySettings, "default")
114+
}
115+
}
116+
98117
test("ivy path works correctly") {
99118
val md = SparkSubmitUtils.getModuleDescriptor
100119
val artifacts = for (i <- 0 until 3) yield new MDArtifact(md, s"jar-$i", "jar", "jar")
@@ -168,4 +187,15 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
168187
assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact")
169188
}
170189
}
190+
191+
test("exclude dependencies end to end") {
192+
val main = new MavenCoordinate("my.great.lib", "mylib", "0.1")
193+
val dep = "my.great.dep:mydep:0.5"
194+
IvyTestUtils.withRepository(main, Some(dep), None) { repo =>
195+
val files = SparkSubmitUtils.resolveMavenCoordinates(main.toString,
196+
Some(repo), None, Seq("my.great.dep:mydep"), isTest = true)
197+
assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact")
198+
assert(files.indexOf("my.great.dep") < 0, "Returned excluded artifact")
199+
}
200+
}
171201
}

dev/run-tests.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ def get_hadoop_profiles(hadoop_version):
273273
"hadoop2.0": ["-Phadoop-1", "-Dhadoop.version=2.0.0-mr1-cdh4.1.1"],
274274
"hadoop2.2": ["-Pyarn", "-Phadoop-2.2"],
275275
"hadoop2.3": ["-Pyarn", "-Phadoop-2.3", "-Dhadoop.version=2.3.0"],
276+
"hadoop2.6": ["-Pyarn", "-Phadoop-2.6"],
276277
}
277278

278279
if hadoop_version in sbt_maven_hadoop_profiles:
@@ -289,7 +290,7 @@ def build_spark_maven(hadoop_version):
289290
mvn_goals = ["clean", "package", "-DskipTests"]
290291
profiles_and_goals = build_profiles + mvn_goals
291292

292-
print("[info] Building Spark (w/Hive 0.13.1) using Maven with these arguments: ",
293+
print("[info] Building Spark (w/Hive 1.2.1) using Maven with these arguments: ",
293294
" ".join(profiles_and_goals))
294295

295296
exec_maven(profiles_and_goals)
@@ -305,14 +306,14 @@ def build_spark_sbt(hadoop_version):
305306
"streaming-kinesis-asl-assembly/assembly"]
306307
profiles_and_goals = build_profiles + sbt_goals
307308

308-
print("[info] Building Spark (w/Hive 0.13.1) using SBT with these arguments: ",
309+
print("[info] Building Spark (w/Hive 1.2.1) using SBT with these arguments: ",
309310
" ".join(profiles_and_goals))
310311

311312
exec_sbt(profiles_and_goals)
312313

313314

314315
def build_apache_spark(build_tool, hadoop_version):
315-
"""Will build Spark against Hive v0.13.1 given the passed in build tool (either `sbt` or
316+
"""Will build Spark against Hive v1.2.1 given the passed in build tool (either `sbt` or
316317
`maven`). Defaults to using `sbt`."""
317318

318319
set_title_and_block("Building Spark", "BLOCK_BUILD")

docs/README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,16 @@ Read on to learn more about viewing documentation in plain text (i.e., markdown)
88
documentation yourself. Why build it yourself? So that you have the docs that corresponds to
99
whichever version of Spark you currently have checked out of revision control.
1010

11+
## Prerequisites
12+
The Spark documenation build uses a number of tools to build HTML docs and API docs in Scala, Python
13+
and R. To get started you can run the following commands
14+
15+
$ sudo gem install jekyll
16+
$ sudo gem install jekyll-redirect-from
17+
$ sudo pip install Pygments
18+
$ Rscript -e 'install.packages(c("knitr", "devtools"), repos="http://cran.stat.ucla.edu/")'
19+
20+
1121
## Generating the Documentation HTML
1222

1323
We include the Spark documentation as part of the source (as opposed to using a hosted wiki, such as

0 commit comments

Comments
 (0)