@@ -29,6 +29,8 @@ import org.apache.spark.flume.sink.{SparkSinkConfig, SparkSink}
2929import scala .collection .JavaConversions ._
3030import org .apache .flume .event .EventBuilder
3131import org .apache .spark .streaming .dstream .ReceiverInputDStream
32+ import java .net .InetSocketAddress
33+ import java .util .concurrent .{Callable , ExecutorCompletionService , Executors }
3234
3335class FlumePollingReceiverSuite extends TestSuiteBase {
3436
@@ -38,7 +40,7 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
3840 // Set up the streaming context and input streams
3941 val ssc = new StreamingContext (conf, batchDuration)
4042 val flumeStream : ReceiverInputDStream [SparkPollingEvent ] =
41- FlumeUtils .createPollingStream(ssc, " localhost" , testPort, 100 , 1 ,
43+ FlumeUtils .createPollingStream(ssc, Seq ( new InetSocketAddress ( " localhost" , testPort)) , 100 , 5 ,
4244 StorageLevel .MEMORY_AND_DISK )
4345 val outputBuffer = new ArrayBuffer [Seq [SparkPollingEvent ]]
4446 with SynchronizedBuffer [Seq [SparkPollingEvent ]]
@@ -60,42 +62,81 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
6062 sink.setChannel(channel)
6163 sink.start()
6264 ssc.start()
65+ writeAndVerify(Seq (channel), ssc, outputBuffer)
66+ sink.stop()
67+ channel.stop()
68+ }
6369
64- val clock = ssc.scheduler.clock.asInstanceOf [ManualClock ]
65- var t = 0
66- for (i <- 0 until 5 ) {
67- val tx = channel.getTransaction
68- tx.begin()
69- for (j <- 0 until 5 ) {
70- channel.put(EventBuilder .withBody(
71- String .valueOf(t).getBytes(" utf-8" ),
72- Map [String , String ](" test-" + t.toString -> " header" )))
73- t += 1
74- }
70+ test(" flume polling test multiple hosts" ) {
71+ // Set up the streaming context and input streams
72+ val ssc = new StreamingContext (conf, batchDuration)
73+ val flumeStream : ReceiverInputDStream [SparkPollingEvent ] =
74+ FlumeUtils .createPollingStream(ssc, Seq (new InetSocketAddress (" localhost" , testPort),
75+ new InetSocketAddress (" localhost" , testPort + 1 )), 100 , 5 ,
76+ StorageLevel .MEMORY_AND_DISK )
77+ val outputBuffer = new ArrayBuffer [Seq [SparkPollingEvent ]]
78+ with SynchronizedBuffer [Seq [SparkPollingEvent ]]
79+ val outputStream = new TestOutputStream (flumeStream, outputBuffer)
80+ outputStream.register()
81+
82+ // Start the channel and sink.
83+ val context = new Context ()
84+ context.put(" capacity" , " 5000" )
85+ context.put(" transactionCapacity" , " 1000" )
86+ context.put(" keep-alive" , " 0" )
87+ val channel = new MemoryChannel ()
88+ Configurables .configure(channel, context)
89+
90+ val channel2 = new MemoryChannel ()
91+ Configurables .configure(channel2, context)
7592
76- tx.commit()
77- tx.close()
78- Thread .sleep(500 ) // Allow some time for the events to reach
79- clock.addToTime(batchDuration.milliseconds)
93+ val sink = new SparkSink ()
94+ context.put(SparkSinkConfig .CONF_HOSTNAME , " localhost" )
95+ context.put(SparkSinkConfig .CONF_PORT , String .valueOf(testPort))
96+ Configurables .configure(sink, context)
97+ sink.setChannel(channel)
98+ sink.start()
99+
100+ val sink2 = new SparkSink ()
101+ context.put(SparkSinkConfig .CONF_HOSTNAME , " localhost" )
102+ context.put(SparkSinkConfig .CONF_PORT , String .valueOf(testPort + 1 ))
103+ Configurables .configure(sink2, context)
104+ sink2.setChannel(channel2)
105+ sink2.start()
106+ ssc.start()
107+ writeAndVerify(Seq (channel, channel2), ssc, outputBuffer)
108+ sink.stop()
109+ channel.stop()
110+
111+ }
112+
113+ def writeAndVerify (channels : Seq [MemoryChannel ], ssc : StreamingContext ,
114+ outputBuffer : ArrayBuffer [Seq [SparkPollingEvent ]]) {
115+ val clock = ssc.scheduler.clock.asInstanceOf [ManualClock ]
116+ val executor = Executors .newCachedThreadPool()
117+ val executorCompletion = new ExecutorCompletionService [Void ](executor)
118+ channels.map(channel => {
119+ executorCompletion.submit(new TxnSubmitter (channel, clock))
120+ })
121+ for (i <- 0 until channels.size) {
122+ executorCompletion.take()
80123 }
81124 val startTime = System .currentTimeMillis()
82- while (outputBuffer.size < 5 && System .currentTimeMillis() - startTime < maxWaitTimeMillis) {
125+ while (outputBuffer.size < 5 * channels.size && System .currentTimeMillis() - startTime < maxWaitTimeMillis) {
83126 logInfo(" output.size = " + outputBuffer.size)
84127 Thread .sleep(100 )
85128 }
86129 val timeTaken = System .currentTimeMillis() - startTime
87130 assert(timeTaken < maxWaitTimeMillis, " Operation timed out after " + timeTaken + " ms" )
88131 logInfo(" Stopping context" )
89132 ssc.stop()
90- sink.stop()
91- channel.stop()
92133
93134 val flattenedBuffer = outputBuffer.flatten
94- assert(flattenedBuffer.size === 25 )
135+ assert(flattenedBuffer.size === 25 * channels.size )
95136 var counter = 0
96- for (i <- 0 until 25 ) {
97- val eventToVerify = EventBuilder .withBody(
98- String .valueOf(i).getBytes(" utf-8" ),
137+ for (k <- 0 until channels.size; i <- 0 until 25 ) {
138+ val eventToVerify = EventBuilder .withBody((channels(k).getName + " - " +
139+ String .valueOf(i)) .getBytes(" utf-8" ),
99140 Map [String , String ](" test-" + i.toString -> " header" ))
100141 var found = false
101142 var j = 0
@@ -110,7 +151,26 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
110151 j += 1
111152 }
112153 }
113- assert (counter === 25 )
154+ assert(counter === 25 * channels.size )
114155 }
115156
157+ private class TxnSubmitter (channel : MemoryChannel , clock : ManualClock ) extends Callable [Void ] {
158+ override def call (): Void = {
159+ var t = 0
160+ for (i <- 0 until 5 ) {
161+ val tx = channel.getTransaction
162+ tx.begin()
163+ for (j <- 0 until 5 ) {
164+ channel.put(EventBuilder .withBody((channel.getName + " - " + String .valueOf(t)).getBytes(" utf-8" ),
165+ Map [String , String ](" test-" + t.toString -> " header" )))
166+ t += 1
167+ }
168+ tx.commit()
169+ tx.close()
170+ Thread .sleep(500 ) // Allow some time for the events to reach
171+ clock.addToTime(batchDuration.milliseconds)
172+ }
173+ null
174+ }
175+ }
116176}
0 commit comments