Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,48 +190,60 @@ public Map<String, Metric> getMetrics() {
}
}

private boolean isShuffleBlock(String[] blockIdParts) {
// length == 4: ShuffleBlockId
// length == 5: ContinuousShuffleBlockId
return (blockIdParts.length == 4 || blockIdParts.length == 5) &&
blockIdParts[0].equals("shuffle");
}

private class ManagedBufferIterator implements Iterator<ManagedBuffer> {

private int index = 0;
private final String appId;
private final String execId;
private final int shuffleId;
// An array containing mapId and reduceId pairs.
private final int[] mapIdAndReduceIds;
// An array containing mapId, reduceId and numBlocks tuple
private final int[] shuffleBlockIds;

ManagedBufferIterator(String appId, String execId, String[] blockIds) {
this.appId = appId;
this.execId = execId;
String[] blockId0Parts = blockIds[0].split("_");
if (blockId0Parts.length != 4 || !blockId0Parts[0].equals("shuffle")) {
if (!isShuffleBlock(blockId0Parts)) {
throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[0]);
}
this.shuffleId = Integer.parseInt(blockId0Parts[1]);
mapIdAndReduceIds = new int[2 * blockIds.length];
shuffleBlockIds = new int[3 * blockIds.length];
for (int i = 0; i < blockIds.length; i++) {
String[] blockIdParts = blockIds[i].split("_");
if (blockIdParts.length != 4 || !blockIdParts[0].equals("shuffle")) {
if (!isShuffleBlock(blockIdParts)) {
throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[i]);
}
if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
", got:" + blockIds[i]);
}
mapIdAndReduceIds[2 * i] = Integer.parseInt(blockIdParts[2]);
mapIdAndReduceIds[2 * i + 1] = Integer.parseInt(blockIdParts[3]);
shuffleBlockIds[3 * i] = Integer.parseInt(blockIdParts[2]);
shuffleBlockIds[3 * i + 1] = Integer.parseInt(blockIdParts[3]);
if (blockIdParts.length == 4) {
shuffleBlockIds[3 * i + 2] = 1;
} else {
shuffleBlockIds[3 * i + 2] = Integer.parseInt(blockIdParts[4]);
}
}
}

@Override
public boolean hasNext() {
return index < mapIdAndReduceIds.length;
return index < shuffleBlockIds.length;
}

@Override
public ManagedBuffer next() {
final ManagedBuffer block = blockManager.getBlockData(appId, execId, shuffleId,
mapIdAndReduceIds[index], mapIdAndReduceIds[index + 1]);
index += 2;
shuffleBlockIds[index], shuffleBlockIds[index + 1], shuffleBlockIds[index + 2]);
index += 3;
metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0);
return block;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,21 +157,22 @@ public void registerExecutor(
}

/**
* Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId). We make assumptions
* Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId, numBlocks). We make assumptions
* about how the hash and sort based shuffles store their data.
*/
public ManagedBuffer getBlockData(
String appId,
String execId,
int shuffleId,
int mapId,
int reduceId) {
int reduceId,
int numBlocks) {
ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
if (executor == null) {
throw new RuntimeException(
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
}
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId, numBlocks);
}

/**
Expand Down Expand Up @@ -232,13 +233,13 @@ private void deleteExecutorDirs(String[] dirs) {
* and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
*/
private ManagedBuffer getSortBasedShuffleBlockData(
ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) {
ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId, int numBlocks) {
File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.index");

try {
ShuffleIndexInformation shuffleIndexInformation = shuffleIndexCache.get(indexFile);
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId);
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId, numBlocks);
return new FileSegmentManagedBuffer(
conf,
getFile(executor.localDirs, executor.subDirsPerLocalDir,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ public int getSize() {
/**
* Get index offset for a particular reducer.
*/
public ShuffleIndexRecord getIndex(int reduceId) {
public ShuffleIndexRecord getIndex(int reduceId, int numBlocks) {
long offset = offsets.get(reduceId);
long nextOffset = offsets.get(reduceId + 1);
long nextOffset = offsets.get(reduceId + numBlocks);
return new ShuffleIndexRecord(offset, nextOffset - offset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ public void testOpenShuffleBlocks() {

ManagedBuffer block0Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[3]));
ManagedBuffer block1Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 0)).thenReturn(block0Marker);
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 1)).thenReturn(block1Marker);
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 0, 1)).thenReturn(block0Marker);
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 1, 1)).thenReturn(block1Marker);
ByteBuffer openBlocks = new OpenBlocks("app0", "exec1",
new String[] { "shuffle_0_0_0", "shuffle_0_0_1" })
.toByteBuffer();
Expand All @@ -106,8 +106,8 @@ public void testOpenShuffleBlocks() {
assertEquals(block0Marker, buffers.next());
assertEquals(block1Marker, buffers.next());
assertFalse(buffers.hasNext());
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 0);
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 1);
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 0, 1);
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 1, 1);

// Verify open block request latency metrics
Timer openBlockRequestLatencyMillis = (Timer) ((ExternalShuffleBlockHandler) handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void testBadRequests() throws IOException {
ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
// Unregistered executor
try {
resolver.getBlockData("app0", "exec1", 1, 1, 0);
resolver.getBlockData("app0", "exec1", 1, 1, 0, 1);
fail("Should have failed");
} catch (RuntimeException e) {
assertTrue("Bad error message: " + e, e.getMessage().contains("not registered"));
Expand All @@ -74,7 +74,7 @@ public void testBadRequests() throws IOException {
// Invalid shuffle manager
try {
resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
resolver.getBlockData("app0", "exec2", 1, 1, 0);
resolver.getBlockData("app0", "exec2", 1, 1, 0, 1);
fail("Should have failed");
} catch (UnsupportedOperationException e) {
// pass
Expand All @@ -84,7 +84,7 @@ public void testBadRequests() throws IOException {
resolver.registerExecutor("app0", "exec3",
dataContext.createExecutorInfo(SORT_MANAGER));
try {
resolver.getBlockData("app0", "exec3", 1, 1, 0);
resolver.getBlockData("app0", "exec3", 1, 1, 0, 1);
fail("Should have failed");
} catch (Exception e) {
// pass
Expand All @@ -98,18 +98,25 @@ public void testSortShuffleBlocks() throws IOException {
dataContext.createExecutorInfo(SORT_MANAGER));

InputStream block0Stream =
resolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream();
resolver.getBlockData("app0", "exec0", 0, 0, 0, 1).createInputStream();
String block0 = CharStreams.toString(
new InputStreamReader(block0Stream, StandardCharsets.UTF_8));
block0Stream.close();
assertEquals(sortBlock0, block0);

InputStream block1Stream =
resolver.getBlockData("app0", "exec0", 0, 0, 1).createInputStream();
resolver.getBlockData("app0", "exec0", 0, 0, 1, 1).createInputStream();
String block1 = CharStreams.toString(
new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
block1Stream.close();
assertEquals(sortBlock1, block1);

InputStream block01Stream =
resolver.getBlockData("app0", "exec0", 0, 0, 0, 2).createInputStream();
String block01 = CharStreams.toString(
new InputStreamReader(block01Stream, StandardCharsets.UTF_8));
block01Stream.close();
assertEquals(sortBlock0 + sortBlock1, block01);
}

@Test
Expand Down
61 changes: 48 additions & 13 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ import scala.util.control.NonFatal
import org.apache.spark.broadcast.{Broadcast, BroadcastManager}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId}
import org.apache.spark.storage.{BlockId, BlockManagerId, ContinuousShuffleBlockId, ShuffleBlockId}
import org.apache.spark.util._

/**
Expand Down Expand Up @@ -280,10 +282,23 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
}
}

protected def supportsContinuousBlockBatchFetch(serializerRelocatable: Boolean): Boolean = {
if (!serializerRelocatable) {
false
} else {
if (!conf.getBoolean("spark.shuffle.compress", true)) {
true
} else {
val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf)
CompressionCodec.supportsConcatenationOfSerializedStreams(compressionCodec)
}
}
}

// For testing
def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int)
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1)
getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1, serializerRelocatable = false)
}

/**
Expand All @@ -295,8 +310,11 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
* and the second item is a sequence of (shuffle block id, shuffle block size) tuples
* describing the shuffle blocks that are stored at that block manager.
*/
def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
: Seq[(BlockManagerId, Seq[(BlockId, Long)])]
def getMapSizesByExecutorId(
shuffleId: Int,
startPartition: Int,
endPartition: Int,
serializerRelocatable: Boolean): Seq[(BlockManagerId, Seq[(BlockId, Long)])]

/**
* Deletes map output status information for the specified shuffle stage.
Expand Down Expand Up @@ -633,13 +651,17 @@ private[spark] class MapOutputTrackerMaster(
}

// This method is only called in local-mode.
def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
def getMapSizesByExecutorId(
shuffleId: Int,
startPartition: Int,
endPartition: Int,
serializerRelocatable: Boolean): Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
shuffleStatuses.get(shuffleId) match {
case Some (shuffleStatus) =>
shuffleStatus.withMapStatuses { statuses =>
MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses)
MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses,
supportsContinuousBlockBatchFetch(serializerRelocatable))
}
case None =>
Seq.empty
Expand Down Expand Up @@ -669,12 +691,16 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
/** Remembers which map output locations are currently being fetched on an executor. */
private val fetching = new HashSet[Int]

override def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
override def getMapSizesByExecutorId(
shuffleId: Int,
startPartition: Int,
endPartition: Int,
serializerRelocatable: Boolean): Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
val statuses = getStatuses(shuffleId)
try {
MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses)
MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses,
supportsContinuousBlockBatchFetch(serializerRelocatable))
} catch {
case e: MetadataFetchFailedException =>
// We experienced a fetch failure so our mapStatuses cache is outdated; clear it:
Expand Down Expand Up @@ -849,6 +875,7 @@ private[spark] object MapOutputTracker extends Logging {
* @param startPartition Start of map output partition ID range (included in range)
* @param endPartition End of map output partition ID range (excluded from range)
* @param statuses List of map statuses, indexed by map ID.
* @param supportsContinuousBlockBatchFetch if true, merge contiguous partitions in one IO
* @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
* and the second item is a sequence of (shuffle block ID, shuffle block size) tuples
* describing the shuffle blocks that are stored at that block manager.
Expand All @@ -857,7 +884,8 @@ private[spark] object MapOutputTracker extends Logging {
shuffleId: Int,
startPartition: Int,
endPartition: Int,
statuses: Array[MapStatus]): Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
statuses: Array[MapStatus],
supportsContinuousBlockBatchFetch: Boolean): Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
assert (statuses != null)
val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(BlockId, Long)]]
for ((status, mapId) <- statuses.zipWithIndex) {
Expand All @@ -866,9 +894,16 @@ private[spark] object MapOutputTracker extends Logging {
logError(errorMessage)
throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage)
} else {
for (part <- startPartition until endPartition) {
if (endPartition - startPartition > 1 && supportsContinuousBlockBatchFetch) {
val totalSize: Long = (startPartition until endPartition).map(status.getSizeForBlock).sum
splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) +=
((ShuffleBlockId(shuffleId, mapId, part), status.getSizeForBlock(part)))
((ContinuousShuffleBlockId(shuffleId, mapId,
startPartition, endPartition - startPartition), totalSize))
} else {
for (part <- startPartition until endPartition) {
splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) +=
((ShuffleBlockId(shuffleId, mapId, part), status.getSizeForBlock(part)))
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ private[spark] class SerializerManager(
private def shouldCompress(blockId: BlockId): Boolean = {
blockId match {
case _: ShuffleBlockId => compressShuffle
case _: ContinuousShuffleBlockId => compressShuffle
case _: BroadcastBlockId => compressBroadcast
case _: RDDBlockId => compressRdds
case _: TempLocalBlockId => compressShuffleSpill
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ private[spark] class BlockStoreShuffleReader[K, C](
context,
blockManager.shuffleClient,
blockManager,
mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
mapOutputTracker.getMapSizesByExecutorId(
handle.shuffleId, startPartition, endPartition,
dep.serializer.supportsRelocationOfSerializedObjects),
serializerManager.wrapStream,
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ private[spark] class IndexShuffleBlockResolver(
}
}

override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
override def getBlockData(blockId: ShuffleBlockIdBase): ManagedBuffer = {
// The block is actually going to be a range of a single map output file for this map, so
// find out the consolidated file, then the offset within that from our index
val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
Expand All @@ -203,13 +203,21 @@ private[spark] class IndexShuffleBlockResolver(
// class of issue from re-occurring in the future which is why they are left here even though
// SPARK-22982 is fixed.
val channel = Files.newByteChannel(indexFile.toPath)
channel.position(blockId.reduceId * 8L)
val in = new DataInputStream(Channels.newInputStream(channel))
try {
channel.position(blockId.reduceId * 8)
val offset = in.readLong()
var expectedPosition = 0
blockId match {
case bid: ContinuousShuffleBlockId =>
val tempId = blockId.reduceId + bid.numBlocks
channel.position(tempId * 8)
expectedPosition = tempId * 8 + 8
case _ =>
expectedPosition = blockId.reduceId * 8 + 16
}
val nextOffset = in.readLong()
val actualPosition = channel.position()
val expectedPosition = blockId.reduceId * 8L + 16
if (actualPosition != expectedPosition) {
throw new Exception(s"SPARK-22982: Incorrect channel position after index file reads: " +
s"expected $expectedPosition but actual position was $actualPosition.")
Expand Down
Loading