From 1ebb1f329ab3d16d06754453ae2c57295133d750 Mon Sep 17 00:00:00 2001 From: christophe Date: Tue, 22 Apr 2014 21:11:23 -0700 Subject: [PATCH] [STREAMING] SPARK-1581: Allow One Flume Avro RPC Server for Each Worker rather than Just One Worker --- .../streaming/flume/FlumeInputDStream.scala | 8 ++--- .../spark/streaming/flume/FlumeUtils.scala | 31 +++++++++++++++---- .../streaming/flume/FlumeStreamSuite.scala | 26 +++++++++++++--- 3 files changed, 50 insertions(+), 15 deletions(-) diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala index 34012b846e21e..83f67583c9ac5 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala @@ -38,7 +38,7 @@ import org.apache.spark.streaming.dstream._ private[streaming] class FlumeInputDStream[T: ClassTag]( @transient ssc_ : StreamingContext, - host: String, + host: Option[String], port: Int, storageLevel: StorageLevel ) extends NetworkInputDStream[SparkFlumeEvent](ssc_) { @@ -130,7 +130,7 @@ class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol { * Flume Avro interface. */ private[streaming] class FlumeReceiver( - host: String, + host: Option[String], port: Int, storageLevel: StorageLevel ) extends NetworkReceiver[SparkFlumeEvent] { @@ -140,7 +140,7 @@ class FlumeReceiver( protected override def onStart() { val responder = new SpecificResponder( classOf[AvroSourceProtocol], new FlumeEventServer(this)) - val server = new NettyServer(responder, new InetSocketAddress(host, port)) + val server = new NettyServer(responder, new InetSocketAddress(host.getOrElse("0.0.0.0"), port)) blockGenerator.start() server.start() logInfo("Flume receiver started") @@ -151,5 +151,5 @@ class FlumeReceiver( logInfo("Flume receiver stopped") } - override def getLocationPreference = Some(host) + override def getLocationPreference = host } diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index 654ba451e72fb..3f3c8b556beb3 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -24,19 +24,38 @@ import org.apache.spark.streaming.dstream.DStream object FlumeUtils { /** - * Create a input stream from a Flume source. + * Create a input stream from a Flume source by starting an Avro RPC server on each worker. * @param ssc StreamingContext object - * @param hostname Hostname of the slave machine to which the flume data will be sent - * @param port Port of the slave machine to which the flume data will be sent + * @param port Port of the slave machines to which the flume data will be sent * @param storageLevel Storage level to use for storing the received objects */ def createStream ( ssc: StreamingContext, - hostname: String, port: Int, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + storageLevel: StorageLevel ): DStream[SparkFlumeEvent] = { - val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel) + val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, None, port, storageLevel) + inputStream + } + + /** + * Create a input stream from a Flume source. + * @param ssc StreamingContext object + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + */ + def createStream ( + ssc: StreamingContext, + hostname: String, + port: Int, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[SparkFlumeEvent] = { + val inputStream = new FlumeInputDStream[SparkFlumeEvent]( + ssc, + Some(hostname), + port, + storageLevel) inputStream } diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index 8bc43972ab6a0..f0664bcbc5257 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -31,15 +31,14 @@ import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase} import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.streaming.dstream.DStream class FlumeStreamSuite extends TestSuiteBase { - val testPort = 9999 - test("flume input stream") { + def testStream(flumeStream: DStream[SparkFlumeEvent], port: Int) { // Set up the streaming context and input streams - val ssc = new StreamingContext(conf, batchDuration) - val flumeStream = FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK) + val ssc = flumeStream.ssc val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] with SynchronizedBuffer[Seq[SparkFlumeEvent]] val outputStream = new TestOutputStream(flumeStream, outputBuffer) @@ -49,7 +48,7 @@ class FlumeStreamSuite extends TestSuiteBase { val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] val input = Seq(1, 2, 3, 4, 5) Thread.sleep(1000) - val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort)) + val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", port)) val client = SpecificRequestor.getClient( classOf[AvroSourceProtocol], transceiver) @@ -71,16 +70,33 @@ class FlumeStreamSuite extends TestSuiteBase { val timeTaken = System.currentTimeMillis() - startTime assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") logInfo("Stopping context") + transceiver.close() ssc.stop() val decoder = Charset.forName("UTF-8").newDecoder() + println(outputBuffer) assert(outputBuffer.size === input.length) for (i <- 0 until outputBuffer.size) { + println(outputBuffer(i)) assert(outputBuffer(i).size === 1) val str = decoder.decode(outputBuffer(i).head.event.getBody) assert(str.toString === input(i).toString) assert(outputBuffer(i).head.event.getHeaders.get("test") === "header") } } + + test("flume input stream") { + val ssc = new StreamingContext(conf, batchDuration) + val port = 9901 + val flumeStream = FlumeUtils.createStream(ssc, "localhost", port, StorageLevel.MEMORY_AND_DISK) + testStream(flumeStream, port) + } + + test("flume multi-worker input stream") { + val ssc = new StreamingContext(conf, batchDuration) + val port = 9902 + val multiWorkerFlumeListeningStream = FlumeUtils.createStream(ssc, port, StorageLevel.MEMORY_AND_DISK) + testStream(multiWorkerFlumeListeningStream, port) + } }