Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.executor

import java.util.concurrent.atomic.AtomicLong

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.annotation.DeveloperApi
Expand Down Expand Up @@ -167,7 +169,7 @@ case class InputMetrics(readMethod: DataReadMethod.Value) {
/**
* Total bytes read.
*/
var bytesRead: Long = 0L
var bytesRead: AtomicLong = new AtomicLong(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is part of public API, could we fix the issue without change the API?

Also, it looks different than others. I'd prefer to keep the type unchanged.

}


Expand Down
9 changes: 6 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ class HadoopRDD[K, V](
array
}

// Task input metrics are added to for each execution of compute(). This is not instantiated
// inside compute() for the CoalescedRDD case which calls compute() multiple times for a single
// task. See SPARK-2630
private val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)

override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
val iter = new NextIterator[(K, V)] {

Expand All @@ -202,13 +207,11 @@ class HadoopRDD[K, V](
val key: K = reader.createKey()
val value: V = reader.createValue()

// Set the task input metrics.
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
try {
/* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't
* always at record boundaries, so tasks may need to read into other splits to complete
* a record. */
inputMetrics.bytesRead = split.inputSplit.value.getLength()
inputMetrics.bytesRead.addAndGet(split.inputSplit.value.getLength())
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class NewHadoopRDD[K, V](
/* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't
* always at record boundaries, so tasks may need to read into other splits to complete
* a record. */
inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength()
inputMetrics.bytesRead.addAndGet(split.serializableHadoopSplit.value.getLength())
} catch {
case e: Exception =>
logWarning("Unable to get input split size in order to set task input bytes", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[spark] class BlockResult(
readMethod: DataReadMethod.Value,
bytes: Long) {
val inputMetrics = new InputMetrics(readMethod)
inputMetrics.bytesRead = bytes
inputMetrics.bytesRead.addAndGet(bytes)
}

private[spark] class BlockManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
if (metrics != null) {
metrics.inputMetrics.foreach { inputMetrics =>
executorToInputBytes(eid) =
executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead
executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead.get()
}
metrics.shuffleReadMetrics.foreach { shuffleRead =>
executorToShuffleRead(eid) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
execSummary.shuffleRead += shuffleReadDelta

val inputBytesDelta =
(taskMetrics.inputMetrics.map(_.bytesRead).getOrElse(0L)
- oldMetrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L))
(taskMetrics.inputMetrics.map(_.bytesRead.get()).getOrElse(0L)
- oldMetrics.flatMap(_.inputMetrics).map(_.bytesRead.get()).getOrElse(0L))
stageData.inputBytes += inputBytesDelta
execSummary.inputBytes += inputBytesDelta

Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
Distribution(data).get.getQuantiles().map(d => <td>{Utils.bytesToString(d.toLong)}</td>)

val inputSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble
metrics.get.inputMetrics.map(_.bytesRead.get()).getOrElse(0L).toDouble
}
val inputQuantiles = <td>Input</td> +: getQuantileCols(inputSizes)

Expand Down Expand Up @@ -247,7 +247,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
val maybeInput = metrics.flatMap(_.inputMetrics)
val inputSortable = maybeInput.map(_.bytesRead.toString).getOrElse("")
val inputReadable = maybeInput
.map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})")
.map(m =>
s"${Utils.bytesToString(m.bytesRead.get())} (${m.readMethod.toString.toLowerCase()})")
.getOrElse("")

val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ private[spark] object JsonProtocol {

def inputMetricsToJson(inputMetrics: InputMetrics): JValue = {
("Data Read Method" -> inputMetrics.readMethod.toString) ~
("Bytes Read" -> inputMetrics.bytesRead)
("Bytes Read" -> inputMetrics.bytesRead.get())
}

def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = {
Expand Down Expand Up @@ -609,7 +609,7 @@ private[spark] object JsonProtocol {
def inputMetricsFromJson(json: JValue): InputMetrics = {
val metrics = new InputMetrics(
DataReadMethod.withName((json \ "Data Read Method").extract[String]))
metrics.bytesRead = (json \ "Bytes Read").extract[Long]
metrics.bytesRead.set((json \ "Bytes Read").extract[Long])
metrics
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,19 +438,19 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
val list1Get = store.get("list1")
assert(list1Get.isDefined, "list1 expected to be in store")
assert(list1Get.get.data.size === 2)
assert(list1Get.get.inputMetrics.bytesRead === list1SizeEstimate)
assert(list1Get.get.inputMetrics.bytesRead.get() === list1SizeEstimate)
assert(list1Get.get.inputMetrics.readMethod === DataReadMethod.Memory)
val list2MemoryGet = store.get("list2memory")
assert(list2MemoryGet.isDefined, "list2memory expected to be in store")
assert(list2MemoryGet.get.data.size === 3)
assert(list2MemoryGet.get.inputMetrics.bytesRead === list2SizeEstimate)
assert(list2MemoryGet.get.inputMetrics.bytesRead.get() === list2SizeEstimate)
assert(list2MemoryGet.get.inputMetrics.readMethod === DataReadMethod.Memory)
val list2DiskGet = store.get("list2disk")
assert(list2DiskGet.isDefined, "list2memory expected to be in store")
assert(list2DiskGet.get.data.size === 3)
System.out.println(list2DiskGet)
// We don't know the exact size of the data on disk, but it should certainly be > 0.
assert(list2DiskGet.get.inputMetrics.bytesRead > 0)
assert(list2DiskGet.get.inputMetrics.bytesRead.get() > 0)
assert(list2DiskGet.get.inputMetrics.readMethod === DataReadMethod.Disk)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskMetrics.memoryBytesSpilled = base + 6
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
taskMetrics.inputMetrics = Some(inputMetrics)
inputMetrics.bytesRead = base + 7
inputMetrics.bytesRead.set(base + 7)
taskMetrics
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ class JsonProtocolSuite extends FunSuite {

private def assertEquals(metrics1: InputMetrics, metrics2: InputMetrics) {
assert(metrics1.readMethod === metrics2.readMethod)
assert(metrics1.bytesRead === metrics2.bytesRead)
assert(metrics1.bytesRead.get() === metrics2.bytesRead.get())
}

private def assertEquals(bm1: BlockManagerId, bm2: BlockManagerId) {
Expand Down Expand Up @@ -564,7 +564,7 @@ class JsonProtocolSuite extends FunSuite {

if (hasHadoopInput) {
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
inputMetrics.bytesRead = d + e + f
inputMetrics.bytesRead.set(d + e + f)
t.inputMetrics = Some(inputMetrics)
} else {
val sr = new ShuffleReadMetrics
Expand Down