Skip to content

Commit 0eb1152

Browse files
mlaflammtdas
authored andcommitted
[STREAMING] SPARK-2343: Fix QueueInputDStream with oneAtATime false
Fix QueueInputDStream which was not removing dequeued items when used with the oneAtATime flag disabled. Author: Manuel Laflamme <[email protected]> Closes apache#1285 from mlaflamm/spark-2343 and squashes the following commits: 61c9e38 [Manuel Laflamme] Unit tests for queue input stream c51d029 [Manuel Laflamme] Fix QueueInputDStream with oneAtATime false
1 parent 339441f commit 0eb1152

File tree

2 files changed

+92
-2
lines changed

2 files changed

+92
-2
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class QueueInputDStream[T: ClassTag](
4141
if (oneAtATime && queue.size > 0) {
4242
buffer += queue.dequeue()
4343
} else {
44-
buffer ++= queue
44+
buffer ++= queue.dequeueAll(_ => true)
4545
}
4646
if (buffer.size > 0) {
4747
if (oneAtATime) {

streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import java.nio.charset.Charset
2929
import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
3030
import java.util.concurrent.atomic.AtomicInteger
3131

32-
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
32+
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer, SynchronizedQueue}
3333

3434
import com.google.common.io.Files
3535
import org.scalatest.BeforeAndAfter
@@ -39,6 +39,7 @@ import org.apache.spark.storage.StorageLevel
3939
import org.apache.spark.streaming.util.ManualClock
4040
import org.apache.spark.util.Utils
4141
import org.apache.spark.streaming.receiver.{ActorHelper, Receiver}
42+
import org.apache.spark.rdd.RDD
4243

4344
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
4445

@@ -234,6 +235,95 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
234235
logInfo("--------------------------------")
235236
assert(output.sum === numTotalRecords)
236237
}
238+
239+
test("queue input stream - oneAtATime=true") {
240+
// Set up the streaming context and input streams
241+
val ssc = new StreamingContext(conf, batchDuration)
242+
val queue = new SynchronizedQueue[RDD[String]]()
243+
val queueStream = ssc.queueStream(queue, oneAtATime = true)
244+
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
245+
val outputStream = new TestOutputStream(queueStream, outputBuffer)
246+
def output = outputBuffer.filter(_.size > 0)
247+
outputStream.register()
248+
ssc.start()
249+
250+
// Setup data queued into the stream
251+
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
252+
val input = Seq("1", "2", "3", "4", "5")
253+
val expectedOutput = input.map(Seq(_))
254+
//Thread.sleep(1000)
255+
val inputIterator = input.toIterator
256+
for (i <- 0 until input.size) {
257+
// Enqueue more than 1 item per tick but they should dequeue one at a time
258+
inputIterator.take(2).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
259+
clock.addToTime(batchDuration.milliseconds)
260+
}
261+
Thread.sleep(1000)
262+
logInfo("Stopping context")
263+
ssc.stop()
264+
265+
// Verify whether data received was as expected
266+
logInfo("--------------------------------")
267+
logInfo("output.size = " + outputBuffer.size)
268+
logInfo("output")
269+
outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
270+
logInfo("expected output.size = " + expectedOutput.size)
271+
logInfo("expected output")
272+
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
273+
logInfo("--------------------------------")
274+
275+
// Verify whether all the elements received are as expected
276+
assert(output.size === expectedOutput.size)
277+
for (i <- 0 until output.size) {
278+
assert(output(i) === expectedOutput(i))
279+
}
280+
}
281+
282+
test("queue input stream - oneAtATime=false") {
283+
// Set up the streaming context and input streams
284+
val ssc = new StreamingContext(conf, batchDuration)
285+
val queue = new SynchronizedQueue[RDD[String]]()
286+
val queueStream = ssc.queueStream(queue, oneAtATime = false)
287+
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
288+
val outputStream = new TestOutputStream(queueStream, outputBuffer)
289+
def output = outputBuffer.filter(_.size > 0)
290+
outputStream.register()
291+
ssc.start()
292+
293+
// Setup data queued into the stream
294+
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
295+
val input = Seq("1", "2", "3", "4", "5")
296+
val expectedOutput = Seq(Seq("1", "2", "3"), Seq("4", "5"))
297+
298+
// Enqueue the first 3 items (one by one), they should be merged in the next batch
299+
val inputIterator = input.toIterator
300+
inputIterator.take(3).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
301+
clock.addToTime(batchDuration.milliseconds)
302+
Thread.sleep(1000)
303+
304+
// Enqueue the remaining items (again one by one), merged in the final batch
305+
inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
306+
clock.addToTime(batchDuration.milliseconds)
307+
Thread.sleep(1000)
308+
logInfo("Stopping context")
309+
ssc.stop()
310+
311+
// Verify whether data received was as expected
312+
logInfo("--------------------------------")
313+
logInfo("output.size = " + outputBuffer.size)
314+
logInfo("output")
315+
outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
316+
logInfo("expected output.size = " + expectedOutput.size)
317+
logInfo("expected output")
318+
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
319+
logInfo("--------------------------------")
320+
321+
// Verify whether all the elements received are as expected
322+
assert(output.size === expectedOutput.size)
323+
for (i <- 0 until output.size) {
324+
assert(output(i) === expectedOutput(i))
325+
}
326+
}
237327
}
238328

239329

0 commit comments

Comments
 (0)