Skip to content

Commit 03e85b4

Browse files
committed
[SPARK-7046] Remove InputMetrics from BlockResult
This is a code cleanup. The BlockResult class originally contained an InputMetrics object so that InputMetrics could directly be used as the InputMetrics for the whole task. Now we copy the fields out of here, and the presence of this object is confusing because it's only a partial input metrics (it doesn't include the records read). Because this object is no longer useful (and is confusing), it should be removed. Author: Kay Ousterhout <[email protected]> Closes apache#5627 from kayousterhout/SPARK-7046 and squashes the following commits: bf64bbe [Kay Ousterhout] Import fix a08ca19 [Kay Ousterhout] [SPARK-7046] Remove InputMetrics from BlockResult
1 parent d206860 commit 03e85b4

File tree

3 files changed

+11
-15
lines changed

3 files changed

+11
-15
lines changed

core/src/main/scala/org/apache/spark/CacheManager.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,9 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
4444
blockManager.get(key) match {
4545
case Some(blockResult) =>
4646
// Partition is already materialized, so just return its values
47-
val inputMetrics = blockResult.inputMetrics
4847
val existingMetrics = context.taskMetrics
49-
.getInputMetricsForReadMethod(inputMetrics.readMethod)
50-
existingMetrics.incBytesRead(inputMetrics.bytesRead)
48+
.getInputMetricsForReadMethod(blockResult.readMethod)
49+
existingMetrics.incBytesRead(blockResult.bytes)
5150

5251
val iter = blockResult.data.asInstanceOf[Iterator[T]]
5352
new InterruptibleIterator[T](context, iter) {

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import scala.util.Random
2929
import sun.nio.ch.DirectBuffer
3030

3131
import org.apache.spark._
32-
import org.apache.spark.executor._
32+
import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics}
3333
import org.apache.spark.io.CompressionCodec
3434
import org.apache.spark.network._
3535
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
@@ -50,11 +50,8 @@ private[spark] case class ArrayValues(buffer: Array[Any]) extends BlockValues
5050
/* Class for returning a fetched block and associated metrics. */
5151
private[spark] class BlockResult(
5252
val data: Iterator[Any],
53-
readMethod: DataReadMethod.Value,
54-
bytes: Long) {
55-
val inputMetrics = new InputMetrics(readMethod)
56-
inputMetrics.incBytesRead(bytes)
57-
}
53+
val readMethod: DataReadMethod.Value,
54+
val bytes: Long)
5855

5956
/**
6057
* Manager running on every node (driver and executors) which provides interfaces for putting and

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -428,19 +428,19 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
428428
val list1Get = store.get("list1")
429429
assert(list1Get.isDefined, "list1 expected to be in store")
430430
assert(list1Get.get.data.size === 2)
431-
assert(list1Get.get.inputMetrics.bytesRead === list1SizeEstimate)
432-
assert(list1Get.get.inputMetrics.readMethod === DataReadMethod.Memory)
431+
assert(list1Get.get.bytes === list1SizeEstimate)
432+
assert(list1Get.get.readMethod === DataReadMethod.Memory)
433433
val list2MemoryGet = store.get("list2memory")
434434
assert(list2MemoryGet.isDefined, "list2memory expected to be in store")
435435
assert(list2MemoryGet.get.data.size === 3)
436-
assert(list2MemoryGet.get.inputMetrics.bytesRead === list2SizeEstimate)
437-
assert(list2MemoryGet.get.inputMetrics.readMethod === DataReadMethod.Memory)
436+
assert(list2MemoryGet.get.bytes === list2SizeEstimate)
437+
assert(list2MemoryGet.get.readMethod === DataReadMethod.Memory)
438438
val list2DiskGet = store.get("list2disk")
439439
assert(list2DiskGet.isDefined, "list2memory expected to be in store")
440440
assert(list2DiskGet.get.data.size === 3)
441441
// We don't know the exact size of the data on disk, but it should certainly be > 0.
442-
assert(list2DiskGet.get.inputMetrics.bytesRead > 0)
443-
assert(list2DiskGet.get.inputMetrics.readMethod === DataReadMethod.Disk)
442+
assert(list2DiskGet.get.bytes > 0)
443+
assert(list2DiskGet.get.readMethod === DataReadMethod.Disk)
444444
}
445445

446446
test("in-memory LRU storage") {

0 commit comments

Comments
 (0)