Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ package org.apache.spark.mllib.clustering
import scala.reflect.ClassTag

import org.apache.spark.Logging
import org.apache.spark.SparkContext._
import org.apache.spark.annotation.{Experimental, DeveloperApi}
import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg.{BLAS, Vector, Vectors}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
Expand Down Expand Up @@ -165,7 +164,7 @@ class StreamingKMeansModel(
class StreamingKMeans(
var k: Int,
var decayFactor: Double,
var timeUnit: String) extends Logging {
var timeUnit: String) extends Logging with Serializable {

def this() = this(2, 1.0, StreamingKMeans.BATCHES)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ package org.apache.spark.streaming.scheduler

import scala.util.{Failure, Success, Try}

import akka.actor.{ActorRef, Props, Actor}

import org.apache.spark.{SparkEnv, Logging}
import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time}
import org.apache.spark.streaming.util.RecurringTimer
import org.apache.spark.util.{Clock, ManualClock, Utils}
import org.apache.spark.util.{Clock, EventLoop, ManualClock}

/** Event classes for JobGenerator */
private[scheduler] sealed trait JobGeneratorEvent
Expand Down Expand Up @@ -58,7 +56,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
}

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator")
longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

// This is marked lazy so that this is initialized after checkpoint duration has been set
// in the context and the generator has been started.
Expand All @@ -70,22 +68,26 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
null
}

// eventActor is created when generator starts.
// eventLoop is created when generator starts.
// This not being null means the scheduler has been started and not stopped
private var eventActor: ActorRef = null
private var eventLoop: EventLoop[JobGeneratorEvent] = null

// last batch whose completion,checkpointing and metadata cleanup has been completed
private var lastProcessedBatch: Time = null

/** Start generation of jobs */
def start(): Unit = synchronized {
if (eventActor != null) return // generator has already been started
if (eventLoop != null) return // generator has already been started

eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
override def receive: PartialFunction[Any, Unit] = {
case event: JobGeneratorEvent => processEvent(event)
override protected def onError(e: Throwable): Unit = {
jobScheduler.reportError("Error in job generator", e)
}
}), "JobGenerator")
}
eventLoop.start()

if (ssc.isCheckpointPresent) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a blank line here

restart()
} else {
Expand All @@ -99,7 +101,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
* checkpoints written.
*/
def stop(processReceivedData: Boolean): Unit = synchronized {
if (eventActor == null) return // generator has already been stopped
if (eventLoop == null) return // generator has already been stopped

if (processReceivedData) {
logInfo("Stopping JobGenerator gracefully")
Expand Down Expand Up @@ -146,25 +148,25 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
graph.stop()
}

// Stop the actor and checkpoint writer
// Stop the event loop and checkpoint writer
if (shouldCheckpoint) checkpointWriter.stop()
ssc.env.actorSystem.stop(eventActor)
eventLoop.stop()
logInfo("Stopped JobGenerator")
}

/**
* Callback called when a batch has been completely processed.
*/
def onBatchCompletion(time: Time) {
eventActor ! ClearMetadata(time)
eventLoop.post(ClearMetadata(time))
}

/**
* Callback called when the checkpoint of a batch has been written.
*/
def onCheckpointCompletion(time: Time, clearCheckpointDataLater: Boolean) {
if (clearCheckpointDataLater) {
eventActor ! ClearCheckpointData(time)
eventLoop.post(ClearCheckpointData(time))
}
}

Expand Down Expand Up @@ -247,7 +249,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventActor ! DoCheckpoint(time, clearCheckpointDataLater = false)
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}

/** Clear DStream metadata for the given `time`. */
Expand All @@ -257,7 +259,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
// If checkpointing is enabled, then checkpoint,
// else mark batch to be fully processed
if (shouldCheckpoint) {
eventActor ! DoCheckpoint(time, clearCheckpointDataLater = true)
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))
} else {
// If checkpointing is not enabled, then delete metadata information about
// received blocks (block data not saved in any case). Otherwise, wait for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.spark.streaming.scheduler

import scala.util.{Failure, Success, Try}
import scala.collection.JavaConversions._
import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors}
import akka.actor.{ActorRef, Actor, Props}
import org.apache.spark.{SparkException, Logging, SparkEnv}

import scala.collection.JavaConversions._
import scala.util.{Failure, Success}

import org.apache.spark.Logging
import org.apache.spark.rdd.PairRDDFunctions
import org.apache.spark.streaming._
import org.apache.spark.util.EventLoop


private[scheduler] sealed trait JobSchedulerEvent
Expand All @@ -46,20 +48,20 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
val listenerBus = new StreamingListenerBus()

// These two are created only when scheduler starts.
// eventActor not being null means the scheduler has been started and not stopped
// eventLoop not being null means the scheduler has been started and not stopped
var receiverTracker: ReceiverTracker = null
private var eventActor: ActorRef = null

private var eventLoop: EventLoop[JobSchedulerEvent] = null

def start(): Unit = synchronized {
if (eventActor != null) return // scheduler has already been started
if (eventLoop != null) return // scheduler has already been started

logDebug("Starting JobScheduler")
eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
override def receive: PartialFunction[Any, Unit] = {
case event: JobSchedulerEvent => processEvent(event)
}
}), "JobScheduler")
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
}
eventLoop.start()

listenerBus.start(ssc.sparkContext)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blank line here

receiverTracker = new ReceiverTracker(ssc)
Expand All @@ -69,7 +71,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}

def stop(processAllReceivedData: Boolean): Unit = synchronized {
if (eventActor == null) return // scheduler has already been stopped
if (eventLoop == null) return // scheduler has already been stopped
logDebug("Stopping JobScheduler")

// First, stop receiving
Expand All @@ -96,8 +98,8 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {

// Stop everything else
listenerBus.stop()
ssc.env.actorSystem.stop(eventActor)
eventActor = null
eventLoop.stop()
eventLoop = null
logInfo("Stopped JobScheduler")
}

Expand All @@ -117,7 +119,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}

def reportError(msg: String, e: Throwable) {
eventActor ! ErrorReported(msg, e)
eventLoop.post(ErrorReported(msg, e))
}

private def processEvent(event: JobSchedulerEvent) {
Expand Down Expand Up @@ -172,14 +174,14 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {

private class JobHandler(job: Job) extends Runnable {
def run() {
eventActor ! JobStarted(job)
eventLoop.post(JobStarted(job))
// Disable checks for existing output directories in jobs launched by the streaming scheduler,
// since we may need to write output to an existing directory during checkpoint recovery;
// see SPARK-4835 for more details.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
job.run()
}
eventActor ! JobCompleted(job)
eventLoop.post(JobCompleted(job))
}
}
}