Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,9 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
val inputMetrics = blockResult.inputMetrics
val existingMetrics = context.taskMetrics
.getInputMetricsForReadMethod(inputMetrics.readMethod)
existingMetrics.incBytesRead(inputMetrics.bytesRead)
.getInputMetricsForReadMethod(blockResult.readMethod)
existingMetrics.incBytesRead(blockResult.bytes)

val iter = blockResult.data.asInstanceOf[Iterator[T]]
new InterruptibleIterator[T](context, iter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.util.Random
import sun.nio.ch.DirectBuffer

import org.apache.spark._
import org.apache.spark.executor._
import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.network._
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
Expand All @@ -50,11 +50,8 @@ private[spark] case class ArrayValues(buffer: Array[Any]) extends BlockValues
/* Class for returning a fetched block and associated metrics. */
private[spark] class BlockResult(
val data: Iterator[Any],
readMethod: DataReadMethod.Value,
bytes: Long) {
val inputMetrics = new InputMetrics(readMethod)
inputMetrics.incBytesRead(bytes)
}
val readMethod: DataReadMethod.Value,
val bytes: Long)

/**
* Manager running on every node (driver and executors) which provides interfaces for putting and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,19 +428,19 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
val list1Get = store.get("list1")
assert(list1Get.isDefined, "list1 expected to be in store")
assert(list1Get.get.data.size === 2)
assert(list1Get.get.inputMetrics.bytesRead === list1SizeEstimate)
assert(list1Get.get.inputMetrics.readMethod === DataReadMethod.Memory)
assert(list1Get.get.bytes === list1SizeEstimate)
assert(list1Get.get.readMethod === DataReadMethod.Memory)
val list2MemoryGet = store.get("list2memory")
assert(list2MemoryGet.isDefined, "list2memory expected to be in store")
assert(list2MemoryGet.get.data.size === 3)
assert(list2MemoryGet.get.inputMetrics.bytesRead === list2SizeEstimate)
assert(list2MemoryGet.get.inputMetrics.readMethod === DataReadMethod.Memory)
assert(list2MemoryGet.get.bytes === list2SizeEstimate)
assert(list2MemoryGet.get.readMethod === DataReadMethod.Memory)
val list2DiskGet = store.get("list2disk")
assert(list2DiskGet.isDefined, "list2memory expected to be in store")
assert(list2DiskGet.get.data.size === 3)
// We don't know the exact size of the data on disk, but it should certainly be > 0.
assert(list2DiskGet.get.inputMetrics.bytesRead > 0)
assert(list2DiskGet.get.inputMetrics.readMethod === DataReadMethod.Disk)
assert(list2DiskGet.get.bytes > 0)
assert(list2DiskGet.get.readMethod === DataReadMethod.Disk)
}

test("in-memory LRU storage") {
Expand Down