Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
91 commits
Select commit Hold shift + click to select a range
d08c20c
tasks know which stageAttempt they belong to
squito May 7, 2015
89e8428
reproduce the failure
squito May 7, 2015
70a787b
ignore fetch failure from attempts that are already failed. only a p…
squito May 7, 2015
7fbcefb
ignore the test for now just to avoid swamping jenkins
squito May 7, 2015
2eebbf2
style
squito May 7, 2015
7142242
more rigorous test case
squito May 12, 2015
ccaa159
index file needs to handle cases when data file already exist, and th…
squito May 12, 2015
3585b96
pare down the unit test
squito May 12, 2015
de23530
SparkIllegalStateException if we ever have multiple concurrent attemp…
squito May 12, 2015
c91ee10
better unit test
squito May 12, 2015
05c72fd
handle more cases from bad ordering of task attempt completion
squito May 12, 2015
5dc5436
Merge branch 'master' into SPARK-7308_fix
squito May 13, 2015
37eece8
cleanup imports
squito May 13, 2015
31c21fa
style
squito May 13, 2015
a894be1
include all missing mapIds in error msg
squito May 13, 2015
93592b1
update existing test since we now do more resubmitting than before
squito May 13, 2015
ea2d972
style
squito May 13, 2015
de0a596
Merge branch 'master' into SPARK-7308_fix
squito May 21, 2015
6654c53
fixes from merge
squito May 21, 2015
dd2839d
better fix from merge
squito May 21, 2015
e684928
shuffle map output writes to a different file per attempt (main compi…
squito May 29, 2015
2523431
tests compile
squito May 29, 2015
4d976f4
avoid NPE in finally block
squito Jun 2, 2015
2b723fd
use case class for result of mapOutputTracker.getServerStatus
squito Jun 2, 2015
fd40a93
fix tests
squito Jun 2, 2015
b5d8ec5
Merge branch 'master' into SPARK_8029_shuffleoutput_per_attempt
squito Jun 2, 2015
9f01d7e
style
squito Jun 2, 2015
fae9c0c
style
squito Jun 2, 2015
06daceb
make ContextCleanerSuite pass ... though maybe the test is pointless
squito Jun 2, 2015
cd16ee8
fix tests
squito Jun 2, 2015
2006de8
ExternalShuffleService use stageAttemptId in shuffle block id
squito Jun 2, 2015
e905f6d
better error msg (just for debugging tests)
squito Jun 2, 2015
86e651c
fix ExternalShuffleService tests for new block id format (need to add…
squito Jun 3, 2015
b16e7f2
Merge branch 'master' into SPARK_8029_shuffleoutput_per_attempt
squito Jun 3, 2015
fdcc92d
ignore mima false positive
squito Jun 3, 2015
66d5bf5
fix mima exclude
squito Jun 3, 2015
289576d
undo some dagscheduler related changes which will go in another pr
squito Jun 4, 2015
9a06fe2
removing some unnecessary changes
squito Jun 4, 2015
9befe51
fault tolerance test in its own suite, since its more of an integrati…
squito Jun 4, 2015
87d7ddd
remove more unnecessary changes
squito Jun 4, 2015
1072a44
ShuffleId --> ShuffleIdAndAttempt (and a case class)
squito Jun 4, 2015
89a93ae
remove more unnecessary stuff
squito Jun 4, 2015
ece31ba
style
squito Jun 4, 2015
de62da0
explanation of the broadcast failure handling
squito Jun 4, 2015
a7f2d9a
make sure we dont drop the driver block manager (prevents the broadca…
squito Jun 5, 2015
4bfbf94
oops, ignore test again for jenkins!
squito Jun 5, 2015
f9a1a31
comment tweak
squito Jun 5, 2015
9bdfdc1
Merge branch 'master' into SPARK_8029_shuffleoutput_per_attempt
squito Jun 5, 2015
b762e22
fix mima
squito Jun 5, 2015
8bbda62
more tests around multiple attempts of ShuffleMapTasks
squito Jun 5, 2015
ff1870a
style
squito Jun 5, 2015
e2daa05
test transferTo; remove accidental addition
squito Jun 5, 2015
54948a8
have both attempts write the shuffle data simultaneously
squito Jun 6, 2015
c231221
style
squito Jun 6, 2015
52eba21
comments, names, style
squito Jun 8, 2015
64ead29
Merge branch 'master' into SPARK_8029_shuffleoutput_per_attempt
squito Jun 8, 2015
3b4159b
use CopyOnWriteArraySet for stageAttempts per shuffleId, its a more a…
squito Jun 9, 2015
7284589
Merge branch 'master' into SPARK_8029_shuffleoutput_per_attempt
squito Jun 9, 2015
fd81700
Merge branch 'master' into SPARK_8029_shuffleoutput_per_attempt
squito Jun 18, 2015
659cb45
Merge branch 'master' into SPARK_8029_shuffleoutput_per_attempt
squito Jun 29, 2015
f1d5c1c
fix merge
squito Jun 29, 2015
2720425
Merge branch 'master' into SPARK_8029_shuffleoutput_per_attempt
squito Jun 30, 2015
bcdbf54
actually use UnsafeShuffle in UnsafeShuffleSuite -- update mergeSpill…
squito Jul 1, 2015
b996802
try to clarify getWriter / clearStageAttemptsForShuffle; style
squito Jul 1, 2015
c29fa57
rename & small tweaks to use of ShuffleIdAndAttempt
squito Jul 1, 2015
d56f8d8
numNonEmptyBlocks is only needed in constructor, doesnt need to be a …
squito Jul 1, 2015
55a9bb1
style
squito Jul 1, 2015
7b465a7
Merge branch 'master' into SPARK_8029_shuffleoutput_per_attempt
squito Jul 16, 2015
9d1189f
fix merge
squito Jul 16, 2015
c297c78
get rid of printlns
squito Jul 16, 2015
e3c8df6
Merge branch 'master' into SPARK_8029_shuffleoutput_per_attempt
squito Jul 22, 2015
78d9614
oops, fix merge
squito Jul 22, 2015
529aa95
more fix merge
squito Jul 22, 2015
23af915
no longer need ServerAttemptSize after other changes in master
squito Jul 22, 2015
c288ff9
style
squito Jul 22, 2015
657b135
Merge branch 'master' into SPARK_8029_shuffleoutput_per_attempt
squito Jul 22, 2015
f392acc
use interpolation for logging
squito Jul 23, 2015
9cd9c75
style, comments
squito Jul 23, 2015
90ee54a
Merge branch 'master' into SPARK_8029_shuffleoutput_per_attempt
squito Jul 23, 2015
26b6ea6
Merge branch 'master' into SPARK_8029_shuffleoutput_per_attempt
squito Aug 5, 2015
c60c6d4
Merge branch 'master' into SPARK_8029_shuffleoutput_per_attempt
squito Oct 7, 2015
5547611
fix merge
squito Oct 8, 2015
c7b3017
style
squito Oct 8, 2015
812aa0e
FetchFailed should include the stageAttemptId, so it has a full block id
squito Oct 8, 2015
26baad9
style
squito Oct 8, 2015
f37be91
mima
squito Oct 8, 2015
fac0f1c
comment formatting
squito Oct 8, 2015
a38d760
get rid of DAGSchedulerFailureRecoverySuite
squito Oct 9, 2015
37ac799
ExternalShuffleBlockResolver can handle blockIds w/out stageAttemptId
squito Oct 9, 2015
c9a9e08
Merge branch 'master' into SPARK_8029_shuffleoutput_per_attempt
squito Oct 9, 2015
fbd129b
Merge branch 'master' into SPARK_8029_shuffleoutput_per_attempt
squito Nov 9, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private final ShuffleWriteMetrics writeMetrics;
private final int shuffleId;
private final int mapId;
private final int stageAttemptId;
private final Serializer serializer;
private final IndexShuffleBlockResolver shuffleBlockResolver;

Expand All @@ -103,6 +104,7 @@ public BypassMergeSortShuffleWriter(
IndexShuffleBlockResolver shuffleBlockResolver,
BypassMergeSortShuffleHandle<K, V> handle,
int mapId,
int stageAttemptId,
TaskContext taskContext,
SparkConf conf) {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
Expand All @@ -111,6 +113,7 @@ public BypassMergeSortShuffleWriter(
this.blockManager = blockManager;
final ShuffleDependency<K, V, V> dep = handle.dependency();
this.mapId = mapId;
this.stageAttemptId = stageAttemptId;
this.shuffleId = dep.shuffleId();
this.partitioner = dep.partitioner();
this.numPartitions = partitioner.numPartitions();
Expand All @@ -125,8 +128,9 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
shuffleBlockResolver.writeIndexFile(shuffleId, mapId, stageAttemptId, partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), stageAttemptId,
partitionLengths);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
Expand Down Expand Up @@ -156,9 +160,10 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
}

partitionLengths =
writePartitionedFile(shuffleBlockResolver.getDataFile(shuffleId, mapId));
shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
writePartitionedFile(shuffleBlockResolver.getDataFile(shuffleId, mapId, stageAttemptId));
shuffleBlockResolver.writeIndexFile(shuffleId, mapId, stageAttemptId, partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), stageAttemptId,
partitionLengths);
}

@VisibleForTesting
Expand Down Expand Up @@ -231,7 +236,7 @@ public Option<MapStatus> stop(boolean success) {
partitionWriters = null;
}
}
shuffleBlockResolver.removeDataByMap(shuffleId, mapId);
shuffleBlockResolver.removeDataByMap(shuffleId, mapId, stageAttemptId);
return None$.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private final ShuffleWriteMetrics writeMetrics;
private final int shuffleId;
private final int mapId;
private final int stageAttemptId;
private final TaskContext taskContext;
private final SparkConf sparkConf;
private final boolean transferToEnabled;
Expand All @@ -89,6 +90,11 @@ private static final class MyByteArrayOutputStream extends ByteArrayOutputStream

private MyByteArrayOutputStream serBuffer;
private SerializationStream serOutputStream;
/**
* This is just to allow tests to explore more code paths, without requiring too much complexity
* in the test cases. In normal usage, it will be true.
*/
private final boolean allowSpillMove;

/**
* Are we in the process of stopping? Because map tasks can call stop() with success = true
Expand All @@ -103,6 +109,7 @@ public UnsafeShuffleWriter(
TaskMemoryManager memoryManager,
SerializedShuffleHandle<K, V> handle,
int mapId,
int stageAttemptId,
TaskContext taskContext,
SparkConf sparkConf) throws IOException {
final int numPartitions = handle.dependency().partitioner().numPartitions();
Expand All @@ -115,6 +122,7 @@ public UnsafeShuffleWriter(
this.shuffleBlockResolver = shuffleBlockResolver;
this.memoryManager = memoryManager;
this.mapId = mapId;
this.stageAttemptId = stageAttemptId;
final ShuffleDependency<K, V, V> dep = handle.dependency();
this.shuffleId = dep.shuffleId();
this.serializer = Serializer.getSerializer(dep.serializer()).newInstance();
Expand All @@ -123,6 +131,7 @@ public UnsafeShuffleWriter(
taskContext.taskMetrics().shuffleWriteMetrics_$eq(Option.apply(writeMetrics));
this.taskContext = taskContext;
this.sparkConf = sparkConf;
this.allowSpillMove = sparkConf.getBoolean("spark.shuffle.unsafe.testing.allowSpillMove", true);
this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true);
open();
}
Expand Down Expand Up @@ -215,8 +224,9 @@ void closeAndWriteOutput() throws IOException {
}
}
}
shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
shuffleBlockResolver.writeIndexFile(shuffleId, mapId, stageAttemptId, partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), stageAttemptId, partitionLengths);
}

@VisibleForTesting
Expand Down Expand Up @@ -249,7 +259,7 @@ void forceSorterToSpill() throws IOException {
* @return the partition lengths in the merged file.
*/
private long[] mergeSpills(SpillInfo[] spills) throws IOException {
final File outputFile = shuffleBlockResolver.getDataFile(shuffleId, mapId);
final File outputFile = shuffleBlockResolver.getDataFile(shuffleId, mapId, stageAttemptId);
final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true);
final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf);
final boolean fastMergeEnabled =
Expand All @@ -260,7 +270,7 @@ private long[] mergeSpills(SpillInfo[] spills) throws IOException {
if (spills.length == 0) {
new FileOutputStream(outputFile).close(); // Create an empty file
return new long[partitioner.numPartitions()];
} else if (spills.length == 1) {
} else if (spills.length == 1 && allowSpillMove) {
// Here, we don't need to perform any metrics updates because the bytes written to this
// output file would have already been counted as shuffle bytes written.
Files.move(spills[0].file, outputFile);
Expand Down Expand Up @@ -325,7 +335,7 @@ private long[] mergeSpillsWithFileStream(
SpillInfo[] spills,
File outputFile,
@Nullable CompressionCodec compressionCodec) throws IOException {
assert (spills.length >= 2);
assert (spills.length >= 2 || !allowSpillMove);
final int numPartitions = partitioner.numPartitions();
final long[] partitionLengths = new long[numPartitions];
final InputStream[] spillInputStreams = new FileInputStream[spills.length];
Expand Down Expand Up @@ -379,7 +389,7 @@ private long[] mergeSpillsWithFileStream(
* @return the partition lengths in the merged file.
*/
private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) throws IOException {
assert (spills.length >= 2);
assert (spills.length >= 2 || !allowSpillMove);
final int numPartitions = partitioner.numPartitions();
final long[] partitionLengths = new long[numPartitions];
final FileChannel[] spillInputChannels = new FileChannel[spills.length];
Expand Down Expand Up @@ -463,7 +473,7 @@ public Option<MapStatus> stop(boolean success) {
return Option.apply(mapStatus);
} else {
// The map task failed, so delete our output data.
shuffleBlockResolver.removeDataByMap(shuffleId, mapId);
shuffleBlockResolver.removeDataByMap(shuffleId, mapId, stageAttemptId);
return Option.apply(null);
}
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,8 @@ private[spark] object MapOutputTracker extends Logging {
} else {
for (part <- startPartition until endPartition) {
splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) +=
((ShuffleBlockId(shuffleId, mapId, part), status.getSizeForBlock(part)))
((ShuffleBlockId(shuffleId, mapId, part, status.stageAttemptId),
status.getSizeForBlock(part)))
}
}
}
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/TaskContextImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ private[spark] class TaskContextImpl(
@transient private val metricsSystem: MetricsSystem,
internalAccumulators: Seq[Accumulator[Long]],
val runningLocally: Boolean = false,
val stageAttemptId: Int = 0, // for testing
val taskMetrics: TaskMetrics = TaskMetrics.empty)
extends TaskContext
with Logging {
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,13 @@ case class FetchFailed(
shuffleId: Int,
mapId: Int,
reduceId: Int,
stageAttemptId: Int,
message: String)
extends TaskFailedReason {
override def toErrorString: String = {
val bmAddressString = if (bmAddress == null) "null" else bmAddress.toString
s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId, " +
s"message=\n$message\n)"
s"stageAttemptId=$stageAttemptId, message=\n$message\n)"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1227,7 +1227,7 @@ class DAGScheduler(
logInfo("Resubmitted " + task + ", so marking it as still running")
stage.pendingPartitions += task.partitionId

case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>
case FetchFailed(bmAddress, shuffleId, mapId, reduceId, stageAttemptId, failureMessage) =>
val failedStage = stageIdToStage(task.stageId)
val mapStage = shuffleToMapStage(shuffleId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
taskStatus += " STATUS=RESUBMITTED TID=" + taskInfo.taskId +
" STAGE_ID=" + taskEnd.stageId
stageLogInfo(taskEnd.stageId, taskStatus)
case FetchFailed(bmAddress, shuffleId, mapId, reduceId, message) =>
case FetchFailed(bmAddress, shuffleId, mapId, reduceId, stageAttemptId, message) =>
taskStatus += " STATUS=FETCHFAILED TID=" + taskInfo.taskId + " STAGE_ID=" +
taskEnd.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" +
mapId + " REDUCE_ID=" + reduceId
Expand Down
34 changes: 25 additions & 9 deletions core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ private[spark] sealed trait MapStatus {
/** Location where this task was run. */
def location: BlockManagerId

/** stage attempt for the ShuffleMapTask */
def stageAttemptId: Int
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MapStatus now needs to keep a stageAttemptId so the fetch side knows which stage attempt to read data from. This is a negligible increase to the size of the map status when we have just one stage attempt, since its just an extra int. But when there are multiple attempts, we now have one MapStatus per (location, stageAttempt). In pathalogical cases, this will result in many more MapStatus.

I needed to update the serialization of MapStatus, but I believe this should be covered by existing tests.


/**
* Estimated size for the reduce block, in bytes.
*
Expand All @@ -43,11 +46,11 @@ private[spark] sealed trait MapStatus {

private[spark] object MapStatus {

def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
def apply(loc: BlockManagerId, stageAttemptId: Int, uncompressedSizes: Array[Long]): MapStatus = {
if (uncompressedSizes.length > 2000) {
HighlyCompressedMapStatus(loc, uncompressedSizes)
HighlyCompressedMapStatus(loc, stageAttemptId, uncompressedSizes)
} else {
new CompressedMapStatus(loc, uncompressedSizes)
new CompressedMapStatus(loc, stageAttemptId, uncompressedSizes)
}
}

Expand Down Expand Up @@ -90,29 +93,34 @@ private[spark] object MapStatus {
*/
private[spark] class CompressedMapStatus(
private[this] var loc: BlockManagerId,
private[this] var _stageAttemptId: Int,
private[this] var compressedSizes: Array[Byte])
extends MapStatus with Externalizable {

protected def this() = this(null, null.asInstanceOf[Array[Byte]]) // For deserialization only
protected def this() = this(null, 0, null.asInstanceOf[Array[Byte]]) // For deserialization only

def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) {
this(loc, uncompressedSizes.map(MapStatus.compressSize))
def this(loc: BlockManagerId, stageAttemptId: Int, uncompressedSizes: Array[Long]) {
this(loc, stageAttemptId, uncompressedSizes.map(MapStatus.compressSize))
}

override def location: BlockManagerId = loc

override def stageAttemptId: Int = _stageAttemptId

override def getSizeForBlock(reduceId: Int): Long = {
MapStatus.decompressSize(compressedSizes(reduceId))
}

override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
loc.writeExternal(out)
out.writeInt(_stageAttemptId)
out.writeInt(compressedSizes.length)
out.write(compressedSizes)
}

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
_stageAttemptId = in.readInt()
val len = in.readInt()
compressedSizes = new Array[Byte](len)
in.readFully(compressedSizes)
Expand All @@ -131,6 +139,7 @@ private[spark] class CompressedMapStatus(
*/
private[spark] class HighlyCompressedMapStatus private (
private[this] var loc: BlockManagerId,
private[this] var _stageAttemptId: Int,
private[this] var numNonEmptyBlocks: Int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not your code, but it doesn't look like there is any reason for this to be a var.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is required for the deserialization code in readExternal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh sorry, I wasn't paying close enough attention, you are right

private[this] var emptyBlocks: BitSet,
private[this] var avgSize: Long)
Expand All @@ -140,10 +149,12 @@ private[spark] class HighlyCompressedMapStatus private (
require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0,
"Average size can only be zero for map stages that produced no output")

protected def this() = this(null, -1, null, -1) // For deserialization only
protected def this() = this(null, 0, -1, null, -1) // For deserialization only

override def location: BlockManagerId = loc

override def stageAttemptId: Int = _stageAttemptId

override def getSizeForBlock(reduceId: Int): Long = {
if (emptyBlocks.get(reduceId)) {
0
Expand All @@ -154,20 +165,25 @@ private[spark] class HighlyCompressedMapStatus private (

override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
loc.writeExternal(out)
out.writeInt(_stageAttemptId)
emptyBlocks.writeExternal(out)
out.writeLong(avgSize)
}

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
_stageAttemptId = in.readInt()
emptyBlocks = new BitSet
emptyBlocks.readExternal(in)
avgSize = in.readLong()
}
}

private[spark] object HighlyCompressedMapStatus {
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): HighlyCompressedMapStatus = {
def apply(
loc: BlockManagerId,
stageAttemptId: Int,
uncompressedSizes: Array[Long]): HighlyCompressedMapStatus = {
// We must keep track of which blocks are empty so that we don't report a zero-sized
// block as being non-empty (or vice-versa) when using the average block size.
var i = 0
Expand All @@ -193,6 +209,6 @@ private[spark] object HighlyCompressedMapStatus {
} else {
0
}
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize)
new HighlyCompressedMapStatus(loc, stageAttemptId, numNonEmptyBlocks, emptyBlocks, avgSize)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private[spark] class ShuffleMapTask(
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, stageAttemptId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
} catch {
Expand Down
15 changes: 8 additions & 7 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,14 @@ private[spark] abstract class Task[T](
metricsSystem: MetricsSystem)
: (T, AccumulatorUpdates) = {
context = new TaskContextImpl(
stageId,
partitionId,
taskAttemptId,
attemptNumber,
taskMemoryManager,
metricsSystem,
internalAccumulators,
stageId = stageId,
stageAttemptId = stageAttemptId,
partitionId = partitionId,
taskAttemptId = taskAttemptId,
attemptNumber = attemptNumber,
taskMemoryManager = taskMemoryManager,
metricsSystem = metricsSystem,
internalAccumulators = internalAccumulators,
runningLocally = false)
TaskContext.setTaskContext(context)
context.taskMetrics.setHostname(Utils.localHostName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,18 +615,22 @@ private[spark] class TaskSetManager(
val index = info.index
info.markSuccessful()
removeRunningTask(tid)
val task = tasks(index)
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
// Note: "result.value()" only deserializes the value when it's called at the first time, so
// here "result.value()" just returns the value and won't block other threads.
sched.dagScheduler.taskEnded(
tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics)
task, Success, result.value(), result.accumUpdates, info, result.metrics)
if (!successful(index)) {
tasksSuccessful += 1
logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format(
info.id, taskSet.id, info.taskId, info.duration, info.host, tasksSuccessful, numTasks))
// include the partition here b/c on a stage retry, the partition is *not* necessarily
// the same as info.id
logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID ${info.taskId}}, " +
s"partition ${task.partitionId}) in ${info.duration} ms on executor ${info.executorId} " +
s"(${info.host}) ($tasksSuccessful/$numTasks)")
// Mark successful and stop if all the tasks have succeeded.
successful(index) = true
if (tasksSuccessful == numTasks) {
Expand Down
Loading