@@ -96,6 +96,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
9696 }
9797 }
9898
99+ test(" binary records stream" ) {
100+ testBinaryRecordsStream()
101+ }
99102
100103 test(" file input stream - newFilesOnly = true" ) {
101104 testFileStream(newFilesOnly = true )
@@ -233,6 +236,46 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
233236 }
234237 }
235238
239+ def testBinaryRecordsStream () {
240+ var ssc : StreamingContext = null
241+ val testDir : File = null
242+ try {
243+ val testDir = Utils .createTempDir()
244+
245+ Thread .sleep(1000 )
246+ // Set up the streaming context and input streams
247+ val newConf = conf.clone.set(
248+ " spark.streaming.clock" , " org.apache.spark.streaming.util.SystemClock" )
249+ ssc = new StreamingContext (newConf, batchDuration)
250+
251+ val fileStream = ssc.binaryRecordsStream(testDir.toString, 1 )
252+
253+ val outputBuffer = new ArrayBuffer [Seq [Array [Byte ]]] with SynchronizedBuffer [Seq [Array [Byte ]]]
254+ val outputStream = new TestOutputStream (fileStream, outputBuffer)
255+ outputStream.register()
256+ ssc.start()
257+
258+ // Create files in the directory with binary data
259+ val input = Seq (1 , 2 , 3 , 4 , 5 )
260+ input.foreach { i =>
261+ Thread .sleep(batchDuration.milliseconds)
262+ val file = new File (testDir, i.toString)
263+ Files .write(Array [Byte ](i.toByte), file)
264+ logInfo(" Created file " + file)
265+ }
266+
267+ // Verify contents of output
268+ eventually(timeout(maxWaitTimeMillis milliseconds), interval(100 milliseconds)) {
269+ val expectedOutput = input.map(i => i.toByte)
270+ val obtainedOutput = outputBuffer.flatten.toList.map(i => i(0 ).toByte)
271+ assert(obtainedOutput === expectedOutput)
272+ }
273+ } finally {
274+ if (ssc != null ) ssc.stop()
275+ if (testDir != null ) Utils .deleteRecursively(testDir)
276+ }
277+ }
278+
236279 def testFileStream (newFilesOnly : Boolean ) {
237280 var ssc : StreamingContext = null
238281 val testDir : File = null
0 commit comments