Skip to content

Commit 9d66e81

Browse files
committed
Added test_batch_info_reports() to streaming/tests.py
1 parent 7d50848 commit 9d66e81

File tree

1 file changed

+61
-2
lines changed

1 file changed

+61
-2
lines changed

python/pyspark/streaming/tests.py

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
from pyspark.streaming.flume import FlumeUtils
4444
from pyspark.streaming.mqtt import MQTTUtils
4545
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
46+
from pyspark.streaming.listener import StreamingListener
4647

4748

4849
class PySparkStreamingTestCase(unittest.TestCase):
@@ -72,7 +73,7 @@ def tearDown(self):
7273
if self.ssc is not None:
7374
self.ssc.stop(False)
7475
# Clean up in the JVM just in case there has been some issues in Python API
75-
jStreamingContextOption = StreamingContext._jvm.SparkContext.getActive()
76+
jStreamingContextOption = StreamingContext.getActive()._jvm.SparkContext.get()
7677
if jStreamingContextOption.nonEmpty():
7778
jStreamingContextOption.get().stop(False)
7879

@@ -392,6 +393,63 @@ def func(dstream):
392393
self._test_func(input, func, expected)
393394

394395

396+
class StreamingListenerTests(PySparkStreamingTestCase):
397+
398+
class BatchInfoCollector(StreamingListener):
399+
400+
def __init__(self):
401+
super(StreamingListener, self).__init__()
402+
self.batchInfosCompleted = []
403+
self.batchInfosStarted = []
404+
self.batchInfosSubmitted = []
405+
406+
def onBatchSubmitted(self, batchSubmitted):
407+
self.batchInfosSubmitted.append(self.getEventInfo(batchSubmitted))
408+
409+
def onBatchStarted(self, batchStarted):
410+
self.batchInfosStarted.append(self.getEventInfo(batchStarted))
411+
412+
def onBatchCompleted(self, batchCompleted):
413+
self.batchInfosCompleted.append(self.getEventInfo(batchCompleted))
414+
415+
def test_batch_info_reports(self):
416+
batch_collector = self.BatchInfoCollector()
417+
self.ssc.addStreamingListener(batch_collector)
418+
input = [[1], [2], [3], [4]]
419+
420+
def func(dstream):
421+
return dstream.map(int)
422+
expected = [[1], [2], [3], [4]]
423+
self._test_func(input, func, expected)
424+
425+
batchInfosSubmitted = batch_collector.batchInfosSubmitted
426+
self.assertEqual(len(batchInfosSubmitted), 4)
427+
428+
for info in batchInfosSubmitted:
429+
self.assertEqual(info.schedulingDelay().getClass().getSimpleName(), u'None$')
430+
self.assertEqual(info.processingDelay().getClass().getSimpleName(), u'None$')
431+
self.assertEqual(info.totalDelay().getClass().getSimpleName(), u'None$')
432+
433+
batchInfosStarted = batch_collector.batchInfosStarted
434+
self.assertEqual(len(batchInfosStarted), 4)
435+
for info in batchInfosStarted:
436+
self.assertNotEqual(info.schedulingDelay().getClass().getSimpleName(), u'None$')
437+
self.assertGreaterEqual(info.schedulingDelay().get(), 0L)
438+
self.assertEqual(info.processingDelay().getClass().getSimpleName(), u'None$')
439+
self.assertEqual(info.totalDelay().getClass().getSimpleName(), u'None$')
440+
441+
batchInfosCompleted = batch_collector.batchInfosCompleted
442+
self.assertEqual(len(batchInfosCompleted), 4)
443+
444+
for info in batchInfosCompleted:
445+
self.assertNotEqual(info.schedulingDelay().getClass().getSimpleName(), u'None$')
446+
self.assertNotEqual(info.processingDelay().getClass().getSimpleName(), u'None$')
447+
self.assertNotEqual(info.totalDelay().getClass().getSimpleName(), u'None$')
448+
self.assertGreaterEqual(info.schedulingDelay().get(), 0L)
449+
self.assertGreaterEqual(info.processingDelay().get(), 0L)
450+
self.assertGreaterEqual(info.totalDelay().get(), 0L)
451+
452+
395453
class WindowFunctionTests(PySparkStreamingTestCase):
396454

397455
timeout = 15
@@ -1275,7 +1333,8 @@ def search_kinesis_asl_assembly_jar():
12751333

12761334
os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
12771335
testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests,
1278-
KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests, MQTTStreamTests]
1336+
KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests, MQTTStreamTests,
1337+
StreamingListenerTests]
12791338

12801339
if kinesis_jar_present is True:
12811340
testcases.append(KinesisStreamTests)

0 commit comments

Comments
 (0)