From c433827db5dfda6f5b1b6aa11e45447525b4aac4 Mon Sep 17 00:00:00 2001 From: tmalaska Date: Mon, 14 Apr 2014 13:37:01 -0400 Subject: [PATCH] SPARK-1478 --- external/flume/pom.xml | 2 +- .../streaming/flume/FlumeInputDStream.scala | 58 ++++++++++++++++--- .../spark/streaming/flume/FlumeUtils.scala | 50 +++++++++++++++- .../streaming/flume/JavaFlumeStreamSuite.java | 2 + .../streaming/flume/FlumeStreamSuite.scala | 55 +++++++++++++++--- project/SparkBuild.scala | 2 +- 6 files changed, 149 insertions(+), 20 deletions(-) diff --git a/external/flume/pom.xml b/external/flume/pom.xml index f21963531574b..771c5b7553449 100644 --- a/external/flume/pom.xml +++ b/external/flume/pom.xml @@ -61,7 +61,7 @@ org.apache.flume flume-ng-sdk - 1.2.0 + 1.3.0 org.jboss.netty diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala index 34012b846e21e..d08a0d5ae186b 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala @@ -20,31 +20,44 @@ package org.apache.spark.streaming.flume import java.net.InetSocketAddress import java.io.{ObjectInput, ObjectOutput, Externalizable} import java.nio.ByteBuffer - import scala.collection.JavaConversions._ import scala.reflect.ClassTag - import org.apache.flume.source.avro.AvroSourceProtocol import org.apache.flume.source.avro.AvroFlumeEvent import org.apache.flume.source.avro.Status import org.apache.avro.ipc.specific.SpecificResponder import org.apache.avro.ipc.NettyServer - import org.apache.spark.util.Utils import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream._ +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory +import org.jboss.netty.channel.ChannelPipelineFactory +import java.util.concurrent.Executors +import org.jboss.netty.channel.Channels +import org.jboss.netty.handler.codec.compression.ZlibDecoder +import org.jboss.netty.handler.codec.compression.ZlibEncoder +import org.jboss.netty.channel.ChannelPipeline +import org.jboss.netty.channel.ChannelFactory +import org.jboss.netty.handler.execution.ExecutionHandler private[streaming] class FlumeInputDStream[T: ClassTag]( @transient ssc_ : StreamingContext, host: String, port: Int, - storageLevel: StorageLevel + storageLevel: StorageLevel, + enableCompression: Boolean ) extends NetworkInputDStream[SparkFlumeEvent](ssc_) { + def this( + @transient ssc_ : StreamingContext, + host: String, + port: Int, + storageLevel: StorageLevel) = this(ssc_, host, port, storageLevel, false); + override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = { - new FlumeReceiver(host, port, storageLevel) + new FlumeReceiver(host, port, storageLevel, enableCompression) } } @@ -132,7 +145,8 @@ private[streaming] class FlumeReceiver( host: String, port: Int, - storageLevel: StorageLevel + storageLevel: StorageLevel, + enableCompression: Boolean ) extends NetworkReceiver[SparkFlumeEvent] { lazy val blockGenerator = new BlockGenerator(storageLevel) @@ -140,7 +154,25 @@ class FlumeReceiver( protected override def onStart() { val responder = new SpecificResponder( classOf[AvroSourceProtocol], new FlumeEventServer(this)) - val server = new NettyServer(responder, new InetSocketAddress(host, port)) + + var server : NettyServer = null; + + if (enableCompression) { + val channelFactory : ChannelFactory = new NioServerSocketChannelFactory + (Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); + val channelPipelieFactory : ChannelPipelineFactory = new CompressionChannelPipelineFactory() + val executionHandler : ExecutionHandler = null + + server = new NettyServer( + responder, + new InetSocketAddress(host, port), + channelFactory, + channelPipelieFactory, + executionHandler) + } else { + server = new NettyServer(responder, new InetSocketAddress(host, port)) + } + blockGenerator.start() server.start() logInfo("Flume receiver started") @@ -153,3 +185,15 @@ class FlumeReceiver( override def getLocationPreference = Some(host) } + +private[streaming] +class CompressionChannelPipelineFactory() extends ChannelPipelineFactory { + + def getPipeline() = { + val pipeline = Channels.pipeline() + val encoder = new ZlibEncoder(6) + pipeline.addFirst("deflater", encoder) + pipeline.addFirst("inflater", new ZlibDecoder()) + pipeline + } +} diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index 654ba451e72fb..d5f31416a1bea 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -29,6 +29,7 @@ object FlumeUtils { * @param hostname Hostname of the slave machine to which the flume data will be sent * @param port Port of the slave machine to which the flume data will be sent * @param storageLevel Storage level to use for storing the received objects + * @param enableCompression Should Netty Server decode input stream from client */ def createStream ( ssc: StreamingContext, @@ -36,7 +37,30 @@ object FlumeUtils { port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[SparkFlumeEvent] = { - val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel) + createStream(ssc, hostname, port, storageLevel, false); + } + + /** + * Create a input stream from a Flume source. + * @param ssc StreamingContext object + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + * @param enableCompression Should Netty Server decode input stream from client + */ + def createStream ( + ssc: StreamingContext, + hostname: String, + port: Int, + storageLevel: StorageLevel, + enableCompression: Boolean + ): DStream[SparkFlumeEvent] = { + val inputStream = new FlumeInputDStream[SparkFlumeEvent]( + ssc, + hostname, + port, + storageLevel, + enableCompression) inputStream } @@ -66,6 +90,28 @@ object FlumeUtils { port: Int, storageLevel: StorageLevel ): JavaDStream[SparkFlumeEvent] = { - createStream(jssc.ssc, hostname, port, storageLevel) + createStream(jssc.ssc, hostname, port, storageLevel, false) + } + + /** + * Creates a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + * @param enableCompression Should Netty Server decode input stream from client + */ + def createStream( + jssc: JavaStreamingContext, + hostname: String, + port: Int, + storageLevel: StorageLevel, + enableCompression: Boolean + ): JavaDStream[SparkFlumeEvent] = { + createStream( + jssc.ssc, + hostname, + port, + storageLevel, + enableCompression) } } diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java index 733389b98d22d..c2e50054ba6df 100644 --- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java +++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java @@ -30,5 +30,7 @@ public void testFlumeStream() { JavaDStream test1 = FlumeUtils.createStream(ssc, "localhost", 12345); JavaDStream test2 = FlumeUtils.createStream(ssc, "localhost", 12345, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaDStream test3 = FlumeUtils.createStream(ssc, "localhost", 12345, + StorageLevel.MEMORY_AND_DISK_SER_2(), true); } } diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index 8bc43972ab6a0..cd86bb7c5bfd4 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -19,27 +19,41 @@ package org.apache.spark.streaming.flume import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} - import java.net.InetSocketAddress import java.nio.ByteBuffer import java.nio.charset.Charset - import org.apache.avro.ipc.NettyTransceiver import org.apache.avro.ipc.specific.SpecificRequestor import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol} - import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase} import org.apache.spark.streaming.util.ManualClock +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory +import org.jboss.netty.channel.ChannelPipeline +import org.jboss.netty.handler.codec.compression.ZlibEncoder +import org.jboss.netty.handler.codec.compression.ZlibDecoder +import org.jboss.netty.channel.socket.SocketChannel class FlumeStreamSuite extends TestSuiteBase { - val testPort = 9999 - test("flume input stream") { + runFlumeStreamTest(false, 9999) + } + + test("flume input compressed stream") { + runFlumeStreamTest(true, 9998) + } + + def runFlumeStreamTest(enableCompression: Boolean, testPort: Int) { // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) - val flumeStream = FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK) + val flumeStream = FlumeUtils.createStream( + ssc, + "localhost", + testPort, + StorageLevel.MEMORY_AND_DISK, + enableCompression) + val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] with SynchronizedBuffer[Seq[SparkFlumeEvent]] val outputStream = new TestOutputStream(flumeStream, outputBuffer) @@ -49,9 +63,20 @@ class FlumeStreamSuite extends TestSuiteBase { val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] val input = Seq(1, 2, 3, 4, 5) Thread.sleep(1000) - val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort)) - val client = SpecificRequestor.getClient( - classOf[AvroSourceProtocol], transceiver) + + var client: AvroSourceProtocol = null; + + if (enableCompression) { + client = SpecificRequestor.getClient( + classOf[AvroSourceProtocol], + new NettyTransceiver(new InetSocketAddress("localhost", testPort), + new CompressionChannelFactory(6))); + } else { + client = SpecificRequestor.getClient( + classOf[AvroSourceProtocol], + new NettyTransceiver(new InetSocketAddress("localhost", testPort))); + } + for (i <- 0 until input.size) { val event = new AvroFlumeEvent @@ -83,4 +108,16 @@ class FlumeStreamSuite extends TestSuiteBase { assert(outputBuffer(i).head.event.getHeaders.get("test") === "header") } } + + class CompressionChannelFactory(compressionLevel: Int) extends + NioClientSocketChannelFactory { + + override def newChannel(pipeline:ChannelPipeline) : SocketChannel = { + var encoder : ZlibEncoder = new ZlibEncoder(compressionLevel); + pipeline.addFirst("deflater", encoder); + pipeline.addFirst("inflater", new ZlibDecoder()); + super.newChannel(pipeline); + } + } + } diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index a6058bba3d211..0115b85496d73 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -556,7 +556,7 @@ object SparkBuild extends Build { name := "spark-streaming-flume", previousArtifact := sparkPreviousArtifact("spark-streaming-flume"), libraryDependencies ++= Seq( - "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty) + "org.apache.flume" % "flume-ng-sdk" % "1.3.0" % "compile" excludeAll(excludeNetty) ) )