@@ -74,7 +74,7 @@ class TestInputStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]],
7474 * The buffer contains a sequence of RDD's, each containing a sequence of items
7575 */
7676class TestOutputStream [T : ClassTag ](parent : DStream [T ],
77- val output : ArrayBuffer [Seq [T ]] = ArrayBuffer [Seq [T ]]() )
77+ val output : SynchronizedBuffer [Seq [T ]] = new ArrayBuffer [Seq [T ]] with SynchronizedBuffer [ Seq [ T ]] )
7878 extends ForEachDStream [T ](parent, (rdd : RDD [T ], t : Time ) => {
7979 val collected = rdd.collect()
8080 output += collected
@@ -95,8 +95,10 @@ class TestOutputStream[T: ClassTag](parent: DStream[T],
9595 * The buffer contains a sequence of RDD's, each containing a sequence of partitions, each
9696 * containing a sequence of items.
9797 */
98- class TestOutputStreamWithPartitions [T : ClassTag ](parent : DStream [T ],
99- val output : ArrayBuffer [Seq [Seq [T ]]] = ArrayBuffer [Seq [Seq [T ]]]())
98+ class TestOutputStreamWithPartitions [T : ClassTag ](
99+ parent : DStream [T ],
100+ val output : SynchronizedBuffer [Seq [Seq [T ]]] =
101+ new ArrayBuffer [Seq [Seq [T ]]] with SynchronizedBuffer [Seq [Seq [T ]]])
100102 extends ForEachDStream [T ](parent, (rdd : RDD [T ], t : Time ) => {
101103 val collected = rdd.glom().collect().map(_.toSeq)
102104 output += collected
@@ -108,10 +110,6 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
108110 ois.defaultReadObject()
109111 output.clear()
110112 }
111-
112- def toTestOutputStream : TestOutputStream [T ] = {
113- new TestOutputStream [T ](this .parent, this .output.map(_.flatten))
114- }
115113}
116114
117115/**
0 commit comments