Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
package org.apache.spark.streaming.flume

import java.net.InetSocketAddress
import java.util.concurrent.{Callable, ExecutorCompletionService, Executors}
import java.util.concurrent._

import scala.collection.JavaConversions._
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import scala.concurrent.duration._
import scala.language.postfixOps

import org.apache.flume.Context
import org.apache.flume.channel.MemoryChannel
import org.apache.flume.conf.Configurables
import org.apache.flume.event.EventBuilder
import org.scalatest.concurrent.Eventually._

import org.scalatest.{BeforeAndAfter, FunSuite}

Expand Down Expand Up @@ -57,11 +60,11 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging

before(beforeFunction())

ignore("flume polling test") {
test("flume polling test") {
testMultipleTimes(testFlumePolling)
}

ignore("flume polling test multiple hosts") {
test("flume polling test multiple hosts") {
testMultipleTimes(testFlumePollingMultipleHost)
}

Expand Down Expand Up @@ -100,18 +103,8 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
Configurables.configure(sink, context)
sink.setChannel(channel)
sink.start()
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", sink.getPort())),
StorageLevel.MEMORY_AND_DISK, eventsPerBatch, 1)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
outputStream.register()
ssc.start()

writeAndVerify(Seq(channel), ssc, outputBuffer)
writeAndVerify(Seq(sink), Seq(channel))
assertChannelIsEmpty(channel)
sink.stop()
channel.stop()
Expand Down Expand Up @@ -142,10 +135,22 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
Configurables.configure(sink2, context)
sink2.setChannel(channel2)
sink2.start()
try {
writeAndVerify(Seq(sink, sink2), Seq(channel, channel2))
assertChannelIsEmpty(channel)
assertChannelIsEmpty(channel2)
} finally {
sink.stop()
sink2.stop()
channel.stop()
channel2.stop()
}
}

def writeAndVerify(sinks: Seq[SparkSink], channels: Seq[MemoryChannel]) {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val addresses = Seq(sink.getPort(), sink2.getPort()).map(new InetSocketAddress("localhost", _))
val addresses = sinks.map(sink => new InetSocketAddress("localhost", sink.getPort()))
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
eventsPerBatch, 5)
Expand All @@ -155,61 +160,49 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
outputStream.register()

ssc.start()
try {
writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
assertChannelIsEmpty(channel)
assertChannelIsEmpty(channel2)
} finally {
sink.stop()
sink2.stop()
channel.stop()
channel2.stop()
}
}

def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext,
outputBuffer: ArrayBuffer[Seq[SparkFlumeEvent]]) {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val executor = Executors.newCachedThreadPool()
val executorCompletion = new ExecutorCompletionService[Void](executor)
channels.map(channel => {

val latch = new CountDownLatch(batchCount * channels.size)
sinks.foreach(_.countdownWhenBatchReceived(latch))

channels.foreach(channel => {
executorCompletion.submit(new TxnSubmitter(channel, clock))
})

for (i <- 0 until channels.size) {
executorCompletion.take()
}
val startTime = System.currentTimeMillis()
while (outputBuffer.size < batchCount * channels.size &&
System.currentTimeMillis() - startTime < 15000) {
logInfo("output.size = " + outputBuffer.size)
Thread.sleep(100)
}
val timeTaken = System.currentTimeMillis() - startTime
assert(timeTaken < 15000, "Operation timed out after " + timeTaken + " ms")
logInfo("Stopping context")
ssc.stop()

val flattenedBuffer = outputBuffer.flatten
assert(flattenedBuffer.size === totalEventsPerChannel * channels.size)
var counter = 0
for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) {
val eventToVerify = EventBuilder.withBody((channels(k).getName + " - " +
String.valueOf(i)).getBytes("utf-8"),
Map[String, String]("test-" + i.toString -> "header"))
var found = false
var j = 0
while (j < flattenedBuffer.size && !found) {
val strToCompare = new String(flattenedBuffer(j).event.getBody.array(), "utf-8")
if (new String(eventToVerify.getBody, "utf-8") == strToCompare &&
eventToVerify.getHeaders.get("test-" + i.toString)
.equals(flattenedBuffer(j).event.getHeaders.get("test-" + i.toString))) {
found = true
counter += 1
latch.await(15, TimeUnit.SECONDS) // Ensure all data has been received.
clock.advance(batchDuration.milliseconds)

// The eventually is required to ensure that all data in the batch has been processed.
eventually(timeout(10 seconds), interval(100 milliseconds)) {
val flattenedBuffer = outputBuffer.flatten
assert(flattenedBuffer.size === totalEventsPerChannel * channels.size)
var counter = 0
for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) {
val eventToVerify = EventBuilder.withBody((channels(k).getName + " - " +
String.valueOf(i)).getBytes("utf-8"),
Map[String, String]("test-" + i.toString -> "header"))
var found = false
var j = 0
while (j < flattenedBuffer.size && !found) {
val strToCompare = new String(flattenedBuffer(j).event.getBody.array(), "utf-8")
if (new String(eventToVerify.getBody, "utf-8") == strToCompare &&
eventToVerify.getHeaders.get("test-" + i.toString)
.equals(flattenedBuffer(j).event.getHeaders.get("test-" + i.toString))) {
found = true
counter += 1
}
j += 1
}
j += 1
}
assert(counter === totalEventsPerChannel * channels.size)
}
assert(counter === totalEventsPerChannel * channels.size)
ssc.stop()
}

def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
Expand All @@ -234,7 +227,6 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
tx.commit()
tx.close()
Thread.sleep(500) // Allow some time for the events to reach
clock.advance(batchDuration.milliseconds)
}
null
}
Expand Down