Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
149ea3e
ShuffleWriters write to temp file, then go through
squito Oct 21, 2015
cf8118e
assorted cleanup
squito Oct 22, 2015
ea1ae07
style
squito Oct 22, 2015
9356c67
fix compilation in StoragePerfTester
squito Oct 22, 2015
2b42eb5
mima
squito Oct 22, 2015
32d4b3b
update UnsafeShuffleWriterSuite
squito Oct 22, 2015
550e198
fix imports
squito Oct 22, 2015
4ff98bf
should work now, but needs cleanup
squito Oct 23, 2015
4a19702
only consider tmp files that exist; only consider the dest pre-existi…
squito Oct 23, 2015
89063dd
cleanup
squito Oct 23, 2015
4145651
ShuffleOutputCoordinatorSuite
squito Oct 23, 2015
2089e12
cleanup
squito Oct 23, 2015
2e9bbaa
Merge branch 'master' into SPARK-8029_first_wins
squito Oct 26, 2015
4cd423e
write the winning mapStatus to disk, so subsequent tasks can respond …
squito Oct 26, 2015
dc4b7f6
fix imports
squito Oct 26, 2015
b7a0981
fixes
squito Oct 26, 2015
830a097
shuffle writers must write always write all tmp files
squito Oct 27, 2015
5d11eca
more fixes for zero-sized blocks
squito Oct 27, 2015
3f5af9c
dont make ShuffleWriter return mapStatusFile
squito Oct 27, 2015
4b7c71a
rather than requiring all tmp files to exist, just write a zero-lengt…
squito Oct 27, 2015
eabf978
update test case
squito Oct 27, 2015
5bbeec3
minor cleanup
squito Oct 27, 2015
e141d82
test that shuffle output files are always the same
squito Oct 27, 2015
4df7955
fix compression settings of tmp files; minor cleanup
squito Oct 27, 2015
dc076b8
fix tests
squito Oct 27, 2015
cfdfd2c
review feedback
squito Nov 3, 2015
86f468a
Merge branch 'master' into SPARK-8029_first_wins
squito Nov 4, 2015
5c8b247
fix imports
squito Nov 4, 2015
4d66df1
fix more imports
squito Nov 4, 2015
e59df41
couple more nits ...
squito Nov 4, 2015
c206fc5
minor cleanup
squito Nov 5, 2015
c0edff1
style
squito Nov 5, 2015
da33519
Merge branch 'master' into SPARK-8029_first_wins
squito Nov 11, 2015
c0b93a5
create temporary files in same location as destination files
squito Nov 11, 2015
9d0d9d9
no more @VisibleForTesting
squito Nov 11, 2015
80e037d
unused import
squito Nov 11, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,27 @@
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Arrays;
import javax.annotation.Nullable;

import scala.None$;
import scala.Option;
import scala.Product2;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.JavaConverters;
import scala.collection.Seq;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Closeables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.Partitioner;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.*;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.scheduler.MapStatus$;
import org.apache.spark.shuffle.TmpDestShuffleFile;
import org.apache.spark.serializer.Serializer;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
Expand Down Expand Up @@ -121,13 +122,26 @@ public BypassMergeSortShuffleWriter(
}

@Override
public void write(Iterator<Product2<K, V>> records) throws IOException {
public Seq<TmpDestShuffleFile> write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
final File indexFile = shuffleBlockResolver.getIndexFile(shuffleId, mapId);
final File dataFile = shuffleBlockResolver.getDataFile(shuffleId, mapId);
final File tmpIndexFile = tmpShuffleFile(indexFile);
final File tmpDataFile = tmpShuffleFile(dataFile);
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
return;
// create empty data file so we always commit same set of shuffle output files, even if
// data is non-deterministic
if (!tmpDataFile.createNewFile()) {
// only possible if the file already exists, from a race in createTempShuffleBlock, which
// should be super-rare
throw new IOException("could not create shuffle data file: " + tmpDataFile);
}
return JavaConverters.asScalaBufferConverter(Arrays.asList(
new TmpDestShuffleFile(tmpIndexFile, indexFile),
new TmpDestShuffleFile(tmpDataFile, dataFile)
)).asScala();
}
final SerializerInstance serInstance = serializer.newInstance();
final long openStartTime = System.nanoTime();
Expand Down Expand Up @@ -155,10 +169,14 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
writer.commitAndClose();
}

partitionLengths =
writePartitionedFile(shuffleBlockResolver.getDataFile(shuffleId, mapId));
shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
partitionLengths = writePartitionedFile(tmpDataFile);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the type of tmp block used here doesn't matter, since writePartitionedFile doesn't go through blockManager.getDiskWriter. However, I'm a bit confused how this worked before. The original partition files might be compressed, and those bytes just get copied to the final data file, though data files seem like they are never compressed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If compression is employed, then each region of the single "partitioned" file is compressed separately. So the concatenation here should be correct.

shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths, tmpIndexFile);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
return JavaConverters.asScalaBufferConverter(Arrays.asList(
new TmpDestShuffleFile(tmpIndexFile, indexFile),
new TmpDestShuffleFile(tmpDataFile, dataFile)
)).asScala();

}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import javax.annotation.Nullable;
import java.io.*;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.Iterator;

import scala.Option;
import scala.Product2;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.collection.immutable.Map;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
Expand All @@ -42,18 +44,19 @@
import org.apache.spark.io.CompressionCodec;
import org.apache.spark.io.CompressionCodec$;
import org.apache.spark.io.LZFCompressionCodec;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.network.util.LimitedInputStream;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.scheduler.MapStatus$;
import org.apache.spark.serializer.SerializationStream;
import org.apache.spark.serializer.Serializer;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
import org.apache.spark.shuffle.TmpDestShuffleFile;
import org.apache.spark.shuffle.ShuffleWriter;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.TimeTrackingOutputStream;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.memory.TaskMemoryManager;

@Private
public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
Expand Down Expand Up @@ -149,12 +152,12 @@ public long getPeakMemoryUsedBytes() {
* This convenience method should only be called in test code.
*/
@VisibleForTesting
public void write(Iterator<Product2<K, V>> records) throws IOException {
write(JavaConverters.asScalaIteratorConverter(records).asScala());
public Seq<TmpDestShuffleFile> write(Iterator<Product2<K, V>> records) throws IOException {
return write(JavaConverters.asScalaIteratorConverter(records).asScala());
}

@Override
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
public Seq<TmpDestShuffleFile> write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
// Keep track of success so we know if we encountered an exception
// We do this rather than a standard try/catch/re-throw to handle
// generic throwables.
Expand All @@ -163,8 +166,9 @@ public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOEx
while (records.hasNext()) {
insertRecordIntoSorter(records.next());
}
closeAndWriteOutput();
final Seq<TmpDestShuffleFile> result = closeAndWriteOutput();
success = true;
return result;
} finally {
if (sorter != null) {
try {
Expand Down Expand Up @@ -198,25 +202,34 @@ private void open() throws IOException {
}

@VisibleForTesting
void closeAndWriteOutput() throws IOException {
Seq<TmpDestShuffleFile> closeAndWriteOutput() throws IOException {
assert(sorter != null);
updatePeakMemoryUsed();
serBuffer = null;
serOutputStream = null;
final SpillInfo[] spills = sorter.closeAndGetSpills();
sorter = null;
final long[] partitionLengths;
final File dataFile = shuffleBlockResolver.getDataFile(shuffleId, mapId);
final File indexFile = shuffleBlockResolver.getIndexFile(shuffleId, mapId);
final File tmpDataFile = tmpShuffleFile(dataFile);
final File tmpIndexFile = tmpShuffleFile(indexFile);
try {
partitionLengths = mergeSpills(spills);
partitionLengths = mergeSpills(spills, tmpDataFile);
} finally {
for (SpillInfo spill : spills) {
if (spill.file.exists() && ! spill.file.delete()) {
logger.error("Error while deleting spill file {}", spill.file.getPath());
}
}
}
shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths, tmpIndexFile);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);

return JavaConverters.asScalaBufferConverter(Arrays.asList(
new TmpDestShuffleFile(tmpIndexFile, indexFile),
new TmpDestShuffleFile(tmpDataFile, dataFile)
)).asScala();
}

@VisibleForTesting
Expand Down Expand Up @@ -248,8 +261,7 @@ void forceSorterToSpill() throws IOException {
*
* @return the partition lengths in the merged file.
*/
private long[] mergeSpills(SpillInfo[] spills) throws IOException {
final File outputFile = shuffleBlockResolver.getDataFile(shuffleId, mapId);
private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOException {
final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true);
final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf);
final boolean fastMergeEnabled =
Expand Down Expand Up @@ -475,4 +487,5 @@ public Option<MapStatus> stop(boolean success) {
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.language.existentials
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.shuffle.ShuffleWriter
import org.apache.spark.shuffle.{ShuffleOutputCoordinator, ShuffleWriter}

/**
* A ShuffleMapTask divides the elements of an RDD into multiple buckets (based on a partitioner
Expand Down Expand Up @@ -70,8 +70,12 @@ private[spark] class ShuffleMapTask(
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
val tmpToDestFiles = writer.write(
rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
val mapStatus = writer.stop(success = true).get
// SPARK-8029 make sure only one task on this executor writes the final shuffle files
ShuffleOutputCoordinator.commitOutputs(dep.shuffleId, partitionId, tmpToDestFiles, mapStatus,
SparkEnv.get)._2
} catch {
case e: Exception =>
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,18 +615,22 @@ private[spark] class TaskSetManager(
val index = info.index
info.markSuccessful()
removeRunningTask(tid)
val task = tasks(index)
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
// Note: "result.value()" only deserializes the value when it's called at the first time, so
// here "result.value()" just returns the value and won't block other threads.
sched.dagScheduler.taskEnded(
tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics)
task, Success, result.value(), result.accumUpdates, info, result.metrics)
if (!successful(index)) {
tasksSuccessful += 1
logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format(
info.id, taskSet.id, info.taskId, info.duration, info.host, tasksSuccessful, numTasks))
// include the partition here b/c on a stage retry, the partition is *not* necessarily
// the same as info.id
logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID ${info.taskId}}, " +
s"partition ${task.partitionId}) in ${info.duration} ms on executor ${info.executorId} " +
s"(${info.host}) ($tasksSuccessful/$numTasks)")
// Mark successful and stop if all the tasks have succeeded.
successful(index) = true
if (tasksSuccessful == numTasks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.shuffle

import java.io.File
import java.util.concurrent.ConcurrentLinkedQueue

import scala.collection.JavaConverters._
Expand All @@ -31,7 +32,7 @@ import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedH

/** A group of writers for a ShuffleMapTask, one writer per reducer. */
private[spark] trait ShuffleWriterGroup {
val writers: Array[DiskBlockObjectWriter]
val writers: Array[(DiskBlockObjectWriter, File)]

/** @param success Indicates all writes were successful. If false, no blocks will be recorded. */
def releaseWriters(success: Boolean)
Expand Down Expand Up @@ -80,10 +81,11 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)

val openStartTime = System.nanoTime
val serializerInstance = serializer.newInstance()
val writers: Array[DiskBlockObjectWriter] = {
Array.tabulate[DiskBlockObjectWriter](numReducers) { bucketId =>
val writers: Array[(DiskBlockObjectWriter, File)] = {
Array.tabulate[(DiskBlockObjectWriter, File)](numReducers) { bucketId =>
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
val blockFile = blockManager.diskBlockManager.getFile(blockId)
val tmpBlockFile = ShuffleWriter.tmpShuffleFile(blockFile)
// Because of previous failures, the shuffle file may already exist on this machine.
// If so, remove it.
if (blockFile.exists) {
Expand All @@ -93,8 +95,8 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
logWarning(s"Failed to remove existing shuffle file $blockFile")
}
}
blockManager.getDiskWriter(blockId, blockFile, serializerInstance, bufferSize,
writeMetrics)
blockManager.getDiskWriter(blockId, tmpBlockFile, serializerInstance, bufferSize,
writeMetrics) -> blockFile
}
}
// Creating the file to write to and creating a disk writer both involve interacting with
Expand Down Expand Up @@ -132,6 +134,13 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
logWarning(s"Error deleting ${file.getPath()}")
}
}
for (mapId <- state.completedMapTasks.asScala) {
val mapStatusFile =
blockManager.diskBlockManager.getFile(ShuffleMapStatusBlockId(shuffleId, mapId))
if (mapStatusFile.exists() && !mapStatusFile.delete()) {
logWarning(s"Error deleting MapStatus file ${mapStatusFile.getPath()}")
}
}
logInfo("Deleted all files for shuffle " + shuffleId)
true
case None =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
}

private def getIndexFile(shuffleId: Int, mapId: Int): File = {
private[shuffle] def getIndexFile(shuffleId: Int, mapId: Int): File = {
blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
}

Expand All @@ -72,16 +72,20 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
logWarning(s"Error deleting index ${file.getPath()}")
}
}

file = blockManager.diskBlockManager.getFile(ShuffleMapStatusBlockId(shuffleId, mapId))
if (file.exists() && !file.delete()) {
logWarning(s"Error deleting MapStatus file ${file.getPath()}")
}
}

/**
* Write an index file with the offsets of each block, plus a final offset at the end for the
* end of the output file. This will be used by getBlockData to figure out where each block
* begins and ends.
* */
def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]): Unit = {
val indexFile = getIndexFile(shuffleId, mapId)
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile)))
* begins and ends. Writes to a temp file, and returns that file.
*/
def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long], tmpIndexFile: File): Unit = {
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(tmpIndexFile)))
Utils.tryWithSafeFinally {
// We take in lengths of each block, need to convert it to offsets.
var offset = 0L
Expand Down
Loading