Skip to content

Commit 7cdaf37

Browse files
committed
Added more checks for BatchInfo
1 parent 5415389 commit 7cdaf37

File tree

2 files changed

+19
-4
lines changed

2 files changed

+19
-4
lines changed

python/pyspark/streaming/tests.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -443,24 +443,39 @@ def func(dstream):
443443

444444
self.assertEqual(len(batchInfosSubmitted), 4)
445445
for info in batchInfosSubmitted:
446-
446+
self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
447+
self.assertGreaterEqual(info.submissionTime(), 0)
448+
self.assertTrue(info.streamIdToInputInfo().isEmpty())
449+
self.assertFalse(info.outputOperationInfos().isEmpty())
450+
self.assertIsNotNone(info.outputOperationInfos().get(0))
447451
self.assertEqual(info.schedulingDelay(), -1)
448452
self.assertEqual(info.processingDelay(), -1)
449453
self.assertEqual(info.totalDelay(), -1)
454+
self.assertEqual(info.numRecords(), 0)
450455

451456
self.assertEqual(len(batchInfosStarted), 4)
452457
for info in batchInfosStarted:
458+
self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
459+
self.assertGreaterEqual(info.submissionTime(), 0)
460+
self.assertTrue(info.streamIdToInputInfo().isEmpty())
461+
self.assertFalse(info.outputOperationInfos().isEmpty())
462+
self.assertIsNotNone(info.outputOperationInfos().get(0))
453463
self.assertGreaterEqual(info.schedulingDelay(), 0)
454464
self.assertEqual(info.processingDelay(), -1)
455465
self.assertEqual(info.totalDelay(), -1)
466+
self.assertEqual(info.numRecords(), 0)
456467

457468
self.assertEqual(len(batchInfosCompleted), 4)
458-
459469
for info in batchInfosCompleted:
460-
470+
self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
471+
self.assertGreaterEqual(info.submissionTime(), 0)
472+
self.assertTrue(info.streamIdToInputInfo().isEmpty())
473+
self.assertFalse(info.outputOperationInfos().isEmpty())
474+
self.assertIsNotNone(info.outputOperationInfos().get(0))
461475
self.assertGreaterEqual(info.schedulingDelay(), 0)
462476
self.assertGreaterEqual(info.processingDelay(), 0)
463477
self.assertGreaterEqual(info.totalDelay(), 0)
478+
self.assertEqual(info.numRecords(), 0)
464479

465480

466481
class WindowFunctionTests(PySparkStreamingTestCase):

streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ private[streaming] trait PythonStreamingListener{
4949
outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted) { }
5050
}
5151

52-
class PythonStreamingListenerWrapper(listener: PythonStreamingListener)
52+
private[streaming] class PythonStreamingListenerWrapper(listener: PythonStreamingListener)
5353
extends JavaStreamingListener {
5454

5555
/** Called when a receiver has been started */

0 commit comments

Comments
 (0)