diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index ee82d679935c0..a348f2e0176b3 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -21,6 +21,7 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.util.Arrays; import javax.annotation.Nullable; import scala.None$; @@ -28,19 +29,19 @@ import scala.Product2; import scala.Tuple2; import scala.collection.Iterator; +import scala.collection.JavaConverters; +import scala.collection.Seq; import com.google.common.annotations.VisibleForTesting; import com.google.common.io.Closeables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.spark.Partitioner; -import org.apache.spark.ShuffleDependency; -import org.apache.spark.SparkConf; -import org.apache.spark.TaskContext; +import org.apache.spark.*; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.scheduler.MapStatus; import org.apache.spark.scheduler.MapStatus$; +import org.apache.spark.shuffle.TmpDestShuffleFile; import org.apache.spark.serializer.Serializer; import org.apache.spark.serializer.SerializerInstance; import org.apache.spark.shuffle.IndexShuffleBlockResolver; @@ -121,13 +122,26 @@ public BypassMergeSortShuffleWriter( } @Override - public void write(Iterator> records) throws IOException { + public Seq write(Iterator> records) throws IOException { assert (partitionWriters == null); + final File indexFile = shuffleBlockResolver.getIndexFile(shuffleId, mapId); + final File dataFile = shuffleBlockResolver.getDataFile(shuffleId, mapId); + final File tmpIndexFile = tmpShuffleFile(indexFile); + final File tmpDataFile = tmpShuffleFile(dataFile); if (!records.hasNext()) { partitionLengths = new long[numPartitions]; - shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths); mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); - return; + // create empty data file so we always commit same set of shuffle output files, even if + // data is non-deterministic + if (!tmpDataFile.createNewFile()) { + // only possible if the file already exists, from a race in createTempShuffleBlock, which + // should be super-rare + throw new IOException("could not create shuffle data file: " + tmpDataFile); + } + return JavaConverters.asScalaBufferConverter(Arrays.asList( + new TmpDestShuffleFile(tmpIndexFile, indexFile), + new TmpDestShuffleFile(tmpDataFile, dataFile) + )).asScala(); } final SerializerInstance serInstance = serializer.newInstance(); final long openStartTime = System.nanoTime(); @@ -155,10 +169,14 @@ public void write(Iterator> records) throws IOException { writer.commitAndClose(); } - partitionLengths = - writePartitionedFile(shuffleBlockResolver.getDataFile(shuffleId, mapId)); - shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths); + partitionLengths = writePartitionedFile(tmpDataFile); + shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths, tmpIndexFile); mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); + return JavaConverters.asScalaBufferConverter(Arrays.asList( + new TmpDestShuffleFile(tmpIndexFile, indexFile), + new TmpDestShuffleFile(tmpDataFile, dataFile) + )).asScala(); + } @VisibleForTesting diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 6a0a89e81c321..27e8357ac3e2e 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -20,11 +20,13 @@ import javax.annotation.Nullable; import java.io.*; import java.nio.channels.FileChannel; +import java.util.Arrays; import java.util.Iterator; import scala.Option; import scala.Product2; import scala.collection.JavaConverters; +import scala.collection.Seq; import scala.collection.immutable.Map; import scala.reflect.ClassTag; import scala.reflect.ClassTag$; @@ -42,6 +44,7 @@ import org.apache.spark.io.CompressionCodec; import org.apache.spark.io.CompressionCodec$; import org.apache.spark.io.LZFCompressionCodec; +import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.network.util.LimitedInputStream; import org.apache.spark.scheduler.MapStatus; import org.apache.spark.scheduler.MapStatus$; @@ -49,11 +52,11 @@ import org.apache.spark.serializer.Serializer; import org.apache.spark.serializer.SerializerInstance; import org.apache.spark.shuffle.IndexShuffleBlockResolver; +import org.apache.spark.shuffle.TmpDestShuffleFile; import org.apache.spark.shuffle.ShuffleWriter; import org.apache.spark.storage.BlockManager; import org.apache.spark.storage.TimeTrackingOutputStream; import org.apache.spark.unsafe.Platform; -import org.apache.spark.memory.TaskMemoryManager; @Private public class UnsafeShuffleWriter extends ShuffleWriter { @@ -149,12 +152,12 @@ public long getPeakMemoryUsedBytes() { * This convenience method should only be called in test code. */ @VisibleForTesting - public void write(Iterator> records) throws IOException { - write(JavaConverters.asScalaIteratorConverter(records).asScala()); + public Seq write(Iterator> records) throws IOException { + return write(JavaConverters.asScalaIteratorConverter(records).asScala()); } @Override - public void write(scala.collection.Iterator> records) throws IOException { + public Seq write(scala.collection.Iterator> records) throws IOException { // Keep track of success so we know if we encountered an exception // We do this rather than a standard try/catch/re-throw to handle // generic throwables. @@ -163,8 +166,9 @@ public void write(scala.collection.Iterator> records) throws IOEx while (records.hasNext()) { insertRecordIntoSorter(records.next()); } - closeAndWriteOutput(); + final Seq result = closeAndWriteOutput(); success = true; + return result; } finally { if (sorter != null) { try { @@ -198,7 +202,7 @@ private void open() throws IOException { } @VisibleForTesting - void closeAndWriteOutput() throws IOException { + Seq closeAndWriteOutput() throws IOException { assert(sorter != null); updatePeakMemoryUsed(); serBuffer = null; @@ -206,8 +210,12 @@ void closeAndWriteOutput() throws IOException { final SpillInfo[] spills = sorter.closeAndGetSpills(); sorter = null; final long[] partitionLengths; + final File dataFile = shuffleBlockResolver.getDataFile(shuffleId, mapId); + final File indexFile = shuffleBlockResolver.getIndexFile(shuffleId, mapId); + final File tmpDataFile = tmpShuffleFile(dataFile); + final File tmpIndexFile = tmpShuffleFile(indexFile); try { - partitionLengths = mergeSpills(spills); + partitionLengths = mergeSpills(spills, tmpDataFile); } finally { for (SpillInfo spill : spills) { if (spill.file.exists() && ! spill.file.delete()) { @@ -215,8 +223,13 @@ void closeAndWriteOutput() throws IOException { } } } - shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths); + shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths, tmpIndexFile); mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); + + return JavaConverters.asScalaBufferConverter(Arrays.asList( + new TmpDestShuffleFile(tmpIndexFile, indexFile), + new TmpDestShuffleFile(tmpDataFile, dataFile) + )).asScala(); } @VisibleForTesting @@ -248,8 +261,7 @@ void forceSorterToSpill() throws IOException { * * @return the partition lengths in the merged file. */ - private long[] mergeSpills(SpillInfo[] spills) throws IOException { - final File outputFile = shuffleBlockResolver.getDataFile(shuffleId, mapId); + private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOException { final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true); final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf); final boolean fastMergeEnabled = @@ -475,4 +487,5 @@ public Option stop(boolean success) { } } } + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index ea97ef0e746d8..7b6d91ca0fabc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -24,7 +24,7 @@ import scala.language.existentials import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD -import org.apache.spark.shuffle.ShuffleWriter +import org.apache.spark.shuffle.{ShuffleOutputCoordinator, ShuffleWriter} /** * A ShuffleMapTask divides the elements of an RDD into multiple buckets (based on a partitioner @@ -70,8 +70,12 @@ private[spark] class ShuffleMapTask( try { val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) - writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) - writer.stop(success = true).get + val tmpToDestFiles = writer.write( + rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) + val mapStatus = writer.stop(success = true).get + // SPARK-8029 make sure only one task on this executor writes the final shuffle files + ShuffleOutputCoordinator.commitOutputs(dep.shuffleId, partitionId, tmpToDestFiles, mapStatus, + SparkEnv.get)._2 } catch { case e: Exception => try { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 114468c48c44c..54da4ae5ee3c8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -615,6 +615,7 @@ private[spark] class TaskSetManager( val index = info.index info.markSuccessful() removeRunningTask(tid) + val task = tasks(index) // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not // "deserialize" the value when holding a lock to avoid blocking other threads. So we call @@ -622,11 +623,14 @@ private[spark] class TaskSetManager( // Note: "result.value()" only deserializes the value when it's called at the first time, so // here "result.value()" just returns the value and won't block other threads. sched.dagScheduler.taskEnded( - tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics) + task, Success, result.value(), result.accumUpdates, info, result.metrics) if (!successful(index)) { tasksSuccessful += 1 - logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format( - info.id, taskSet.id, info.taskId, info.duration, info.host, tasksSuccessful, numTasks)) + // include the partition here b/c on a stage retry, the partition is *not* necessarily + // the same as info.id + logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID ${info.taskId}}, " + + s"partition ${task.partitionId}) in ${info.duration} ms on executor ${info.executorId} " + + s"(${info.host}) ($tasksSuccessful/$numTasks)") // Mark successful and stop if all the tasks have succeeded. successful(index) = true if (tasksSuccessful == numTasks) { diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala index cd253a78c2b19..1f2bf387e9b53 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala @@ -17,6 +17,7 @@ package org.apache.spark.shuffle +import java.io.File import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.JavaConverters._ @@ -31,7 +32,7 @@ import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedH /** A group of writers for a ShuffleMapTask, one writer per reducer. */ private[spark] trait ShuffleWriterGroup { - val writers: Array[DiskBlockObjectWriter] + val writers: Array[(DiskBlockObjectWriter, File)] /** @param success Indicates all writes were successful. If false, no blocks will be recorded. */ def releaseWriters(success: Boolean) @@ -80,10 +81,11 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf) val openStartTime = System.nanoTime val serializerInstance = serializer.newInstance() - val writers: Array[DiskBlockObjectWriter] = { - Array.tabulate[DiskBlockObjectWriter](numReducers) { bucketId => + val writers: Array[(DiskBlockObjectWriter, File)] = { + Array.tabulate[(DiskBlockObjectWriter, File)](numReducers) { bucketId => val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) val blockFile = blockManager.diskBlockManager.getFile(blockId) + val tmpBlockFile = ShuffleWriter.tmpShuffleFile(blockFile) // Because of previous failures, the shuffle file may already exist on this machine. // If so, remove it. if (blockFile.exists) { @@ -93,8 +95,8 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf) logWarning(s"Failed to remove existing shuffle file $blockFile") } } - blockManager.getDiskWriter(blockId, blockFile, serializerInstance, bufferSize, - writeMetrics) + blockManager.getDiskWriter(blockId, tmpBlockFile, serializerInstance, bufferSize, + writeMetrics) -> blockFile } } // Creating the file to write to and creating a disk writer both involve interacting with @@ -132,6 +134,13 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf) logWarning(s"Error deleting ${file.getPath()}") } } + for (mapId <- state.completedMapTasks.asScala) { + val mapStatusFile = + blockManager.diskBlockManager.getFile(ShuffleMapStatusBlockId(shuffleId, mapId)) + if (mapStatusFile.exists() && !mapStatusFile.delete()) { + logWarning(s"Error deleting MapStatus file ${mapStatusFile.getPath()}") + } + } logInfo("Deleted all files for shuffle " + shuffleId) true case None => diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 5e4c2b5d0a5c4..be6cd20f47dc1 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -51,7 +51,7 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID)) } - private def getIndexFile(shuffleId: Int, mapId: Int): File = { + private[shuffle] def getIndexFile(shuffleId: Int, mapId: Int): File = { blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID)) } @@ -72,16 +72,20 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB logWarning(s"Error deleting index ${file.getPath()}") } } + + file = blockManager.diskBlockManager.getFile(ShuffleMapStatusBlockId(shuffleId, mapId)) + if (file.exists() && !file.delete()) { + logWarning(s"Error deleting MapStatus file ${file.getPath()}") + } } /** * Write an index file with the offsets of each block, plus a final offset at the end for the * end of the output file. This will be used by getBlockData to figure out where each block - * begins and ends. - * */ - def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]): Unit = { - val indexFile = getIndexFile(shuffleId, mapId) - val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile))) + * begins and ends. Writes to a temp file, and returns that file. + */ + def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long], tmpIndexFile: File): Unit = { + val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(tmpIndexFile))) Utils.tryWithSafeFinally { // We take in lengths of each block, need to convert it to offsets. var offset = 0L diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleOutputCoordinator.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleOutputCoordinator.scala new file mode 100644 index 0000000000000..0cc75f5e2dee4 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleOutputCoordinator.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle + +import java.io.{File, FileInputStream, FileOutputStream, IOException} + +import org.apache.spark.{Logging, SparkEnv} +import org.apache.spark.scheduler.MapStatus +import org.apache.spark.serializer.SerializerInstance +import org.apache.spark.storage.ShuffleMapStatusBlockId +import org.apache.spark.util.Utils + +/** + * Ensures that on each executor, there are no conflicting writes to the same shuffle files. It + * implements "first write wins", by atomically moving all shuffle files into their final location, + * only if the files did not already exist. See SPARK-8029 + */ +private[spark] object ShuffleOutputCoordinator extends Logging { + + /** + * If any of the destination files do not exist, then move all of the temporary files to their + * destinations, and return (true, the given MapStatus). If all destination files exist, then + * delete all temporary files, and return (false, the MapStatus from previously committed shuffle + * output). + * + * Note that this will write to all destination files. If the tmp file is missing, then a + * zero-length destination file will be created. This is so the ShuffleOutputCoordinator can work + * even when there is non-determinstic data, where the output exists in one attempt, but is + * empty in another attempt. + * + * @param tmpToDest Seq of (temporary, destination) file pairs + * @param mapStatus the [[MapStatus]] for the output already written to the the temporary files + * @return pair of: (1) true iff the set of temporary files was moved to the destination and (2) + * the MapStatus of the committed attempt. + */ + def commitOutputs( + shuffleId: Int, + partitionId: Int, + tmpToDest: Seq[TmpDestShuffleFile], + mapStatus: MapStatus, + sparkEnv: SparkEnv): (Boolean, MapStatus) = synchronized { + val mapStatusFile = sparkEnv.blockManager.diskBlockManager.getFile( + ShuffleMapStatusBlockId(shuffleId, partitionId)) + val ser = sparkEnv.serializer.newInstance() + commitOutputs(shuffleId, partitionId, tmpToDest, mapStatus, mapStatusFile, ser) + } + + /** Exposed for testing. */ + def commitOutputs( + shuffleId: Int, + partitionId: Int, + tmpToDest: Seq[TmpDestShuffleFile], + mapStatus: MapStatus, + mapStatusFile: File, + serializer: SerializerInstance): (Boolean, MapStatus) = synchronized { + // due to SPARK-4085, we only consider the previous attempt "committed" if all its output + // files are present + val destAlreadyExists = tmpToDest.forall(_.dstFile.exists()) && mapStatusFile.exists() + if (!destAlreadyExists) { + tmpToDest.foreach { case TmpDestShuffleFile(tmp, dest) => + // If *some* of the destination files exist, but not all of them, then it's not clear + // what to do. There could be a task already reading from this dest file when we delete + // it -- but then again, something in that taskset would be doomed to fail in any case when + // it got to the missing files. Better to just put consistent output into place. + // Note that for this to work with non-determinstic data, it is *critical* that each + // attempt always produces the exact same set of destination files (even if they are empty). + if (dest.exists()) { + dest.delete() + } + if (tmp.exists()) { + tmp.renameTo(dest) + } else { + // we always create the destination files, so this works correctly even when the + // input data is non-deterministic (potentially empty in one iteration, and non-empty + // in another) + if (!dest.createNewFile()) { + throw new IOException("could not create file: $file") + } + } + } + val out = serializer.serializeStream(new FileOutputStream(mapStatusFile)) + out.writeObject(mapStatus) + out.close() + (true, mapStatus) + } else { + logInfo(s"shuffle output for shuffle $shuffleId, partition $partitionId already exists, " + + s"not overwriting. Another task must have created this shuffle output.") + tmpToDest.foreach { tmpAndDest => tmpAndDest.tmpFile.delete() } + val in = serializer.deserializeStream(new FileInputStream(mapStatusFile)) + Utils.tryWithSafeFinally { + (false, in.readObject[MapStatus]()) + } { + in.close() + } + } + } +} diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala index 4cc4ef5f1886e..e84c4f377a621 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala @@ -17,7 +17,8 @@ package org.apache.spark.shuffle -import java.io.IOException +import java.io.{File, IOException} +import java.util.UUID import org.apache.spark.scheduler.MapStatus @@ -25,10 +26,44 @@ import org.apache.spark.scheduler.MapStatus * Obtained inside a map task to write out records to the shuffle system. */ private[spark] abstract class ShuffleWriter[K, V] { - /** Write a sequence of records to this task's output */ + /** + * Write a sequence of records to this task's output. This should write all data + * to temporary files, but return (temporaryFile, destinationFile) pairs for each + * file written. The temporary files will get moved to their destination or deleted + * by the [[ShuffleOutputCoordinator]]. Note that for the ShuffleOutputCoordinator + * to work correctly, each attempt *must* have the exact same set of destination files. + * If the temporary file is empty, the ShuffleWriter does not have to create the file -- however + * it *must* still be in the result Seq, just pointing to a non-existent file. + */ @throws[IOException] - def write(records: Iterator[Product2[K, V]]): Unit + def write(records: Iterator[Product2[K, V]]): Seq[TmpDestShuffleFile] /** Close this writer, passing along whether the map completed */ def stop(success: Boolean): Option[MapStatus] + + /** + * Returns the file with a random UUID appended. Useful for getting a tmp file in the same + * dir which can be atomically renamed to final destination file. + */ + def tmpShuffleFile(file: File): File = ShuffleWriter.tmpShuffleFile(file) +} + + +private[spark] object ShuffleWriter { + /** + * Returns the file with a random UUID appended. Useful for getting a tmp file in the same + * dir which can be atomically renamed to final destination file. + */ + def tmpShuffleFile(file: File): File = new File(file.getAbsolutePath + "." + UUID.randomUUID()) } + +/** + * The location of one shuffle file written by a [[ShuffleWriter]]. Holds both the temporary + * file, which is written to by the ShuffleWriter itself, and the destination file, where the + * file should get moved by the [[ShuffleOutputCoordinator]]. The ShuffleWriter is responsible + * for specifying both locations, though it only writes the temp file. + */ +private[shuffle] case class TmpDestShuffleFile( + val tmpFile: File, + val dstFile: File +) diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala index 41df70c602c30..011270c050459 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala @@ -17,12 +17,14 @@ package org.apache.spark.shuffle.hash +import java.io.File + import org.apache.spark._ import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.scheduler.MapStatus import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle._ -import org.apache.spark.storage.DiskBlockObjectWriter +import org.apache.spark.storage.{DiskBlockObjectWriter, ShuffleMapStatusBlockId} private[spark] class HashShuffleWriter[K, V]( shuffleBlockResolver: FileShuffleBlockResolver, @@ -49,7 +51,7 @@ private[spark] class HashShuffleWriter[K, V]( writeMetrics) /** Write a bunch of records to this task's output */ - override def write(records: Iterator[Product2[K, V]]): Unit = { + override def write(records: Iterator[Product2[K, V]]): Seq[TmpDestShuffleFile] = { val iter = if (dep.aggregator.isDefined) { if (dep.mapSideCombine) { dep.aggregator.get.combineValuesByKey(records, context) @@ -63,8 +65,9 @@ private[spark] class HashShuffleWriter[K, V]( for (elem <- iter) { val bucketId = dep.partitioner.getPartition(elem._1) - shuffle.writers(bucketId).write(elem._1, elem._2) + shuffle.writers(bucketId)._1.write(elem._1, elem._2) } + shuffle.writers.map { case (writer, destFile) => TmpDestShuffleFile(writer.file, destFile) } } /** Close this writer, passing along whether the map completed */ @@ -102,7 +105,7 @@ private[spark] class HashShuffleWriter[K, V]( private def commitWritesAndBuildStatus(): MapStatus = { // Commit the writes. Get the size of each bucket block (total block size). - val sizes: Array[Long] = shuffle.writers.map { writer: DiskBlockObjectWriter => + val sizes: Array[Long] = shuffle.writers.map { case (writer: DiskBlockObjectWriter, _) => writer.commitAndClose() writer.fileSegment().length } @@ -112,7 +115,7 @@ private[spark] class HashShuffleWriter[K, V]( private def revertWrites(): Unit = { if (shuffle != null && shuffle.writers != null) { for (writer <- shuffle.writers) { - writer.revertPartialWritesAndClose() + writer._1.revertPartialWritesAndClose() } } } diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 808317b017a0f..f10ca6adb92e3 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -20,7 +20,7 @@ package org.apache.spark.shuffle.sort import org.apache.spark._ import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.scheduler.MapStatus -import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriter, BaseShuffleHandle} +import org.apache.spark.shuffle.{BaseShuffleHandle, IndexShuffleBlockResolver, ShuffleWriter, TmpDestShuffleFile} import org.apache.spark.storage.ShuffleBlockId import org.apache.spark.util.collection.ExternalSorter @@ -48,7 +48,7 @@ private[spark] class SortShuffleWriter[K, V, C]( context.taskMetrics.shuffleWriteMetrics = Some(writeMetrics) /** Write a bunch of records to this task's output */ - override def write(records: Iterator[Product2[K, V]]): Unit = { + override def write(records: Iterator[Product2[K, V]]): Seq[TmpDestShuffleFile] = { sorter = if (dep.mapSideCombine) { require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") new ExternalSorter[K, V, C]( @@ -65,12 +65,19 @@ private[spark] class SortShuffleWriter[K, V, C]( // Don't bother including the time to open the merged output file in the shuffle write time, // because it just opens a single file, so is typically too fast to measure accurately // (see SPARK-3570). - val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) + val dataFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) + val indexFile = shuffleBlockResolver.getIndexFile(dep.shuffleId, mapId) + val tmpDataFile = tmpShuffleFile(dataFile) + val tmpIndexFile = tmpShuffleFile(indexFile) val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) - val partitionLengths = sorter.writePartitionedFile(blockId, outputFile) - shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths) + val partitionLengths = sorter.writePartitionedFile(blockId, tmpDataFile) + shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths, tmpIndexFile) mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) + Seq( + TmpDestShuffleFile(tmpDataFile, dataFile), + TmpDestShuffleFile(tmpIndexFile, indexFile) + ) } /** Close this writer, passing along whether the map completed */ @@ -98,6 +105,7 @@ private[spark] class SortShuffleWriter[K, V, C]( } } } + } private[spark] object SortShuffleWriter { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index 524f6970992a5..1eaf43f23cefa 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -60,6 +60,16 @@ case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends Blo override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId } +/** + * Used to get the canonical filename for the [[org.apache.spark.shuffle.ShuffleOutputCoordinator]] + * to store the MapStatus between attempts. See ShuffleOutputCoordinator for more details. Note + * that this "block" is never shared between executors, its just used between tasks on one executor. + * Its just a convenient way to get a canonical file to store this data. + */ +case class ShuffleMapStatusBlockId(shuffleId: Int, mapId: Int) extends BlockId { + override def name: String = "shuffle_" + shuffleId + "_" + mapId + ".mapstatus" +} + @DeveloperApi case class ShuffleDataBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId { override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data" @@ -106,6 +116,7 @@ object BlockId { val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r val SHUFFLE_DATA = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).data".r val SHUFFLE_INDEX = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).index".r + val SHUFFLE_MAPSTATUS = "shuffle_([0-9]+)_([0-9]+).mapstatus".r val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r val TASKRESULT = "taskresult_([0-9]+)".r val STREAM = "input-([0-9]+)-([0-9]+)".r @@ -121,6 +132,8 @@ object BlockId { ShuffleDataBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) case SHUFFLE_INDEX(shuffleId, mapId, reduceId) => ShuffleIndexBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) + case SHUFFLE_MAPSTATUS(shuffleId, mapId) => + ShuffleMapStatusBlockId(shuffleId.toInt, mapId.toInt) case BROADCAST(broadcastId, field) => BroadcastBlockId(broadcastId.toLong, field.stripPrefix("_")) case TASKRESULT(taskId) => diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index 80d426fadc65e..4469bb22f3e10 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -34,7 +34,7 @@ import org.apache.spark.util.Utils * reopened again. */ private[spark] class DiskBlockObjectWriter( - file: File, + val file: File, serializerInstance: SerializerInstance, bufferSize: Int, compressStream: OutputStream => OutputStream, diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index bd6844d045cad..2440139ac95e9 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -638,7 +638,6 @@ private[spark] class ExternalSorter[K, V, C]( * called by the SortShuffleWriter. * * @param blockId block ID to write to. The index file will be blockId.name + ".index". - * @param context a TaskContext for a running Spark task, for us to update shuffle metrics. * @return array of lengths, in bytes, of each partition of the file (used by map output tracker) */ def writePartitionedFile( diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 0e0eca515afc1..9b48425d812a9 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -23,6 +23,7 @@ import scala.*; import scala.collection.Iterator; +import scala.collection.Seq; import scala.runtime.AbstractFunction1; import com.google.common.collect.Iterators; @@ -49,13 +50,15 @@ import org.apache.spark.io.SnappyCompressionCodec; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.memory.TestMemoryManager; import org.apache.spark.network.util.LimitedInputStream; import org.apache.spark.serializer.*; import org.apache.spark.scheduler.MapStatus; import org.apache.spark.shuffle.IndexShuffleBlockResolver; +import org.apache.spark.shuffle.ShuffleOutputCoordinator; +import org.apache.spark.shuffle.TmpDestShuffleFile; import org.apache.spark.storage.*; -import org.apache.spark.memory.TestMemoryManager; -import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.util.Utils; public class UnsafeShuffleWriterSuite { @@ -65,6 +68,8 @@ public class UnsafeShuffleWriterSuite { TaskMemoryManager taskMemoryManager; final HashPartitioner hashPartitioner = new HashPartitioner(NUM_PARTITITONS); File mergedOutputFile; + File indexFile; + File mapStatusFile; File tempDir; long[] partitionSizesInMergedFile; final LinkedList spillFilesCreated = new LinkedList(); @@ -104,6 +109,12 @@ public void setUp() throws IOException { MockitoAnnotations.initMocks(this); tempDir = Utils.createTempDir("test", "test"); mergedOutputFile = File.createTempFile("mergedoutput", "", tempDir); + indexFile = File.createTempFile("shuffle",".index", tempDir); + mapStatusFile = File.createTempFile("shuffle", ".mapstatus", tempDir); + // the ShuffleOutputCoordinator requires that this file does not exist + mergedOutputFile.delete(); + indexFile.delete(); + mapStatusFile.delete(); partitionSizesInMergedFile = null; spillFilesCreated.clear(); conf = new SparkConf() @@ -163,15 +174,18 @@ public OutputStream answer(InvocationOnMock invocation) throws Throwable { } } ); + when(blockManager.shuffleServerId()).thenReturn(BlockManagerId$.MODULE$.apply("1", "a.b.c", 1)); when(shuffleBlockResolver.getDataFile(anyInt(), anyInt())).thenReturn(mergedOutputFile); + when(shuffleBlockResolver.getIndexFile(anyInt(), anyInt())).thenReturn(indexFile); doAnswer(new Answer() { @Override public Void answer(InvocationOnMock invocationOnMock) throws Throwable { partitionSizesInMergedFile = (long[]) invocationOnMock.getArguments()[2]; return null; } - }).when(shuffleBlockResolver).writeIndexFile(anyInt(), anyInt(), any(long[].class)); + }).when(shuffleBlockResolver) + .writeIndexFile(anyInt(), anyInt(), any(long[].class), any(File.class)); when(diskBlockManager.createTempShuffleBlock()).thenAnswer( new Answer>() { @@ -269,9 +283,11 @@ class BadRecords extends scala.collection.AbstractIterator writer = createWriter(true); - writer.write(Iterators.>emptyIterator()); + Seq files = writer.write(Iterators.>emptyIterator()); final Option mapStatus = writer.stop(true); assertTrue(mapStatus.isDefined()); + ShuffleOutputCoordinator.commitOutputs(0, 0, files, mapStatus.get(), mapStatusFile, + serializer.newInstance()); assertTrue(mergedOutputFile.exists()); assertArrayEquals(new long[NUM_PARTITITONS], partitionSizesInMergedFile); assertEquals(0, taskMetrics.shuffleWriteMetrics().get().shuffleRecordsWritten()); @@ -289,8 +305,10 @@ public void writeWithoutSpilling() throws Exception { dataToWrite.add(new Tuple2(i, i)); } final UnsafeShuffleWriter writer = createWriter(true); - writer.write(dataToWrite.iterator()); + Seq files = writer.write(dataToWrite.iterator()); final Option mapStatus = writer.stop(true); + ShuffleOutputCoordinator.commitOutputs(0, 0, files, mapStatus.get(), mapStatusFile, + serializer.newInstance()); assertTrue(mapStatus.isDefined()); assertTrue(mergedOutputFile.exists()); @@ -334,8 +352,10 @@ private void testMergingSpills( writer.forceSorterToSpill(); writer.insertRecordIntoSorter(dataToWrite.get(4)); writer.insertRecordIntoSorter(dataToWrite.get(5)); - writer.closeAndWriteOutput(); + Seq files = writer.closeAndWriteOutput(); final Option mapStatus = writer.stop(true); + ShuffleOutputCoordinator.commitOutputs(0, 0, files, mapStatus.get(), mapStatusFile, + serializer.newInstance()); assertTrue(mapStatus.isDefined()); assertTrue(mergedOutputFile.exists()); assertEquals(2, spillFilesCreated.size()); @@ -405,9 +425,11 @@ public void writeEnoughDataToTriggerSpill() throws Exception { for (int i = 0; i < 10 + 1; i++) { dataToWrite.add(new Tuple2(i, bigByteArray)); } - writer.write(dataToWrite.iterator()); + Seq files = writer.write(dataToWrite.iterator()); assertEquals(2, spillFilesCreated.size()); - writer.stop(true); + MapStatus mapStatus = writer.stop(true).get(); + ShuffleOutputCoordinator.commitOutputs(0, 0, files, mapStatus, mapStatusFile, + serializer.newInstance()); readRecordsFromFile(); assertSpillFilesWereCleanedUp(); ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get(); @@ -426,9 +448,11 @@ public void writeEnoughRecordsToTriggerSortBufferExpansionAndSpill() throws Exce for (int i = 0; i < UnsafeShuffleWriter.INITIAL_SORT_BUFFER_SIZE + 1; i++) { dataToWrite.add(new Tuple2(i, i)); } - writer.write(dataToWrite.iterator()); + Seq files = writer.write(dataToWrite.iterator()); assertEquals(2, spillFilesCreated.size()); - writer.stop(true); + MapStatus mapStatus = writer.stop(true).get(); + ShuffleOutputCoordinator.commitOutputs(0, 0, files, mapStatus, mapStatusFile, + serializer.newInstance()); readRecordsFromFile(); assertSpillFilesWereCleanedUp(); ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get(); @@ -447,8 +471,10 @@ public void writeRecordsThatAreBiggerThanDiskWriteBufferSize() throws Exception final byte[] bytes = new byte[(int) (ShuffleExternalSorter.DISK_WRITE_BUFFER_SIZE * 2.5)]; new Random(42).nextBytes(bytes); dataToWrite.add(new Tuple2(1, ByteBuffer.wrap(bytes))); - writer.write(dataToWrite.iterator()); - writer.stop(true); + Seq files = writer.write(dataToWrite.iterator()); + MapStatus mapStatus = writer.stop(true).get(); + ShuffleOutputCoordinator.commitOutputs(0, 0, files, mapStatus, mapStatusFile, + serializer.newInstance()); assertEquals( HashMultiset.create(dataToWrite), HashMultiset.create(readRecordsFromFile())); @@ -468,8 +494,10 @@ public void writeRecordsThatAreBiggerThanMaxRecordSize() throws Exception { final byte[] exceedsMaxRecordSize = new byte[(int) taskMemoryManager.pageSizeBytes()]; new Random(42).nextBytes(exceedsMaxRecordSize); dataToWrite.add(new Tuple2(3, ByteBuffer.wrap(exceedsMaxRecordSize))); - writer.write(dataToWrite.iterator()); - writer.stop(true); + Seq files = writer.write(dataToWrite.iterator()); + MapStatus mapStatus = writer.stop(true).get(); + ShuffleOutputCoordinator.commitOutputs(0, 0, files, mapStatus, mapStatusFile, + serializer.newInstance()); assertEquals( HashMultiset.create(dataToWrite), HashMultiset.create(readRecordsFromFile())); diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index 4a0877d86f2c6..6d5d7dd5f5c2d 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -17,14 +17,23 @@ package org.apache.spark +import java.io.File +import java.util.concurrent.{Callable, CyclicBarrier, ExecutorService, Executors} + +import scala.collection.JavaConverters._ + +import org.apache.commons.io.FileUtils +import org.apache.commons.io.filefilter.TrueFileFilter import org.scalatest.Matchers import org.apache.spark.ShuffleSuite.NonJavaSerializableClass +import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD, SubtractedRDD} -import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} -import org.apache.spark.serializer.KryoSerializer -import org.apache.spark.storage.{ShuffleDataBlockId, ShuffleBlockId} -import org.apache.spark.util.MutablePair +import org.apache.spark.scheduler.{MapStatus, MyRDD, SparkListener, SparkListenerTaskEnd} +import org.apache.spark.serializer.{KryoSerializer, Serializer} +import org.apache.spark.shuffle.{ShuffleOutputCoordinator, ShuffleWriter} +import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId, ShuffleMapStatusBlockId} +import org.apache.spark.util.{MutablePair, Utils} abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkContext { @@ -88,33 +97,16 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC } test("zero sized blocks") { - // Use a local cluster with 2 processes to make sure there are both local and remote blocks - sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) - - // 201 partitions (greater than "spark.shuffle.sort.bypassMergeThreshold") from 4 keys - val NUM_BLOCKS = 201 - val a = sc.parallelize(1 to 4, NUM_BLOCKS) - val b = a.map(x => (x, x*2)) - // NOTE: The default Java serializer doesn't create zero-sized blocks. // So, use Kryo - val c = new ShuffledRDD[Int, Int, Int](b, new HashPartitioner(NUM_BLOCKS)) - .setSerializer(new KryoSerializer(conf)) - - val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId - assert(c.count === 4) - - val blockSizes = (0 until NUM_BLOCKS).flatMap { id => - val statuses = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(shuffleId, id) - statuses.flatMap(_._2.map(_._2)) - } - val nonEmptyBlocks = blockSizes.filter(x => x > 0) - - // We should have at most 4 non-zero sized partitions - assert(nonEmptyBlocks.size <= 4) + testZeroSizedBlocks(Some(new KryoSerializer(conf))) } test("zero sized blocks without kryo") { + testZeroSizedBlocks(None) + } + + def testZeroSizedBlocks(serOpt: Option[Serializer]): Unit = { // Use a local cluster with 2 processes to make sure there are both local and remote blocks sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) @@ -123,8 +115,8 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC val a = sc.parallelize(1 to 4, NUM_BLOCKS) val b = a.map(x => (x, x*2)) - // NOTE: The default Java serializer should create zero-sized blocks val c = new ShuffledRDD[Int, Int, Int](b, new HashPartitioner(NUM_BLOCKS)) + serOpt.foreach(c.setSerializer(_)) val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId assert(c.count === 4) @@ -317,6 +309,156 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC assert(metrics.bytesWritten === metrics.byresRead) assert(metrics.bytesWritten > 0) } + + /** + * ShuffleOutputCoordinator requires that a given shuffle always generate the same set of files on + * all attempts, even if the data is non-deterministic. We test the extreme case where the data + * is completely missing in one case + */ + test("same set of shuffle files regardless of data") { + + def shuffleAndGetShuffleFiles(data: Seq[Int]): Map[String, Long] = { + var tempDir: File = null + try { + tempDir = Utils.createTempDir() + conf.set("spark.local.dir", tempDir.getAbsolutePath) + sc = new SparkContext("local", "test", conf) + val rdd = sc.parallelize(data, 10).map(x => (x, x)) + val shuffledRdd = new ShuffledRDD[Int, Int, Int](rdd, new HashPartitioner(4)) + shuffledRdd.count() + val shuffleFiles = + FileUtils.listFiles(tempDir, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE) + sc.stop() + shuffleFiles.asScala.map { file: File => + file.getName -> file.length() + }.toMap + } finally { + conf.remove("spark.local.dir") + Utils.deleteRecursively(tempDir) + } + } + + val shuffleFilesWithData = shuffleAndGetShuffleFiles(0 until 100) + val shuffleFilesNoData = shuffleAndGetShuffleFiles(Seq[Int]()) + assert(shuffleFilesNoData.keySet === shuffleFilesWithData.keySet) + // make sure our test is doing what it is supposed to -- at least some of the + // "no data" files are empty + assert(shuffleFilesNoData.filter{ case (name, size) => + size == 0L && shuffleFilesWithData(name) != 0L + }.nonEmpty) + } + + test("multiple simultaneous attempts for one task (SPARK-8029)") { + sc = new SparkContext("local", "test", conf) + val mapStatusFile = sc.env.blockManager.diskBlockManager.getFile(ShuffleMapStatusBlockId(0, 0)) + val mapTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + val manager = sc.env.shuffleManager + + val taskMemoryManager = new TaskMemoryManager(sc.env.memoryManager, 0L) + val metricsSystem = sc.env.metricsSystem + val shuffleMapRdd = new MyRDD(sc, 1, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1)) + val shuffleHandle = manager.registerShuffle(0, 1, shuffleDep) + + // first attempt -- its successful + val writer1 = manager.getWriter[Int, Int](shuffleHandle, 0, + new TaskContextImpl(0, 0, 0L, 0, taskMemoryManager, metricsSystem, + InternalAccumulator.create(sc))) + val data1 = (1 to 10).map { x => x -> x} + + // second attempt -- also successful. We'll write out different data, + // just to simulate the fact that the records may get written differently + // depending on what gets spilled, what gets combined, etc. + val writer2 = manager.getWriter[Int, Int](shuffleHandle, 0, + new TaskContextImpl(0, 0, 1L, 0, taskMemoryManager, metricsSystem, + InternalAccumulator.create(sc))) + val data2 = (11 to 20).map { x => x -> x} + + // interleave writes of both attempts -- we want to test that both attempts can occur + // simultaneously, and everything is still OK + + def writeAndClose( + writer: ShuffleWriter[Int, Int])( + iter: Iterator[(Int, Int)]): Option[(Boolean, MapStatus)] = { + val files = writer.write(iter) + val output = writer.stop(true) + output.map(ShuffleOutputCoordinator.commitOutputs(0, 0, files, _, SparkEnv.get)) + } + val interleaver = new InterleaveIterators( + data1, writeAndClose(writer1), data2, writeAndClose(writer2)) + + val (mapOutput1, mapOutput2) = interleaver.run() + + + // check that we can read the map output and it has the right data (can be either from + // either task, but must be consistent) + assert(mapOutput1.isDefined) + assert(mapOutput2.isDefined) + // exactly one succeeded + assert(mapOutput1.get._1 ^ mapOutput2.get._1) + // The mapstatuses should be equivalent, but not the same object, since they will be + // deserialized independently. Unfortunately we can't check that they are the same since + // MapStatus doesn't override equals() + + // register one of the map outputs -- doesn't matter which one + mapOutput1.foreach { case (_, mapStatus) => + mapTrackerMaster.registerMapOutputs(0, Array(mapStatus)) + } + + val reader = manager.getReader[Int, Int](shuffleHandle, 0, 1, + new TaskContextImpl(1, 0, 2L, 0, taskMemoryManager, metricsSystem, + InternalAccumulator.create(sc))) + val readData = reader.read().toIndexedSeq + assert(readData === data1.toIndexedSeq || readData === data2.toIndexedSeq) + + assert(mapStatusFile.exists()) + manager.unregisterShuffle(0) + assert(!mapStatusFile.exists(), s"$mapStatusFile did not get deleted") + } + +} + +/** + * Utility to help tests make sure that we can process two different iterators simultaneously + * in different threads. This makes sure that in your test, you don't completely process data1 with + * f1 before processing data2 with f2 (or vice versa). It adds a barrier so that the functions only + * process one element, before pausing to wait for the other function to "catch up". + */ +class InterleaveIterators[T, R]( + data1: Seq[T], + f1: Iterator[T] => R, + data2: Seq[T], + f2: Iterator[T] => R) { + + require(data1.size == data2.size) + + val barrier = new CyclicBarrier(2) + class BarrierIterator[E](id: Int, sub: Iterator[E]) extends Iterator[E] { + def hasNext: Boolean = sub.hasNext + + def next: E = { + barrier.await() + sub.next() + } + } + + val c1 = new Callable[R] { + override def call(): R = f1(new BarrierIterator(1, data1.iterator)) + } + val c2 = new Callable[R] { + override def call(): R = f2(new BarrierIterator(2, data2.iterator)) + } + + val e: ExecutorService = Executors.newFixedThreadPool(2) + + def run(): (R, R) = { + val future1 = e.submit(c1) + val future2 = e.submit(c2) + val r1 = future1.get() + val r2 = future2.get() + e.shutdown() + (r1, r2) + } } object ShuffleSuite { diff --git a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala index b8ab227517cc4..67f9c006929a0 100644 --- a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala @@ -26,8 +26,8 @@ import org.apache.commons.io.filefilter.TrueFileFilter import org.scalatest.BeforeAndAfterAll import org.apache.spark.rdd.ShuffledRDD -import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} +import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.util.Utils class SortShuffleSuite extends ShuffleSuite with BeforeAndAfterAll { @@ -83,8 +83,8 @@ class SortShuffleSuite extends ShuffleSuite with BeforeAndAfterAll { shuffledRdd.count() // Ensure that the shuffle actually created files that will need to be cleaned up val filesCreatedByShuffle = getAllFiles -- filesBeforeShuffle - filesCreatedByShuffle.map(_.getName) should be - Set("shuffle_0_0_0.data", "shuffle_0_0_0.index") + filesCreatedByShuffle.map(_.getName) should be (Set( + "shuffle_0_0_0.data", "shuffle_0_0_0.index", "shuffle_0_0.mapstatus")) // Check that the cleanup actually removes the files sc.env.blockManager.master.removeShuffle(0, blocking = true) for (file <- filesCreatedByShuffle) { diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleOutputCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleOutputCoordinatorSuite.scala new file mode 100644 index 0000000000000..62ccb6932a395 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleOutputCoordinatorSuite.scala @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle + +import java.io.{File, FileInputStream, FileOutputStream} + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.scheduler.MapStatus +import org.apache.spark.serializer.{JavaSerializer, SerializerInstance} +import org.apache.spark.storage.BlockManagerId +import org.apache.spark.util.Utils + +class ShuffleOutputCoordinatorSuite extends SparkFunSuite with BeforeAndAfterEach { + + var tempDir: File = _ + var mapStatusFile: File = _ + // use the "port" as a way to distinguish mapstatuses, just for the test + def mapStatus(attemptId: Int): MapStatus = { + MapStatus(BlockManagerId("1", "a.b.c", attemptId), Array(0L, 1L)) + } + def ser: SerializerInstance = new JavaSerializer(new SparkConf()).newInstance() + + override def beforeEach(): Unit = { + tempDir = Utils.createTempDir() + mapStatusFile = File.createTempFile("shuffle", ".mapstatus", tempDir) + mapStatusFile.delete() + } + + override def afterEach(): Unit = { + Utils.deleteRecursively(tempDir) + } + + private def writeFile(filename: String, data: Int): File = { + val f = new File(tempDir, filename) + val out = new FileOutputStream(f) + out.write(data) + out.close() + f + } + + private def verifyFiles(successfulAttempt: Int): Unit = { + (0 until 3).foreach { idx => + val exp = successfulAttempt* 3 + idx + val file = new File(tempDir, s"d$idx") + withClue(s"checking dest file $file") { + assert(file.length === 1) + val in = new FileInputStream(file) + assert(in.read() === exp) + in.close() + + } + } + } + + private def generateAttempt(attempt: Int): Seq[TmpDestShuffleFile] = { + (0 until 3).map { idx => + val j = attempt * 3 + idx + TmpDestShuffleFile(writeFile(s"t$j", j), new File(tempDir, s"d$idx")) + } + } + + private def commit(files: Seq[TmpDestShuffleFile], attemptId: Int): (Boolean, MapStatus) = { + ShuffleOutputCoordinator.commitOutputs(0, 0, files, mapStatus(attemptId), mapStatusFile, ser) + } + + test("move files if dest missing") { + val firstAttempt = generateAttempt(0) + val firstCommit = commit(firstAttempt, 1) + assert(firstCommit._1) + // "port" is just our holder for the attempt that succeeded in this test setup + assert(firstCommit._2.location.port === 1) + verifyFiles(0) + firstAttempt.foreach{ case TmpDestShuffleFile(t, d) => assert(!t.exists())} + + val secondAttempt = generateAttempt(1) + // second commit fails, and also deletes the tmp files + val secondCommit = commit(secondAttempt, 2) + assert(!secondCommit._1) + // still the mapstatus from the first commit + assert(firstCommit._2.location.port === 1) + verifyFiles(0) + // make sure we delete the temp files if the dest exists + secondAttempt.foreach{ case TmpDestShuffleFile(t, d) => assert(!t.exists())} + } + + test("move files if just map status file missing") { + val firstAttempt = generateAttempt(0) + val firstCommit = commit(firstAttempt, 1) + assert(firstCommit._1) + // "port" is just our holder for the attempt that succeeded in this test setup + assert(firstCommit._2.location.port === 1) + verifyFiles(0) + firstAttempt.foreach{ case TmpDestShuffleFile(t, d) => assert(!t.exists())} + + val secondAttempt = generateAttempt(1) + mapStatusFile.delete() + // second commit now succeeds since one destination file is missing + val secondCommit = commit(secondAttempt, 2) + assert(secondCommit._1) + assert(secondCommit._2.location.port === 2) + verifyFiles(1) + secondAttempt.foreach{ case TmpDestShuffleFile(t, d) => assert(!t.exists())} + } + + test("missing tmp files become zero-length destination files") { + val extraDestFile = new File(tempDir, "blah") + val firstAttempt = generateAttempt(0) ++ + Seq(TmpDestShuffleFile(new File(tempDir, "bogus"), extraDestFile)) + assert(commit(firstAttempt, 1)._1) + verifyFiles(0) + assert(extraDestFile.exists()) + assert(extraDestFile.length() === 0) + + // if we attempt the move again and *only* the missing tmp file is missing, we still + // do the move + extraDestFile.delete() + val secondAttempt = generateAttempt(1) ++ + Seq(TmpDestShuffleFile(new File(tempDir, "flippy"), extraDestFile)) + assert(commit(secondAttempt, 2)._1) + verifyFiles(1) + assert(extraDestFile.exists()) + assert(extraDestFile.length() === 0) + } +} diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala index b92a302806f76..766f8fb915628 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala @@ -32,9 +32,9 @@ import org.mockito.stubbing.Answer import org.scalatest.BeforeAndAfterEach import org.apache.spark._ -import org.apache.spark.executor.{TaskMetrics, ShuffleWriteMetrics} -import org.apache.spark.shuffle.IndexShuffleBlockResolver +import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics} import org.apache.spark.serializer.{JavaSerializer, SerializerInstance} +import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleOutputCoordinator} import org.apache.spark.storage._ import org.apache.spark.util.Utils @@ -49,6 +49,8 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte private var taskMetrics: TaskMetrics = _ private var tempDir: File = _ private var outputFile: File = _ + private var indexFile: File = _ + private var mapStatusFile: File = _ private val conf: SparkConf = new SparkConf(loadDefaults = false) private val temporaryFilesCreated: mutable.Buffer[File] = new ArrayBuffer[File]() private val blockIdToFileMap: mutable.Map[BlockId, File] = new mutable.HashMap[BlockId, File] @@ -57,6 +59,12 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte override def beforeEach(): Unit = { tempDir = Utils.createTempDir() outputFile = File.createTempFile("shuffle", null, tempDir) + indexFile = File.createTempFile("shuffle", ".index", tempDir) + mapStatusFile = File.createTempFile("shuffle", ".mapstatus", tempDir) + // ShuffleOutputCoordinator requires these files to not exist yet + outputFile.delete() + indexFile.delete() + mapStatusFile.delete() taskMetrics = new TaskMetrics MockitoAnnotations.initMocks(this) shuffleHandle = new BypassMergeSortShuffleHandle[Int, Int]( @@ -68,6 +76,8 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte when(dependency.serializer).thenReturn(Some(new JavaSerializer(conf))) when(taskContext.taskMetrics()).thenReturn(taskMetrics) when(blockResolver.getDataFile(0, 0)).thenReturn(outputFile) + when(blockResolver.getIndexFile(0, 0)).thenReturn(indexFile) + // the index file will be empty, but that is fine for these tests when(blockManager.diskBlockManager).thenReturn(diskBlockManager) when(blockManager.getDiskWriter( any[BlockId], @@ -88,11 +98,14 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte ) } }) + when(blockManager.shuffleServerId).thenReturn(BlockManagerId.apply("1", "a.b.c", 1)) + when(diskBlockManager.createTempShuffleBlock()).thenAnswer( new Answer[(TempShuffleBlockId, File)] { override def answer(invocation: InvocationOnMock): (TempShuffleBlockId, File) = { val blockId = new TempShuffleBlockId(UUID.randomUUID) val file = File.createTempFile(blockId.toString, null, tempDir) + file.delete() blockIdToFileMap.put(blockId, file) temporaryFilesCreated.append(file) (blockId, file) @@ -121,8 +134,10 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte taskContext, conf ) - writer.write(Iterator.empty) - writer.stop( /* success = */ true) + val files = writer.write(Iterator.empty) + val mapStatus = writer.stop( /* success = */ true).get + val ser = new JavaSerializer(conf).newInstance() + assert(ShuffleOutputCoordinator.commitOutputs(0, 0, files, mapStatus, mapStatusFile, ser)._1) assert(writer.getPartitionLengths.sum === 0) assert(outputFile.exists()) assert(outputFile.length() === 0) @@ -145,8 +160,10 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte taskContext, conf ) - writer.write(records) - writer.stop( /* success = */ true) + val files = writer.write(records) + val mapStatus = writer.stop( /* success = */ true).get + val ser = new JavaSerializer(conf).newInstance() + ShuffleOutputCoordinator.commitOutputs(0, 0, files, mapStatus, mapStatusFile, ser) assert(temporaryFilesCreated.nonEmpty) assert(writer.getPartitionLengths.sum === outputFile.length()) assert(temporaryFilesCreated.count(_.exists()) === 0) // check that temporary files were deleted diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala index d7b2d07a40052..d9661b842f7bc 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala @@ -481,15 +481,15 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { intercept[SparkException] { data.reduceByKey(_ + _).count() } - // After the shuffle, there should be only 2 files on disk: the output of task 1 and - // its index. All other files (map 2's output and intermediate merge files) should + // After the shuffle, there should be only 3 files on disk: the output of task 1, its index, + // and mapstatus file. All other files (map 2's output and intermediate merge files) should // have been deleted. - assert(diskBlockManager.getAllFiles().length === 2) + assert(diskBlockManager.getAllFiles().length === 3) } else { assert(data.reduceByKey(_ + _).count() === size) - // After the shuffle, there should be only 4 files on disk: the output of both tasks - // and their indices. All intermediate merge files should have been deleted. - assert(diskBlockManager.getAllFiles().length === 4) + // After the shuffle, there should be only 6 files on disk: the output of both tasks their + // indices, and mapstatus files. All intermediate merge files should have been deleted. + assert(diskBlockManager.getAllFiles().length === 6) } } } diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 50220790d1f84..014ca15f10118 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -110,6 +110,9 @@ object MimaExcludes { ) ++ Seq( ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.SparkContext.preferredNodeLocationData_="), + // SPARK-8029 -- this is a private[spark] method but that has to be public in java + ProblemFilters.exclude[IncompatibleResultTypeProblem]( + "org.apache.spark.shuffle.unsafe.UnsafeShuffleWriter.write"), ProblemFilters.exclude[MissingClassProblem]( "org.apache.spark.rdd.MapPartitionsWithPreparationRDD"), ProblemFilters.exclude[MissingClassProblem]( diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala index 0dc2861253f17..50a68d4116f83 100644 --- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala +++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala @@ -64,9 +64,9 @@ object StoragePerfTester { new KryoSerializer(sc.conf), new ShuffleWriteMetrics()) val writers = shuffle.writers for (i <- 1 to recordsPerMap) { - writers(i % numOutputSplits).write(writeKey, writeValue) + writers(i % numOutputSplits)._1.write(writeKey, writeValue) } - writers.map { w => + writers.map { case (w, _) => w.commitAndClose() total.addAndGet(w.fileSegment().length) }