Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion external/flume/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<version>1.2.0</version>
<version>1.3.0</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still 1.3? Have you merged master branch with your branch?

<exclusions>
<exclusion>
<groupId>org.jboss.netty</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,44 @@ package org.apache.spark.streaming.flume
import java.net.InetSocketAddress
import java.io.{ObjectInput, ObjectOutput, Externalizable}
import java.nio.ByteBuffer

import scala.collection.JavaConversions._
import scala.reflect.ClassTag

import org.apache.flume.source.avro.AvroSourceProtocol
import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.flume.source.avro.Status
import org.apache.avro.ipc.specific.SpecificResponder
import org.apache.avro.ipc.NettyServer

import org.apache.spark.util.Utils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.channel.ChannelPipelineFactory
import java.util.concurrent.Executors
import org.jboss.netty.channel.Channels
import org.jboss.netty.handler.codec.compression.ZlibDecoder
import org.jboss.netty.handler.codec.compression.ZlibEncoder
import org.jboss.netty.channel.ChannelPipeline
import org.jboss.netty.channel.ChannelFactory
import org.jboss.netty.handler.execution.ExecutionHandler

private[streaming]
class FlumeInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
host: String,
port: Int,
storageLevel: StorageLevel
storageLevel: StorageLevel,
enableCompression: Boolean
) extends NetworkInputDStream[SparkFlumeEvent](ssc_) {

def this(
@transient ssc_ : StreamingContext,
host: String,
port: Int,
storageLevel: StorageLevel) = this(ssc_, host, port, storageLevel, false);

override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = {
new FlumeReceiver(host, port, storageLevel)
new FlumeReceiver(host, port, storageLevel, enableCompression)
}
}

Expand Down Expand Up @@ -132,15 +145,34 @@ private[streaming]
class FlumeReceiver(
host: String,
port: Int,
storageLevel: StorageLevel
storageLevel: StorageLevel,
enableCompression: Boolean
) extends NetworkReceiver[SparkFlumeEvent] {

lazy val blockGenerator = new BlockGenerator(storageLevel)

protected override def onStart() {
val responder = new SpecificResponder(
classOf[AvroSourceProtocol], new FlumeEventServer(this))
val server = new NettyServer(responder, new InetSocketAddress(host, port))

var server : NettyServer = null;

if (enableCompression) {
val channelFactory : ChannelFactory = new NioServerSocketChannelFactory
(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
val channelPipelieFactory : ChannelPipelineFactory = new CompressionChannelPipelineFactory()
val executionHandler : ExecutionHandler = null

server = new NettyServer(
responder,
new InetSocketAddress(host, port),
channelFactory,
channelPipelieFactory,
executionHandler)
} else {
server = new NettyServer(responder, new InetSocketAddress(host, port))
}

blockGenerator.start()
server.start()
logInfo("Flume receiver started")
Expand All @@ -153,3 +185,15 @@ class FlumeReceiver(

override def getLocationPreference = Some(host)
}

private[streaming]
class CompressionChannelPipelineFactory() extends ChannelPipelineFactory {

def getPipeline() = {
val pipeline = Channels.pipeline()
val encoder = new ZlibEncoder(6)
pipeline.addFirst("deflater", encoder)
pipeline.addFirst("inflater", new ZlibDecoder())
pipeline
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,38 @@ object FlumeUtils {
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
* @param storageLevel Storage level to use for storing the received objects
* @param enableCompression Should Netty Server decode input stream from client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little confused. What does "should netty server decode input stream" have to do with "compression" ? Maybe you wanted to say "should netty server decompress input stream" ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this parameter does not exist in this version of the createStream method!

*/
def createStream (
ssc: StreamingContext,
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
createStream(ssc, hostname, port, storageLevel, false);
}

/**
* Create a input stream from a Flume source.
* @param ssc StreamingContext object
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
* @param storageLevel Storage level to use for storing the received objects
* @param enableCompression Should Netty Server decode input stream from client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above, for enableCompression.

*/
def createStream (
ssc: StreamingContext,
hostname: String,
port: Int,
storageLevel: StorageLevel,
enableCompression: Boolean
): DStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream[SparkFlumeEvent](
ssc,
hostname,
port,
storageLevel,
enableCompression)
inputStream
}

Expand Down Expand Up @@ -66,6 +90,28 @@ object FlumeUtils {
port: Int,
storageLevel: StorageLevel
): JavaDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port, storageLevel)
createStream(jssc.ssc, hostname, port, storageLevel, false)
}

/**
* Creates a input stream from a Flume source.
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
* @param storageLevel Storage level to use for storing the received objects
* @param enableCompression Should Netty Server decode input stream from client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above, for enableCompression.

*/
def createStream(
jssc: JavaStreamingContext,
hostname: String,
port: Int,
storageLevel: StorageLevel,
enableCompression: Boolean
): JavaDStream[SparkFlumeEvent] = {
createStream(
jssc.ssc,
hostname,
port,
storageLevel,
enableCompression)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,7 @@ public void testFlumeStream() {
JavaDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
JavaDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
StorageLevel.MEMORY_AND_DISK_SER_2());
JavaDStream<SparkFlumeEvent> test3 = FlumeUtils.createStream(ssc, "localhost", 12345,
StorageLevel.MEMORY_AND_DISK_SER_2(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,41 @@ package org.apache.spark.streaming.flume

import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}

import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.charset.Charset

import org.apache.avro.ipc.NettyTransceiver
import org.apache.avro.ipc.specific.SpecificRequestor
import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
import org.apache.spark.streaming.util.ManualClock
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.channel.ChannelPipeline
import org.jboss.netty.handler.codec.compression.ZlibEncoder
import org.jboss.netty.handler.codec.compression.ZlibDecoder
import org.jboss.netty.channel.socket.SocketChannel

class FlumeStreamSuite extends TestSuiteBase {

val testPort = 9999

test("flume input stream") {
runFlumeStreamTest(false, 9999)
}

test("flume input compressed stream") {
runFlumeStreamTest(true, 9998)
}

def runFlumeStreamTest(enableCompression: Boolean, testPort: Int) {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val flumeStream = FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val flumeStream = FlumeUtils.createStream(
ssc,
"localhost",
testPort,
StorageLevel.MEMORY_AND_DISK,
enableCompression)

val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
Expand All @@ -49,9 +63,20 @@ class FlumeStreamSuite extends TestSuiteBase {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
Thread.sleep(1000)
val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
val client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol], transceiver)

var client: AvroSourceProtocol = null;

if (enableCompression) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Scala way of writing this would be

val client: AvroSourceProtocol = {
   if (enableCompression) {
      SpecificRequester.getClient( .... )
   } else {
      SpecificRequester.getClieant(.....)
   }
}

client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol],
new NettyTransceiver(new InetSocketAddress("localhost", testPort),
new CompressionChannelFactory(6)));
} else {
client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol],
new NettyTransceiver(new InetSocketAddress("localhost", testPort)));
}


for (i <- 0 until input.size) {
val event = new AvroFlumeEvent
Expand Down Expand Up @@ -83,4 +108,16 @@ class FlumeStreamSuite extends TestSuiteBase {
assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
}
}

class CompressionChannelFactory(compressionLevel: Int) extends
NioClientSocketChannelFactory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to wrap the line here. Usually, it is either

class X extends Y

or

class X 
  extends Y


override def newChannel(pipeline:ChannelPipeline) : SocketChannel = {
var encoder : ZlibEncoder = new ZlibEncoder(compressionLevel);
pipeline.addFirst("deflater", encoder);
pipeline.addFirst("inflater", new ZlibDecoder());
super.newChannel(pipeline);
}
}

}
2 changes: 1 addition & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ object SparkBuild extends Build {
name := "spark-streaming-flume",
previousArtifact := sparkPreviousArtifact("spark-streaming-flume"),
libraryDependencies ++= Seq(
"org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty)
"org.apache.flume" % "flume-ng-sdk" % "1.3.0" % "compile" excludeAll(excludeNetty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be 1.4 if I am not mistaken, please merge with master.

)
)

Expand Down