@@ -206,56 +206,14 @@ class PythonTransformedDStream(
206206}
207207*/
208208
209- <<<<<<< HEAD
210- =======
211- /**
212- * This is a input stream just for the unitest. This is equivalent to a checkpointable,
213- * replayable, reliable message queue like Kafka. It requires a sequence as input, and
214- * returns the i_th element at the i_th batch under manual clock.
215- */
216- class PythonTestInputStream (ssc_ : JavaStreamingContext , inputFiles : JArrayList [String ], numPartitions : Int )
217- extends InputDStream [Array [Byte ]](JavaStreamingContext .toStreamingContext(ssc_)){
218-
219- def start () {}
220-
221- def stop () {}
222-
223- def compute (validTime : Time ): Option [RDD [Array [Byte ]]] = {
224- logInfo(" Computing RDD for time " + validTime)
225- inputFiles.foreach(logInfo(_))
226- // make a temporary file
227- // make empty RDD
228- val prefix = " spark"
229- val suffix = " .tmp"
230- val tempFile = File .createTempFile(prefix, suffix)
231- val index = ((validTime - zeroTime) / slideDuration - 1 ).toInt
232- logInfo(" Index: " + index)
233-
234- val selectedInputFile : String = {
235- if (inputFiles.isEmpty){
236- tempFile.getAbsolutePath
237- }else if (index < inputFiles.size()) {
238- inputFiles.get(index)
239- } else {
240- tempFile.getAbsolutePath
241- }
242- }
243- val rdd = PythonRDD .readRDDFromFile(JavaSparkContext .fromSparkContext(ssc_.sparkContext), selectedInputFile, numPartitions).rdd
244- logInfo(" Created RDD " + rdd.id + " with " + selectedInputFile)
245- Some (rdd)
246- }
247-
248- val asJavaDStream = JavaDStream .fromDStream(this )
249- }
250-
251209/**
252210 * This is a input stream just for the unitest. This is equivalent to a checkpointable,
253211 * replayable, reliable message queue like Kafka. It requires a sequence as input, and
254212 * returns the i_th element at the i_th batch under manual clock.
255213 * This implementation is close to QueStream
256214 */
257215
258- class PythonTestInputStream2 (ssc_ : JavaStreamingContext , inputRDDs : JArrayList [JavaRDD [Array [Byte ]]])
216+ class PythonTestInputStream (ssc_ : JavaStreamingContext , inputRDDs : JArrayList [JavaRDD [Array [Byte ]]])
259217 extends InputDStream [Array [Byte ]](JavaStreamingContext .toStreamingContext(ssc_)) {
260218
261219 def start () {}
@@ -280,21 +238,3 @@ class PythonTestInputStream2(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[
280238
281239 val asJavaDStream = JavaDStream .fromDStream(this )
282240}
283-
284-
285- class PythonTestInputStream3 (ssc_ : JavaStreamingContext )
286- extends InputDStream [Any ](JavaStreamingContext .toStreamingContext(ssc_)) {
287-
288- def start () {}
289-
290- def stop () {}
291-
292- def compute (validTime : Time ): Option [RDD [Any ]] = {
293- val index = ((validTime - zeroTime) / slideDuration - 1 ).toInt
294- val selectedInput = ArrayBuffer (1 , 2 , 3 ).toSeq
295- val rdd : RDD [Any ] = ssc.sc.makeRDD(selectedInput, 2 )
296- Some (rdd)
297- }
298-
299- val asJavaDStream = JavaDStream .fromDStream(this )
300- }>>>>>>> broke something
0 commit comments