Skip to content

Commit f7c620c

Browse files
committed
Address a number of minor review comments
1 parent 71d67fe commit f7c620c

File tree

4 files changed

+22
-11
lines changed

4 files changed

+22
-11
lines changed

core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,15 @@
2121
import java.io.FileInputStream;
2222
import java.io.FileOutputStream;
2323
import java.io.IOException;
24+
import javax.annotation.Nullable;
2425

26+
import scala.None$;
2527
import scala.Option;
2628
import scala.Product2;
2729
import scala.Tuple2;
2830
import scala.collection.Iterator;
2931

32+
import com.google.common.annotations.VisibleForTesting;
3033
import com.google.common.io.Closeables;
3134
import org.slf4j.Logger;
3235
import org.slf4j.LoggerFactory;
@@ -45,8 +48,6 @@
4548
import org.apache.spark.storage.*;
4649
import org.apache.spark.util.Utils;
4750

48-
import javax.annotation.Nullable;
49-
5051
/**
5152
* This class implements sort-based shuffle's hash-style shuffle fallback path. This write path
5253
* writes incoming records to separate files, one file per reduce partition, then concatenates these
@@ -160,11 +161,16 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
160161
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
161162
}
162163

163-
// Exposed for testing
164+
@VisibleForTesting
164165
long[] getPartitionLengths() {
165166
return partitionLengths;
166167
}
167168

169+
/**
170+
* Concatenate all of the per-partition files into a single combined file.
171+
*
172+
* @return array of lengths, in bytes, of each partition of the file (used by map output tracker).
173+
*/
168174
private long[] writePartitionedFile(File outputFile) throws IOException {
169175
// Track location of the partition starts in the output file
170176
final long[] lengths = new long[numPartitions];
@@ -202,7 +208,7 @@ private long[] writePartitionedFile(File outputFile) throws IOException {
202208
@Override
203209
public Option<MapStatus> stop(boolean success) {
204210
if (stopping) {
205-
return Option.apply(null);
211+
return None$.empty();
206212
} else {
207213
stopping = true;
208214
if (success) {
@@ -226,7 +232,7 @@ public Option<MapStatus> stop(boolean success) {
226232
}
227233
}
228234
shuffleBlockResolver.removeDataByMap(shuffleId, mapId);
229-
return Option.apply(null);
235+
return None$.empty();
230236
}
231237
}
232238
}

core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,11 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
7474
" Shuffle will continue to spill to disk when necessary.")
7575
}
7676

77+
/**
78+
* A mapping from shuffle ids to the number of mappers producing output for those shuffles.
79+
*/
7780
private[this] val numMapsForShuffle = new ConcurrentHashMap[Int, Int]()
81+
7882
override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)
7983

8084
/**
@@ -168,8 +172,9 @@ private[spark] object SortShuffleManager extends Logging {
168172

169173
/**
170174
* The maximum number of shuffle output partitions that SortShuffleManager supports when
171-
*
172-
*/
175+
* buffering map outputs in a serialized form. This is an extreme defensive programming measure,
176+
* since it's extremely unlikely that a single shuffle produces over 16 million output partitions.
177+
* */
173178
val MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE =
174179
PackedRecordPointer.MAXIMUM_PARTITION_ID + 1
175180

core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.nio.ByteBuffer;
2222
import java.util.*;
2323

24-
import org.apache.spark.shuffle.sort.SerializedShuffleHandle;
2524
import scala.*;
2625
import scala.collection.Iterator;
2726
import scala.runtime.AbstractFunction1;
@@ -56,6 +55,7 @@
5655
import org.apache.spark.scheduler.MapStatus;
5756
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
5857
import org.apache.spark.shuffle.ShuffleMemoryManager;
58+
import org.apache.spark.shuffle.sort.SerializedShuffleHandle;
5959
import org.apache.spark.storage.*;
6060
import org.apache.spark.unsafe.memory.ExecutorMemoryManager;
6161
import org.apache.spark.unsafe.memory.MemoryAllocator;

core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
122122
conf
123123
)
124124
writer.write(Iterator.empty)
125-
writer.stop(true)
125+
writer.stop( /* success = */ true)
126126
assert(writer.getPartitionLengths.sum === 0)
127127
assert(outputFile.exists())
128128
assert(outputFile.length() === 0)
@@ -146,7 +146,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
146146
conf
147147
)
148148
writer.write(records)
149-
writer.stop(true)
149+
writer.stop( /* success = */ true)
150150
assert(temporaryFilesCreated.nonEmpty)
151151
assert(writer.getPartitionLengths.sum === outputFile.length())
152152
assert(temporaryFilesCreated.count(_.exists()) === 0) // check that temporary files were deleted
@@ -175,7 +175,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
175175
}))
176176
}
177177
assert(temporaryFilesCreated.nonEmpty)
178-
writer.stop(false)
178+
writer.stop( /* success = */ false)
179179
assert(temporaryFilesCreated.count(_.exists()) === 0)
180180
}
181181

0 commit comments

Comments
 (0)