Skip to content

Commit 40a8fef

Browse files
tmalaskatdas
authored andcommitted
[SPARK-1478].3: Upgrade FlumeInputDStream's FlumeReceiver to support FLUME-1915
This is a modified version of this PR apache#1168 done by @tmalaska Adds MIMA binary check exclusions. Author: tmalaska <[email protected]> Author: Tathagata Das <[email protected]> Closes apache#1347 from tdas/FLUME-1915 and squashes the following commits: 96065df [Tathagata Das] Added Mima exclusion for FlumeReceiver. 41d5338 [tmalaska] Address line 57 that was too long 12617e5 [tmalaska] SPARK-1478: Upgrade FlumeInputDStream's Flume...
1 parent 369aa84 commit 40a8fef

File tree

5 files changed

+147
-16
lines changed

5 files changed

+147
-16
lines changed

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala

Lines changed: 67 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.flume
2020
import java.net.InetSocketAddress
2121
import java.io.{ObjectInput, ObjectOutput, Externalizable}
2222
import java.nio.ByteBuffer
23+
import java.util.concurrent.Executors
2324

2425
import scala.collection.JavaConversions._
2526
import scala.reflect.ClassTag
@@ -29,24 +30,32 @@ import org.apache.flume.source.avro.AvroFlumeEvent
2930
import org.apache.flume.source.avro.Status
3031
import org.apache.avro.ipc.specific.SpecificResponder
3132
import org.apache.avro.ipc.NettyServer
32-
33+
import org.apache.spark.Logging
3334
import org.apache.spark.util.Utils
3435
import org.apache.spark.storage.StorageLevel
35-
import org.apache.spark.streaming.StreamingContext
3636
import org.apache.spark.streaming.dstream._
37-
import org.apache.spark.Logging
37+
import org.apache.spark.streaming.StreamingContext
3838
import org.apache.spark.streaming.receiver.Receiver
3939

40+
import org.jboss.netty.channel.ChannelPipelineFactory
41+
import org.jboss.netty.channel.Channels
42+
import org.jboss.netty.channel.ChannelPipeline
43+
import org.jboss.netty.channel.ChannelFactory
44+
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
45+
import org.jboss.netty.handler.codec.compression._
46+
import org.jboss.netty.handler.execution.ExecutionHandler
47+
4048
private[streaming]
4149
class FlumeInputDStream[T: ClassTag](
4250
@transient ssc_ : StreamingContext,
4351
host: String,
4452
port: Int,
45-
storageLevel: StorageLevel
53+
storageLevel: StorageLevel,
54+
enableDecompression: Boolean
4655
) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {
4756

4857
override def getReceiver(): Receiver[SparkFlumeEvent] = {
49-
new FlumeReceiver(host, port, storageLevel)
58+
new FlumeReceiver(host, port, storageLevel, enableDecompression)
5059
}
5160
}
5261

@@ -134,22 +143,71 @@ private[streaming]
134143
class FlumeReceiver(
135144
host: String,
136145
port: Int,
137-
storageLevel: StorageLevel
146+
storageLevel: StorageLevel,
147+
enableDecompression: Boolean
138148
) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
139149

140150
lazy val responder = new SpecificResponder(
141151
classOf[AvroSourceProtocol], new FlumeEventServer(this))
142-
lazy val server = new NettyServer(responder, new InetSocketAddress(host, port))
152+
var server: NettyServer = null
153+
154+
private def initServer() = {
155+
if (enableDecompression) {
156+
val channelFactory = new NioServerSocketChannelFactory
157+
(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
158+
val channelPipelieFactory = new CompressionChannelPipelineFactory()
159+
160+
new NettyServer(
161+
responder,
162+
new InetSocketAddress(host, port),
163+
channelFactory,
164+
channelPipelieFactory,
165+
null)
166+
} else {
167+
new NettyServer(responder, new InetSocketAddress(host, port))
168+
}
169+
}
143170

144171
def onStart() {
145-
server.start()
172+
synchronized {
173+
if (server == null) {
174+
server = initServer()
175+
server.start()
176+
} else {
177+
logWarning("Flume receiver being asked to start more then once with out close")
178+
}
179+
}
146180
logInfo("Flume receiver started")
147181
}
148182

149183
def onStop() {
150-
server.close()
184+
synchronized {
185+
if (server != null) {
186+
server.close()
187+
server = null
188+
}
189+
}
151190
logInfo("Flume receiver stopped")
152191
}
153192

154193
override def preferredLocation = Some(host)
194+
195+
/** A Netty Pipeline factory that will decompress incoming data from
196+
* and the Netty client and compress data going back to the client.
197+
*
198+
* The compression on the return is required because Flume requires
199+
* a successful response to indicate it can remove the event/batch
200+
* from the configured channel
201+
*/
202+
private[streaming]
203+
class CompressionChannelPipelineFactory extends ChannelPipelineFactory {
204+
205+
def getPipeline() = {
206+
val pipeline = Channels.pipeline()
207+
val encoder = new ZlibEncoder(6)
208+
pipeline.addFirst("deflater", encoder)
209+
pipeline.addFirst("inflater", new ZlibDecoder())
210+
pipeline
211+
}
212+
}
155213
}

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,27 @@ object FlumeUtils {
3636
port: Int,
3737
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
3838
): ReceiverInputDStream[SparkFlumeEvent] = {
39-
val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
39+
createStream(ssc, hostname, port, storageLevel, false)
40+
}
41+
42+
/**
43+
* Create a input stream from a Flume source.
44+
* @param ssc StreamingContext object
45+
* @param hostname Hostname of the slave machine to which the flume data will be sent
46+
* @param port Port of the slave machine to which the flume data will be sent
47+
* @param storageLevel Storage level to use for storing the received objects
48+
* @param enableDecompression should netty server decompress input stream
49+
*/
50+
def createStream (
51+
ssc: StreamingContext,
52+
hostname: String,
53+
port: Int,
54+
storageLevel: StorageLevel,
55+
enableDecompression: Boolean
56+
): ReceiverInputDStream[SparkFlumeEvent] = {
57+
val inputStream = new FlumeInputDStream[SparkFlumeEvent](
58+
ssc, hostname, port, storageLevel, enableDecompression)
59+
4060
inputStream
4161
}
4262

@@ -66,6 +86,23 @@ object FlumeUtils {
6686
port: Int,
6787
storageLevel: StorageLevel
6888
): JavaReceiverInputDStream[SparkFlumeEvent] = {
69-
createStream(jssc.ssc, hostname, port, storageLevel)
89+
createStream(jssc.ssc, hostname, port, storageLevel, false)
90+
}
91+
92+
/**
93+
* Creates a input stream from a Flume source.
94+
* @param hostname Hostname of the slave machine to which the flume data will be sent
95+
* @param port Port of the slave machine to which the flume data will be sent
96+
* @param storageLevel Storage level to use for storing the received objects
97+
* @param enableDecompression should netty server decompress input stream
98+
*/
99+
def createStream(
100+
jssc: JavaStreamingContext,
101+
hostname: String,
102+
port: Int,
103+
storageLevel: StorageLevel,
104+
enableDecompression: Boolean
105+
): JavaReceiverInputDStream[SparkFlumeEvent] = {
106+
createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression)
70107
}
71108
}

external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,7 @@ public void testFlumeStream() {
3030
JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
3131
JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
3232
StorageLevel.MEMORY_AND_DISK_SER_2());
33+
JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createStream(ssc, "localhost", 12345,
34+
StorageLevel.MEMORY_AND_DISK_SER_2(), false);
3335
}
3436
}

external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,26 @@ import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuite
3333
import org.apache.spark.streaming.util.ManualClock
3434
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
3535

36-
class FlumeStreamSuite extends TestSuiteBase {
36+
import org.jboss.netty.channel.ChannelPipeline
37+
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
38+
import org.jboss.netty.channel.socket.SocketChannel
39+
import org.jboss.netty.handler.codec.compression._
3740

38-
val testPort = 9999
41+
class FlumeStreamSuite extends TestSuiteBase {
3942

4043
test("flume input stream") {
44+
runFlumeStreamTest(false, 9998)
45+
}
46+
47+
test("flume input compressed stream") {
48+
runFlumeStreamTest(true, 9997)
49+
}
50+
51+
def runFlumeStreamTest(enableDecompression: Boolean, testPort: Int) {
4152
// Set up the streaming context and input streams
4253
val ssc = new StreamingContext(conf, batchDuration)
4354
val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] =
44-
FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
55+
FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
4556
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
4657
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
4758
val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer)
@@ -52,8 +63,17 @@ class FlumeStreamSuite extends TestSuiteBase {
5263
val input = Seq(1, 2, 3, 4, 5)
5364
Thread.sleep(1000)
5465
val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
55-
val client = SpecificRequestor.getClient(
56-
classOf[AvroSourceProtocol], transceiver)
66+
var client: AvroSourceProtocol = null;
67+
68+
if (enableDecompression) {
69+
client = SpecificRequestor.getClient(
70+
classOf[AvroSourceProtocol],
71+
new NettyTransceiver(new InetSocketAddress("localhost", testPort),
72+
new CompressionChannelFactory(6)));
73+
} else {
74+
client = SpecificRequestor.getClient(
75+
classOf[AvroSourceProtocol], transceiver)
76+
}
5777

5878
for (i <- 0 until input.size) {
5979
val event = new AvroFlumeEvent
@@ -64,6 +84,8 @@ class FlumeStreamSuite extends TestSuiteBase {
6484
clock.addToTime(batchDuration.milliseconds)
6585
}
6686

87+
Thread.sleep(1000)
88+
6789
val startTime = System.currentTimeMillis()
6890
while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
6991
logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
@@ -85,4 +107,13 @@ class FlumeStreamSuite extends TestSuiteBase {
85107
assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
86108
}
87109
}
110+
111+
class CompressionChannelFactory(compressionLevel: Int) extends NioClientSocketChannelFactory {
112+
override def newChannel(pipeline:ChannelPipeline) : SocketChannel = {
113+
var encoder : ZlibEncoder = new ZlibEncoder(compressionLevel);
114+
pipeline.addFirst("deflater", encoder);
115+
pipeline.addFirst("inflater", new ZlibDecoder());
116+
super.newChannel(pipeline);
117+
}
118+
}
88119
}

project/MimaExcludes.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ object MimaExcludes {
6464
"org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$"
6565
+ "createZero$1")
6666
) ++
67+
Seq(
68+
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.FlumeReceiver.this")
69+
) ++
6770
Seq( // Ignore some private methods in ALS.
6871
ProblemFilters.exclude[MissingMethodProblem](
6972
"org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$^dateFeatures"),

0 commit comments

Comments
 (0)