Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
val completionIter = CompletionIterator[T, Iterator[T]](itr, {
val shuffleMetrics = new ShuffleReadMetrics
shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime
shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,6 @@ class ShuffleReadMetrics extends Serializable {
*/
var fetchWaitTime: Long = _

/**
* Total time spent fetching remote shuffle blocks. This aggregates the time spent fetching all
* input blocks. Since block fetches are both pipelined and parallelized, this can
* exceed fetchWaitTime and executorRunTime.
*/
var remoteFetchTime: Long = _

/**
* Total number of remote bytes read from the shuffle by this task
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ class JobLogger(val user: String, val logDirName: String)
" BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched +
" BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched +
" REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime +
" REMOTE_FETCH_TIME=" + metrics.remoteFetchTime +
" REMOTE_BYTES_READ=" + metrics.remoteBytesRead
case None => ""
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] wi
def totalBlocks: Int
def numLocalBlocks: Int
def numRemoteBlocks: Int
def remoteFetchTime: Long
def fetchWaitTime: Long
def remoteBytesRead: Long
}
Expand Down Expand Up @@ -79,7 +78,6 @@ object BlockFetcherIterator {
import blockManager._

private var _remoteBytesRead = 0L
private var _remoteFetchTime = 0L
private var _fetchWaitTime = 0L

if (blocksByAddress == null) {
Expand Down Expand Up @@ -125,7 +123,6 @@ object BlockFetcherIterator {
future.onSuccess {
case Some(message) => {
val fetchDone = System.currentTimeMillis()
_remoteFetchTime += fetchDone - fetchStart
val bufferMessage = message.asInstanceOf[BufferMessage]
val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage)
for (blockMessage <- blockMessageArray) {
Expand Down Expand Up @@ -241,7 +238,6 @@ object BlockFetcherIterator {
override def totalBlocks: Int = numLocal + numRemote
override def numLocalBlocks: Int = numLocal
override def numRemoteBlocks: Int = numRemote
override def remoteFetchTime: Long = _remoteFetchTime
override def fetchWaitTime: Long = _fetchWaitTime
override def remoteBytesRead: Long = _remoteBytesRead

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
sm.localBlocksFetched should be > (0)
sm.remoteBlocksFetched should be (0)
sm.remoteBytesRead should be (0l)
sm.remoteFetchTime should be (0l)
}
}
}
Expand Down