Skip to content

Commit 12617e5

Browse files
author
tmalaska
committed
SPARK-1478: Upgrade FlumeInputDStream's Flume...
SPARK-1478: Upgrade FlumeInputDStream's FlumeReceiver to support FLUME-1915
1 parent ca5d8b5 commit 12617e5

File tree

4 files changed

+142
-16
lines changed

4 files changed

+142
-16
lines changed

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala

Lines changed: 67 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.flume
2020
import java.net.InetSocketAddress
2121
import java.io.{ObjectInput, ObjectOutput, Externalizable}
2222
import java.nio.ByteBuffer
23+
import java.util.concurrent.Executors
2324

2425
import scala.collection.JavaConversions._
2526
import scala.reflect.ClassTag
@@ -29,24 +30,32 @@ import org.apache.flume.source.avro.AvroFlumeEvent
2930
import org.apache.flume.source.avro.Status
3031
import org.apache.avro.ipc.specific.SpecificResponder
3132
import org.apache.avro.ipc.NettyServer
32-
33+
import org.apache.spark.Logging
3334
import org.apache.spark.util.Utils
3435
import org.apache.spark.storage.StorageLevel
35-
import org.apache.spark.streaming.StreamingContext
3636
import org.apache.spark.streaming.dstream._
37-
import org.apache.spark.Logging
37+
import org.apache.spark.streaming.StreamingContext
3838
import org.apache.spark.streaming.receiver.Receiver
3939

40+
import org.jboss.netty.channel.ChannelPipelineFactory
41+
import org.jboss.netty.channel.Channels
42+
import org.jboss.netty.channel.ChannelPipeline
43+
import org.jboss.netty.channel.ChannelFactory
44+
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
45+
import org.jboss.netty.handler.codec.compression._
46+
import org.jboss.netty.handler.execution.ExecutionHandler
47+
4048
private[streaming]
4149
class FlumeInputDStream[T: ClassTag](
4250
@transient ssc_ : StreamingContext,
4351
host: String,
4452
port: Int,
45-
storageLevel: StorageLevel
53+
storageLevel: StorageLevel,
54+
enableDecompression: Boolean
4655
) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {
4756

4857
override def getReceiver(): Receiver[SparkFlumeEvent] = {
49-
new FlumeReceiver(host, port, storageLevel)
58+
new FlumeReceiver(host, port, storageLevel, enableDecompression)
5059
}
5160
}
5261

@@ -134,22 +143,71 @@ private[streaming]
134143
class FlumeReceiver(
135144
host: String,
136145
port: Int,
137-
storageLevel: StorageLevel
146+
storageLevel: StorageLevel,
147+
enableDecompression: Boolean
138148
) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
139149

140150
lazy val responder = new SpecificResponder(
141151
classOf[AvroSourceProtocol], new FlumeEventServer(this))
142-
lazy val server = new NettyServer(responder, new InetSocketAddress(host, port))
152+
var server: NettyServer = null
153+
154+
private def initServer() = {
155+
if (enableDecompression) {
156+
val channelFactory = new NioServerSocketChannelFactory
157+
(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
158+
val channelPipelieFactory = new CompressionChannelPipelineFactory()
159+
160+
new NettyServer(
161+
responder,
162+
new InetSocketAddress(host, port),
163+
channelFactory,
164+
channelPipelieFactory,
165+
null)
166+
} else {
167+
new NettyServer(responder, new InetSocketAddress(host, port))
168+
}
169+
}
143170

144171
def onStart() {
145-
server.start()
172+
synchronized {
173+
if (server == null) {
174+
server = initServer()
175+
server.start()
176+
} else {
177+
logWarning("Flume receiver being asked to start more then once with out close")
178+
}
179+
}
146180
logInfo("Flume receiver started")
147181
}
148182

149183
def onStop() {
150-
server.close()
184+
synchronized {
185+
if (server != null) {
186+
server.close()
187+
server = null
188+
}
189+
}
151190
logInfo("Flume receiver stopped")
152191
}
153192

154193
override def preferredLocation = Some(host)
194+
195+
/** A Netty Pipeline factory that will decompress incoming data from
196+
* and the Netty client and compress data going back to the client.
197+
*
198+
* The compression on the return is required because Flume requires
199+
* a successful response to indicate it can remove the event/batch
200+
* from the configured channel
201+
*/
202+
private[streaming]
203+
class CompressionChannelPipelineFactory extends ChannelPipelineFactory {
204+
205+
def getPipeline() = {
206+
val pipeline = Channels.pipeline()
207+
val encoder = new ZlibEncoder(6)
208+
pipeline.addFirst("deflater", encoder)
209+
pipeline.addFirst("inflater", new ZlibDecoder())
210+
pipeline
211+
}
212+
}
155213
}

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,25 @@ object FlumeUtils {
3636
port: Int,
3737
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
3838
): ReceiverInputDStream[SparkFlumeEvent] = {
39-
val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
39+
createStream(ssc, hostname, port, storageLevel, false)
40+
}
41+
42+
/**
43+
* Create a input stream from a Flume source.
44+
* @param ssc StreamingContext object
45+
* @param hostname Hostname of the slave machine to which the flume data will be sent
46+
* @param port Port of the slave machine to which the flume data will be sent
47+
* @param storageLevel Storage level to use for storing the received objects
48+
* @param enableDecompression should netty server decompress input stream
49+
*/
50+
def createStream (
51+
ssc: StreamingContext,
52+
hostname: String,
53+
port: Int,
54+
storageLevel: StorageLevel,
55+
enableDecompression: Boolean
56+
): ReceiverInputDStream[SparkFlumeEvent] = {
57+
val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel, enableDecompression)
4058
inputStream
4159
}
4260

@@ -66,6 +84,23 @@ object FlumeUtils {
6684
port: Int,
6785
storageLevel: StorageLevel
6886
): JavaReceiverInputDStream[SparkFlumeEvent] = {
69-
createStream(jssc.ssc, hostname, port, storageLevel)
87+
createStream(jssc.ssc, hostname, port, storageLevel, false)
88+
}
89+
90+
/**
91+
* Creates a input stream from a Flume source.
92+
* @param hostname Hostname of the slave machine to which the flume data will be sent
93+
* @param port Port of the slave machine to which the flume data will be sent
94+
* @param storageLevel Storage level to use for storing the received objects
95+
* @param enableDecompression should netty server decompress input stream
96+
*/
97+
def createStream(
98+
jssc: JavaStreamingContext,
99+
hostname: String,
100+
port: Int,
101+
storageLevel: StorageLevel,
102+
enableDecompression: Boolean
103+
): JavaReceiverInputDStream[SparkFlumeEvent] = {
104+
createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression)
70105
}
71106
}

external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,7 @@ public void testFlumeStream() {
3030
JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
3131
JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
3232
StorageLevel.MEMORY_AND_DISK_SER_2());
33+
JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createStream(ssc, "localhost", 12345,
34+
StorageLevel.MEMORY_AND_DISK_SER_2(), false);
3335
}
3436
}

external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,26 @@ import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuite
3333
import org.apache.spark.streaming.util.ManualClock
3434
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
3535

36-
class FlumeStreamSuite extends TestSuiteBase {
36+
import org.jboss.netty.channel.ChannelPipeline
37+
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
38+
import org.jboss.netty.channel.socket.SocketChannel
39+
import org.jboss.netty.handler.codec.compression._
3740

38-
val testPort = 9999
41+
class FlumeStreamSuite extends TestSuiteBase {
3942

4043
test("flume input stream") {
44+
runFlumeStreamTest(false, 9998)
45+
}
46+
47+
test("flume input compressed stream") {
48+
runFlumeStreamTest(true, 9997)
49+
}
50+
51+
def runFlumeStreamTest(enableDecompression: Boolean, testPort: Int) {
4152
// Set up the streaming context and input streams
4253
val ssc = new StreamingContext(conf, batchDuration)
4354
val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] =
44-
FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
55+
FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
4556
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
4657
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
4758
val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer)
@@ -52,8 +63,17 @@ class FlumeStreamSuite extends TestSuiteBase {
5263
val input = Seq(1, 2, 3, 4, 5)
5364
Thread.sleep(1000)
5465
val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
55-
val client = SpecificRequestor.getClient(
56-
classOf[AvroSourceProtocol], transceiver)
66+
var client: AvroSourceProtocol = null;
67+
68+
if (enableDecompression) {
69+
client = SpecificRequestor.getClient(
70+
classOf[AvroSourceProtocol],
71+
new NettyTransceiver(new InetSocketAddress("localhost", testPort),
72+
new CompressionChannelFactory(6)));
73+
} else {
74+
client = SpecificRequestor.getClient(
75+
classOf[AvroSourceProtocol], transceiver)
76+
}
5777

5878
for (i <- 0 until input.size) {
5979
val event = new AvroFlumeEvent
@@ -64,6 +84,8 @@ class FlumeStreamSuite extends TestSuiteBase {
6484
clock.addToTime(batchDuration.milliseconds)
6585
}
6686

87+
Thread.sleep(1000)
88+
6789
val startTime = System.currentTimeMillis()
6890
while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
6991
logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
@@ -85,4 +107,13 @@ class FlumeStreamSuite extends TestSuiteBase {
85107
assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
86108
}
87109
}
110+
111+
class CompressionChannelFactory(compressionLevel: Int) extends NioClientSocketChannelFactory {
112+
override def newChannel(pipeline:ChannelPipeline) : SocketChannel = {
113+
var encoder : ZlibEncoder = new ZlibEncoder(compressionLevel);
114+
pipeline.addFirst("deflater", encoder);
115+
pipeline.addFirst("inflater", new ZlibDecoder());
116+
super.newChannel(pipeline);
117+
}
118+
}
88119
}

0 commit comments

Comments
 (0)