Skip to content

Commit 8d2a36c

Browse files
kayousterhoutrxin
authored andcommitted
[SPARK-6754] Remove unnecessary TaskContextHelper
The TaskContextHelper was originally necessary because TaskContext was written in Java, which does not have a way to specify that classes are package-private, so TaskContextHelper existed to work around this. Now that TaskContext has been re-written in Scala, this class is no longer necessary. rxin can you look at this? It looks like you missed this bit of cleanup when you moved TaskContext from Java to Scala in apache#4324 cc ScrapCodes and pwendell who added this originally. Author: Kay Ousterhout <[email protected]> Closes apache#5402 from kayousterhout/SPARK-6754 and squashes the following commits: f089800 [Kay Ousterhout] [SPARK-6754] Remove unnecessary TaskContextHelper
1 parent d138aa8 commit 8d2a36c

File tree

3 files changed

+5
-34
lines changed

3 files changed

+5
-34
lines changed

core/src/main/scala/org/apache/spark/TaskContextHelper.scala

Lines changed: 0 additions & 29 deletions
This file was deleted.

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -645,13 +645,13 @@ class DAGScheduler(
645645
val split = rdd.partitions(job.partitions(0))
646646
val taskContext = new TaskContextImpl(job.finalStage.id, job.partitions(0), taskAttemptId = 0,
647647
attemptNumber = 0, runningLocally = true)
648-
TaskContextHelper.setTaskContext(taskContext)
648+
TaskContext.setTaskContext(taskContext)
649649
try {
650650
val result = job.func(taskContext, rdd.iterator(split, taskContext))
651651
job.listener.taskSucceeded(0, result)
652652
} finally {
653653
taskContext.markTaskCompleted()
654-
TaskContextHelper.unset()
654+
TaskContext.unset()
655655
}
656656
} catch {
657657
case e: Exception =>

core/src/main/scala/org/apache/spark/scheduler/Task.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
2222

2323
import scala.collection.mutable.HashMap
2424

25-
import org.apache.spark.{TaskContextHelper, TaskContextImpl, TaskContext}
25+
import org.apache.spark.{TaskContextImpl, TaskContext}
2626
import org.apache.spark.executor.TaskMetrics
2727
import org.apache.spark.serializer.SerializerInstance
2828
import org.apache.spark.util.ByteBufferInputStream
@@ -54,7 +54,7 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
5454
final def run(taskAttemptId: Long, attemptNumber: Int): T = {
5555
context = new TaskContextImpl(stageId = stageId, partitionId = partitionId,
5656
taskAttemptId = taskAttemptId, attemptNumber = attemptNumber, runningLocally = false)
57-
TaskContextHelper.setTaskContext(context)
57+
TaskContext.setTaskContext(context)
5858
context.taskMetrics.setHostname(Utils.localHostName())
5959
taskThread = Thread.currentThread()
6060
if (_killed) {
@@ -64,7 +64,7 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
6464
runTask(context)
6565
} finally {
6666
context.markTaskCompleted()
67-
TaskContextHelper.unset()
67+
TaskContext.unset()
6868
}
6969
}
7070

0 commit comments

Comments
 (0)