Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class QueueInputDStream[T: ClassTag](
if (oneAtATime && queue.size > 0) {
buffer += queue.dequeue()
} else {
buffer ++= queue
buffer ++= queue.dequeueAll(_ => true)
}
if (buffer.size > 0) {
if (oneAtATime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import java.nio.charset.Charset
import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer, SynchronizedQueue}

import com.google.common.io.Files
import org.scalatest.BeforeAndAfter
Expand All @@ -39,6 +39,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.util.Utils
import org.apache.spark.streaming.receiver.{ActorHelper, Receiver}
import org.apache.spark.rdd.RDD

class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {

Expand Down Expand Up @@ -234,6 +235,95 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
logInfo("--------------------------------")
assert(output.sum === numTotalRecords)
}

test("queue input stream - oneAtATime=true") {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val queue = new SynchronizedQueue[RDD[String]]()
val queueStream = ssc.queueStream(queue, oneAtATime = true)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
val outputStream = new TestOutputStream(queueStream, outputBuffer)
def output = outputBuffer.filter(_.size > 0)
outputStream.register()
ssc.start()

// Setup data queued into the stream
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq("1", "2", "3", "4", "5")
val expectedOutput = input.map(Seq(_))
//Thread.sleep(1000)
val inputIterator = input.toIterator
for (i <- 0 until input.size) {
// Enqueue more than 1 item per tick but they should dequeue one at a time
inputIterator.take(2).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
clock.addToTime(batchDuration.milliseconds)
}
Thread.sleep(1000)
logInfo("Stopping context")
ssc.stop()

// Verify whether data received was as expected
logInfo("--------------------------------")
logInfo("output.size = " + outputBuffer.size)
logInfo("output")
outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("expected output.size = " + expectedOutput.size)
logInfo("expected output")
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("--------------------------------")

// Verify whether all the elements received are as expected
assert(output.size === expectedOutput.size)
for (i <- 0 until output.size) {
assert(output(i) === expectedOutput(i))
}
}

test("queue input stream - oneAtATime=false") {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val queue = new SynchronizedQueue[RDD[String]]()
val queueStream = ssc.queueStream(queue, oneAtATime = false)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
val outputStream = new TestOutputStream(queueStream, outputBuffer)
def output = outputBuffer.filter(_.size > 0)
outputStream.register()
ssc.start()

// Setup data queued into the stream
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq("1", "2", "3", "4", "5")
val expectedOutput = Seq(Seq("1", "2", "3"), Seq("4", "5"))

// Enqueue the first 3 items (one by one), they should be merged in the next batch
val inputIterator = input.toIterator
inputIterator.take(3).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
clock.addToTime(batchDuration.milliseconds)
Thread.sleep(1000)

// Enqueue the remaining items (again one by one), merged in the final batch
inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
clock.addToTime(batchDuration.milliseconds)
Thread.sleep(1000)
logInfo("Stopping context")
ssc.stop()

// Verify whether data received was as expected
logInfo("--------------------------------")
logInfo("output.size = " + outputBuffer.size)
logInfo("output")
outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("expected output.size = " + expectedOutput.size)
logInfo("expected output")
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("--------------------------------")

// Verify whether all the elements received are as expected
assert(output.size === expectedOutput.size)
for (i <- 0 until output.size) {
assert(output(i) === expectedOutput(i))
}
}
}


Expand Down