Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
91530b0
[SPARK-11327][MESOS] Backport dispatcher does not respect all args f…
jayv Apr 4, 2016
285cb9c
[SPARK-14368][PYSPARK] Support python.spark.worker.memory with upper-…
yongtang Apr 5, 2016
cfe9f02
[SPARK-14243][CORE][BACKPORT-1.6] update task metrics when removing b…
jeanlyn Apr 5, 2016
dca0d9a
[SPARK-14322][MLLIB] Use treeAggregate instead of reduce in OnlineLDA…
hhbyyh Apr 6, 2016
8a94a59
[DOCS][MINOR] Remove sentence about Mesos not supporting cluster mode.
Apr 8, 2016
77ebae3
[SPARK-14468] Always enable OutputCommitCoordinator
Apr 8, 2016
7a02c44
[SPARK-14357][CORE] Properly handle the root cause being a commit den…
jasonmoore2k Apr 10, 2016
baf2985
[SPARK-14290] [SPARK-13352] [CORE] [BACKPORT-1.6] avoid significant m…
liyezhang556520 Apr 11, 2016
c12db0d
[SPARK-14454] [1.6] Better exception handling while marking tasks as …
sameeragarwal Apr 11, 2016
12f640c
Merge branch 'branch-1.6' of github.com:apache/spark into csd-1.6
markhamstra Apr 11, 2016
f4110cd
[BUILD][HOTFIX] Download Maven from regular mirror network rather tha…
JoshRosen Apr 11, 2016
5bfd31d
Merge branch 'branch-1.6' of github.com:apache/spark into csd-1.6
markhamstra Apr 11, 2016
05dbc28
[SPARK-14298][ML][MLLIB] LDA should support disable checkpoint
yanboliang Apr 8, 2016
663a492
[SPARK-14242][CORE][NETWORK] avoid copy in compositeBuffer for frame …
liyezhang556520 Apr 1, 2016
2554c35
[SPARK-14563][ML] use a random table name instead of __THIS__ in SQLT…
mengxr Apr 12, 2016
582ed8a
[SPARK-14544] [SQL] improve performance of SQL UI tab
Apr 12, 2016
413d060
[SPARK-14363] Fix executor OOM due to memory leak in the Sorter
Apr 12, 2016
88aebb7
Merge branch 'branch-1.6' of github.com:apache/spark into csd-1.6
markhamstra Apr 13, 2016
93c9a63
[SPARK-14618][ML][DOC] Updated RegressionEvaluator.metricName param doc
jkbradley Apr 14, 2016
58dfba6
[SPARK-14665][ML][PYTHON] Fixed bug with StopWordsRemover default sto…
jkbradley Apr 15, 2016
5e8618b
Merge branch 'branch-1.6' of github.com:apache/spark into csd-1.6
markhamstra Apr 18, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ install_mvn() {
local MVN_VERSION="3.3.3"

install_app \
"http://archive.apache.org/dist/maven/maven-3/${MVN_VERSION}/binaries" \
"https://www.apache.org/dyn/closer.lua?action=download&filename=/maven/maven-3/${MVN_VERSION}/binaries" \
"apache-maven-${MVN_VERSION}-bin.tar.gz" \
"apache-maven-${MVN_VERSION}/bin/mvn"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,6 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
}
}

inMemSorter.reset();

if (!isLastFile) { // i.e. this is a spill file
// The current semantics of `shuffleRecordsWritten` seem to be that it's updated when records
// are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter
Expand Down Expand Up @@ -255,6 +253,10 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {

writeSortedFile(false);
final long spillSize = freeMemory();
inMemSorter.reset();
// Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
// records. Otherwise, if the task is over allocated memory, then without freeing the memory pages,
// we might not be able to get memory for the pointer array.
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
return spillSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,12 @@ public int compare(PackedRecordPointer left, PackedRecordPointer right) {
*/
private int pos = 0;

private int initialSize;

public ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize) {
this.consumer = consumer;
assert (initialSize > 0);
this.initialSize = initialSize;
this.array = consumer.allocateArray(initialSize);
this.sorter = new Sorter<>(ShuffleSortDataFormat.INSTANCE);
}
Expand All @@ -68,6 +71,10 @@ public int numRecords() {
}

public void reset() {
if (consumer != null) {
consumer.freeArray(array);
this.array = consumer.allocateArray(initialSize);
}
pos = 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,17 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
spillWriter.write(baseObject, baseOffset, recordLength, sortedRecords.getKeyPrefix());
}
spillWriter.close();

inMemSorter.reset();
}

final long spillSize = freeMemory();
// Note that this is more-or-less going to be a multiple of the page size, so wasted space in
// pages will currently be counted as memory spilled even though that space isn't actually
// written to disk. This also counts the space needed to store the sorter's pointer array.
inMemSorter.reset();
// Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
// records. Otherwise, if the task is over allocated memory, then without freeing the memory pages,
// we might not be able to get memory for the pointer array.

taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);

return spillSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public int compare(RecordPointerAndKeyPrefix r1, RecordPointerAndKeyPrefix r2) {
*/
private int pos = 0;

private long initialSize;

public UnsafeInMemorySorter(
final MemoryConsumer consumer,
final TaskMemoryManager memoryManager,
Expand All @@ -98,6 +100,7 @@ public UnsafeInMemorySorter(
LongArray array) {
this.consumer = consumer;
this.memoryManager = memoryManager;
this.initialSize = array.size();
this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE);
this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager);
this.array = array;
Expand All @@ -114,6 +117,10 @@ public void free() {
}

public void reset() {
if (consumer != null) {
consumer.freeArray(array);
this.array = consumer.allocateArray(initialSize);
}
pos = 0;
}

Expand Down
8 changes: 5 additions & 3 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,22 @@ pre {
line-height: 18px;
padding: 6px;
margin: 0;
word-break: break-word;
border-radius: 3px;
}

.stage-details {
max-height: 100px;
overflow-y: auto;
margin: 0;
display: block;
transition: max-height 0.25s ease-out, padding 0.25s ease-out;
}

.stage-details.collapsed {
max-height: 0;
padding-top: 0;
padding-bottom: 0;
border: none;
display: none;
}

.description-input {
Expand All @@ -143,14 +144,15 @@ pre {
max-height: 300px;
overflow-y: auto;
margin: 0;
display: block;
transition: max-height 0.25s ease-out, padding 0.25s ease-out;
}

.stacktrace-details.collapsed {
max-height: 0;
padding-top: 0;
padding-bottom: 0;
border: none;
display: none;
}

span.expand-additional-metrics, span.expand-dag-viz {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ private[spark] class Executor(
logInfo(s"Executor killed $taskName (TID $taskId)")
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))

case cDE: CommitDeniedException =>
case CausedBy(cDE: CommitDeniedException) =>
val reason = cDE.toTaskEndReason
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,8 @@ object SparkHadoopMapRedUtil extends Logging {
* the driver in order to determine whether this attempt can commit (please see SPARK-4879 for
* details).
*
* Output commit coordinator is only contacted when the following two configurations are both set
* to `true`:
*
* - `spark.speculation`
* - `spark.hadoop.outputCommitCoordination.enabled`
* Output commit coordinator is only used when `spark.hadoop.outputCommitCoordination.enabled`
* is set to true (which is the default).
*/
def commitTask(
committer: MapReduceOutputCommitter,
Expand All @@ -112,11 +109,10 @@ object SparkHadoopMapRedUtil extends Logging {
if (committer.needsTaskCommit(mrTaskContext)) {
val shouldCoordinateWithDriver: Boolean = {
val sparkConf = SparkEnv.get.conf
// We only need to coordinate with the driver if there are multiple concurrent task
// attempts, which should only occur if speculation is enabled
val speculationEnabled = sparkConf.getBoolean("spark.speculation", defaultValue = false)
// This (undocumented) setting is an escape-hatch in case the commit code introduces bugs
sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", speculationEnabled)
// We only need to coordinate with the driver if there are concurrent task attempts.
// Note that this could happen even when speculation is not enabled (e.g. see SPARK-8029).
// This (undocumented) setting is an escape-hatch in case the commit code introduces bugs.
sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", defaultValue = true)
}

if (shouldCoordinateWithDriver) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1117,9 +1117,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
recordsWritten += 1
}
} {
writer.close(hadoopContext)
}
}(finallyBlock = writer.close(hadoopContext))
committer.commitTask(hadoopContext)
bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
outputMetrics.setRecordsWritten(recordsWritten)
Expand Down Expand Up @@ -1203,9 +1201,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
recordsWritten += 1
}
} {
writer.close()
}
}(finallyBlock = writer.close())
writer.commit()
bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
outputMetrics.setRecordsWritten(recordsWritten)
Expand Down
14 changes: 10 additions & 4 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,16 @@ private[spark] abstract class Task[T](
}
try {
(runTask(context), context.collectAccumulators())
} catch { case e: Throwable =>
// Catch all errors; run task failure callbacks, and rethrow the exception.
context.markTaskFailed(e)
throw e
} catch {
case e: Throwable =>
// Catch all errors; run task failure callbacks, and rethrow the exception.
try {
context.markTaskFailed(e)
} catch {
case t: Throwable =>
e.addSuppressed(t)
}
throw e
} finally {
// Call the task completion callbacks.
context.markTaskCompleted()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,12 @@ private[spark] class MesosClusterScheduler(
"--driver-cores", desc.cores.toString,
"--driver-memory", s"${desc.mem}M")

val replicatedOptionsBlacklist = Set(
"spark.jars", // Avoids duplicate classes in classpath
"spark.submit.deployMode", // this would be set to `cluster`, but we need client
"spark.master" // this contains the address of the dispatcher, not master
)

// Assume empty main class means we're running python
if (!desc.command.mainClass.equals("")) {
options ++= Seq("--class", desc.command.mainClass)
Expand All @@ -439,9 +445,29 @@ private[spark] class MesosClusterScheduler(
.mkString(",")
options ++= Seq("--py-files", formattedFiles)
}
desc.schedulerProperties
.filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) }
.foreach { case (key, value) => options ++= Seq("--conf", s"$key=${shellEscape(value)}") }
options
}

/**
* Escape args for Unix-like shells, unless already quoted by the user.
* Based on: http://www.gnu.org/software/bash/manual/html_node/Double-Quotes.html
* and http://www.grymoire.com/Unix/Quote.html
* @param value argument
* @return escaped argument
*/
private[scheduler] def shellEscape(value: String): String = {
val WrappedInQuotes = """^(".+"|'.+')$""".r
val ShellSpecialChars = (""".*([ '<>&|\?\*;!#\\(\)"$`]).*""").r
value match {
case WrappedInQuotes(c) => value // The user quoted his args, don't touch it!
case ShellSpecialChars(c) => "\"" + value.replaceAll("""(["`\$\\])""", """\\$1""") + "\""
case _: String => value // Don't touch harmless strings
}
}

private class ResourceOffer(val offer: Offer, var cpu: Double, var mem: Double) {
override def toString(): String = {
s"Offer id: ${offer.getId.getValue}, cpu: $cpu, mem: $mem"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1120,10 +1120,15 @@ private[spark] class BlockManager(
"the disk, memory, or external block store")
}
blockInfo.remove(blockId)
val status = getCurrentBlockStatus(blockId, info)
if (tellMaster && info.tellMaster) {
val status = getCurrentBlockStatus(blockId, info)
reportBlockStatus(blockId, info, status)
}
Option(TaskContext.get()).foreach { tc =>
val metrics = tc.taskMetrics()
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ Seq((blockId, status)))
}
}
} else {
// The block has already been removed; do nothing.
Expand Down
36 changes: 36 additions & 0 deletions core/src/main/scala/org/apache/spark/util/CausedBy.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util

/**
* Extractor Object for pulling out the root cause of an error.
* If the error contains no cause, it will return the error itself.
*
* Usage:
* try {
* ...
* } catch {
* case CausedBy(ex: CommitDeniedException) => ...
* }
*/
private[spark] object CausedBy {

def unapply(e: Throwable): Option[Throwable] = {
Option(e.getCause).flatMap(cause => unapply(cause)).orElse(Some(e))
}
}
29 changes: 19 additions & 10 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1259,26 +1259,35 @@ private[spark] object Utils extends Logging {
}

/**
* Execute a block of code, call the failure callbacks before finally block if there is any
* exceptions happen. But if exceptions happen in the finally block, do not suppress the original
* exception.
* Execute a block of code and call the failure callbacks in the catch block. If exceptions occur
* in either the catch or the finally block, they are appended to the list of suppressed
* exceptions in original exception which is then rethrown.
*
* This is primarily an issue with `finally { out.close() }` blocks, where
* close needs to be called to clean up `out`, but if an exception happened
* in `out.write`, it's likely `out` may be corrupted and `out.close` will
* This is primarily an issue with `catch { abort() }` or `finally { out.close() }` blocks,
* where the abort/close needs to be called to clean up `out`, but if an exception happened
* in `out.write`, it's likely `out` may be corrupted and `abort` or `out.close` will
* fail as well. This would then suppress the original/likely more meaningful
* exception from the original `out.write` call.
*/
def tryWithSafeFinallyAndFailureCallbacks[T](block: => T)(finallyBlock: => Unit): T = {
def tryWithSafeFinallyAndFailureCallbacks[T](block: => T)
(catchBlock: => Unit = (), finallyBlock: => Unit = ()): T = {
var originalThrowable: Throwable = null
try {
block
} catch {
case t: Throwable =>
case cause: Throwable =>
// Purposefully not using NonFatal, because even fatal exceptions
// we don't want to have our finallyBlock suppress
originalThrowable = t
TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(t)
originalThrowable = cause
try {
logError("Aborting task", originalThrowable)
TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(originalThrowable)
catchBlock
} catch {
case t: Throwable =>
originalThrowable.addSuppressed(t)
logWarning(s"Suppressing exception in catch: " + t.getMessage, t)
}
throw originalThrowable
} finally {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class OutputCommitCoordinatorIntegrationSuite
super.beforeAll()
val conf = new SparkConf()
.set("master", "local[2,4]")
.set("spark.speculation", "true")
.set("spark.hadoop.outputCommitCoordination.enabled", "true")
.set("spark.hadoop.mapred.output.committer.class",
classOf[ThrowExceptionOnFirstAttemptOutputCommitter].getCanonicalName)
sc = new SparkContext("local[2, 4]", "test", conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
val conf = new SparkConf()
.setMaster("local[4]")
.setAppName(classOf[OutputCommitCoordinatorSuite].getSimpleName)
.set("spark.speculation", "true")
.set("spark.hadoop.outputCommitCoordination.enabled", "true")
sc = new SparkContext(conf) {
override private[spark] def createSparkEnv(
conf: SparkConf,
Expand Down
Loading