Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,27 @@ import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
import org.apache.spark.Logging
import org.apache.spark.streaming.receiver.Receiver
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.channel.ChannelPipelineFactory
import java.util.concurrent.Executors
import org.jboss.netty.channel.Channels
import org.jboss.netty.handler.codec.compression.ZlibDecoder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import org.jboss.netty.handler.codec.compression.ZlibEncoder
import org.jboss.netty.channel.ChannelPipeline
import org.jboss.netty.channel.ChannelFactory
import org.jboss.netty.handler.execution.ExecutionHandler

private[streaming]
class FlumeInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
host: String,
port: Int,
storageLevel: StorageLevel
storageLevel: StorageLevel,
enableDecompression: Boolean
) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {

override def getReceiver(): Receiver[SparkFlumeEvent] = {
new FlumeReceiver(host, port, storageLevel)
new FlumeReceiver(host, port, storageLevel, enableDecompression)
}
}

Expand Down Expand Up @@ -134,22 +144,64 @@ private[streaming]
class FlumeReceiver(
host: String,
port: Int,
storageLevel: StorageLevel
storageLevel: StorageLevel,
enableDecompression: Boolean
) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {

lazy val responder = new SpecificResponder(
classOf[AvroSourceProtocol], new FlumeEventServer(this))
lazy val server = new NettyServer(responder, new InetSocketAddress(host, port))
var server: NettyServer = null

private def initServer() = {
if (enableDecompression) {
val channelFactory = new NioServerSocketChannelFactory
(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
val channelPipelieFactory = new CompressionChannelPipelineFactory()

new NettyServer(
responder,
new InetSocketAddress(host, port),
channelFactory,
channelPipelieFactory,
null)
} else {
new NettyServer(responder, new InetSocketAddress(host, port))
}
}

def onStart() {
server.start()
synchronized {
if (server == null) {
server = initServer()
server.start()
} else {
logWarning("Flume receiver being asked to start more then once with out close")
}
}
logInfo("Flume receiver started")
}

def onStop() {
server.close()
synchronized {
if (server != null) {
server.close()
server = null
}
}
logInfo("Flume receiver stopped")
}

override def preferredLocation = Some(host)
}

private[streaming]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add comments to this class, explaining what this class does and why it is necessary?

class CompressionChannelPipelineFactory extends ChannelPipelineFactory {

def getPipeline() = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a line of comment saying what pipeline does this return. For Flume noob's like me ;)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool will do before the weekend is done. Thanks

val pipeline = Channels.pipeline()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Formatting issue. 2 space indents required.

val encoder = new ZlibEncoder(6)
pipeline.addFirst("deflater", encoder)
pipeline.addFirst("inflater", new ZlibDecoder())
pipeline
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,25 @@ object FlumeUtils {
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
createStream(ssc, hostname, port, storageLevel, false)
}

/**
* Create a input stream from a Flume source.
* @param ssc StreamingContext object
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
* @param storageLevel Storage level to use for storing the received objects
* @param enableDecompression should netty server decompress input stream
*/
def createStream (
ssc: StreamingContext,
hostname: String,
port: Int,
storageLevel: StorageLevel,
enableDecompression: Boolean
): ReceiverInputDStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel, enableDecompression)
inputStream
}

Expand Down Expand Up @@ -66,6 +84,23 @@ object FlumeUtils {
port: Int,
storageLevel: StorageLevel
): JavaReceiverInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port, storageLevel)
createStream(jssc.ssc, hostname, port, storageLevel, false)
}

/**
* Creates a input stream from a Flume source.
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
* @param storageLevel Storage level to use for storing the received objects
* @param enableDecompression should netty server decompress input stream
*/
def createStream(
jssc: JavaStreamingContext,
hostname: String,
port: Int,
storageLevel: StorageLevel,
enableDecompression: Boolean
): JavaReceiverInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,7 @@ public void testFlumeStream() {
JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
StorageLevel.MEMORY_AND_DISK_SER_2());
JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createStream(ssc, "localhost", 12345,
StorageLevel.MEMORY_AND_DISK_SER_2(), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,27 @@ import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuite
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream

class FlumeStreamSuite extends TestSuiteBase {
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.channel.ChannelPipeline
import org.jboss.netty.handler.codec.compression.ZlibEncoder
import org.jboss.netty.handler.codec.compression.ZlibDecoder
import org.jboss.netty.channel.socket.SocketChannel

val testPort = 9999
class FlumeStreamSuite extends TestSuiteBase {

test("flume input stream") {
runFlumeStreamTest(false, 9999)
}

test("flume input compressed stream") {
runFlumeStreamTest(true, 9998)
}

def runFlumeStreamTest(enableDecompression: Boolean, testPort: Int) {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] =
FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer)
Expand All @@ -52,9 +64,17 @@ class FlumeStreamSuite extends TestSuiteBase {
val input = Seq(1, 2, 3, 4, 5)
Thread.sleep(1000)
val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
val client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol], transceiver)
var client: AvroSourceProtocol = null;

if (enableDecompression) {
client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol],
new NettyTransceiver(new InetSocketAddress("localhost", testPort),
new CompressionChannelFactory(6)));
} else {
client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol], transceiver)
}
for (i <- 0 until input.size) {
val event = new AvroFlumeEvent
event.setBody(ByteBuffer.wrap(input(i).toString.getBytes("utf-8")))
Expand All @@ -63,7 +83,9 @@ class FlumeStreamSuite extends TestSuiteBase {
Thread.sleep(500)
clock.addToTime(batchDuration.milliseconds)
}


Thread.sleep(1000)

val startTime = System.currentTimeMillis()
while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
Expand All @@ -85,4 +107,13 @@ class FlumeStreamSuite extends TestSuiteBase {
assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
}
}

class CompressionChannelFactory(compressionLevel: Int) extends NioClientSocketChannelFactory {
override def newChannel(pipeline:ChannelPipeline) : SocketChannel = {
var encoder : ZlibEncoder = new ZlibEncoder(compressionLevel);
pipeline.addFirst("deflater", encoder);
pipeline.addFirst("inflater", new ZlibDecoder());
super.newChannel(pipeline);
}
}
}