diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index ee82d679935c0..8242bb91811db 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -83,6 +83,7 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { private final ShuffleWriteMetrics writeMetrics; private final int shuffleId; private final int mapId; + private final int stageAttemptId; private final Serializer serializer; private final IndexShuffleBlockResolver shuffleBlockResolver; @@ -103,6 +104,7 @@ public BypassMergeSortShuffleWriter( IndexShuffleBlockResolver shuffleBlockResolver, BypassMergeSortShuffleHandle handle, int mapId, + int stageAttemptId, TaskContext taskContext, SparkConf conf) { // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided @@ -111,6 +113,7 @@ public BypassMergeSortShuffleWriter( this.blockManager = blockManager; final ShuffleDependency dep = handle.dependency(); this.mapId = mapId; + this.stageAttemptId = stageAttemptId; this.shuffleId = dep.shuffleId(); this.partitioner = dep.partitioner(); this.numPartitions = partitioner.numPartitions(); @@ -125,8 +128,9 @@ public void write(Iterator> records) throws IOException { assert (partitionWriters == null); if (!records.hasNext()) { partitionLengths = new long[numPartitions]; - shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths); - mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); + shuffleBlockResolver.writeIndexFile(shuffleId, mapId, stageAttemptId, partitionLengths); + mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), stageAttemptId, + partitionLengths); return; } final SerializerInstance serInstance = serializer.newInstance(); @@ -156,9 +160,10 @@ public void write(Iterator> records) throws IOException { } partitionLengths = - writePartitionedFile(shuffleBlockResolver.getDataFile(shuffleId, mapId)); - shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths); - mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); + writePartitionedFile(shuffleBlockResolver.getDataFile(shuffleId, mapId, stageAttemptId)); + shuffleBlockResolver.writeIndexFile(shuffleId, mapId, stageAttemptId, partitionLengths); + mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), stageAttemptId, + partitionLengths); } @VisibleForTesting @@ -231,7 +236,7 @@ public Option stop(boolean success) { partitionWriters = null; } } - shuffleBlockResolver.removeDataByMap(shuffleId, mapId); + shuffleBlockResolver.removeDataByMap(shuffleId, mapId, stageAttemptId); return None$.empty(); } } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 6a0a89e81c321..f7fd7a416a10b 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -73,6 +73,7 @@ public class UnsafeShuffleWriter extends ShuffleWriter { private final ShuffleWriteMetrics writeMetrics; private final int shuffleId; private final int mapId; + private final int stageAttemptId; private final TaskContext taskContext; private final SparkConf sparkConf; private final boolean transferToEnabled; @@ -89,6 +90,11 @@ private static final class MyByteArrayOutputStream extends ByteArrayOutputStream private MyByteArrayOutputStream serBuffer; private SerializationStream serOutputStream; + /** + * This is just to allow tests to explore more code paths, without requiring too much complexity + * in the test cases. In normal usage, it will be true. + */ + private final boolean allowSpillMove; /** * Are we in the process of stopping? Because map tasks can call stop() with success = true @@ -103,6 +109,7 @@ public UnsafeShuffleWriter( TaskMemoryManager memoryManager, SerializedShuffleHandle handle, int mapId, + int stageAttemptId, TaskContext taskContext, SparkConf sparkConf) throws IOException { final int numPartitions = handle.dependency().partitioner().numPartitions(); @@ -115,6 +122,7 @@ public UnsafeShuffleWriter( this.shuffleBlockResolver = shuffleBlockResolver; this.memoryManager = memoryManager; this.mapId = mapId; + this.stageAttemptId = stageAttemptId; final ShuffleDependency dep = handle.dependency(); this.shuffleId = dep.shuffleId(); this.serializer = Serializer.getSerializer(dep.serializer()).newInstance(); @@ -123,6 +131,7 @@ public UnsafeShuffleWriter( taskContext.taskMetrics().shuffleWriteMetrics_$eq(Option.apply(writeMetrics)); this.taskContext = taskContext; this.sparkConf = sparkConf; + this.allowSpillMove = sparkConf.getBoolean("spark.shuffle.unsafe.testing.allowSpillMove", true); this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true); open(); } @@ -215,8 +224,9 @@ void closeAndWriteOutput() throws IOException { } } } - shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths); - mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); + shuffleBlockResolver.writeIndexFile(shuffleId, mapId, stageAttemptId, partitionLengths); + mapStatus = MapStatus$.MODULE$.apply( + blockManager.shuffleServerId(), stageAttemptId, partitionLengths); } @VisibleForTesting @@ -249,7 +259,7 @@ void forceSorterToSpill() throws IOException { * @return the partition lengths in the merged file. */ private long[] mergeSpills(SpillInfo[] spills) throws IOException { - final File outputFile = shuffleBlockResolver.getDataFile(shuffleId, mapId); + final File outputFile = shuffleBlockResolver.getDataFile(shuffleId, mapId, stageAttemptId); final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true); final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf); final boolean fastMergeEnabled = @@ -260,7 +270,7 @@ private long[] mergeSpills(SpillInfo[] spills) throws IOException { if (spills.length == 0) { new FileOutputStream(outputFile).close(); // Create an empty file return new long[partitioner.numPartitions()]; - } else if (spills.length == 1) { + } else if (spills.length == 1 && allowSpillMove) { // Here, we don't need to perform any metrics updates because the bytes written to this // output file would have already been counted as shuffle bytes written. Files.move(spills[0].file, outputFile); @@ -325,7 +335,7 @@ private long[] mergeSpillsWithFileStream( SpillInfo[] spills, File outputFile, @Nullable CompressionCodec compressionCodec) throws IOException { - assert (spills.length >= 2); + assert (spills.length >= 2 || !allowSpillMove); final int numPartitions = partitioner.numPartitions(); final long[] partitionLengths = new long[numPartitions]; final InputStream[] spillInputStreams = new FileInputStream[spills.length]; @@ -379,7 +389,7 @@ private long[] mergeSpillsWithFileStream( * @return the partition lengths in the merged file. */ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) throws IOException { - assert (spills.length >= 2); + assert (spills.length >= 2 || !allowSpillMove); final int numPartitions = partitioner.numPartitions(); final long[] partitionLengths = new long[numPartitions]; final FileChannel[] spillInputChannels = new FileChannel[spills.length]; @@ -463,7 +473,7 @@ public Option stop(boolean success) { return Option.apply(mapStatus); } else { // The map task failed, so delete our output data. - shuffleBlockResolver.removeDataByMap(shuffleId, mapId); + shuffleBlockResolver.removeDataByMap(shuffleId, mapId, stageAttemptId); return Option.apply(null); } } diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 72355cdfa68b3..c4549f705fbbf 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -543,7 +543,8 @@ private[spark] object MapOutputTracker extends Logging { } else { for (part <- startPartition until endPartition) { splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) += - ((ShuffleBlockId(shuffleId, mapId, part), status.getSizeForBlock(part))) + ((ShuffleBlockId(shuffleId, mapId, part, status.stageAttemptId), + status.getSizeForBlock(part))) } } } diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index f0ae83a9341bd..93a7f085bd84d 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -34,6 +34,7 @@ private[spark] class TaskContextImpl( @transient private val metricsSystem: MetricsSystem, internalAccumulators: Seq[Accumulator[Long]], val runningLocally: Boolean = false, + val stageAttemptId: Int = 0, // for testing val taskMetrics: TaskMetrics = TaskMetrics.empty) extends TaskContext with Logging { diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 13241b77bf97b..842b22c3286d1 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -84,12 +84,13 @@ case class FetchFailed( shuffleId: Int, mapId: Int, reduceId: Int, + stageAttemptId: Int, message: String) extends TaskFailedReason { override def toErrorString: String = { val bmAddressString = if (bmAddress == null) "null" else bmAddress.toString s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId, " + - s"message=\n$message\n)" + s"stageAttemptId=$stageAttemptId, message=\n$message\n)" } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 4a9518fff4e7b..e8300cd6dd339 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1227,7 +1227,7 @@ class DAGScheduler( logInfo("Resubmitted " + task + ", so marking it as still running") stage.pendingPartitions += task.partitionId - case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) => + case FetchFailed(bmAddress, shuffleId, mapId, reduceId, stageAttemptId, failureMessage) => val failedStage = stageIdToStage(task.stageId) val mapStage = shuffleToMapStage(shuffleId) diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index f96eb8ca0ae00..2f0af776df759 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -223,7 +223,7 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener taskStatus += " STATUS=RESUBMITTED TID=" + taskInfo.taskId + " STAGE_ID=" + taskEnd.stageId stageLogInfo(taskEnd.stageId, taskStatus) - case FetchFailed(bmAddress, shuffleId, mapId, reduceId, message) => + case FetchFailed(bmAddress, shuffleId, mapId, reduceId, stageAttemptId, message) => taskStatus += " STATUS=FETCHFAILED TID=" + taskInfo.taskId + " STAGE_ID=" + taskEnd.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" + mapId + " REDUCE_ID=" + reduceId diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index 180c8d1827e13..30323d3615eca 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -31,6 +31,9 @@ private[spark] sealed trait MapStatus { /** Location where this task was run. */ def location: BlockManagerId + /** stage attempt for the ShuffleMapTask */ + def stageAttemptId: Int + /** * Estimated size for the reduce block, in bytes. * @@ -43,11 +46,11 @@ private[spark] sealed trait MapStatus { private[spark] object MapStatus { - def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = { + def apply(loc: BlockManagerId, stageAttemptId: Int, uncompressedSizes: Array[Long]): MapStatus = { if (uncompressedSizes.length > 2000) { - HighlyCompressedMapStatus(loc, uncompressedSizes) + HighlyCompressedMapStatus(loc, stageAttemptId, uncompressedSizes) } else { - new CompressedMapStatus(loc, uncompressedSizes) + new CompressedMapStatus(loc, stageAttemptId, uncompressedSizes) } } @@ -90,29 +93,34 @@ private[spark] object MapStatus { */ private[spark] class CompressedMapStatus( private[this] var loc: BlockManagerId, + private[this] var _stageAttemptId: Int, private[this] var compressedSizes: Array[Byte]) extends MapStatus with Externalizable { - protected def this() = this(null, null.asInstanceOf[Array[Byte]]) // For deserialization only + protected def this() = this(null, 0, null.asInstanceOf[Array[Byte]]) // For deserialization only - def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) { - this(loc, uncompressedSizes.map(MapStatus.compressSize)) + def this(loc: BlockManagerId, stageAttemptId: Int, uncompressedSizes: Array[Long]) { + this(loc, stageAttemptId, uncompressedSizes.map(MapStatus.compressSize)) } override def location: BlockManagerId = loc + override def stageAttemptId: Int = _stageAttemptId + override def getSizeForBlock(reduceId: Int): Long = { MapStatus.decompressSize(compressedSizes(reduceId)) } override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { loc.writeExternal(out) + out.writeInt(_stageAttemptId) out.writeInt(compressedSizes.length) out.write(compressedSizes) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { loc = BlockManagerId(in) + _stageAttemptId = in.readInt() val len = in.readInt() compressedSizes = new Array[Byte](len) in.readFully(compressedSizes) @@ -131,6 +139,7 @@ private[spark] class CompressedMapStatus( */ private[spark] class HighlyCompressedMapStatus private ( private[this] var loc: BlockManagerId, + private[this] var _stageAttemptId: Int, private[this] var numNonEmptyBlocks: Int, private[this] var emptyBlocks: BitSet, private[this] var avgSize: Long) @@ -140,10 +149,12 @@ private[spark] class HighlyCompressedMapStatus private ( require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0, "Average size can only be zero for map stages that produced no output") - protected def this() = this(null, -1, null, -1) // For deserialization only + protected def this() = this(null, 0, -1, null, -1) // For deserialization only override def location: BlockManagerId = loc + override def stageAttemptId: Int = _stageAttemptId + override def getSizeForBlock(reduceId: Int): Long = { if (emptyBlocks.get(reduceId)) { 0 @@ -154,12 +165,14 @@ private[spark] class HighlyCompressedMapStatus private ( override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { loc.writeExternal(out) + out.writeInt(_stageAttemptId) emptyBlocks.writeExternal(out) out.writeLong(avgSize) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { loc = BlockManagerId(in) + _stageAttemptId = in.readInt() emptyBlocks = new BitSet emptyBlocks.readExternal(in) avgSize = in.readLong() @@ -167,7 +180,10 @@ private[spark] class HighlyCompressedMapStatus private ( } private[spark] object HighlyCompressedMapStatus { - def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): HighlyCompressedMapStatus = { + def apply( + loc: BlockManagerId, + stageAttemptId: Int, + uncompressedSizes: Array[Long]): HighlyCompressedMapStatus = { // We must keep track of which blocks are empty so that we don't report a zero-sized // block as being non-empty (or vice-versa) when using the average block size. var i = 0 @@ -193,6 +209,6 @@ private[spark] object HighlyCompressedMapStatus { } else { 0 } - new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize) + new HighlyCompressedMapStatus(loc, stageAttemptId, numNonEmptyBlocks, emptyBlocks, avgSize) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index ea97ef0e746d8..20276ef0d862c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -69,7 +69,7 @@ private[spark] class ShuffleMapTask( var writer: ShuffleWriter[Any, Any] = null try { val manager = SparkEnv.get.shuffleManager - writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) + writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, stageAttemptId, context) writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 4fb32ba8cb188..41f54375b07ed 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -69,13 +69,14 @@ private[spark] abstract class Task[T]( metricsSystem: MetricsSystem) : (T, AccumulatorUpdates) = { context = new TaskContextImpl( - stageId, - partitionId, - taskAttemptId, - attemptNumber, - taskMemoryManager, - metricsSystem, - internalAccumulators, + stageId = stageId, + stageAttemptId = stageAttemptId, + partitionId = partitionId, + taskAttemptId = taskAttemptId, + attemptNumber = attemptNumber, + taskMemoryManager = taskMemoryManager, + metricsSystem = metricsSystem, + internalAccumulators = internalAccumulators, runningLocally = false) TaskContext.setTaskContext(context) context.taskMetrics.setHostname(Utils.localHostName()) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 114468c48c44c..54da4ae5ee3c8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -615,6 +615,7 @@ private[spark] class TaskSetManager( val index = info.index info.markSuccessful() removeRunningTask(tid) + val task = tasks(index) // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not // "deserialize" the value when holding a lock to avoid blocking other threads. So we call @@ -622,11 +623,14 @@ private[spark] class TaskSetManager( // Note: "result.value()" only deserializes the value when it's called at the first time, so // here "result.value()" just returns the value and won't block other threads. sched.dagScheduler.taskEnded( - tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics) + task, Success, result.value(), result.accumUpdates, info, result.metrics) if (!successful(index)) { tasksSuccessful += 1 - logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format( - info.id, taskSet.id, info.taskId, info.duration, info.host, tasksSuccessful, numTasks)) + // include the partition here b/c on a stage retry, the partition is *not* necessarily + // the same as info.id + logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID ${info.taskId}}, " + + s"partition ${task.partitionId}) in ${info.duration} ms on executor ${info.executorId} " + + s"(${info.host}) ($tasksSuccessful/$numTasks)") // Mark successful and stop if all the tasks have succeeded. successful(index) = true if (tasksSuccessful == numTasks) { diff --git a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala index be184464e0ae9..46b2395c181d4 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala @@ -32,6 +32,7 @@ private[spark] class FetchFailedException( shuffleId: Int, mapId: Int, reduceId: Int, + stageAttemptId: Int, message: String, cause: Throwable = null) extends Exception(message, cause) { @@ -41,12 +42,13 @@ private[spark] class FetchFailedException( shuffleId: Int, mapId: Int, reduceId: Int, + stageAttemptId: Int, cause: Throwable) { - this(bmAddress, shuffleId, mapId, reduceId, cause.getMessage, cause) + this(bmAddress, shuffleId, mapId, reduceId, stageAttemptId, cause.getMessage, cause) } def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId, - Utils.exceptionString(this)) + stageAttemptId, Utils.exceptionString(this)) } /** @@ -56,4 +58,4 @@ private[spark] class MetadataFetchFailedException( shuffleId: Int, reduceId: Int, message: String) - extends FetchFailedException(null, shuffleId, -1, reduceId, message) + extends FetchFailedException(null, shuffleId, -1, reduceId, -1, message) diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala index cd253a78c2b19..3f1ba872fea71 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala @@ -17,10 +17,13 @@ package org.apache.spark.shuffle +import java.io.File import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.JavaConverters._ +import com.google.common.annotations.VisibleForTesting + import org.apache.spark.{Logging, SparkConf, SparkEnv} import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} @@ -63,7 +66,7 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf) val completedMapTasks = new ConcurrentLinkedQueue[Int]() } - private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState] + private val shuffleStates = new TimeStampedHashMap[ShuffleIdAndAttempt, ShuffleState] private val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup, conf) @@ -72,17 +75,22 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf) * Get a ShuffleWriterGroup for the given map task, which will register it as complete * when the writers are closed successfully */ - def forMapTask(shuffleId: Int, mapId: Int, numReducers: Int, serializer: Serializer, + def forMapTask( + shuffleAndAttempt: ShuffleIdAndAttempt, + mapId: Int, + numReducers: Int, + serializer: Serializer, writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = { new ShuffleWriterGroup { - shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numReducers)) - private val shuffleState = shuffleStates(shuffleId) + shuffleStates.putIfAbsent(shuffleAndAttempt, new ShuffleState(numReducers)) + private val shuffleState = shuffleStates(shuffleAndAttempt) val openStartTime = System.nanoTime val serializerInstance = serializer.newInstance() val writers: Array[DiskBlockObjectWriter] = { Array.tabulate[DiskBlockObjectWriter](numReducers) { bucketId => - val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) + val blockId = ShuffleBlockId(shuffleAndAttempt.shuffleId, mapId, bucketId, + shuffleAndAttempt.stageAttemptId) val blockFile = blockManager.diskBlockManager.getFile(blockId) // Because of previous failures, the shuffle file may already exist on this machine. // If so, remove it. @@ -113,29 +121,35 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf) } /** Remove all the blocks / files and metadata related to a particular shuffle. */ - def removeShuffle(shuffleId: ShuffleId): Boolean = { + def removeShuffle(shuffleAndAttempt: ShuffleIdAndAttempt): Boolean = { // Do not change the ordering of this, if shuffleStates should be removed only // after the corresponding shuffle blocks have been removed - val cleaned = removeShuffleBlocks(shuffleId) - shuffleStates.remove(shuffleId) + val cleaned = removeShuffleBlocks(shuffleAndAttempt) + shuffleStates.remove(shuffleAndAttempt) cleaned } + @VisibleForTesting + private[shuffle] def getShuffleFiles(blockId: ShuffleBlockId): Seq[File] = { + Seq(blockManager.diskBlockManager.getFile(blockId)) + } + /** Remove all the blocks / files related to a particular shuffle. */ - private def removeShuffleBlocks(shuffleId: ShuffleId): Boolean = { - shuffleStates.get(shuffleId) match { + private def removeShuffleBlocks(shuffleAndAttempt: ShuffleIdAndAttempt): Boolean = { + shuffleStates.get(shuffleAndAttempt) match { case Some(state) => for (mapId <- state.completedMapTasks.asScala; reduceId <- 0 until state.numReducers) { - val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId) + val blockId = new ShuffleBlockId(shuffleAndAttempt.shuffleId, mapId, reduceId, + shuffleAndAttempt.stageAttemptId) val file = blockManager.diskBlockManager.getFile(blockId) if (!file.delete()) { logWarning(s"Error deleting ${file.getPath()}") } } - logInfo("Deleted all files for shuffle " + shuffleId) + logInfo("Deleted all files for shuffle " + shuffleAndAttempt) true case None => - logInfo("Could not find files for shuffle " + shuffleId + " for deleting") + logInfo("Could not find files for shuffle " + shuffleAndAttempt + " for deleting") false } } diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 5e4c2b5d0a5c4..550b300517fcd 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -19,16 +19,16 @@ package org.apache.spark.shuffle import java.io._ +import com.google.common.annotations.VisibleForTesting import com.google.common.io.ByteStreams import org.apache.spark.{SparkConf, SparkEnv, Logging} import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.netty.SparkTransportConf +import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID import org.apache.spark.storage._ import org.apache.spark.util.Utils -import IndexShuffleBlockResolver.NOOP_REDUCE_ID - /** * Create and maintain the shuffle blocks' mapping between logic block and physical file location. * Data of shuffle blocks from the same map task are stored in a single consolidated data file. @@ -47,26 +47,29 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB private val transportConf = SparkTransportConf.fromSparkConf(conf) - def getDataFile(shuffleId: Int, mapId: Int): File = { - blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID)) + def getDataFile(shuffleId: Int, mapId: Int, stageAttemptId: Int): File = { + blockManager.diskBlockManager.getFile( + ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID, stageAttemptId)) } - private def getIndexFile(shuffleId: Int, mapId: Int): File = { - blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID)) + @VisibleForTesting + private[shuffle] def getIndexFile(shuffleId: Int, mapId: Int, stageAttemptId: Int): File = { + blockManager.diskBlockManager.getFile( + ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID, stageAttemptId)) } /** * Remove data file and index file that contain the output data from one map. * */ - def removeDataByMap(shuffleId: Int, mapId: Int): Unit = { - var file = getDataFile(shuffleId, mapId) + def removeDataByMap(shuffleId: Int, mapId: Int, stageAttemptId: Int): Unit = { + var file = getDataFile(shuffleId, mapId, stageAttemptId) if (file.exists()) { if (!file.delete()) { logWarning(s"Error deleting data ${file.getPath()}") } } - file = getIndexFile(shuffleId, mapId) + file = getIndexFile(shuffleId, mapId, stageAttemptId) if (file.exists()) { if (!file.delete()) { logWarning(s"Error deleting index ${file.getPath()}") @@ -79,8 +82,12 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB * end of the output file. This will be used by getBlockData to figure out where each block * begins and ends. * */ - def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]): Unit = { - val indexFile = getIndexFile(shuffleId, mapId) + def writeIndexFile( + shuffleId: Int, + mapId: Int, + stageAttemptId: Int, + lengths: Array[Long]): Unit = { + val indexFile = getIndexFile(shuffleId, mapId, stageAttemptId) val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile))) Utils.tryWithSafeFinally { // We take in lengths of each block, need to convert it to offsets. @@ -98,7 +105,7 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { // The block is actually going to be a range of a single map output file for this map, so // find out the consolidated file, then the offset within that from our index - val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) + val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId, blockId.stageAttemptId) val in = new DataInputStream(new FileInputStream(indexFile)) try { @@ -107,7 +114,7 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB val nextOffset = in.readLong() new FileSegmentManagedBuffer( transportConf, - getDataFile(blockId.shuffleId, blockId.mapId), + getDataFile(blockId.shuffleId, blockId.mapId, blockId.stageAttemptId), offset, nextOffset - offset) } finally { diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala index 4342b0d598b16..51c30abbbf1ef 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala @@ -17,7 +17,6 @@ package org.apache.spark.shuffle -import java.nio.ByteBuffer import org.apache.spark.network.buffer.ManagedBuffer import org.apache.spark.storage.ShuffleBlockId @@ -29,8 +28,6 @@ private[spark] * implementations when shuffle data is retrieved. */ trait ShuffleBlockResolver { - type ShuffleId = Int - /** * Retrieve the data for the specified block. If the data for that block is not available, * throws an unspecified exception. @@ -39,3 +36,5 @@ trait ShuffleBlockResolver { def stop(): Unit } + +private[spark] case class ShuffleIdAndAttempt(shuffleId: Int, stageAttemptId: Int) diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala index 978366d1a1d1b..9eac6f255f273 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala @@ -17,7 +17,14 @@ package org.apache.spark.shuffle -import org.apache.spark.{TaskContext, ShuffleDependency} +import java.io.File +import java.util.concurrent.{CopyOnWriteArraySet, ConcurrentHashMap} + +import scala.collection.JavaConverters._ + +import com.google.common.annotations.VisibleForTesting + +import org.apache.spark.{ShuffleDependency, TaskContext} /** * Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the driver @@ -36,8 +43,16 @@ private[spark] trait ShuffleManager { numMaps: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle - /** Get a writer for a given partition. Called on executors by map tasks. */ - def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V] + /** + * Get a writer for a given partition. Called on executors by map tasks. + * Implementations should call [[addShuffleAttempt]] to update internal state, so we can track + * all attempts for each shuffle. + */ + def getWriter[K, V]( + handle: ShuffleHandle, + mapId: Int, + stageAttemptId: Int, + context: TaskContext): ShuffleWriter[K, V] /** * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). @@ -49,6 +64,21 @@ private[spark] trait ShuffleManager { endPartition: Int, context: TaskContext): ShuffleReader[K, C] + + /** + * Get all the files associated with the given shuffle. + * + * This method exists just so that general shuffle tests can make sure shuffle files are cleaned + * up correctly. + */ + @VisibleForTesting + private[shuffle] def getShuffleFiles( + handle: ShuffleHandle, + mapId: Int, + reduceId: Int, + stageAttemptId: Int): Seq[File] + + /** * Remove a shuffle's metadata from the ShuffleManager. * @return true if the metadata removed successfully, otherwise false. @@ -62,4 +92,29 @@ private[spark] trait ShuffleManager { /** Shut down this ShuffleManager. */ def stop(): Unit + + private[this] val shuffleToAttempts = new ConcurrentHashMap[Int, CopyOnWriteArraySet[Int]]() + + /** + * Register a stage attempt for the given shuffle, so we can clean up all attempts when + * the shuffle is unregistered + */ + protected def addShuffleAttempt(shuffleId: Int, stageAttemptId: Int): Unit = { + shuffleToAttempts.putIfAbsent(shuffleId, new CopyOnWriteArraySet[Int]()) + shuffleToAttempts.get(shuffleId).add(stageAttemptId) + } + + /** + * Clear internal state which tracks attempts for each shuffle, and return the set of attempts + * so implementations can perform extra cleanup on each attempt (eg., delete shuffle files) + */ + @VisibleForTesting + private[shuffle] def clearStageAttemptsForShuffle(shuffleId: Int): Iterable[Int] = { + val attempts = shuffleToAttempts.remove(shuffleId) + if (attempts == null) { + Iterable[Int]() + } else { + attempts.asScala + } + } } diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala index d2e2fc4c110a7..319d473a301f4 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala @@ -17,8 +17,11 @@ package org.apache.spark.shuffle.hash +import java.io.File + import org.apache.spark._ import org.apache.spark.shuffle._ +import org.apache.spark.storage.ShuffleBlockId /** * A ShuffleManager using hashing, that creates one output file per reduce partition on each @@ -56,15 +59,30 @@ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager } /** Get a writer for a given partition. Called on executors by map tasks. */ - override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext) - : ShuffleWriter[K, V] = { - new HashShuffleWriter( - shuffleBlockResolver, handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context) + override def getWriter[K, V]( + handle: ShuffleHandle, + mapId: Int, + stageAttemptId: Int, + context: TaskContext): ShuffleWriter[K, V] = { + addShuffleAttempt(handle.shuffleId, stageAttemptId) + new HashShuffleWriter(shuffleBlockResolver, handle.asInstanceOf[BaseShuffleHandle[K, V, _]], + mapId, stageAttemptId, context) } /** Remove a shuffle's metadata from the ShuffleManager. */ override def unregisterShuffle(shuffleId: Int): Boolean = { - shuffleBlockResolver.removeShuffle(shuffleId) + clearStageAttemptsForShuffle(shuffleId).forall { stageAttemptId => + shuffleBlockResolver.removeShuffle(ShuffleIdAndAttempt(shuffleId, stageAttemptId)) + } + } + + private[shuffle] override def getShuffleFiles( + handle: ShuffleHandle, + mapId: Int, + reduceId: Int, + stageAttemptId: Int): Seq[File] = { + val blockId = ShuffleBlockId(handle.shuffleId, mapId, reduceId, stageAttemptId) + fileShuffleBlockResolver.getShuffleFiles(blockId) } override def shuffleBlockResolver: FileShuffleBlockResolver = { diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala index 41df70c602c30..54de961b33b2b 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala @@ -28,6 +28,7 @@ private[spark] class HashShuffleWriter[K, V]( shuffleBlockResolver: FileShuffleBlockResolver, handle: BaseShuffleHandle[K, V, _], mapId: Int, + stageAttemptId: Int, context: TaskContext) extends ShuffleWriter[K, V] with Logging { @@ -45,8 +46,9 @@ private[spark] class HashShuffleWriter[K, V]( private val blockManager = SparkEnv.get.blockManager private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null)) - private val shuffle = shuffleBlockResolver.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser, - writeMetrics) + private val shuffleAndAttempt = ShuffleIdAndAttempt(dep.shuffleId, stageAttemptId) + private val shuffle = shuffleBlockResolver.forMapTask(shuffleAndAttempt, mapId, numOutputSplits, + ser, writeMetrics) /** Write a bunch of records to this task's output */ override def write(records: Iterator[Product2[K, V]]): Unit = { @@ -106,7 +108,7 @@ private[spark] class HashShuffleWriter[K, V]( writer.commitAndClose() writer.fileSegment().length } - MapStatus(blockManager.shuffleServerId, sizes) + MapStatus(blockManager.shuffleServerId, stageAttemptId, sizes) } private def revertWrites(): Unit = { diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index 66b6bbc61fe8e..2fb0e21b3955f 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -17,6 +17,7 @@ package org.apache.spark.shuffle.sort +import java.io.File import java.util.concurrent.ConcurrentHashMap import org.apache.spark._ @@ -123,9 +124,13 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager override def getWriter[K, V]( handle: ShuffleHandle, mapId: Int, + stageAttemptId: Int, context: TaskContext): ShuffleWriter[K, V] = { + val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, V, _]] + val shuffleId = baseShuffleHandle.shuffleId + addShuffleAttempt(shuffleId, stageAttemptId) numMapsForShuffle.putIfAbsent( - handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps) + handle.shuffleId, baseShuffleHandle.numMaps) val env = SparkEnv.get handle match { case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] => @@ -135,6 +140,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager context.taskMemoryManager(), unsafeShuffleHandle, mapId, + stageAttemptId, context, env.conf) case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] => @@ -143,18 +149,22 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], bypassMergeSortHandle, mapId, + stageAttemptId, context, env.conf) case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] => - new SortShuffleWriter(shuffleBlockResolver, other, mapId, context) + new SortShuffleWriter(shuffleBlockResolver, other, mapId, stageAttemptId, context) } } /** Remove a shuffle's metadata from the ShuffleManager. */ override def unregisterShuffle(shuffleId: Int): Boolean = { Option(numMapsForShuffle.remove(shuffleId)).foreach { numMaps => + val attempts = clearStageAttemptsForShuffle(shuffleId) (0 until numMaps).foreach { mapId => - shuffleBlockResolver.removeDataByMap(shuffleId, mapId) + attempts.foreach { stageAttemptId => + shuffleBlockResolver.removeDataByMap(shuffleId, mapId, stageAttemptId) + } } } true @@ -164,8 +174,18 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager override def stop(): Unit = { shuffleBlockResolver.stop() } -} + private[shuffle] override def getShuffleFiles( + handle: ShuffleHandle, + mapId: Int, + reduceId: Int, + stageAttemptId: Int): Seq[File] = { + Seq( + shuffleBlockResolver.getDataFile(handle.shuffleId, mapId, stageAttemptId), + shuffleBlockResolver.getIndexFile(handle.shuffleId, mapId, stageAttemptId) + ) + } +} private[spark] object SortShuffleManager extends Logging { diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 808317b017a0f..538f5ba189389 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -28,6 +28,7 @@ private[spark] class SortShuffleWriter[K, V, C]( shuffleBlockResolver: IndexShuffleBlockResolver, handle: BaseShuffleHandle[K, V, C], mapId: Int, + stageAttemptId: Int, context: TaskContext) extends ShuffleWriter[K, V] with Logging { @@ -65,12 +66,13 @@ private[spark] class SortShuffleWriter[K, V, C]( // Don't bother including the time to open the merged output file in the shuffle write time, // because it just opens a single file, so is typically too fast to measure accurately // (see SPARK-3570). - val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) - val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) + val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId, stageAttemptId) + val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID, + stageAttemptId) val partitionLengths = sorter.writePartitionedFile(blockId, outputFile) - shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths) + shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, stageAttemptId, partitionLengths) - mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) + mapStatus = MapStatus(blockManager.shuffleServerId, stageAttemptId, partitionLengths) } /** Close this writer, passing along whether the map completed */ @@ -84,7 +86,7 @@ private[spark] class SortShuffleWriter[K, V, C]( return Option(mapStatus) } else { // The map task failed, so delete our output data. - shuffleBlockResolver.removeDataByMap(dep.shuffleId, mapId) + shuffleBlockResolver.removeDataByMap(dep.shuffleId, mapId, stageAttemptId) return None } } finally { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index 524f6970992a5..201847853e4b2 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -56,18 +56,27 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId { // Format of the shuffle block ids (including data and index) should be kept in sync with // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData(). @DeveloperApi -case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId { - override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId +case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int, stageAttemptId: Int) + extends BlockId { + override def name: String = { + s"shuffle_${shuffleId}_${mapId}_${reduceId}_$stageAttemptId" + } } @DeveloperApi -case class ShuffleDataBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId { - override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data" +case class ShuffleDataBlockId(shuffleId: Int, mapId: Int, stageAttemptId: Int, reduceId: Int) + extends BlockId { + override def name: String = { + s"shuffle_${shuffleId}_${mapId}_${reduceId}_$stageAttemptId.data" + } } @DeveloperApi -case class ShuffleIndexBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId { - override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".index" +case class ShuffleIndexBlockId(shuffleId: Int, mapId: Int, stageAttemptId: Int, reduceId: Int) + extends BlockId { + override def name: String = { + s"shuffle_${shuffleId}_${mapId}_${reduceId}_$stageAttemptId.index" + } } @DeveloperApi @@ -103,9 +112,9 @@ private[spark] case class TestBlockId(id: String) extends BlockId { @DeveloperApi object BlockId { val RDD = "rdd_([0-9]+)_([0-9]+)".r - val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r - val SHUFFLE_DATA = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).data".r - val SHUFFLE_INDEX = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).index".r + val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)_([0-9]+)".r + val SHUFFLE_DATA = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)_([0-9]+).data".r + val SHUFFLE_INDEX = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)_([0-9]+).index".r val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r val TASKRESULT = "taskresult_([0-9]+)".r val STREAM = "input-([0-9]+)-([0-9]+)".r @@ -115,12 +124,12 @@ object BlockId { def apply(id: String): BlockId = id match { case RDD(rddId, splitIndex) => RDDBlockId(rddId.toInt, splitIndex.toInt) - case SHUFFLE(shuffleId, mapId, reduceId) => - ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) - case SHUFFLE_DATA(shuffleId, mapId, reduceId) => - ShuffleDataBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) - case SHUFFLE_INDEX(shuffleId, mapId, reduceId) => - ShuffleIndexBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) + case SHUFFLE(shuffleId, mapId, reduceId, stageAttemptId) => + ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt, stageAttemptId.toInt) + case SHUFFLE_DATA(shuffleId, mapId, reduceId, stageAttemptId) => + ShuffleDataBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt, stageAttemptId.toInt) + case SHUFFLE_INDEX(shuffleId, mapId, reduceId, stageAttemptId) => + ShuffleIndexBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt, stageAttemptId.toInt) case BROADCAST(broadcastId, field) => BroadcastBlockId(broadcastId.toLong, field.stripPrefix("_")) case TASKRESULT(taskId) => diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 0d0448feb5b06..a29f94fb2ef0f 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -319,8 +319,9 @@ final class ShuffleBlockFetcherIterator( private def throwFetchFailedException(blockId: BlockId, address: BlockManagerId, e: Throwable) = { blockId match { - case ShuffleBlockId(shufId, mapId, reduceId) => - throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, e) + case ShuffleBlockId(shufId, mapId, reduceId, stageAttemptId) => + throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, stageAttemptId, + e) case _ => throw new SparkException( "Failed to get block " + blockId + ", which is not a shuffle block", e) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index c9beeb25e05af..ed19ce261d68f 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -354,6 +354,7 @@ private[spark] object JsonProtocol { ("Shuffle ID" -> fetchFailed.shuffleId) ~ ("Map ID" -> fetchFailed.mapId) ~ ("Reduce ID" -> fetchFailed.reduceId) ~ + ("Stage Attempt ID" -> fetchFailed.stageAttemptId) ~ ("Message" -> fetchFailed.message) case exceptionFailure: ExceptionFailure => val stackTrace = stackTraceToJson(exceptionFailure.stackTrace) @@ -795,8 +796,10 @@ private[spark] object JsonProtocol { val shuffleId = (json \ "Shuffle ID").extract[Int] val mapId = (json \ "Map ID").extract[Int] val reduceId = (json \ "Reduce ID").extract[Int] + val stageAttemptId = Utils.jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]) + .getOrElse(0) val message = Utils.jsonOption(json \ "Message").map(_.extract[String]) - new FetchFailed(blockManagerAddress, shuffleId, mapId, reduceId, + new FetchFailed(blockManagerAddress, shuffleId, mapId, reduceId, stageAttemptId, message.getOrElse("Unknown reason")) case `exceptionFailure` => val className = (json \ "Class Name").extract[String] diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 0e0eca515afc1..b97dbef904e89 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -164,14 +164,15 @@ public OutputStream answer(InvocationOnMock invocation) throws Throwable { } ); - when(shuffleBlockResolver.getDataFile(anyInt(), anyInt())).thenReturn(mergedOutputFile); + when(shuffleBlockResolver.getDataFile(anyInt(), anyInt(), anyInt())) + .thenReturn(mergedOutputFile); doAnswer(new Answer() { @Override public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - partitionSizesInMergedFile = (long[]) invocationOnMock.getArguments()[2]; + partitionSizesInMergedFile = (long[]) invocationOnMock.getArguments()[3]; return null; } - }).when(shuffleBlockResolver).writeIndexFile(anyInt(), anyInt(), any(long[].class)); + }).when(shuffleBlockResolver).writeIndexFile(anyInt(), anyInt(), anyInt(), any(long[].class)); when(diskBlockManager.createTempShuffleBlock()).thenAnswer( new Answer>() { @@ -201,6 +202,7 @@ private UnsafeShuffleWriter createWriter( taskMemoryManager, new SerializedShuffleHandle(0, 1, shuffleDep), 0, // map id + 0, // stage attempt id taskContext, conf ); @@ -501,6 +503,7 @@ public void testPeakMemoryUsed() throws Exception { taskMemoryManager, new SerializedShuffleHandle<>(0, 1, shuffleDep), 0, // map id + 0, // stageAttemptId taskContext, conf); diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index 0c14bef7befd8..cf88281397e85 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -603,8 +603,8 @@ class CleanerTester( private def getShuffleBlocks(shuffleId: Int): Seq[BlockId] = { blockManager.master.getMatchingBlockIds( _ match { - case ShuffleBlockId(`shuffleId`, _, _) => true - case ShuffleIndexBlockId(`shuffleId`, _, _) => true + case ShuffleBlockId(`shuffleId`, _, _, _) => true + case ShuffleIndexBlockId(`shuffleId`, _, _, _) => true case _ => false }, askSlaves = true) } diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index 231f4631e0a47..4efd107753df8 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -23,6 +23,7 @@ import org.apache.spark.network.TransportContext import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.server.TransportServer import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleClient} +import org.apache.spark.shuffle.ShuffleSuite /** * This suite creates an external shuffle server and routes all shuffle fetches through it. diff --git a/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala b/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala index 19180e88ebe0a..700f6f7bc8888 100644 --- a/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark import org.scalatest.BeforeAndAfterAll +import org.apache.spark.shuffle.ShuffleSuite + class HashShuffleSuite extends ShuffleSuite with BeforeAndAfterAll { // This test suite should run all tests in ShuffleSuite with hash-based shuffle. diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 7e70308bb360c..13205a73049cd 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -53,15 +53,16 @@ class MapOutputTrackerSuite extends SparkFunSuite { assert(tracker.containsShuffle(10)) val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L)) - tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000), + tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000), 0, Array(1000L, 10000L))) - tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000), + tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000), 0, Array(10000L, 1000L))) val statuses = tracker.getMapSizesByExecutorId(10, 0) assert(statuses.toSet === - Seq((BlockManagerId("a", "hostA", 1000), ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000))), - (BlockManagerId("b", "hostB", 1000), ArrayBuffer((ShuffleBlockId(10, 1, 0), size10000)))) - .toSet) + Seq( + (BlockManagerId("a", "hostA", 1000), ArrayBuffer((ShuffleBlockId(10, 0, 0, 0), size1000))), + (BlockManagerId("b", "hostB", 1000), ArrayBuffer((ShuffleBlockId(10, 1, 0, 0), size10000))) + ).toSet) tracker.stop() rpcEnv.shutdown() } @@ -74,9 +75,9 @@ class MapOutputTrackerSuite extends SparkFunSuite { tracker.registerShuffle(10, 2) val compressedSize1000 = MapStatus.compressSize(1000L) val compressedSize10000 = MapStatus.compressSize(10000L) - tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000), + tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000), 0, Array(compressedSize1000, compressedSize10000))) - tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000), + tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000), 0, Array(compressedSize10000, compressedSize1000))) assert(tracker.containsShuffle(10)) assert(tracker.getMapSizesByExecutorId(10, 0).nonEmpty) @@ -96,9 +97,9 @@ class MapOutputTrackerSuite extends SparkFunSuite { tracker.registerShuffle(10, 2) val compressedSize1000 = MapStatus.compressSize(1000L) val compressedSize10000 = MapStatus.compressSize(10000L) - tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000), + tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000), 0, Array(compressedSize1000, compressedSize1000, compressedSize1000))) - tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000), + tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000), 0, Array(compressedSize10000, compressedSize1000, compressedSize1000))) // As if we had two simultaneous fetch failures @@ -134,11 +135,12 @@ class MapOutputTrackerSuite extends SparkFunSuite { val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) masterTracker.registerMapOutput(10, 0, MapStatus( - BlockManagerId("a", "hostA", 1000), Array(1000L))) + BlockManagerId("a", "hostA", 1000), 0, Array(1000L))) masterTracker.incrementEpoch() slaveTracker.updateEpoch(masterTracker.getEpoch) assert(slaveTracker.getMapSizesByExecutorId(10, 0) === - Seq((BlockManagerId("a", "hostA", 1000), ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000))))) + Seq((BlockManagerId("a", "hostA", 1000), + ArrayBuffer((ShuffleBlockId(10, 0, 0, 0), size1000))))) masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000)) masterTracker.incrementEpoch() @@ -167,7 +169,7 @@ class MapOutputTrackerSuite extends SparkFunSuite { // Frame size should be ~123B, and no exception should be thrown masterTracker.registerShuffle(10, 1) masterTracker.registerMapOutput(10, 0, MapStatus( - BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0))) + BlockManagerId("88", "mph", 1000), 0, Array.fill[Long](10)(0))) val senderAddress = RpcAddress("localhost", 12345) val rpcCallContext = mock(classOf[RpcCallContext]) when(rpcCallContext.senderAddress).thenReturn(senderAddress) @@ -195,7 +197,7 @@ class MapOutputTrackerSuite extends SparkFunSuite { masterTracker.registerShuffle(20, 100) (0 until 100).foreach { i => masterTracker.registerMapOutput(20, i, new CompressedMapStatus( - BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0))) + BlockManagerId("999", "mps", 1000), 0, Array.fill[Long](4000000)(0))) } val senderAddress = RpcAddress("localhost", 12345) val rpcCallContext = mock(classOf[RpcCallContext]) @@ -218,11 +220,11 @@ class MapOutputTrackerSuite extends SparkFunSuite { // on hostA with output size 2 // on hostB with output size 3 tracker.registerShuffle(10, 3) - tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000), + tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000), 0, Array(2L))) - tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("a", "hostA", 1000), + tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("a", "hostA", 1000), 0, Array(2L))) - tracker.registerMapOutput(10, 2, MapStatus(BlockManagerId("b", "hostB", 1000), + tracker.registerMapOutput(10, 2, MapStatus(BlockManagerId("b", "hostB", 1000), 0, Array(3L))) // When the threshold is 50%, only host A should be returned as a preferred location diff --git a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala index d78c99c2e1e06..58ab6bcdb1b74 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala @@ -19,6 +19,8 @@ package org.apache.spark import org.scalatest.BeforeAndAfterAll +import org.apache.spark.shuffle.ShuffleSuite + class ShuffleNettySuite extends ShuffleSuite with BeforeAndAfterAll { // This test suite should run all tests in ShuffleSuite with Netty shuffle mode. diff --git a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala index b8ab227517cc4..d72477aebeebe 100644 --- a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala @@ -26,6 +26,7 @@ import org.apache.commons.io.filefilter.TrueFileFilter import org.scalatest.BeforeAndAfterAll import org.apache.spark.rdd.ShuffledRDD +import org.apache.spark.shuffle.ShuffleSuite import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.util.Utils diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala index 3940527fb874e..c3345d120d69f 100644 --- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala +++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala @@ -100,7 +100,7 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi */ private def testConnection(conf0: SparkConf, conf1: SparkConf): Try[Unit] = { val blockManager = mock[BlockDataManager] - val blockId = ShuffleBlockId(0, 1, 2) + val blockId = ShuffleBlockId(0, 1, 2, 0) val blockString = "Hello, world!" val blockBuffer = new NioManagedBuffer(ByteBuffer.wrap(blockString.getBytes)) when(blockManager.getBlockData(blockId)).thenReturn(blockBuffer) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 3816b8c4a09aa..8205adbe53b10 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -505,7 +505,7 @@ class DAGSchedulerSuite // the 2nd ResultTask failed complete(taskSets(1), Seq( (Success, 42), - (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null))) + (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, 0, "ignored"), null))) // this will get called // blockManagerMaster.removeExecutor("exec-hostA") // ask the scheduler to try it again @@ -583,7 +583,8 @@ class DAGSchedulerSuite val stageAttempt = taskSets.last checkStageId(stageId, attemptIdx, stageAttempt) complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map { case (task, idx) => - (FetchFailed(makeBlockManagerId("hostA"), shuffleDep.shuffleId, 0, idx, "ignored"), null) + (FetchFailed(makeBlockManagerId("hostA"), shuffleDep.shuffleId, 0, idx, attemptIdx, + "ignored"), null) }.toSeq) } @@ -810,7 +811,7 @@ class DAGSchedulerSuite // The first result task fails, with a fetch failure for the output from the first mapper. runEvent(CompletionEvent( taskSets(1).tasks(0), - FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), + FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, 0, "ignored"), null, Map[Long, Any](), createFakeTaskInfo(), @@ -821,7 +822,7 @@ class DAGSchedulerSuite // The second ResultTask fails, with a fetch failure for the output from the second mapper. runEvent(CompletionEvent( taskSets(1).tasks(0), - FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1, "ignored"), + FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1, 0, "ignored"), null, Map[Long, Any](), createFakeTaskInfo(), @@ -863,7 +864,7 @@ class DAGSchedulerSuite // The first result task fails, with a fetch failure for the output from the first mapper. runEvent(CompletionEvent( taskSets(1).tasks(0), - FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), + FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, 0, "ignored"), null, Map[Long, Any](), createFakeTaskInfo(), @@ -881,7 +882,7 @@ class DAGSchedulerSuite // The second ResultTask fails, with a fetch failure for the output from the second mapper. runEvent(CompletionEvent( taskSets(1).tasks(1), - FetchFailed(makeBlockManagerId("hostB"), shuffleId, 1, 1, "ignored"), + FetchFailed(makeBlockManagerId("hostB"), shuffleId, 1, 1, 0, "ignored"), null, Map[Long, Any](), createFakeTaskInfo(), @@ -933,7 +934,7 @@ class DAGSchedulerSuite // The first result task fails, with a fetch failure for the output from the first mapper. runEvent(CompletionEvent( taskSets(1).tasks(0), - FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), + FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, 0, "ignored"), null, Map[Long, Any](), createFakeTaskInfo(), @@ -952,7 +953,7 @@ class DAGSchedulerSuite // A late FetchFailed arrives from the second task in the original reduce stage. runEvent(CompletionEvent( taskSets(1).tasks(1), - FetchFailed(makeBlockManagerId("hostB"), shuffleId, 1, 1, "ignored"), + FetchFailed(makeBlockManagerId("hostB"), shuffleId, 1, 1, 0, "ignored"), null, Map[Long, Any](), createFakeTaskInfo(), @@ -1080,7 +1081,7 @@ class DAGSchedulerSuite runEvent(ExecutorLost("exec-hostA")) runEvent(CompletionEvent( taskSets(1).tasks(0), - FetchFailed(null, firstShuffleId, 2, 0, "Fetch failed"), + FetchFailed(null, firstShuffleId, 2, 0, 0, "Fetch failed"), null, null, createFakeTaskInfo(), @@ -1315,7 +1316,8 @@ class DAGSchedulerSuite (Success, makeMapStatus("hostC", 1)))) // fail the third stage because hostA went down complete(taskSets(2), Seq( - (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null))) + (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, 0, "ignored"), + null))) // TODO assert this: // blockManagerMaster.removeExecutor("exec-hostA") // have DAGScheduler try again @@ -1346,7 +1348,8 @@ class DAGSchedulerSuite (Success, makeMapStatus("hostB", 1)))) // pretend stage 2 failed because hostA went down complete(taskSets(2), Seq( - (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null))) + (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, 0, "ignored"), + null))) // TODO assert this: // blockManagerMaster.removeExecutor("exec-hostA") // DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun. @@ -1600,7 +1603,7 @@ class DAGSchedulerSuite submit(reduceRdd, Array(0, 1)) complete(taskSets(1), Seq( (Success, 42), - (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null))) + (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, 0, "ignored"), null))) // Ask the scheduler to try it again; TaskSet 2 will rerun the map task that we couldn't fetch // from, then TaskSet 3 will run the reduce stage scheduler.resubmitFailedStages() @@ -1659,7 +1662,7 @@ class DAGSchedulerSuite assert(taskSets(1).stageId === 1) complete(taskSets(1), Seq( (Success, makeMapStatus("hostA", rdd2.partitions.length)), - (FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0, "ignored"), null))) + (FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0, 0, "ignored"), null))) scheduler.resubmitFailedStages() assert(listener2.results.size === 0) // Second stage listener should not have a result yet @@ -1685,7 +1688,7 @@ class DAGSchedulerSuite assert(taskSets(4).stageId === 2) complete(taskSets(4), Seq( (Success, 52), - (FetchFailed(makeBlockManagerId("hostD"), dep2.shuffleId, 0, 0, "ignored"), null))) + (FetchFailed(makeBlockManagerId("hostD"), dep2.shuffleId, 0, 0, 0, "ignored"), null))) scheduler.resubmitFailedStages() // TaskSet 5 will rerun stage 1's lost task, then TaskSet 6 will rerun stage 2 @@ -1763,7 +1766,7 @@ class DAGSchedulerSuite } private def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2): MapStatus = - MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes)) + MapStatus(makeBlockManagerId(host), 0, Array.fill[Long](reduces)(sizes)) private def makeBlockManagerId(host: String): BlockManagerId = BlockManagerId("exec-" + host, host, 12345) diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala index b8e466fab4506..11a3b67c508b9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala @@ -54,7 +54,7 @@ class MapStatusSuite extends SparkFunSuite { stddev <- Seq(0.0, 0.01, 0.5, 1.0) ) { val sizes = Array.fill[Long](numSizes)(abs(round(Random.nextGaussian() * stddev)) + mean) - val status = MapStatus(BlockManagerId("a", "b", 10), sizes) + val status = MapStatus(BlockManagerId("a", "b", 10), 0, sizes) val status1 = compressAndDecompressMapStatus(status) for (i <- 0 until numSizes) { if (sizes(i) != 0) { @@ -68,8 +68,9 @@ class MapStatusSuite extends SparkFunSuite { test("large tasks should use " + classOf[HighlyCompressedMapStatus].getName) { val sizes = Array.fill[Long](2001)(150L) - val status = MapStatus(null, sizes) + val status = MapStatus(null, 1, sizes) assert(status.isInstanceOf[HighlyCompressedMapStatus]) + assert(status.stageAttemptId === 1) assert(status.getSizeForBlock(10) === 150L) assert(status.getSizeForBlock(50) === 150L) assert(status.getSizeForBlock(99) === 150L) @@ -80,10 +81,11 @@ class MapStatusSuite extends SparkFunSuite { val sizes = Array.tabulate[Long](3000) { i => i.toLong } val avg = sizes.sum / sizes.filter(_ != 0).length val loc = BlockManagerId("a", "b", 10) - val status = MapStatus(loc, sizes) + val status = MapStatus(loc, 1, sizes) val status1 = compressAndDecompressMapStatus(status) assert(status1.isInstanceOf[HighlyCompressedMapStatus]) assert(status1.location == loc) + assert(status1.stageAttemptId === 1) for (i <- 0 until 3000) { val estimate = status1.getSizeForBlock(i) if (sizes(i) > 0) { diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index afe2e80358ca0..0c553c2749ce6 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -326,7 +326,8 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { val denseBlockSizes = new Array[Long](5000) val sparseBlockSizes = Array[Long](0L, 1L, 0L, 2L) Seq(denseBlockSizes, sparseBlockSizes).foreach { blockSizes => - ser.serialize(HighlyCompressedMapStatus(BlockManagerId("exec-1", "host", 1234), blockSizes)) + ser.serialize( + HighlyCompressedMapStatus(BlockManagerId("exec-1", "host", 1234), 0, blockSizes)) } } diff --git a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala index 26a372d6a905d..3eec55b7a61f9 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala @@ -103,7 +103,7 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext // Setup the blockManager mock so the buffer gets returned when the shuffle code tries to // fetch shuffle data. - val shuffleBlockId = ShuffleBlockId(shuffleId, mapId, reduceId) + val shuffleBlockId = ShuffleBlockId(shuffleId, mapId, reduceId, 0) when(blockManager.getBlockData(shuffleBlockId)).thenReturn(managedBuffer) when(blockManager.wrapForCompression(meq(shuffleBlockId), isA(classOf[InputStream]))) .thenAnswer(dummyCompressionFunction) @@ -118,7 +118,7 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext // Test a scenario where all data is local, to avoid creating a bunch of additional mocks // for the code to read data over the network. val shuffleBlockIdsAndSizes = (0 until numMaps).map { mapId => - val shuffleBlockId = ShuffleBlockId(shuffleId, mapId, reduceId) + val shuffleBlockId = ShuffleBlockId(shuffleId, mapId, reduceId, 0) (shuffleBlockId, byteOutputStream.size().toLong) } Seq((localBlockManagerId, shuffleBlockIdsAndSizes)) diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleSuite.scala similarity index 71% rename from core/src/test/scala/org/apache/spark/ShuffleSuite.scala rename to core/src/test/scala/org/apache/spark/shuffle/ShuffleSuite.scala index 4a0877d86f2c6..65fb72652236c 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleSuite.scala @@ -15,15 +15,20 @@ * limitations under the License. */ -package org.apache.spark +package org.apache.spark.shuffle + +import java.util.concurrent._ import org.scalatest.Matchers -import org.apache.spark.ShuffleSuite.NonJavaSerializableClass +import org.apache.spark._ +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD, SubtractedRDD} -import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} +import org.apache.spark.scheduler.{MyRDD, SparkListener, SparkListenerTaskEnd} import org.apache.spark.serializer.KryoSerializer -import org.apache.spark.storage.{ShuffleDataBlockId, ShuffleBlockId} +import org.apache.spark.shuffle.ShuffleSuite.NonJavaSerializableClass +import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId} import org.apache.spark.util.MutablePair abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkContext { @@ -270,8 +275,8 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC rdd.count() // Delete one of the local shuffle blocks. - val hashFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0, 0)) - val sortFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleDataBlockId(0, 0, 0)) + val hashFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0, 0, 0)) + val sortFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleDataBlockId(0, 0, 0, 0)) assert(hashFile.exists() || sortFile.exists()) if (hashFile.exists()) { @@ -317,6 +322,119 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC assert(metrics.bytesWritten === metrics.byresRead) assert(metrics.bytesWritten > 0) } + + def multipleAttemptConfs: Seq[(String, SparkConf)] = Seq("basic" -> conf) + + multipleAttemptConfs.foreach { case (name, multipleAttemptConf) => + test("multiple attempts for one task: conf = " + name) { + sc = new SparkContext("local", "test", multipleAttemptConf) + val mapTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + val manager = sc.env.shuffleManager + val taskMemoryManager = new TaskMemoryManager(sc.env.memoryManager, 0) + val metricsSystem = sc.env.metricsSystem + val shuffleMapRdd = new MyRDD(sc, 1, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1)) + val shuffleHandle = manager.registerShuffle(0, 1, shuffleDep) + + // first attempt -- its successful + val writer1 = manager.getWriter[Int, Int](shuffleHandle, 0, 0, + new TaskContextImpl(0, 0, 0L, 0, taskMemoryManager, metricsSystem, + InternalAccumulator.create(sc), false, stageAttemptId = 0, taskMetrics = new TaskMetrics)) + val data1 = (1 to 10).map { x => x -> x} + + // second attempt -- also successful. We'll write out different data, + // just to simulate the fact that the records may get written differently + // depending on what gets spilled, what gets combined, etc. + val writer2 = manager.getWriter[Int, Int](shuffleHandle, 0, 1, + new TaskContextImpl(0, 0, 1L, 0, taskMemoryManager, metricsSystem, + InternalAccumulator.create(sc), false, stageAttemptId = 1, taskMetrics = new TaskMetrics)) + val data2 = (11 to 20).map { x => x -> x} + + // interleave writes of both attempts -- we want to test that both attempts can occur + // simultaneously, and everything is still OK + val interleaver = new InterleaveIterators( + data1, {iter: Iterator[(Int, Int)] => writer1.write(iter); writer1.stop(true)}, + data2, {iter: Iterator[(Int, Int)] => writer2.write(iter); writer2.stop(true)}) + val (mapOutput1, mapOutput2) = interleaver.run() + + // register the output from attempt 1, and try to read it + mapOutput1.foreach { mapStatus => mapTrackerMaster.registerMapOutputs(0, Array(mapStatus))} + val reader1 = manager.getReader[Int, Int](shuffleHandle, 0, 1, + new TaskContextImpl(1, 0, 2L, 0, taskMemoryManager, metricsSystem, + InternalAccumulator.create(sc), false, taskMetrics = new TaskMetrics)) + reader1.read().toIndexedSeq should be (data1.toIndexedSeq) + + // now for attempt 2 (registeringMapOutputs always blows away all previous outputs, so we + // won't find the output for attempt 1) + mapOutput2.foreach { mapStatus => mapTrackerMaster.registerMapOutputs(0, Array(mapStatus))} + + val reader2 = manager.getReader[Int, Int](shuffleHandle, 0, 1, + new TaskContextImpl(1, 0, 2L, 0, taskMemoryManager, metricsSystem, + InternalAccumulator.create(sc), false, taskMetrics = new TaskMetrics)) + reader2.read().toIndexedSeq should be(data2.toIndexedSeq) + + // make sure that when the shuffle gets unregistered, we cleanup from all attempts + val shuffleFiles1 = manager.getShuffleFiles(shuffleHandle, 0, 0, 0) + val shuffleFiles2 = manager.getShuffleFiles(shuffleHandle, 0, 0, 1) + // we are relying on getShuffleFiles to be accurate. We can't be positive its correct, but + // at least this makes sure they are returning something which seems plausible + assert(shuffleFiles1.nonEmpty) + assert(shuffleFiles2.nonEmpty) + assert(shuffleFiles1.toSet.intersect(shuffleFiles2.toSet).isEmpty) + val shuffleFiles = shuffleFiles1 ++ shuffleFiles2 + shuffleFiles.foreach { file => assert(file.exists()) } + + // now unregister, and check all the files were deleted + manager.unregisterShuffle(0) + shuffleFiles.foreach { file => assert(!file.exists()) } + // also make sure shuffleToAttempts gets cleanded up + manager.clearStageAttemptsForShuffle(0).size should be (0) + } + } + +} + +/** + * Utility to help tests make sure that we can process two different iterators simultaneously + * in different threads. This makes sure that in your test, you don't completely process data1 with + * f1 before processing data2 with f2 (or vice versa). It adds a barrier so that the functions only + * process one element, before pausing to wait for the other function to "catch up". + */ +class InterleaveIterators[T, R]( + data1: Seq[T], + f1: Iterator[T] => R, + data2: Seq[T], + f2: Iterator[T] => R) { + + require(data1.size == data2.size) + + val barrier = new CyclicBarrier(2) + class BarrierIterator[E](id: Int, sub: Iterator[E]) extends Iterator[E] { + def hasNext: Boolean = sub.hasNext + + def next: E = { + barrier.await() + sub.next() + } + } + + val c1 = new Callable[R] { + override def call(): R = f1(new BarrierIterator(1, data1.iterator)) + } + val c2 = new Callable[R] { + override def call(): R = f2(new BarrierIterator(2, data2.iterator)) + } + + val e: ExecutorService = Executors.newFixedThreadPool(2) + + def run(): (R, R) = { + val future1 = e.submit(c1) + val future2 = e.submit(c2) + val r1 = future1.get() + val r2 = future2.get() + e.shutdown() + (r1, r2) + } } object ShuffleSuite { diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala index b92a302806f76..9a6ef73018acf 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala @@ -34,7 +34,7 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark._ import org.apache.spark.executor.{TaskMetrics, ShuffleWriteMetrics} import org.apache.spark.shuffle.IndexShuffleBlockResolver -import org.apache.spark.serializer.{JavaSerializer, SerializerInstance} +import org.apache.spark.serializer.{Serializer, JavaSerializer, SerializerInstance} import org.apache.spark.storage._ import org.apache.spark.util.Utils @@ -52,6 +52,8 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte private val conf: SparkConf = new SparkConf(loadDefaults = false) private val temporaryFilesCreated: mutable.Buffer[File] = new ArrayBuffer[File]() private val blockIdToFileMap: mutable.Map[BlockId, File] = new mutable.HashMap[BlockId, File] + private val shuffleBlockId: ShuffleBlockId = new ShuffleBlockId(0, 0, 0, 0) + private val serializer: Serializer = new JavaSerializer(conf) private var shuffleHandle: BypassMergeSortShuffleHandle[Int, Int] = _ override def beforeEach(): Unit = { @@ -67,7 +69,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte when(dependency.partitioner).thenReturn(new HashPartitioner(7)) when(dependency.serializer).thenReturn(Some(new JavaSerializer(conf))) when(taskContext.taskMetrics()).thenReturn(taskMetrics) - when(blockResolver.getDataFile(0, 0)).thenReturn(outputFile) + when(blockResolver.getDataFile(0, 0, 0)).thenReturn(outputFile) when(blockManager.diskBlockManager).thenReturn(diskBlockManager) when(blockManager.getDiskWriter( any[BlockId], @@ -118,6 +120,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte blockResolver, shuffleHandle, 0, // MapId + 0, // StageAttemptId taskContext, conf ) @@ -142,6 +145,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte blockResolver, shuffleHandle, 0, // MapId + 0, // StageAttemptId taskContext, conf ) @@ -163,6 +167,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte blockResolver, shuffleHandle, 0, // MapId + 0, // StageAttemptId taskContext, conf ) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala index 89ed031b6fcd1..92e0cd01ad25e 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala @@ -55,14 +55,16 @@ class BlockIdSuite extends SparkFunSuite { } test("shuffle") { - val id = ShuffleBlockId(1, 2, 3) - assertSame(id, ShuffleBlockId(1, 2, 3)) - assertDifferent(id, ShuffleBlockId(3, 2, 3)) - assert(id.name === "shuffle_1_2_3") + val id = ShuffleBlockId(1, 2, 3, 4) + assertSame(id, ShuffleBlockId(1, 2, 3, 4)) + assertDifferent(id, ShuffleBlockId(3, 2, 3, 4)) + assertDifferent(id, ShuffleBlockId(1, 2, 3, 1)) + assert(id.name === "shuffle_1_2_3_4") assert(id.asRDDId === None) assert(id.shuffleId === 1) assert(id.mapId === 2) assert(id.reduceId === 3) + assert(id.stageAttemptId === 4) assert(id.isShuffle) assertSame(id, BlockId(id.toString)) } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 53991d8a1aede..c59009645ecc3 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -764,16 +764,18 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE try { conf.set("spark.shuffle.compress", "true") store = makeBlockManager(20000, "exec1") - store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) - assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100, + store.putSingle( + ShuffleBlockId(0, 0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0, 0)) <= 100, "shuffle_0_0_0 was not compressed") store.stop() store = null conf.set("spark.shuffle.compress", "false") store = makeBlockManager(20000, "exec2") - store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) - assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 10000, + store.putSingle( + ShuffleBlockId(0, 0, 0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0, 0)) >= 10000, "shuffle_0_0_0 was compressed") store.stop() store = null diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala index 828153bdbfc44..6f1226129be1a 100644 --- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala @@ -74,9 +74,9 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT // Make sure blockManager.getBlockData would return the blocks val localBlocks = Map[BlockId, ManagedBuffer]( - ShuffleBlockId(0, 0, 0) -> createMockManagedBuffer(), - ShuffleBlockId(0, 1, 0) -> createMockManagedBuffer(), - ShuffleBlockId(0, 2, 0) -> createMockManagedBuffer()) + ShuffleBlockId(0, 0, 0, 0) -> createMockManagedBuffer(), + ShuffleBlockId(0, 1, 0, 0) -> createMockManagedBuffer(), + ShuffleBlockId(0, 2, 0, 0) -> createMockManagedBuffer()) localBlocks.foreach { case (blockId, buf) => doReturn(buf).when(blockManager).getBlockData(meq(blockId)) } @@ -84,8 +84,8 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT // Make sure remote blocks would return val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2) val remoteBlocks = Map[BlockId, ManagedBuffer]( - ShuffleBlockId(0, 3, 0) -> createMockManagedBuffer(), - ShuffleBlockId(0, 4, 0) -> createMockManagedBuffer()) + ShuffleBlockId(0, 3, 0, 0) -> createMockManagedBuffer(), + ShuffleBlockId(0, 4, 0, 0) -> createMockManagedBuffer()) val transfer = createMockTransfer(remoteBlocks) @@ -138,9 +138,9 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT // Make sure remote blocks would return val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2) val blocks = Map[BlockId, ManagedBuffer]( - ShuffleBlockId(0, 0, 0) -> createMockManagedBuffer(), - ShuffleBlockId(0, 1, 0) -> createMockManagedBuffer(), - ShuffleBlockId(0, 2, 0) -> createMockManagedBuffer()) + ShuffleBlockId(0, 0, 0, 0) -> createMockManagedBuffer(), + ShuffleBlockId(0, 1, 0, 0) -> createMockManagedBuffer(), + ShuffleBlockId(0, 2, 0, 0) -> createMockManagedBuffer()) // Semaphore to coordinate event sequence in two different threads. val sem = new Semaphore(0) @@ -152,12 +152,12 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT future { // Return the first two blocks, and wait till task completion before returning the 3rd one listener.onBlockFetchSuccess( - ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0))) + ShuffleBlockId(0, 0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0, 0))) listener.onBlockFetchSuccess( - ShuffleBlockId(0, 1, 0).toString, blocks(ShuffleBlockId(0, 1, 0))) + ShuffleBlockId(0, 1, 0, 0).toString, blocks(ShuffleBlockId(0, 1, 0, 0))) sem.acquire() listener.onBlockFetchSuccess( - ShuffleBlockId(0, 2, 0).toString, blocks(ShuffleBlockId(0, 2, 0))) + ShuffleBlockId(0, 2, 0, 0).toString, blocks(ShuffleBlockId(0, 2, 0, 0))) } } }) @@ -173,22 +173,22 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT blocksByAddress, 48 * 1024 * 1024) - verify(blocks(ShuffleBlockId(0, 0, 0)), times(0)).release() + verify(blocks(ShuffleBlockId(0, 0, 0, 0)), times(0)).release() iterator.next()._2.close() // close() first block's input stream - verify(blocks(ShuffleBlockId(0, 0, 0)), times(1)).release() + verify(blocks(ShuffleBlockId(0, 0, 0, 0)), times(1)).release() // Get the 2nd block but do not exhaust the iterator val subIter = iterator.next()._2 // Complete the task; then the 2nd block buffer should be exhausted - verify(blocks(ShuffleBlockId(0, 1, 0)), times(0)).release() + verify(blocks(ShuffleBlockId(0, 1, 0, 0)), times(0)).release() taskContext.markTaskCompleted() - verify(blocks(ShuffleBlockId(0, 1, 0)), times(1)).release() + verify(blocks(ShuffleBlockId(0, 1, 0, 0)), times(1)).release() // The 3rd block should not be retained because the iterator is already in zombie state sem.release() - verify(blocks(ShuffleBlockId(0, 2, 0)), times(0)).retain() - verify(blocks(ShuffleBlockId(0, 2, 0)), times(0)).release() + verify(blocks(ShuffleBlockId(0, 2, 0, 0)), times(0)).retain() + verify(blocks(ShuffleBlockId(0, 2, 0, 0)), times(0)).release() } test("fail all blocks if any of the remote request fails") { @@ -199,9 +199,9 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT // Make sure remote blocks would return val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2) val blocks = Map[BlockId, ManagedBuffer]( - ShuffleBlockId(0, 0, 0) -> mock(classOf[ManagedBuffer]), - ShuffleBlockId(0, 1, 0) -> mock(classOf[ManagedBuffer]), - ShuffleBlockId(0, 2, 0) -> mock(classOf[ManagedBuffer]) + ShuffleBlockId(0, 0, 0, 0) -> mock(classOf[ManagedBuffer]), + ShuffleBlockId(0, 1, 0, 0) -> mock(classOf[ManagedBuffer]), + ShuffleBlockId(0, 2, 0, 0) -> mock(classOf[ManagedBuffer]) ) // Semaphore to coordinate event sequence in two different threads. @@ -214,11 +214,11 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT future { // Return the first block, and then fail. listener.onBlockFetchSuccess( - ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0))) + ShuffleBlockId(0, 0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0, 0))) listener.onBlockFetchFailure( - ShuffleBlockId(0, 1, 0).toString, new BlockNotFoundException("blah")) + ShuffleBlockId(0, 1, 0, 0).toString, new BlockNotFoundException("blah")) listener.onBlockFetchFailure( - ShuffleBlockId(0, 2, 0).toString, new BlockNotFoundException("blah")) + ShuffleBlockId(0, 2, 0, 0).toString, new BlockNotFoundException("blah")) sem.release() } } diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index ceecfd665bf87..0c930a06bf7ee 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -253,8 +253,10 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B val shuffleId = shuffleHandle.shuffleId val mapId = 0 val reduceId = taskContext.partitionId() + val stageAttemptId = 0 val message = "Simulated fetch failure" - throw new FetchFailedException(bmAddress, shuffleId, mapId, reduceId, message) + throw new FetchFailedException(bmAddress, shuffleId, mapId, reduceId, stageAttemptId, + message) } else { x } diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index e02f5a1b20fe3..38684b238bad2 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -239,7 +239,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with // Go through all the failure cases to make sure we are counting them as failures. val taskFailedReasons = Seq( Resubmitted, - new FetchFailed(null, 0, 0, 0, "ignored"), + new FetchFailed(null, 0, 0, 0, 0, "ignored"), ExceptionFailure("Exception", "description", null, null, None, None), TaskResultLost, TaskKilled, diff --git a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala index 61601016e005e..630fe672e3bbd 100644 --- a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala @@ -104,14 +104,14 @@ class AkkaUtilsSuite extends SparkFunSuite with LocalSparkContext with ResetSyst val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) masterTracker.registerMapOutput(10, 0, - MapStatus(BlockManagerId("a", "hostA", 1000), Array(1000L))) + MapStatus(BlockManagerId("a", "hostA", 1000), 0, Array(1000L))) masterTracker.incrementEpoch() slaveTracker.updateEpoch(masterTracker.getEpoch) // this should succeed since security off assert(slaveTracker.getMapSizesByExecutorId(10, 0).toSeq === Seq((BlockManagerId("a", "hostA", 1000), - ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000))))) + ArrayBuffer((ShuffleBlockId(10, 0, 0, 0), size1000))))) rpcEnv.shutdown() slaveRpcEnv.shutdown() @@ -151,14 +151,14 @@ class AkkaUtilsSuite extends SparkFunSuite with LocalSparkContext with ResetSyst val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) masterTracker.registerMapOutput(10, 0, MapStatus( - BlockManagerId("a", "hostA", 1000), Array(1000L))) + BlockManagerId("a", "hostA", 1000), 0, Array(1000L))) masterTracker.incrementEpoch() slaveTracker.updateEpoch(masterTracker.getEpoch) // this should succeed since security on and passwords match assert(slaveTracker.getMapSizesByExecutorId(10, 0) === Seq((BlockManagerId("a", "hostA", 1000), - ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000))))) + ArrayBuffer((ShuffleBlockId(10, 0, 0, 0), size1000))))) rpcEnv.shutdown() slaveRpcEnv.shutdown() @@ -231,13 +231,14 @@ class AkkaUtilsSuite extends SparkFunSuite with LocalSparkContext with ResetSyst val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) masterTracker.registerMapOutput(10, 0, - MapStatus(BlockManagerId("a", "hostA", 1000), Array(1000L))) + MapStatus(BlockManagerId("a", "hostA", 1000), 0, Array(1000L))) masterTracker.incrementEpoch() slaveTracker.updateEpoch(masterTracker.getEpoch) // this should succeed since security off assert(slaveTracker.getMapSizesByExecutorId(10, 0) === - Seq((BlockManagerId("a", "hostA", 1000), ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000))))) + Seq((BlockManagerId("a", "hostA", 1000), + ArrayBuffer((ShuffleBlockId(10, 0, 0, 0), size1000))))) rpcEnv.shutdown() slaveRpcEnv.shutdown() @@ -278,12 +279,13 @@ class AkkaUtilsSuite extends SparkFunSuite with LocalSparkContext with ResetSyst val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) masterTracker.registerMapOutput(10, 0, - MapStatus(BlockManagerId("a", "hostA", 1000), Array(1000L))) + MapStatus(BlockManagerId("a", "hostA", 1000), 0, Array(1000L))) masterTracker.incrementEpoch() slaveTracker.updateEpoch(masterTracker.getEpoch) assert(slaveTracker.getMapSizesByExecutorId(10, 0) === - Seq((BlockManagerId("a", "hostA", 1000), ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000))))) + Seq((BlockManagerId("a", "hostA", 1000), + ArrayBuffer((ShuffleBlockId(10, 0, 0, 0), size1000))))) rpcEnv.shutdown() slaveRpcEnv.shutdown() diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 3f94ef7041914..c67cbc39c7676 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -141,7 +141,7 @@ class JsonProtocolSuite extends SparkFunSuite { // TaskEndReason val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, 19, - "Some exception") + 20, "Some exception") val fetchMetadataFailed = new MetadataFetchFailedException(17, 19, "metadata Fetch failed exception").toTaskEndReason val exceptionFailure = new ExceptionFailure(exception, None) @@ -158,7 +158,7 @@ class JsonProtocolSuite extends SparkFunSuite { // BlockId testBlockId(RDDBlockId(1, 2)) - testBlockId(ShuffleBlockId(1, 2, 3)) + testBlockId(ShuffleBlockId(1, 2, 3, 4)) testBlockId(BroadcastBlockId(1L, "insert_words_of_wisdom_here")) testBlockId(TaskResultBlockId(1L)) testBlockId(StreamBlockId(1, 2L)) @@ -266,13 +266,20 @@ class JsonProtocolSuite extends SparkFunSuite { test("FetchFailed backwards compatibility") { // FetchFailed in Spark 1.1.0 does not have an "Message" property. - val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, 19, + val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, 19, 20, "ignored") val oldEvent = JsonProtocol.taskEndReasonToJson(fetchFailed) .removeField({ _._1 == "Message" }) val expectedFetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, 19, - "Unknown reason") + 20, "Unknown reason") assert(expectedFetchFailed === JsonProtocol.taskEndReasonFromJson(oldEvent)) + + // FetchFailed pre Spark 1.6.0 does not have "Stage Attempt ID" property + val pre16Event = JsonProtocol.taskEndReasonToJson(fetchFailed) + .removeField({ _._1 == "Stage Attempt ID" }) + val expectedPre16FetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, + 19, 0, "ignored") + assert(expectedPre16FetchFailed === JsonProtocol.taskEndReasonFromJson(pre16Event)) } test("ShuffleReadMetrics: Local bytes read and time taken backwards compatibility") { diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 0d4dd6afac769..1dd2e8c38ffcd 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -163,12 +163,12 @@ public void registerExecutor( /** * Obtains a FileSegmentManagedBuffer from a shuffle block id. We expect the blockId has the - * format "shuffle_ShuffleId_MapId_ReduceId" (from ShuffleBlockId), and additionally make - * assumptions about how the hash and sort based shuffles store their data. + * format "shuffle_ShuffleId_MapId_ReduceId_StageAttemptId" (from ShuffleBlockId), and + * additionally make assumptions about how the hash and sort based shuffles store their data. */ public ManagedBuffer getBlockData(String appId, String execId, String blockId) { String[] blockIdParts = blockId.split("_"); - if (blockIdParts.length < 4) { + if (blockIdParts.length < 4 || blockIdParts.length > 5) { throw new IllegalArgumentException("Unexpected block id format: " + blockId); } else if (!blockIdParts[0].equals("shuffle")) { throw new IllegalArgumentException("Expected shuffle block id, got: " + blockId); @@ -187,7 +187,14 @@ public ManagedBuffer getBlockData(String appId, String execId, String blockId) { return getHashBasedShuffleBlockData(executor, blockId); } else if ("org.apache.spark.shuffle.sort.SortShuffleManager".equals(executor.shuffleManager) || "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager".equals(executor.shuffleManager)) { - return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId); + // for backwards compatibility, we also handle legacy shuffle block ids which don't have + // a stageAttemptId + String baseFileName = "shuffle_" + shuffleId + "_" + mapId + "_0"; + if (blockIdParts.length == 5) { + int stageAttemptId = Integer.parseInt(blockIdParts[4]); + baseFileName = baseFileName + "_" + stageAttemptId; + } + return getSortBasedShuffleBlockData(executor, baseFileName, reduceId); } else { throw new UnsupportedOperationException( "Unsupported shuffle manager: " + executor.shuffleManager); @@ -266,9 +273,11 @@ private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, * and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId. */ private ManagedBuffer getSortBasedShuffleBlockData( - ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) { + ExecutorShuffleInfo executor, + String baseFileName, + int reduceId) { File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, - "shuffle_" + shuffleId + "_" + mapId + "_0.index"); + baseFileName + ".index"); DataInputStream in = null; try { @@ -278,8 +287,7 @@ private ManagedBuffer getSortBasedShuffleBlockData( long nextOffset = in.readLong(); return new FileSegmentManagedBuffer( conf, - getFile(executor.localDirs, executor.subDirsPerLocalDir, - "shuffle_" + shuffleId + "_" + mapId + "_0.data"), + getFile(executor.localDirs, executor.subDirsPerLocalDir, baseFileName + ".data"), offset, nextOffset - offset); } catch (IOException e) { diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java index 3c6cb367dea46..c137355a81f5b 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java @@ -37,9 +37,16 @@ public class ExternalShuffleBlockResolverSuite { static String sortBlock0 = "Hello!"; static String sortBlock1 = "World!"; + static String sortBlock0_1 = "supercali"; + static String sortBlock1_1 = "fragilistic"; + static String hashBlock0 = "Elementary"; static String hashBlock1 = "Tabular"; + static String hashBlock0_1 = "expiali"; + static String hashBlock1_1 = "docious"; + + static TestShuffleDataContext dataContext; static TransportConf conf = new TransportConf(new SystemPropertyConfigProvider()); @@ -50,10 +57,14 @@ public static void beforeAll() throws IOException { dataContext.create(); // Write some sort and hash data. - dataContext.insertSortShuffleData(0, 0, + dataContext.insertSortShuffleData(0, 0, 0, new byte[][] { sortBlock0.getBytes(), sortBlock1.getBytes() } ); - dataContext.insertHashShuffleData(1, 0, - new byte[][] { hashBlock0.getBytes(), hashBlock1.getBytes() } ); + dataContext.insertSortShuffleData(0, 0, 1, + new byte[][] { sortBlock0_1.getBytes(), sortBlock1_1.getBytes() } ); + dataContext.insertHashShuffleData(1, 0, 0, + new byte[][]{hashBlock0.getBytes(), hashBlock1.getBytes()}); + dataContext.insertHashShuffleData(1, 0, 1, + new byte[][] { hashBlock0_1.getBytes(), hashBlock1_1.getBytes() } ); } @AfterClass @@ -66,7 +77,7 @@ public void testBadRequests() throws IOException { ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null); // Unregistered executor try { - resolver.getBlockData("app0", "exec1", "shuffle_1_1_0"); + resolver.getBlockData("app0", "exec1", "shuffle_1_1_0_0"); fail("Should have failed"); } catch (RuntimeException e) { assertTrue("Bad error message: " + e, e.getMessage().contains("not registered")); @@ -75,7 +86,7 @@ public void testBadRequests() throws IOException { // Invalid shuffle manager resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar")); try { - resolver.getBlockData("app0", "exec2", "shuffle_1_1_0"); + resolver.getBlockData("app0", "exec2", "shuffle_1_1_0_0"); fail("Should have failed"); } catch (UnsupportedOperationException e) { // pass @@ -85,11 +96,27 @@ public void testBadRequests() throws IOException { resolver.registerExecutor("app0", "exec3", dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager")); try { - resolver.getBlockData("app0", "exec3", "shuffle_1_1_0"); + resolver.getBlockData("app0", "exec3", "shuffle_1_1_0_0"); fail("Should have failed"); } catch (Exception e) { // pass } + + // wrong number of parts (note that we allow a missing stageAttemptId) + try { + resolver.getBlockData("app0", "exec1", "shuffle_1_1_0_0_0"); + fail("Should have failed"); + } catch (RuntimeException e) { + assertTrue("Bad error message: " + e, e.getMessage().contains("Unexpected block id format")); + } + + try { + resolver.getBlockData("app0", "exec1", "shuffle_1_1"); + fail("Should have failed"); + } catch (RuntimeException e) { + assertTrue("Bad error message: " + e, e.getMessage().contains("Unexpected block id format")); + } + } @Test @@ -98,17 +125,10 @@ public void testSortShuffleBlocks() throws IOException { resolver.registerExecutor("app0", "exec0", dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager")); - InputStream block0Stream = - resolver.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream(); - String block0 = CharStreams.toString(new InputStreamReader(block0Stream)); - block0Stream.close(); - assertEquals(sortBlock0, block0); - - InputStream block1Stream = - resolver.getBlockData("app0", "exec0", "shuffle_0_0_1").createInputStream(); - String block1 = CharStreams.toString(new InputStreamReader(block1Stream)); - block1Stream.close(); - assertEquals(sortBlock1, block1); + testReadBlockData(resolver, "shuffle_0_0_0_0", sortBlock0); + testReadBlockData(resolver, "shuffle_0_0_1_0", sortBlock1); + testReadBlockData(resolver, "shuffle_0_0_0_1", sortBlock0_1); + testReadBlockData(resolver, "shuffle_0_0_1_1", sortBlock1_1); } @Test @@ -117,17 +137,67 @@ public void testHashShuffleBlocks() throws IOException { resolver.registerExecutor("app0", "exec0", dataContext.createExecutorInfo("org.apache.spark.shuffle.hash.HashShuffleManager")); - InputStream block0Stream = - resolver.getBlockData("app0", "exec0", "shuffle_1_0_0").createInputStream(); - String block0 = CharStreams.toString(new InputStreamReader(block0Stream)); - block0Stream.close(); - assertEquals(hashBlock0, block0); - - InputStream block1Stream = - resolver.getBlockData("app0", "exec0", "shuffle_1_0_1").createInputStream(); - String block1 = CharStreams.toString(new InputStreamReader(block1Stream)); - block1Stream.close(); - assertEquals(hashBlock1, block1); + testReadBlockData(resolver, "shuffle_1_0_0_0", hashBlock0); + testReadBlockData(resolver, "shuffle_1_0_1_0", hashBlock1); + testReadBlockData(resolver, "shuffle_1_0_0_1", hashBlock0_1); + testReadBlockData(resolver, "shuffle_1_0_1_1", hashBlock1_1); + } + + private void testReadBlockData(ExternalShuffleBlockResolver resolver, String blockId, + String expected) throws IOException { + InputStream blockStream = + resolver.getBlockData("app0", "exec0", blockId).createInputStream(); + String block0 = CharStreams.toString(new InputStreamReader(blockStream)); + blockStream.close(); + assertEquals(expected, block0); + } + + @Test + public void supportLegacySortShuffleBlockIds() throws IOException { + // In Spark 1.6, the stage attempt ID was added to shuffle block ids (SPARK-8029). However, + // during a rolling upgrade, the shuffle service may be restarted with new code but still + // need to serve old apps. So we make sure we can still handle old blocks + + ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null); + resolver.registerExecutor("app0", "exec0", + dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager")); + + dataContext.insertLegacySortShuffleData(2, 1, + new byte[][]{"legacy".getBytes(), "block".getBytes()}); + + testReadBlockData(resolver, "shuffle_2_1_0", "legacy"); + testReadBlockData(resolver, "shuffle_2_1_1", "block"); + + // verify everything still works when we also register some blocks which do have a + // stageAttemptId + testSortShuffleBlocks(); + + testReadBlockData(resolver, "shuffle_2_1_0", "legacy"); + testReadBlockData(resolver, "shuffle_2_1_1", "block"); + } + + @Test + public void supportLegacyHashShuffleBlockIds() throws IOException { + // In Spark 1.6, the stage attempt ID was added to shuffle block ids (SPARK-8029). However, + // during a rolling upgrade, the shuffle service may be restarted with new code but still + // need to serve old apps. So we make sure we can still handle old blocks + + ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null); + resolver.registerExecutor("app0", "exec0", + dataContext.createExecutorInfo("org.apache.spark.shuffle.hash.HashShuffleManager")); + + dataContext.insertLegacyHashShuffleData(2, 0, + new byte[][] { "more legacy".getBytes(), "hash".getBytes() } ); + + testReadBlockData(resolver, "shuffle_2_0_0", "more legacy"); + testReadBlockData(resolver, "shuffle_2_0_1", "hash"); + + // verify everything still works when we also register some blocks which do have a + // stageAttemptId + testHashShuffleBlocks(); + + testReadBlockData(resolver, "shuffle_2_0_0", "more legacy"); + testReadBlockData(resolver, "shuffle_2_0_1", "hash"); } @Test diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java index 2f4f1d0df478b..e9807fbecbad4 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java @@ -140,10 +140,10 @@ private TestShuffleDataContext createSomeData() throws IOException { TestShuffleDataContext dataContext = new TestShuffleDataContext(10, 5); dataContext.create(); - dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000), + dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000), rand.nextInt(1000), new byte[][] { "ABC".getBytes(), "DEF".getBytes() } ); dataContext.insertHashShuffleData(rand.nextInt(1000), rand.nextInt(1000) + 1000, - new byte[][] { "GHI".getBytes(), "JKLMNOPQRSTUVWXYZ".getBytes() } ); + rand.nextInt(1000), new byte[][] { "GHI".getBytes(), "JKLMNOPQRSTUVWXYZ".getBytes() } ); return dataContext; } } diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java index a3f9a38b1aeb9..dd52c69d29420 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java @@ -72,24 +72,39 @@ public class ExternalShuffleIntegrationSuite { new byte[54321], }; + static byte[][] exec0BlocksAttempt1 = new byte[][] { + new byte[789] + }; + + static byte[][] exec1BlocksAttempt1 = new byte[][] { + new byte[345], + new byte[678] + }; + + + @BeforeClass public static void beforeAll() throws IOException { Random rand = new Random(); - for (byte[] block : exec0Blocks) { - rand.nextBytes(block); - } - for (byte[] block: exec1Blocks) { - rand.nextBytes(block); + byte[][][] allBlocks = new byte[][][]{ + exec0Blocks, exec1Blocks, exec0BlocksAttempt1, exec1BlocksAttempt1}; + for (byte[][] blockGroup: allBlocks) { + for (byte[] block : blockGroup) { + rand.nextBytes(block); + } } dataContext0 = new TestShuffleDataContext(2, 5); dataContext0.create(); - dataContext0.insertSortShuffleData(0, 0, exec0Blocks); + dataContext0.insertSortShuffleData(0, 0, 0, exec0Blocks); + dataContext0.insertSortShuffleData(0, 0, 1, exec0BlocksAttempt1); dataContext1 = new TestShuffleDataContext(6, 2); dataContext1.create(); - dataContext1.insertHashShuffleData(1, 0, exec1Blocks); + dataContext1.insertHashShuffleData(1, 0, 0, exec1Blocks); + dataContext1.insertHashShuffleData(1, 0, 1, exec1BlocksAttempt1); + conf = new TransportConf(new SystemPropertyConfigProvider()); handler = new ExternalShuffleBlockHandler(conf, null); @@ -173,19 +188,27 @@ public void onBlockFetchFailure(String blockId, Throwable exception) { @Test public void testFetchOneSort() throws Exception { registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER)); - FetchResult exec0Fetch = fetchBlocks("exec-0", new String[] { "shuffle_0_0_0" }); - assertEquals(Sets.newHashSet("shuffle_0_0_0"), exec0Fetch.successBlocks); + FetchResult exec0Fetch = fetchBlocks("exec-0", new String[] { "shuffle_0_0_0_0" }); + assertEquals(Sets.newHashSet("shuffle_0_0_0_0"), exec0Fetch.successBlocks); assertTrue(exec0Fetch.failedBlocks.isEmpty()); assertBufferListsEqual(exec0Fetch.buffers, Lists.newArrayList(exec0Blocks[0])); exec0Fetch.releaseBuffers(); + + + FetchResult exec0Fetch1 = fetchBlocks("exec-0", new String[] { "shuffle_0_0_0_1" }); + assertEquals(Sets.newHashSet("shuffle_0_0_0_1"), exec0Fetch1.successBlocks); + assertTrue(exec0Fetch1.failedBlocks.isEmpty()); + assertBufferListsEqual(exec0Fetch1.buffers, Lists.newArrayList(exec0BlocksAttempt1[0])); + exec0Fetch1.releaseBuffers(); + } @Test public void testFetchThreeSort() throws Exception { registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER)); FetchResult exec0Fetch = fetchBlocks("exec-0", - new String[] { "shuffle_0_0_0", "shuffle_0_0_1", "shuffle_0_0_2" }); - assertEquals(Sets.newHashSet("shuffle_0_0_0", "shuffle_0_0_1", "shuffle_0_0_2"), + new String[] { "shuffle_0_0_0_0", "shuffle_0_0_1_0", "shuffle_0_0_2_0" }); + assertEquals(Sets.newHashSet("shuffle_0_0_0_0", "shuffle_0_0_1_0", "shuffle_0_0_2_0"), exec0Fetch.successBlocks); assertTrue(exec0Fetch.failedBlocks.isEmpty()); assertBufferListsEqual(exec0Fetch.buffers, Lists.newArrayList(exec0Blocks)); @@ -196,29 +219,37 @@ public void testFetchThreeSort() throws Exception { public void testFetchHash() throws Exception { registerExecutor("exec-1", dataContext1.createExecutorInfo(HASH_MANAGER)); FetchResult execFetch = fetchBlocks("exec-1", - new String[] { "shuffle_1_0_0", "shuffle_1_0_1" }); - assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.successBlocks); + new String[] { "shuffle_1_0_0_0", "shuffle_1_0_1_0" }); + assertEquals(Sets.newHashSet("shuffle_1_0_0_0", "shuffle_1_0_1_0"), execFetch.successBlocks); assertTrue(execFetch.failedBlocks.isEmpty()); assertBufferListsEqual(execFetch.buffers, Lists.newArrayList(exec1Blocks)); execFetch.releaseBuffers(); + + + FetchResult exec1Fetch1 = fetchBlocks("exec-1", + new String[] { "shuffle_1_0_0_1", "shuffle_1_0_1_1" }); + assertEquals(Sets.newHashSet("shuffle_1_0_0_1", "shuffle_1_0_1_1"), exec1Fetch1.successBlocks); + assertTrue(exec1Fetch1.failedBlocks.isEmpty()); + assertBufferListsEqual(exec1Fetch1.buffers, Lists.newArrayList(exec1BlocksAttempt1)); + exec1Fetch1.releaseBuffers(); } @Test public void testFetchWrongShuffle() throws Exception { registerExecutor("exec-1", dataContext1.createExecutorInfo(SORT_MANAGER /* wrong manager */)); FetchResult execFetch = fetchBlocks("exec-1", - new String[] { "shuffle_1_0_0", "shuffle_1_0_1" }); + new String[] { "shuffle_1_0_0_0", "shuffle_1_0_1_0" }); assertTrue(execFetch.successBlocks.isEmpty()); - assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.failedBlocks); + assertEquals(Sets.newHashSet("shuffle_1_0_0_0", "shuffle_1_0_1_0"), execFetch.failedBlocks); } @Test public void testFetchInvalidShuffle() throws Exception { registerExecutor("exec-1", dataContext1.createExecutorInfo("unknown sort manager")); FetchResult execFetch = fetchBlocks("exec-1", - new String[] { "shuffle_1_0_0" }); + new String[] { "shuffle_1_0_0_0" }); assertTrue(execFetch.successBlocks.isEmpty()); - assertEquals(Sets.newHashSet("shuffle_1_0_0"), execFetch.failedBlocks); + assertEquals(Sets.newHashSet("shuffle_1_0_0_0"), execFetch.failedBlocks); } @Test @@ -234,28 +265,28 @@ public void testFetchWrongBlockId() throws Exception { public void testFetchNonexistent() throws Exception { registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER)); FetchResult execFetch = fetchBlocks("exec-0", - new String[] { "shuffle_2_0_0" }); + new String[] { "shuffle_2_0_0_0" }); assertTrue(execFetch.successBlocks.isEmpty()); - assertEquals(Sets.newHashSet("shuffle_2_0_0"), execFetch.failedBlocks); + assertEquals(Sets.newHashSet("shuffle_2_0_0_0"), execFetch.failedBlocks); } @Test public void testFetchWrongExecutor() throws Exception { registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER)); FetchResult execFetch = fetchBlocks("exec-0", - new String[] { "shuffle_0_0_0" /* right */, "shuffle_1_0_0" /* wrong */ }); + new String[] { "shuffle_0_0_0_0" /* right */, "shuffle_1_0_0_0" /* wrong */ }); // Both still fail, as we start by checking for all block. assertTrue(execFetch.successBlocks.isEmpty()); - assertEquals(Sets.newHashSet("shuffle_0_0_0", "shuffle_1_0_0"), execFetch.failedBlocks); + assertEquals(Sets.newHashSet("shuffle_0_0_0_0", "shuffle_1_0_0_0"), execFetch.failedBlocks); } @Test public void testFetchUnregisteredExecutor() throws Exception { registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER)); FetchResult execFetch = fetchBlocks("exec-2", - new String[] { "shuffle_0_0_0", "shuffle_1_0_0" }); + new String[] { "shuffle_0_0_0_0", "shuffle_1_0_0_0" }); assertTrue(execFetch.successBlocks.isEmpty()); - assertEquals(Sets.newHashSet("shuffle_0_0_0", "shuffle_1_0_0"), execFetch.failedBlocks); + assertEquals(Sets.newHashSet("shuffle_0_0_0_0", "shuffle_1_0_0_0"), execFetch.failedBlocks); } @Test @@ -264,9 +295,9 @@ public void testFetchNoServer() throws Exception { try { registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER)); FetchResult execFetch = fetchBlocks("exec-0", - new String[]{"shuffle_1_0_0", "shuffle_1_0_1"}, 1 /* port */); + new String[]{"shuffle_1_0_0_0", "shuffle_1_0_1_0"}, 1 /* port */); assertTrue(execFetch.successBlocks.isEmpty()); - assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.failedBlocks); + assertEquals(Sets.newHashSet("shuffle_1_0_0_0", "shuffle_1_0_1_0"), execFetch.failedBlocks); } finally { System.clearProperty("spark.shuffle.io.maxRetries"); } diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java index 3fdde054ab6c7..c4d2a85aba58f 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java @@ -57,9 +57,24 @@ public void cleanup() { } /** Creates reducer blocks in a sort-based data format within our local dirs. */ - public void insertSortShuffleData(int shuffleId, int mapId, byte[][] blocks) throws IOException { + public void insertSortShuffleData( + int shuffleId, + int mapId, + int stageAttemptId, + byte[][] blocks) throws IOException { + String blockId = "shuffle_" + shuffleId + "_" + mapId + "_0_" + stageAttemptId; + insertSortShuffleData(blockId, blocks); + } + + public void insertLegacySortShuffleData( + int shuffleId, + int mapId, + byte[][] blocks) throws IOException { String blockId = "shuffle_" + shuffleId + "_" + mapId + "_0"; + insertSortShuffleData(blockId, blocks); + } + private void insertSortShuffleData(String blockId, byte[][] blocks) throws IOException { OutputStream dataStream = new FileOutputStream( ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".data")); DataOutputStream indexStream = new DataOutputStream(new FileOutputStream( @@ -77,8 +92,10 @@ public void insertSortShuffleData(int shuffleId, int mapId, byte[][] blocks) thr indexStream.close(); } - /** Creates reducer blocks in a hash-based data format within our local dirs. */ - public void insertHashShuffleData(int shuffleId, int mapId, byte[][] blocks) throws IOException { + public void insertLegacyHashShuffleData( + int shuffleId, + int mapId, + byte[][] blocks) throws IOException { for (int i = 0; i < blocks.length; i ++) { String blockId = "shuffle_" + shuffleId + "_" + mapId + "_" + i; Files.write(blocks[i], @@ -86,6 +103,19 @@ public void insertHashShuffleData(int shuffleId, int mapId, byte[][] blocks) thr } } + /** Creates reducer blocks in a hash-based data format within our local dirs. */ + public void insertHashShuffleData( + int shuffleId, + int mapId, + int stageAttemptId, + byte[][] blocks) throws IOException { + for (int i = 0; i < blocks.length; i ++) { + String blockId = "shuffle_" + shuffleId + "_" + mapId + "_" + i + "_" + stageAttemptId; + Files.write(blocks[i], + ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId)); + } + } + /** * Creates an ExecutorShuffleInfo object based on the given shuffle manager which targets this * context's directories. diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 50220790d1f84..7b6ebeef9038d 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -108,6 +108,9 @@ object MimaExcludes { ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.sql.SQLContext.createSession") ) ++ Seq( + // SPARK-8029. False positive, this is a @Private java class + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.shuffle.unsafe.UnsafeShuffleWriter.this"), ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.SparkContext.preferredNodeLocationData_="), ProblemFilters.exclude[MissingClassProblem]( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala index 09e258299de5a..9649a5b9e935d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala @@ -129,7 +129,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext { // Merging spilled files should not throw assertion error taskContext.taskMetrics.shuffleWriteMetrics = Some(new ShuffleWriteMetrics) - sorter.writePartitionedFile(ShuffleBlockId(0, 0, 0), outputFile) + sorter.writePartitionedFile(ShuffleBlockId(0, 0, 0, 0), outputFile) } { // Clean up if (sc != null) { diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala index 0dc2861253f17..ce66c133b0a57 100644 --- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala +++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala @@ -21,6 +21,7 @@ import java.util.concurrent.{CountDownLatch, Executors} import java.util.concurrent.atomic.AtomicLong import org.apache.spark.executor.ShuffleWriteMetrics +import org.apache.spark.shuffle.ShuffleIdAndAttempt import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.serializer.KryoSerializer import org.apache.spark.shuffle.hash.HashShuffleManager @@ -60,8 +61,8 @@ object StoragePerfTester { val hashShuffleManager = sc.env.shuffleManager.asInstanceOf[HashShuffleManager] def writeOutputBytes(mapId: Int, total: AtomicLong): Unit = { - val shuffle = hashShuffleManager.shuffleBlockResolver.forMapTask(1, mapId, numOutputSplits, - new KryoSerializer(sc.conf), new ShuffleWriteMetrics()) + val shuffle = hashShuffleManager.shuffleBlockResolver.forMapTask(ShuffleIdAndAttempt(1, 0), + mapId, numOutputSplits, new KryoSerializer(sc.conf), new ShuffleWriteMetrics()) val writers = shuffle.writers for (i <- 1 to recordsPerMap) { writers(i % numOutputSplits).write(writeKey, writeValue)