Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import akka.actor.Actor
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.scheduler.TaskScheduler
import org.apache.spark.util.ActorLogReceive

/**
* A heartbeat from executors to the driver. This is a shared message used by several internal
Expand All @@ -36,8 +37,10 @@ private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
/**
* Lives in the driver to receive heartbeats from executors..
*/
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler) extends Actor {
override def receive = {
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler)
extends Actor with ActorLogReceive with Logging {

override def receiveWithLogging = {
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
val response = HeartbeatResponse(
!scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage

/** Actor class for MapOutputTrackerMaster */
private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster, conf: SparkConf)
extends Actor with Logging {
extends Actor with ActorLogReceive with Logging {
val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)

def receive = {
override def receiveWithLogging = {
case GetMapOutputStatuses(shuffleId: Int) =>
val hostPort = sender.path.address.hostPort
logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort)
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ import org.apache.log4j.{Level, Logger}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils}

/**
* Proxy that relays messages to the driver.
*/
private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with Logging {
private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
extends Actor with ActorLogReceive with Logging {

var masterActor: ActorSelection = _
val timeout = AkkaUtils.askTimeout(conf)

Expand Down Expand Up @@ -114,7 +116,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
}
}

override def receive = {
override def receiveWithLogging = {

case SubmitDriverResponse(success, driverId, message) =>
println(message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.util.{ActorLogReceive, Utils, AkkaUtils}

/**
* Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL,
Expand All @@ -56,7 +56,7 @@ private[spark] class AppClient(
var registered = false
var activeMasterUrl: String = null

class ClientActor extends Actor with Logging {
class ClientActor extends Actor with ActorLogReceive with Logging {
var master: ActorSelection = null
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
var alreadyDead = false // To avoid calling listener.dead() multiple times
Expand Down Expand Up @@ -119,7 +119,7 @@ private[spark] class AppClient(
.contains(remoteUrl.hostPort)
}

override def receive = {
override def receiveWithLogging = {
case RegisteredApplication(appId_, masterUrl) =>
appId = appId_
registered = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}

private[spark] class Master(
host: String,
port: Int,
webUiPort: Int,
val securityMgr: SecurityManager)
extends Actor with Logging {
extends Actor with ActorLogReceive with Logging {

import context.dispatcher // to use Akka's scheduler.schedule()

Expand Down Expand Up @@ -167,7 +167,7 @@ private[spark] class Master(
context.stop(leaderElectionAgent)
}

override def receive = {
override def receiveWithLogging = {
case ElectedLeader => {
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}

/**
* @param masterUrls Each url should look like spark://host:port.
Expand All @@ -51,7 +51,7 @@ private[spark] class Worker(
workDirPath: String = null,
val conf: SparkConf,
val securityMgr: SecurityManager)
extends Actor with Logging {
extends Actor with ActorLogReceive with Logging {
import context.dispatcher

Utils.checkHost(host, "Expected hostname")
Expand Down Expand Up @@ -187,7 +187,7 @@ private[spark] class Worker(
}
}

override def receive = {
override def receiveWithLogging = {
case RegisteredWorker(masterUrl, masterWebUiUrl) =>
logInfo("Successfully registered with master " + masterUrl)
registered = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import akka.remote.{AssociatedEvent, AssociationErrorEvent, AssociationEvent, Di

import org.apache.spark.Logging
import org.apache.spark.deploy.DeployMessages.SendHeartbeat
import org.apache.spark.util.ActorLogReceive

/**
* Actor which connects to a worker process and terminates the JVM if the connection is severed.
* Provides fate sharing between a worker and its associated child processes.
*/
private[spark] class WorkerWatcher(workerUrl: String) extends Actor
with Logging {
private[spark] class WorkerWatcher(workerUrl: String)
extends Actor with ActorLogReceive with Logging {

override def preStart() {
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

Expand All @@ -48,7 +50,7 @@ private[spark] class WorkerWatcher(workerUrl: String) extends Actor

def exitNonZero() = if (isTesting) isShutDown = true else System.exit(-1)

override def receive = {
override def receiveWithLogging = {
case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) =>
logInfo(s"Successfully connected to $workerUrl")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}

private[spark] class CoarseGrainedExecutorBackend(
driverUrl: String,
executorId: String,
hostPort: String,
cores: Int,
sparkProperties: Seq[(String, String)]) extends Actor with ExecutorBackend with Logging {
sparkProperties: Seq[(String, String)])
extends Actor with ActorLogReceive with ExecutorBackend with Logging {

Utils.checkHostPort(hostPort, "Expected hostport")

Expand All @@ -52,7 +53,7 @@ private[spark] class CoarseGrainedExecutorBackend(
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}

override def receive = {
override def receiveWithLogging = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
// Make this host instead of hostPort ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils}
import org.apache.spark.util.{ActorLogReceive, SerializableBuffer, AkkaUtils, Utils}
import org.apache.spark.ui.JettyUtils

/**
Expand Down Expand Up @@ -61,7 +61,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
val createTime = System.currentTimeMillis()

class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor with ActorLogReceive {

override protected def log = CoarseGrainedSchedulerBackend.this.log

private val executorActor = new HashMap[String, ActorRef]
private val executorAddress = new HashMap[String, Address]
private val executorHost = new HashMap[String, String]
Expand All @@ -79,7 +82,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
}

def receive = {
def receiveWithLogging = {
case RegisterExecutor(executorId, hostPort, cores) =>
Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
if (executorActor.contains(executorId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import akka.actor.{Actor, ActorRef, Props}

import org.apache.spark.{Logging, SparkEnv, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.{TaskMetrics, Executor, ExecutorBackend}
import org.apache.spark.executor.{Executor, ExecutorBackend}
import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.ActorLogReceive

private case class ReviveOffers()

Expand All @@ -43,7 +43,7 @@ private case class StopExecutor()
private[spark] class LocalActor(
scheduler: TaskSchedulerImpl,
executorBackend: LocalBackend,
private val totalCores: Int) extends Actor with Logging {
private val totalCores: Int) extends Actor with ActorLogReceive with Logging {

private var freeCores = totalCores

Expand All @@ -53,7 +53,7 @@ private[spark] class LocalActor(
val executor = new Executor(
localExecutorId, localExecutorHostname, scheduler.conf.getAll, isLocal = true)

def receive = {
override def receiveWithLogging = {
case ReviveOffers =>
reviveOffers()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils}

/**
* BlockManagerMasterActor is an actor on the master node to track statuses of
* all slaves' block managers.
*/
private[spark]
class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus: LiveListenerBus)
extends Actor with Logging {
extends Actor with ActorLogReceive with Logging {

// Mapping from block manager id to the block manager's information.
private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]
Expand All @@ -55,8 +55,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
val slaveTimeout = conf.getLong("spark.storage.blockManagerSlaveTimeoutMs",
math.max(conf.getInt("spark.executor.heartbeatInterval", 10000) * 3, 45000))

val checkTimeoutInterval = conf.getLong("spark.storage.blockManagerTimeoutIntervalMs",
60000)
val checkTimeoutInterval = conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000)

var timeoutCheckingTask: Cancellable = null

Expand All @@ -67,9 +66,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
super.preStart()
}

def receive = {
override def receiveWithLogging = {
case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
logInfo("received a register")
register(blockManagerId, maxMemSize, slaveActor)
sender ! true

Expand Down Expand Up @@ -118,7 +116,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
sender ! true

case StopBlockManagerMaster =>
logInfo("Stopping BlockManagerMaster")
sender ! true
if (timeoutCheckingTask != null) {
timeoutCheckingTask.cancel()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import akka.actor.{ActorRef, Actor}

import org.apache.spark.{Logging, MapOutputTracker}
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.ActorLogReceive

/**
* An actor to take commands from the master to execute options. For example,
Expand All @@ -32,12 +33,12 @@ private[storage]
class BlockManagerSlaveActor(
blockManager: BlockManager,
mapOutputTracker: MapOutputTracker)
extends Actor with Logging {
extends Actor with ActorLogReceive with Logging {

import context.dispatcher

// Operations that involve removing blocks may be slow and should be done asynchronously
override def receive = {
override def receiveWithLogging = {
case RemoveBlock(blockId) =>
doAsync[Boolean]("removing block " + blockId, sender) {
blockManager.removeBlock(blockId)
Expand Down
64 changes: 64 additions & 0 deletions core/src/main/scala/org/apache/spark/util/ActorLogReceive.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util

import akka.actor.Actor
import org.slf4j.Logger

/**
* A trait to enable logging all Akka actor messages. Here's an example of using this:
*
* {{{
* class BlockManagerMasterActor extends Actor with ActorLogReceive with Logging {
* ...
* override def receiveWithLogging = {
* case GetLocations(blockId) =>
* sender ! getLocations(blockId)
* ...
* }
* ...
* }
* }}}
*
*/
private[spark] trait ActorLogReceive {
self: Actor =>

override def receive: Actor.Receive = new Actor.Receive {

private val _receiveWithLogging = receiveWithLogging

override def isDefinedAt(o: Any): Boolean = _receiveWithLogging.isDefinedAt(o)

override def apply(o: Any): Unit = {
if (log.isDebugEnabled) {
log.debug(s"[actor] received message $o from ${self.sender}")
}
val start = System.nanoTime
_receiveWithLogging.apply(o)
val timeTaken = (System.nanoTime - start).toDouble / 1000000
if (log.isDebugEnabled) {
log.debug(s"[actor] handled message ($timeTaken ms) $o from ${self.sender}")
}
}
}

def receiveWithLogging: Actor.Receive

protected def log: Logger
}