Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.scheduler.HighlyCompressedMapStatus;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.scheduler.MapStatus$;
import org.apache.spark.serializer.Serializer;
Expand Down Expand Up @@ -75,6 +76,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {

private static final Logger logger = LoggerFactory.getLogger(BypassMergeSortShuffleWriter.class);

private final TaskContext taskContext;
private final int fileBufferSize;
private final boolean transferToEnabled;
private final int numPartitions;
Expand Down Expand Up @@ -115,6 +117,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
this.shuffleId = dep.shuffleId();
this.partitioner = dep.partitioner();
this.numPartitions = partitioner.numPartitions();
this.taskContext = taskContext;
this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics();
this.serializer = dep.serializer();
this.shuffleBlockResolver = shuffleBlockResolver;
Expand Down Expand Up @@ -169,6 +172,16 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
if (mapStatus instanceof HighlyCompressedMapStatus) {
if (logger.isDebugEnabled() && partitionLengths.length > 0) {
Tuple2<String, Object> tuple = SortShuffleWriter$.MODULE$.genBlocksDistributionStr(
partitionLengths, (HighlyCompressedMapStatus) mapStatus, taskContext);
if (!tuple._1.isEmpty()) {
logger.debug(tuple._1);
}
writeMetrics.incUnderestimatedBlocksSize((Long)(tuple._2));
}
}
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,30 @@

import javax.annotation.Nullable;
import java.io.*;
import java.lang.Long;
import java.nio.channels.FileChannel;
import java.util.Iterator;

import scala.Option;
import scala.Product2;
import scala.*;
import scala.collection.JavaConverters;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
import com.google.common.io.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.commons.io.output.CloseShieldOutputStream;
import org.apache.commons.io.output.CountingOutputStream;
import org.apache.spark.*;
import org.apache.spark.annotation.Private;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.io.CompressionCodec;
import org.apache.spark.io.CompressionCodec$;
import org.apache.commons.io.output.CloseShieldOutputStream;
import org.apache.commons.io.output.CountingOutputStream;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.network.util.LimitedInputStream;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.scheduler.MapStatus$;
import org.apache.spark.scheduler.*;
import org.apache.spark.serializer.SerializationStream;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
Expand Down Expand Up @@ -228,6 +225,16 @@ void closeAndWriteOutput() throws IOException {
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
if (mapStatus instanceof HighlyCompressedMapStatus) {
if (logger.isDebugEnabled() && partitionLengths.length > 0) {
Tuple2<String, Object> tuple = SortShuffleWriter$.MODULE$.genBlocksDistributionStr(
partitionLengths, (HighlyCompressedMapStatus) mapStatus, taskContext);
if (!tuple._1.isEmpty()) {
logger.debug(tuple._1);
}
writeMetrics.incUnderestimatedBlocksSize((Long)(tuple._2));
}
}
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ private[spark] object InternalAccumulator {
val REMOTE_BLOCKS_FETCHED = SHUFFLE_READ_METRICS_PREFIX + "remoteBlocksFetched"
val LOCAL_BLOCKS_FETCHED = SHUFFLE_READ_METRICS_PREFIX + "localBlocksFetched"
val REMOTE_BYTES_READ = SHUFFLE_READ_METRICS_PREFIX + "remoteBytesRead"
val REMOTE_BYTES_READ_TO_MEM = SHUFFLE_READ_METRICS_PREFIX + "remoteBytesReadToMem"
val LOCAL_BYTES_READ = SHUFFLE_READ_METRICS_PREFIX + "localBytesRead"
val FETCH_WAIT_TIME = SHUFFLE_READ_METRICS_PREFIX + "fetchWaitTime"
val RECORDS_READ = SHUFFLE_READ_METRICS_PREFIX + "recordsRead"
Expand All @@ -60,6 +61,7 @@ private[spark] object InternalAccumulator {
val BYTES_WRITTEN = SHUFFLE_WRITE_METRICS_PREFIX + "bytesWritten"
val RECORDS_WRITTEN = SHUFFLE_WRITE_METRICS_PREFIX + "recordsWritten"
val WRITE_TIME = SHUFFLE_WRITE_METRICS_PREFIX + "writeTime"
val UNDERESTIMATED_BLOCKS_SIZE = SHUFFLE_WRITE_METRICS_PREFIX + "underestimatedBlocksSize"
}

// Names of output metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
private[executor] val _remoteBlocksFetched = new LongAccumulator
private[executor] val _localBlocksFetched = new LongAccumulator
private[executor] val _remoteBytesRead = new LongAccumulator
private[executor] val _remoteBytesReadToMem = new LongAccumulator
private[executor] val _localBytesRead = new LongAccumulator
private[executor] val _fetchWaitTime = new LongAccumulator
private[executor] val _recordsRead = new LongAccumulator
Expand All @@ -50,6 +51,11 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
*/
def remoteBytesRead: Long = _remoteBytesRead.sum

/**
* Total number of remotes bytes read to memory from the shuffle by this task.
*/
def remoteBytesReadToMem: Long = _remoteBytesReadToMem.sum

/**
* Shuffle data that was read from the local disk (as opposed to from a remote executor).
*/
Expand Down Expand Up @@ -80,13 +86,15 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
private[spark] def incRemoteBlocksFetched(v: Long): Unit = _remoteBlocksFetched.add(v)
private[spark] def incLocalBlocksFetched(v: Long): Unit = _localBlocksFetched.add(v)
private[spark] def incRemoteBytesRead(v: Long): Unit = _remoteBytesRead.add(v)
private[spark] def incRemoteBytesReadToMem(v: Long): Unit = _remoteBytesReadToMem.add(v)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way it seems to be coded up, this will end up being everything fetched from shuffle - and we can already infer it : remote bytes read + local bytes read.
Or did I miss something here ?

private[spark] def incLocalBytesRead(v: Long): Unit = _localBytesRead.add(v)
private[spark] def incFetchWaitTime(v: Long): Unit = _fetchWaitTime.add(v)
private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v)

private[spark] def setRemoteBlocksFetched(v: Int): Unit = _remoteBlocksFetched.setValue(v)
private[spark] def setLocalBlocksFetched(v: Int): Unit = _localBlocksFetched.setValue(v)
private[spark] def setRemoteBytesRead(v: Long): Unit = _remoteBytesRead.setValue(v)
private[spark] def setRemoteBytesReadToMem(v: Long): Unit = _remoteBytesReadToMem.setValue(v)
private[spark] def setLocalBytesRead(v: Long): Unit = _localBytesRead.setValue(v)
private[spark] def setFetchWaitTime(v: Long): Unit = _fetchWaitTime.setValue(v)
private[spark] def setRecordsRead(v: Long): Unit = _recordsRead.setValue(v)
Expand All @@ -99,13 +107,15 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
_remoteBlocksFetched.setValue(0)
_localBlocksFetched.setValue(0)
_remoteBytesRead.setValue(0)
_remoteBytesReadToMem.setValue(0)
_localBytesRead.setValue(0)
_fetchWaitTime.setValue(0)
_recordsRead.setValue(0)
metrics.foreach { metric =>
_remoteBlocksFetched.add(metric.remoteBlocksFetched)
_localBlocksFetched.add(metric.localBlocksFetched)
_remoteBytesRead.add(metric.remoteBytesRead)
_remoteBytesReadToMem.add(metric.remoteBytesReadToMem)
_localBytesRead.add(metric.localBytesRead)
_fetchWaitTime.add(metric.fetchWaitTime)
_recordsRead.add(metric.recordsRead)
Expand All @@ -122,20 +132,23 @@ private[spark] class TempShuffleReadMetrics {
private[this] var _remoteBlocksFetched = 0L
private[this] var _localBlocksFetched = 0L
private[this] var _remoteBytesRead = 0L
private[this] var _remoteBytesReadToMem = 0L
private[this] var _localBytesRead = 0L
private[this] var _fetchWaitTime = 0L
private[this] var _recordsRead = 0L

def incRemoteBlocksFetched(v: Long): Unit = _remoteBlocksFetched += v
def incLocalBlocksFetched(v: Long): Unit = _localBlocksFetched += v
def incRemoteBytesRead(v: Long): Unit = _remoteBytesRead += v
def incRemoteBytesReadToMem(v: Long): Unit = _remoteBytesReadToMem += v
def incLocalBytesRead(v: Long): Unit = _localBytesRead += v
def incFetchWaitTime(v: Long): Unit = _fetchWaitTime += v
def incRecordsRead(v: Long): Unit = _recordsRead += v

def remoteBlocksFetched: Long = _remoteBlocksFetched
def localBlocksFetched: Long = _localBlocksFetched
def remoteBytesRead: Long = _remoteBytesRead
def remoteBytesReadToMem: Long = _remoteBytesReadToMem
def localBytesRead: Long = _localBytesRead
def fetchWaitTime: Long = _fetchWaitTime
def recordsRead: Long = _recordsRead
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class ShuffleWriteMetrics private[spark] () extends Serializable {
private[executor] val _bytesWritten = new LongAccumulator
private[executor] val _recordsWritten = new LongAccumulator
private[executor] val _writeTime = new LongAccumulator
private[executor] val _underestimatedBlocksSize = new LongAccumulator

/**
* Number of bytes written for the shuffle by this task.
Expand All @@ -47,6 +48,11 @@ class ShuffleWriteMetrics private[spark] () extends Serializable {
*/
def writeTime: Long = _writeTime.sum

/**
* The sum of underestimated sizes of blocks in MapStatus.
*/
def underestimatedBlocksSize: Long = _underestimatedBlocksSize.value

private[spark] def incBytesWritten(v: Long): Unit = _bytesWritten.add(v)
private[spark] def incRecordsWritten(v: Long): Unit = _recordsWritten.add(v)
private[spark] def incWriteTime(v: Long): Unit = _writeTime.add(v)
Expand All @@ -57,6 +63,10 @@ class ShuffleWriteMetrics private[spark] () extends Serializable {
_recordsWritten.setValue(recordsWritten - v)
}

private[spark] def incUnderestimatedBlocksSize(v: Long) = {
_underestimatedBlocksSize.add(v)
}

// Legacy methods for backward compatibility.
// TODO: remove these once we make this class private.
@deprecated("use bytesWritten instead", "2.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,14 @@ class TaskMetrics private[spark] () extends Serializable {
shuffleRead.REMOTE_BLOCKS_FETCHED -> shuffleReadMetrics._remoteBlocksFetched,
shuffleRead.LOCAL_BLOCKS_FETCHED -> shuffleReadMetrics._localBlocksFetched,
shuffleRead.REMOTE_BYTES_READ -> shuffleReadMetrics._remoteBytesRead,
shuffleRead.REMOTE_BYTES_READ_TO_MEM -> shuffleReadMetrics._remoteBytesReadToMem,
shuffleRead.LOCAL_BYTES_READ -> shuffleReadMetrics._localBytesRead,
shuffleRead.FETCH_WAIT_TIME -> shuffleReadMetrics._fetchWaitTime,
shuffleRead.RECORDS_READ -> shuffleReadMetrics._recordsRead,
shuffleWrite.BYTES_WRITTEN -> shuffleWriteMetrics._bytesWritten,
shuffleWrite.RECORDS_WRITTEN -> shuffleWriteMetrics._recordsWritten,
shuffleWrite.WRITE_TIME -> shuffleWriteMetrics._writeTime,
shuffleWrite.UNDERESTIMATED_BLOCKS_SIZE -> shuffleWriteMetrics._underestimatedBlocksSize,
input.BYTES_READ -> inputMetrics._bytesRead,
input.RECORDS_READ -> inputMetrics._recordsRead,
output.BYTES_WRITTEN -> outputMetrics._bytesWritten,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ private[spark] class HighlyCompressedMapStatus private (
emptyBlocks.readExternal(in)
avgSize = in.readLong()
}

def getAvgSize: Long = avgSize
}

private[spark] object HighlyCompressedMapStatus {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package org.apache.spark.shuffle.sort

import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.scheduler.{HighlyCompressedMapStatus, MapStatus}
import org.apache.spark.shuffle.{BaseShuffleHandle, IndexShuffleBlockResolver, ShuffleWriter}
import org.apache.spark.storage.ShuffleBlockId
import org.apache.spark.util.Utils
import org.apache.spark.util.{Distribution, Utils}
import org.apache.spark.util.collection.ExternalSorter

private[spark] class SortShuffleWriter[K, V, C](
Expand Down Expand Up @@ -72,6 +72,17 @@ private[spark] class SortShuffleWriter[K, V, C](
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
mapStatus match {
case hc: HighlyCompressedMapStatus =>
if (log.isDebugEnabled() && partitionLengths.length > 0) {
SortShuffleWriter.genBlocksDistributionStr(partitionLengths, hc, context) match {
case (logStr, underestimatedSize) if logStr.nonEmpty =>
logDebug(logStr)
writeMetrics.incUnderestimatedBlocksSize(underestimatedSize)
}
}
case _ => // no-op
}
} finally {
if (tmp.exists() && !tmp.delete()) {
logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
Expand Down Expand Up @@ -114,4 +125,19 @@ private[spark] object SortShuffleWriter {
dep.partitioner.numPartitions <= bypassMergeThreshold
}
}
def genBlocksDistributionStr(lens: Array[Long], hc: HighlyCompressedMapStatus,
ctx: TaskContext): (String, Long) = {
// Distribution of sizes in MapStatus.
Distribution(lens.map(_.toDouble)) match {
case Some(distribution) =>
val underestimatedLengths = lens.filter(_ > hc.getAvgSize).map(_ - hc.getAvgSize)
val distributionStr = distribution.getQuantiles().mkString(", ")
(s"For task ${ctx.partitionId()}.${ctx.attemptNumber()} in stage ${ctx.stageId()} " +
s"(TID ${ctx.taskAttemptId()}), the block sizes in MapStatus are highly compressed" +
s" (average is ${hc.getAvgSize}, ${underestimatedLengths.length} blocks underestimated," +
s" the size of underestimated is ${underestimatedLengths.sum}), distribution at " +
s"probabilities(0, 0.25, 0.5, 0.75, 1.0) is $distributionStr.", underestimatedLengths.sum)
case None => ("", 0L)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ private[v1] object AllStagesResource {
readBytes = submetricQuantiles(_.totalBytesRead),
readRecords = submetricQuantiles(_.recordsRead),
remoteBytesRead = submetricQuantiles(_.remoteBytesRead),
remoteBytesReadToMem = submetricQuantiles(_.remoteBytesReadToMem),
remoteBlocksFetched = submetricQuantiles(_.remoteBlocksFetched),
localBlocksFetched = submetricQuantiles(_.localBlocksFetched),
totalBlocksFetched = submetricQuantiles(_.totalBlocksFetched),
Expand All @@ -216,7 +217,8 @@ private[v1] object AllStagesResource {
def build: ShuffleWriteMetricDistributions = new ShuffleWriteMetricDistributions(
writeBytes = submetricQuantiles(_.bytesWritten),
writeRecords = submetricQuantiles(_.recordsWritten),
writeTime = submetricQuantiles(_.writeTime)
writeTime = submetricQuantiles(_.writeTime),
underestimatedBlocksSize = submetricQuantiles(_.underestimatedBlocksSize)
)
}.build

Expand Down Expand Up @@ -281,6 +283,7 @@ private[v1] object AllStagesResource {
localBlocksFetched = internal.localBlocksFetched,
fetchWaitTime = internal.fetchWaitTime,
remoteBytesRead = internal.remoteBytesRead,
remoteBytesReadToMem = internal.remoteBytesReadToMem,
localBytesRead = internal.localBytesRead,
recordsRead = internal.recordsRead
)
Expand All @@ -290,7 +293,8 @@ private[v1] object AllStagesResource {
new ShuffleWriteMetrics(
bytesWritten = internal.bytesWritten,
writeTime = internal.writeTime,
recordsWritten = internal.recordsWritten
recordsWritten = internal.recordsWritten,
underestimatedBlocksSize = internal.underestimatedBlocksSize
)
}
}
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,15 @@ class ShuffleReadMetrics private[spark](
val localBlocksFetched: Long,
val fetchWaitTime: Long,
val remoteBytesRead: Long,
val remoteBytesReadToMem: Long,
val localBytesRead: Long,
val recordsRead: Long)

class ShuffleWriteMetrics private[spark](
val bytesWritten: Long,
val writeTime: Long,
val recordsWritten: Long)
val recordsWritten: Long,
val underestimatedBlocksSize: Long)

class TaskMetricDistributions private[spark](
val quantiles: IndexedSeq[Double],
Expand Down Expand Up @@ -237,12 +239,14 @@ class ShuffleReadMetricDistributions private[spark](
val localBlocksFetched: IndexedSeq[Double],
val fetchWaitTime: IndexedSeq[Double],
val remoteBytesRead: IndexedSeq[Double],
val remoteBytesReadToMem: IndexedSeq[Double],
val totalBlocksFetched: IndexedSeq[Double])

class ShuffleWriteMetricDistributions private[spark](
val writeBytes: IndexedSeq[Double],
val writeRecords: IndexedSeq[Double],
val writeTime: IndexedSeq[Double])
val writeTime: IndexedSeq[Double],
val underestimatedBlocksSize: IndexedSeq[Double])

class AccumulableInfo private[spark](
val id: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ final class ShuffleBlockFetcherIterator(
case SuccessFetchResult(_, address, _, buf, _) =>
if (address != blockManager.blockManagerId) {
shuffleMetrics.incRemoteBytesRead(buf.size)
shuffleMetrics.incRemoteBytesReadToMem(buf.size)
shuffleMetrics.incRemoteBlocksFetched(1)
}
buf.release()
Expand Down Expand Up @@ -337,6 +338,7 @@ final class ShuffleBlockFetcherIterator(
case r @ SuccessFetchResult(blockId, address, size, buf, isNetworkReqDone) =>
if (address != blockManager.blockManagerId) {
shuffleMetrics.incRemoteBytesRead(buf.size)
shuffleMetrics.incRemoteBytesReadToMem(buf.size)
shuffleMetrics.incRemoteBlocksFetched(1)
}
bytesInFlight -= size
Expand Down
Loading