Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,11 @@ private[spark] object UIUtils extends Logging {
(records, "")
}
}
"%.1f%s".formatLocal(Locale.US, value, unit)
if (unit.isEmpty) {
"%d".formatLocal(Locale.US, value)
} else {
"%.1f%s".formatLocal(Locale.US, value, unit)
}
}

// Yarn has to go through a proxy so the base uri is provided and has to be on all links
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import akka.actor.{Actor, ActorRef, Props, actorRef2Scala}
import org.apache.spark.{SparkConf, SecurityManager}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
import org.apache.spark.streaming.receivers.Receiver
import org.apache.spark.util.AkkaUtils
import org.apache.spark.streaming.receiver.ActorHelper

case class SubscribeReceiver(receiverActor: ActorRef)
case class UnsubscribeReceiver(receiverActor: ActorRef)
Expand Down Expand Up @@ -81,14 +81,14 @@ class FeederActor extends Actor {
* @see [[org.apache.spark.streaming.examples.FeederActor]]
*/
class SampleActorReceiver[T: ClassTag](urlOfPublisher: String)
extends Actor with Receiver {
extends Actor with ActorHelper {

lazy private val remotePublisher = context.actorSelection(urlOfPublisher)

override def preStart = remotePublisher ! SubscribeReceiver(context.self)

def receive = {
case msg => pushBlock(msg.asInstanceOf[T])
case msg => store(msg.asInstanceOf[T])
}

override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,18 @@ import org.apache.spark.util.Utils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
import org.apache.spark.Logging
import org.apache.spark.streaming.receiver.Receiver

private[streaming]
class FlumeInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
host: String,
port: Int,
storageLevel: StorageLevel
) extends NetworkInputDStream[SparkFlumeEvent](ssc_) {
) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {

override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = {
override def getReceiver(): Receiver[SparkFlumeEvent] = {
new FlumeReceiver(host, port, storageLevel)
}
}
Expand Down Expand Up @@ -115,13 +117,13 @@ private[streaming] object SparkFlumeEvent {
private[streaming]
class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
override def append(event : AvroFlumeEvent) : Status = {
receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)
receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))
Status.OK
}

override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = {
events.foreach (event =>
receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event))
receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)))
Status.OK
}
}
Expand All @@ -133,23 +135,21 @@ class FlumeReceiver(
host: String,
port: Int,
storageLevel: StorageLevel
) extends NetworkReceiver[SparkFlumeEvent] {
) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {

lazy val blockGenerator = new BlockGenerator(storageLevel)
lazy val responder = new SpecificResponder(
classOf[AvroSourceProtocol], new FlumeEventServer(this))
lazy val server = new NettyServer(responder, new InetSocketAddress(host, port))

protected override def onStart() {
val responder = new SpecificResponder(
classOf[AvroSourceProtocol], new FlumeEventServer(this))
val server = new NettyServer(responder, new InetSocketAddress(host, port))
blockGenerator.start()
def onStart() {
server.start()
logInfo("Flume receiver started")
}

protected override def onStop() {
blockGenerator.stop()
def onStop() {
server.close()
logInfo("Flume receiver stopped")
}

override def getLocationPreference = Some(host)
override def preferredLocation = Some(host)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package org.apache.spark.streaming.flume

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaInputDStream, JavaStreamingContext, JavaDStream}
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}

object FlumeUtils {
/**
Expand All @@ -35,7 +35,7 @@ object FlumeUtils {
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[SparkFlumeEvent] = {
): ReceiverInputDStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
inputStream
}
Expand All @@ -50,7 +50,7 @@ object FlumeUtils {
jssc: JavaStreamingContext,
hostname: String,
port: Int
): JavaDStream[SparkFlumeEvent] = {
): JavaReceiverInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port)
}

Expand All @@ -65,7 +65,7 @@ object FlumeUtils {
hostname: String,
port: Int,
storageLevel: StorageLevel
): JavaDStream[SparkFlumeEvent] = {
): JavaReceiverInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port, storageLevel)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@

import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;

import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.junit.Test;

public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
@Test
public void testFlumeStream() {
// tests the API, does not actually test data receiving
JavaDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
JavaDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
StorageLevel.MEMORY_AND_DISK_SER_2());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream

class FlumeStreamSuite extends TestSuiteBase {

Expand All @@ -39,10 +40,11 @@ class FlumeStreamSuite extends TestSuiteBase {
test("flume input stream") {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val flumeStream = FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] =
FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer)
outputStream.register()
ssc.start()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.Receiver

/**
* Input stream that pulls messages from a Kafka Broker.
Expand All @@ -53,11 +54,11 @@ class KafkaInputDStream[
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
) extends NetworkInputDStream[(K, V)](ssc_) with Logging {
) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {

def getReceiver(): NetworkReceiver[(K, V)] = {
def getReceiver(): Receiver[(K, V)] = {
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
.asInstanceOf[NetworkReceiver[(K, V)]]
.asInstanceOf[Receiver[(K, V)]]
}
}

Expand All @@ -70,21 +71,15 @@ class KafkaReceiver[
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
) extends NetworkReceiver[Any] {
) extends Receiver[Any](storageLevel) with Logging {

// Handles pushing data into the BlockManager
lazy protected val blockGenerator = new BlockGenerator(storageLevel)
// Connection to Kafka
var consumerConnector : ConsumerConnector = null

def onStop() {
blockGenerator.stop()
}
def onStop() { }

def onStart() {

blockGenerator.start()

// In case we are using multiple Threads to handle Kafka Messages
val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))

Expand Down Expand Up @@ -130,7 +125,7 @@ class KafkaReceiver[
def run() {
logInfo("Starting MessageHandler.")
for (msgAndMetadata <- stream) {
blockGenerator += (msgAndMetadata.key, msgAndMetadata.message)
store((msgAndMetadata.key, msgAndMetadata.message))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import kafka.serializer.{Decoder, StringDecoder}

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext, JavaPairDStream}
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}


object KafkaUtils {
Expand All @@ -48,7 +48,7 @@ object KafkaUtils {
groupId: String,
topics: Map[String, Int],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[(String, String)] = {
): ReceiverInputDStream[(String, String)] = {
val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
"zookeeper.connection.timeout.ms" -> "10000")
Expand All @@ -70,7 +70,7 @@ object KafkaUtils {
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
): DStream[(K, V)] = {
): ReceiverInputDStream[(K, V)] = {
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
}

Expand All @@ -88,7 +88,7 @@ object KafkaUtils {
zkQuorum: String,
groupId: String,
topics: JMap[String, JInt]
): JavaPairDStream[String, String] = {
): JavaPairReceiverInputDStream[String, String] = {
implicit val cmt: ClassTag[String] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
Expand All @@ -110,7 +110,7 @@ object KafkaUtils {
groupId: String,
topics: JMap[String, JInt],
storageLevel: StorageLevel
): JavaPairDStream[String, String] = {
): JavaPairReceiverInputDStream[String, String] = {
implicit val cmt: ClassTag[String] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
Expand Down Expand Up @@ -139,7 +139,7 @@ object KafkaUtils {
kafkaParams: JMap[String, String],
topics: JMap[String, JInt],
storageLevel: StorageLevel
): JavaPairDStream[K, V] = {
): JavaPairReceiverInputDStream[K, V] = {
implicit val keyCmt: ClassTag[K] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
implicit val valueCmt: ClassTag[V] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,29 @@
package org.apache.spark.streaming.kafka;

import java.util.HashMap;

import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.junit.Test;
import com.google.common.collect.Maps;
import kafka.serializer.StringDecoder;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;

public class JavaKafkaStreamSuite extends LocalJavaStreamingContext {
@Test
public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();

// tests the API, does not actually test data receiving
JavaPairDStream<String, String> test1 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics);
JavaPairDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
JavaPairReceiverInputDStream<String, String> test1 =
KafkaUtils.createStream(ssc, "localhost:12345", "group", topics);
JavaPairReceiverInputDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
StorageLevel.MEMORY_AND_DISK_SER_2());

HashMap<String, String> kafkaParams = Maps.newHashMap();
kafkaParams.put("zookeeper.connect", "localhost:12345");
kafkaParams.put("group.id","consumer-group");
JavaPairDStream<String, String> test3 = KafkaUtils.createStream(ssc,
JavaPairReceiverInputDStream<String, String> test3 = KafkaUtils.createStream(ssc,
String.class, String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.streaming.kafka
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream

class KafkaStreamSuite extends TestSuiteBase {

Expand All @@ -28,10 +29,13 @@ class KafkaStreamSuite extends TestSuiteBase {
val topics = Map("my-topic" -> 1)

// tests the API, does not actually test data receiving
val test1 = KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
val test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
val test1: ReceiverInputDStream[(String, String)] =
KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
val test2: ReceiverInputDStream[(String, String)] =
KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
val test3 = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
val test3: ReceiverInputDStream[(String, String)] =
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)

// TODO: Actually test receiving data
Expand Down
Loading