Skip to content

Commit ad03c48

Browse files
author
Marcelo Vanzin
committed
Revert "Fix a thread-safety issue in "local" mode."
This reverts commit f26556b. Conflicts: core/src/main/scala/org/apache/spark/executor/Executor.scala
1 parent 0b509d0 commit ad03c48

File tree

2 files changed

+5
-14
lines changed

2 files changed

+5
-14
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,18 +43,9 @@ private[spark] class Executor(
4343
executorId: String,
4444
executorHostname: String,
4545
env: SparkEnv,
46-
conf: SparkConf,
4746
isLocal: Boolean = false)
4847
extends Logging
4948
{
50-
51-
def this(executorId: String,
52-
slaveHostname: String,
53-
env: SparkEnv,
54-
isLocal: Boolean = false) = {
55-
this(executorId, slaveHostname, env, env.conf, isLocal)
56-
}
57-
5849
logInfo(s"Starting executor ID $executorId on host $executorHostname")
5950

6051
// Application dependencies (added through SparkContext) that we've fetched so far on this node.
@@ -64,6 +55,8 @@ private[spark] class Executor(
6455

6556
private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))
6657

58+
private val conf = env.conf
59+
6760
@volatile private var isStopped = false
6861

6962
// No ip or host:port - just hostname

core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
2121

2222
import akka.actor.{Actor, ActorRef, Props}
2323

24-
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState}
24+
import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskState}
2525
import org.apache.spark.TaskState.TaskState
2626
import org.apache.spark.executor.{Executor, ExecutorBackend}
2727
import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
@@ -43,7 +43,6 @@ private case class StopExecutor()
4343
private[spark] class LocalActor(
4444
scheduler: TaskSchedulerImpl,
4545
executorBackend: LocalBackend,
46-
conf: SparkConf,
4746
private val totalCores: Int)
4847
extends Actor with ActorLogReceive with Logging {
4948

@@ -53,7 +52,7 @@ private[spark] class LocalActor(
5352
private val localExecutorHostname = "localhost"
5453

5554
private val executor = new Executor(
56-
localExecutorId, localExecutorHostname, SparkEnv.get, conf, isLocal = true)
55+
localExecutorId, localExecutorHostname, SparkEnv.get, isLocal = true)
5756

5857
override def receiveWithLogging = {
5958
case ReviveOffers =>
@@ -92,12 +91,11 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores:
9291
extends SchedulerBackend with ExecutorBackend {
9392

9493
private val appId = "local-" + System.currentTimeMillis
95-
private val conf = SparkEnv.get.conf.clone()
9694
var localActor: ActorRef = null
9795

9896
override def start() {
9997
localActor = SparkEnv.get.actorSystem.actorOf(
100-
Props(new LocalActor(scheduler, this, conf, totalCores)),
98+
Props(new LocalActor(scheduler, this, totalCores)),
10199
"LocalBackendActor")
102100
}
103101

0 commit comments

Comments
 (0)