Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private[spark] object UIUtils extends Logging {
}
}
if (unit.isEmpty) {
"%d".formatLocal(Locale.US, value)
"%d".formatLocal(Locale.US, value.toInt)
} else {
"%.1f%s".formatLocal(Locale.US, value, unit)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,49 +50,42 @@ class SocketReceiver[T: ClassTag](
storageLevel: StorageLevel
) extends Receiver[T](storageLevel) with Logging {

var socket: Socket = null
var receivingThread: Thread = null

def onStart() {
receivingThread = new Thread("Socket Receiver") {
override def run() {
connect()
receive()
}
}
receivingThread.start()
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
setDaemon(true)
override def run() { receive() }
}.start()
}

def onStop() {
if (socket != null) {
socket.close()
}
socket = null
if (receivingThread != null) {
receivingThread.join()
}
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}

def connect() {
/** Create a socket connection and receive data until receiver is stopped */
def receive() {
var socket: Socket = null
try {
logInfo("Connecting to " + host + ":" + port)
socket = new Socket(host, port)
} catch {
case e: Exception =>
restart("Could not connect to " + host + ":" + port, e)
}
}

def receive() {
try {
logInfo("Connected to " + host + ":" + port)
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next)
}
logInfo("Stopped receiving")
restart("Retrying connecting to " + host + ":" + port)
} catch {
case e: Exception =>
restart("Error receiving data from socket", e)
case e: java.net.ConnectException =>
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
restart("Error receiving data", t)
} finally {
if (socket != null) {
socket.close()
logInfo("Closed socket to " + host + ":" + port)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,13 @@ import akka.actor.SupervisorStrategy.{Escalate, Restart}
import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.storage.StorageLevel
import java.nio.ByteBuffer
import org.apache.spark.annotation.DeveloperApi

/** A helper with set of defaults for supervisor strategy */
/**
* :: DeveloperApi ::
* A helper with set of defaults for supervisor strategy
*/
@DeveloperApi
object ActorSupervisorStrategy {

val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
Expand All @@ -40,6 +45,7 @@ object ActorSupervisorStrategy {
}

/**
* :: DeveloperApi ::
* A receiver trait to be mixed in with your Actor to gain access to
* the API for pushing received data into Spark Streaming for being processed.
*
Expand All @@ -61,6 +67,7 @@ object ActorSupervisorStrategy {
* to ensure the type safety, i.e parametrized type of push block and InputDStream
* should be same.
*/
@DeveloperApi
trait ActorHelper {

self: Actor => // to ensure that this can be added to Actor classes only
Expand Down Expand Up @@ -92,10 +99,12 @@ trait ActorHelper {
}

/**
* :: DeveloperApi ::
* Statistics for querying the supervisor about state of workers. Used in
* conjunction with `StreamingContext.actorStream` and
* [[org.apache.spark.streaming.receiver.ActorHelper]].
*/
@DeveloperApi
case class Statistics(numberOfMsgs: Int,
numberOfWorkers: Int,
numberOfHiccups: Int,
Expand Down Expand Up @@ -188,4 +197,3 @@ private[streaming] class ActorReceiver[T: ClassTag](
supervisor ! PoisonPill
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._

import org.apache.spark.storage.StorageLevel
import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* Abstract class of a receiver that can be run on worker nodes to receive external data. A
* custom receiver can be defined by defining the functions onStart() and onStop(). onStart()
* should define the setup steps necessary to start receiving data,
Expand All @@ -51,6 +53,7 @@ import org.apache.spark.storage.StorageLevel
* }
* }}}
*/
@DeveloperApi
abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {

/**
Expand Down Expand Up @@ -198,7 +201,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable

/** Check if receiver has been marked for stopping. */
def isStopped(): Boolean = {
!executor.isReceiverStarted()
executor.isReceiverStopped()
}

/** Get unique identifier of this receiver. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@
package org.apache.spark.streaming.receiver

/** Messages sent to the NetworkReceiver. */
private[streaming] sealed trait NetworkReceiverMessage
private[streaming] object StopReceiver extends NetworkReceiverMessage
private[streaming] sealed trait ReceiverMessage
private[streaming] object StopReceiver extends ReceiverMessage

Original file line number Diff line number Diff line change
Expand Up @@ -88,22 +88,38 @@ private[streaming] abstract class ReceiverSupervisor(
/** Report errors. */
def reportError(message: String, throwable: Throwable)

/** Start the executor */
/** Called when supervisor is started */
protected def onStart() { }

/** Called when supervisor is stopped */
protected def onStop(message: String, error: Option[Throwable]) { }

/** Called when receiver is started */
protected def onReceiverStart() { }

/** Called when receiver is stopped */
protected def onReceiverStop(message: String, error: Option[Throwable]) { }

/** Start the supervisor */
def start() {
onStart()
startReceiver()
}

/** Mark the executor and the receiver for stopping */
/** Mark the supervisor and the receiver for stopping */
def stop(message: String, error: Option[Throwable]) {
stoppingError = error.orNull
stopReceiver(message, error)
onStop(message, error)
stopLatch.countDown()
}

/** Start receiver */
def startReceiver(): Unit = synchronized {
try {
logInfo("Starting receiver")
receiver.onStart()
logInfo("Called receiver onStart")
onReceiverStart()
receiverState = Started
} catch {
Expand All @@ -115,7 +131,10 @@ private[streaming] abstract class ReceiverSupervisor(
/** Stop receiver */
def stopReceiver(message: String, error: Option[Throwable]): Unit = synchronized {
try {
logInfo("Stopping receiver with message: " + message + ": " + error.getOrElse(""))
receiverState = Stopped
receiver.onStop()
logInfo("Called receiver onStop")
onReceiverStop(message, error)
} catch {
case t: Throwable =>
Expand All @@ -130,41 +149,32 @@ private[streaming] abstract class ReceiverSupervisor(

/** Restart receiver with delay */
def restartReceiver(message: String, error: Option[Throwable], delay: Int) {
logWarning("Restarting receiver with delay " + delay + " ms: " + message,
error.getOrElse(null))
stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error)
future {
Future {
logWarning("Restarting receiver with delay " + delay + " ms: " + message,
error.getOrElse(null))
stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error)
logDebug("Sleeping for " + delay)
Thread.sleep(delay)
logDebug("Starting receiver again")
logInfo("Starting receiver again")
startReceiver()
logInfo("Receiver started again")
}
}

/** Called when the receiver needs to be started */
protected def onReceiverStart(): Unit = synchronized {
// Call user-defined onStart()
logInfo("Calling receiver onStart")
receiver.onStart()
logInfo("Called receiver onStart")
}

/** Called when the receiver needs to be stopped */
protected def onReceiverStop(message: String, error: Option[Throwable]): Unit = synchronized {
// Call user-defined onStop()
logInfo("Calling receiver onStop")
receiver.onStop()
logInfo("Called receiver onStop")
}

/** Check if receiver has been marked for stopping */
def isReceiverStarted() = {
logDebug("state = " + receiverState)
receiverState == Started
}

/** Wait the thread until the executor is stopped */
/** Check if receiver has been marked for stopping */
def isReceiverStopped() = {
logDebug("state = " + receiverState)
receiverState == Stopped
}


/** Wait the thread until the supervisor is stopped */
def awaitTermination() {
stopLatch.await()
logInfo("Waiting for executor stop is over")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ private[streaming] class ReceiverSupervisorImpl(
logInfo("Received stop signal")
stop("Stopped by driver", None)
}

def ref = self
}), "Receiver-" + streamId + "-" + System.currentTimeMillis())

/** Unique block ids if one wants to add blocks directly */
Expand Down Expand Up @@ -154,14 +156,23 @@ private[streaming] class ReceiverSupervisorImpl(
logWarning("Reported error " + message + " - " + error)
}

override def onReceiverStart() {
override protected def onStart() {
blockGenerator.start()
super.onReceiverStart()
}

override def onReceiverStop(message: String, error: Option[Throwable]) {
super.onReceiverStop(message, error)
override protected def onStop(message: String, error: Option[Throwable]) {
blockGenerator.stop()
env.actorSystem.stop(actor)
}

override protected def onReceiverStart() {
val msg = RegisterReceiver(
streamId, receiver.getClass.getSimpleName, Utils.localHostName(), actor)
val future = trackerActor.ask(msg)(askTimeout)
Await.result(future, askTimeout)
}

override protected def onReceiverStop(message: String, error: Option[Throwable]) {
logInfo("Deregistering receiver " + streamId)
val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")
val future = trackerActor.ask(
Expand All @@ -170,11 +181,6 @@ private[streaming] class ReceiverSupervisorImpl(
logInfo("Stopped receiver " + streamId)
}

override def stop(message: String, error: Option[Throwable]) {
super.stop(message, error)
env.actorSystem.stop(actor)
}

/** Generate new block ID */
private def nextBlockId = StreamBlockId(streamId, newBlockId.getAndIncrement)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
package org.apache.spark.streaming.scheduler

import org.apache.spark.streaming.Time
import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* Class having information on completed batches.
* @param batchTime Time of the batch
* @param submissionTime Clock time of when jobs of this batch was submitted to
* the streaming scheduler queue
* @param processingStartTime Clock time of when the first job of this batch started processing
* @param processingEndTime Clock time of when the last job of this batch finished processing
*/
@DeveloperApi
case class BatchInfo(
batchTime: Time,
receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.scheduler

import akka.actor.ActorRef
import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* Class having information about a receiver
*/
@DeveloperApi
case class ReceiverInfo(
streamId: Int,
name: String,
private[streaming] val actor: ActorRef,
active: Boolean,
location: String,
lastErrorMessage: String = "",
lastError: String = ""
) {
}
Loading