Skip to content

Commit ae04d99

Browse files
committed
Remove input metrics for parallel collections
1 parent 719f19d commit ae04d99

File tree

2 files changed

+4
-12
lines changed

2 files changed

+4
-12
lines changed

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ class TaskMetrics extends Serializable {
6767
var diskBytesSpilled: Long = _
6868

6969
/**
70-
* If this task reads from a HadoopRDD, from cached data, or from a parallelized collection,
71-
* metrics on how much data was read are stored here.
70+
* If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read
71+
* are stored here.
7272
*/
7373
var inputMetrics: Option[InputMetrics] = None
7474

core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,8 @@ import scala.collection.mutable.ArrayBuffer
2626
import scala.reflect.ClassTag
2727

2828
import org.apache.spark._
29-
import org.apache.spark.executor.{DataReadMethod, InputMetrics}
3029
import org.apache.spark.serializer.JavaSerializer
31-
import org.apache.spark.util.{SizeEstimator, Utils}
30+
import org.apache.spark.util.Utils
3231

3332
private[spark] class ParallelCollectionPartition[T: ClassTag](
3433
var rddId: Long,
@@ -100,14 +99,7 @@ private[spark] class ParallelCollectionRDD[T: ClassTag](
10099
}
101100

102101
override def compute(s: Partition, context: TaskContext) = {
103-
val parallelCollectionsPartition = s.asInstanceOf[ParallelCollectionPartition[T]]
104-
105-
// Set the input metrics for the task.
106-
val inputMetrics = new InputMetrics(DataReadMethod.Memory)
107-
inputMetrics.bytesRead = SizeEstimator.estimate(parallelCollectionsPartition.values)
108-
context.taskMetrics.inputMetrics = Some(inputMetrics)
109-
110-
new InterruptibleIterator(context, parallelCollectionsPartition.iterator)
102+
new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator)
111103
}
112104

113105
override def getPreferredLocations(s: Partition): Seq[String] = {

0 commit comments

Comments
 (0)