From a63ef33188c14796dc0ca662448443ca2f8bb423 Mon Sep 17 00:00:00 2001 From: Daniel Jalova Date: Mon, 5 Oct 2015 17:12:31 -0700 Subject: [PATCH 01/17] Added skeleton classes for listener.py --- python/pyspark/streaming/listener.py | 56 ++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 python/pyspark/streaming/listener.py diff --git a/python/pyspark/streaming/listener.py b/python/pyspark/streaming/listener.py new file mode 100644 index 0000000000000..b161ed7c84125 --- /dev/null +++ b/python/pyspark/streaming/listener.py @@ -0,0 +1,56 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class StreamingListenerEvent(object): + +class StreamingListenerBatchSubmitted(StreamingListenerEvent, batchInfo): + +class StreamingListenerBatchCompleted(StreamingListenerEvent, batchInfo): + +class StreamingListenerBatchStarted(StreamingListenerEvent, batchInfo): + +class StreamingListenerStarted(StreamingListenerEvent, receiverInfo): + +class StreamingListenerReceiverError(StreamingListenerEvent, receiverInfo): + +class StreamingListenerReceiverStopped(StreamingListenerEvent, receiverInfo): + +class StreamingListener(object): + + # Called when a receiver has been started. + def onReceiverStarted(self, receiverStarted): + pass + + # Called when a receiver has reported an error. + def onReceiverError(self, receiverError): + pass + + # Called when a receiver has been stopped + def onReceiverStopped(self, receiverStopped): + pass + + # Called when a batch of jobs has been submitted for processing. + def onBatchSubmitted(self, batchSubmitted): + pass + + # Called when processing of a batch of jobs has started. + def onBatchStarted(self, batchStarted): + pass + + # Called when processing of a batch of jobs has completed. + def onBatchCompleted(self, batchCompleted): + pass From f6c91e9b06e238b3d8e8671705124e1f42a44b73 Mon Sep 17 00:00:00 2001 From: Daniel Jalova Date: Mon, 12 Oct 2015 10:37:16 -0700 Subject: [PATCH 02/17] Added skeleton class for Python API streamingListener --- python/pyspark/streaming/listener.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python/pyspark/streaming/listener.py b/python/pyspark/streaming/listener.py index b161ed7c84125..e969a624740bf 100644 --- a/python/pyspark/streaming/listener.py +++ b/python/pyspark/streaming/listener.py @@ -15,6 +15,8 @@ # limitations under the License. # +from py4j.java_gateway import JavaObject + class StreamingListenerEvent(object): class StreamingListenerBatchSubmitted(StreamingListenerEvent, batchInfo): @@ -31,6 +33,9 @@ class StreamingListenerReceiverStopped(StreamingListenerEvent, receiverInfo): class StreamingListener(object): + def __init__(self): + + # Called when a receiver has been started. def onReceiverStarted(self, receiverStarted): pass From 1e6a87e6adf940e72c0e9048130b349067c16b73 Mon Sep 17 00:00:00 2001 From: Daniel Jalova Date: Tue, 13 Oct 2015 10:07:48 -0700 Subject: [PATCH 03/17] Added listener.py to __init__.py --- python/pyspark/streaming/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/streaming/__init__.py b/python/pyspark/streaming/__init__.py index d2644a1d4ffab..66e8f8ef001e3 100644 --- a/python/pyspark/streaming/__init__.py +++ b/python/pyspark/streaming/__init__.py @@ -17,5 +17,6 @@ from pyspark.streaming.context import StreamingContext from pyspark.streaming.dstream import DStream +from pyspark.streaming.listener import StreamingListener -__all__ = ['StreamingContext', 'DStream'] +__all__ = ['StreamingContext', 'DStream', 'StreamingListener'] From 2579f644bafc231a50f14ab814b14e2253bc991a Mon Sep 17 00:00:00 2001 From: Daniel Jalova Date: Tue, 13 Oct 2015 16:15:20 -0700 Subject: [PATCH 04/17] Working implementation of StreamingListener for Python API. Need to implement StreamingListenerEvent --- python/pyspark/streaming/context.py | 9 +++++++++ python/pyspark/streaming/listener.py | 23 ++++++++++++++++++++--- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index a8c9ffc235b9e..a52a9a919c86f 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -28,6 +28,7 @@ from pyspark.storagelevel import StorageLevel from pyspark.streaming.dstream import DStream from pyspark.streaming.util import TransformFunction, TransformFunctionSerializer +from pyspark.streaming.listener import StreamingListener __all__ = ["StreamingContext"] @@ -401,3 +402,11 @@ def union(self, *dstreams): first = dstreams[0] jrest = [d._jdstream for d in dstreams[1:]] return DStream(self._jssc.union(first._jdstream, jrest), self, first._jrdd_deserializer) + + def addStreamingListener(self): + """ + + :return: + """ + self._jssc.addStreamingListener(StreamingListener()) + #pass \ No newline at end of file diff --git a/python/pyspark/streaming/listener.py b/python/pyspark/streaming/listener.py index e969a624740bf..5f48a90aca471 100644 --- a/python/pyspark/streaming/listener.py +++ b/python/pyspark/streaming/listener.py @@ -15,8 +15,11 @@ # limitations under the License. # -from py4j.java_gateway import JavaObject +from py4j.java_gateway import JavaGateway + + +""" class StreamingListenerEvent(object): class StreamingListenerBatchSubmitted(StreamingListenerEvent, batchInfo): @@ -30,12 +33,15 @@ class StreamingListenerStarted(StreamingListenerEvent, receiverInfo): class StreamingListenerReceiverError(StreamingListenerEvent, receiverInfo): class StreamingListenerReceiverStopped(StreamingListenerEvent, receiverInfo): +""" + +__all__ = ["StreamingListener"] + class StreamingListener(object): def __init__(self): - # Called when a receiver has been started. def onReceiverStarted(self, receiverStarted): pass @@ -50,7 +56,6 @@ def onReceiverStopped(self, receiverStopped): # Called when a batch of jobs has been submitted for processing. def onBatchSubmitted(self, batchSubmitted): - pass # Called when processing of a batch of jobs has started. def onBatchStarted(self, batchStarted): @@ -59,3 +64,15 @@ def onBatchStarted(self, batchStarted): # Called when processing of a batch of jobs has completed. def onBatchCompleted(self, batchCompleted): pass + + # Called when processing of a job of a batch has started. + def onOutputOperationStarted(self, outputOperationStarted): + pass + + # Called when processing of a job of a batch has completed + def onOutputOperationCompleted(self, outputOperationCompleted): + pass + + class Java: + implements = ["org.apache.spark.streaming.scheduler.StreamingListener"] + From afad086208676578d586c2a4b8450cfb17d0eec2 Mon Sep 17 00:00:00 2001 From: Daniel Jalova Date: Wed, 14 Oct 2015 12:19:37 -0700 Subject: [PATCH 05/17] Modified signature for addStreamingListener to match Scala API, added comment. --- python/pyspark/streaming/context.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index a52a9a919c86f..9b7002567c3d5 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -403,10 +403,9 @@ def union(self, *dstreams): jrest = [d._jdstream for d in dstreams[1:]] return DStream(self._jssc.union(first._jdstream, jrest), self, first._jrdd_deserializer) - def addStreamingListener(self): + def addStreamingListener(self, streamingListener): """ - - :return: + Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for + receiving system events related to streaming. """ - self._jssc.addStreamingListener(StreamingListener()) - #pass \ No newline at end of file + self._jssc.addStreamingListener(streamingListener) From 7d50848f19a2f000f5fb7c6f63ece799d39ddada Mon Sep 17 00:00:00 2001 From: Daniel Jalova Date: Fri, 16 Oct 2015 12:06:27 -0700 Subject: [PATCH 06/17] Refactored StreamingListener methods and added getEventInfo. --- python/pyspark/streaming/listener.py | 41 +++++++++++++--------------- 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/python/pyspark/streaming/listener.py b/python/pyspark/streaming/listener.py index 5f48a90aca471..dd0fa764683b7 100644 --- a/python/pyspark/streaming/listener.py +++ b/python/pyspark/streaming/listener.py @@ -15,32 +15,13 @@ # limitations under the License. # - -from py4j.java_gateway import JavaGateway - - -""" -class StreamingListenerEvent(object): - -class StreamingListenerBatchSubmitted(StreamingListenerEvent, batchInfo): - -class StreamingListenerBatchCompleted(StreamingListenerEvent, batchInfo): - -class StreamingListenerBatchStarted(StreamingListenerEvent, batchInfo): - -class StreamingListenerStarted(StreamingListenerEvent, receiverInfo): - -class StreamingListenerReceiverError(StreamingListenerEvent, receiverInfo): - -class StreamingListenerReceiverStopped(StreamingListenerEvent, receiverInfo): -""" - __all__ = ["StreamingListener"] class StreamingListener(object): def __init__(self): + pass # Called when a receiver has been started. def onReceiverStarted(self, receiverStarted): @@ -50,12 +31,13 @@ def onReceiverStarted(self, receiverStarted): def onReceiverError(self, receiverError): pass - # Called when a receiver has been stopped + # Called when a receiver has been stopped. def onReceiverStopped(self, receiverStopped): pass # Called when a batch of jobs has been submitted for processing. def onBatchSubmitted(self, batchSubmitted): + pass # Called when processing of a batch of jobs has started. def onBatchStarted(self, batchStarted): @@ -73,6 +55,21 @@ def onOutputOperationStarted(self, outputOperationStarted): def onOutputOperationCompleted(self, outputOperationCompleted): pass + def getEventInfo(self, event): + """ + :param event: StreamingListenerEvent + :return Returns a BatchInfo, OutputOperationInfo, or ReceiverInfo based on + event passed. + """ + event_name = event.getClass().getSimpleName() + if 'Batch' in event_name: + return event.batchInfo() + + elif 'Output' in event_name: + return event.outputOperationInfo() + + elif 'Receiver' in event_name: + return event.receiverInfo() + class Java: implements = ["org.apache.spark.streaming.scheduler.StreamingListener"] - From 9d66e81211bd600bc5ff8940a1cb9abb3c624b16 Mon Sep 17 00:00:00 2001 From: Daniel Jalova Date: Tue, 20 Oct 2015 12:39:16 -0700 Subject: [PATCH 07/17] Added test_batch_info_reports() to streaming/tests.py --- python/pyspark/streaming/tests.py | 63 ++++++++++++++++++++++++++++++- 1 file changed, 61 insertions(+), 2 deletions(-) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index e4e56fff3b3fc..7a816a7f71b71 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -43,6 +43,7 @@ from pyspark.streaming.flume import FlumeUtils from pyspark.streaming.mqtt import MQTTUtils from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream +from pyspark.streaming.listener import StreamingListener class PySparkStreamingTestCase(unittest.TestCase): @@ -72,7 +73,7 @@ def tearDown(self): if self.ssc is not None: self.ssc.stop(False) # Clean up in the JVM just in case there has been some issues in Python API - jStreamingContextOption = StreamingContext._jvm.SparkContext.getActive() + jStreamingContextOption = StreamingContext.getActive()._jvm.SparkContext.get() if jStreamingContextOption.nonEmpty(): jStreamingContextOption.get().stop(False) @@ -392,6 +393,63 @@ def func(dstream): self._test_func(input, func, expected) +class StreamingListenerTests(PySparkStreamingTestCase): + + class BatchInfoCollector(StreamingListener): + + def __init__(self): + super(StreamingListener, self).__init__() + self.batchInfosCompleted = [] + self.batchInfosStarted = [] + self.batchInfosSubmitted = [] + + def onBatchSubmitted(self, batchSubmitted): + self.batchInfosSubmitted.append(self.getEventInfo(batchSubmitted)) + + def onBatchStarted(self, batchStarted): + self.batchInfosStarted.append(self.getEventInfo(batchStarted)) + + def onBatchCompleted(self, batchCompleted): + self.batchInfosCompleted.append(self.getEventInfo(batchCompleted)) + + def test_batch_info_reports(self): + batch_collector = self.BatchInfoCollector() + self.ssc.addStreamingListener(batch_collector) + input = [[1], [2], [3], [4]] + + def func(dstream): + return dstream.map(int) + expected = [[1], [2], [3], [4]] + self._test_func(input, func, expected) + + batchInfosSubmitted = batch_collector.batchInfosSubmitted + self.assertEqual(len(batchInfosSubmitted), 4) + + for info in batchInfosSubmitted: + self.assertEqual(info.schedulingDelay().getClass().getSimpleName(), u'None$') + self.assertEqual(info.processingDelay().getClass().getSimpleName(), u'None$') + self.assertEqual(info.totalDelay().getClass().getSimpleName(), u'None$') + + batchInfosStarted = batch_collector.batchInfosStarted + self.assertEqual(len(batchInfosStarted), 4) + for info in batchInfosStarted: + self.assertNotEqual(info.schedulingDelay().getClass().getSimpleName(), u'None$') + self.assertGreaterEqual(info.schedulingDelay().get(), 0L) + self.assertEqual(info.processingDelay().getClass().getSimpleName(), u'None$') + self.assertEqual(info.totalDelay().getClass().getSimpleName(), u'None$') + + batchInfosCompleted = batch_collector.batchInfosCompleted + self.assertEqual(len(batchInfosCompleted), 4) + + for info in batchInfosCompleted: + self.assertNotEqual(info.schedulingDelay().getClass().getSimpleName(), u'None$') + self.assertNotEqual(info.processingDelay().getClass().getSimpleName(), u'None$') + self.assertNotEqual(info.totalDelay().getClass().getSimpleName(), u'None$') + self.assertGreaterEqual(info.schedulingDelay().get(), 0L) + self.assertGreaterEqual(info.processingDelay().get(), 0L) + self.assertGreaterEqual(info.totalDelay().get(), 0L) + + class WindowFunctionTests(PySparkStreamingTestCase): timeout = 15 @@ -1275,7 +1333,8 @@ def search_kinesis_asl_assembly_jar(): os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests, - KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests, MQTTStreamTests] + KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests, MQTTStreamTests, + StreamingListenerTests] if kinesis_jar_present is True: testcases.append(KinesisStreamTests) From 0ac3df63fc00d0fc6fe99f284496b9ad136666ef Mon Sep 17 00:00:00 2001 From: Daniel Jalova Date: Fri, 23 Oct 2015 11:54:33 -0700 Subject: [PATCH 08/17] Added Python friendly classes for BatchInfo, OutputOperationInfo, and ReceiverInfo --- python/pyspark/streaming/listener.py | 140 ++++++++++++++++++++++++--- python/pyspark/streaming/tests.py | 29 +++--- 2 files changed, 144 insertions(+), 25 deletions(-) diff --git a/python/pyspark/streaming/listener.py b/python/pyspark/streaming/listener.py index dd0fa764683b7..69a2fb7b3124b 100644 --- a/python/pyspark/streaming/listener.py +++ b/python/pyspark/streaming/listener.py @@ -23,36 +23,52 @@ class StreamingListener(object): def __init__(self): pass - # Called when a receiver has been started. def onReceiverStarted(self, receiverStarted): + """ + Called when a receiver has been started + """ pass - # Called when a receiver has reported an error. def onReceiverError(self, receiverError): + """ + Called when a receiver has reported an error + """ pass - # Called when a receiver has been stopped. def onReceiverStopped(self, receiverStopped): + """ + Called when a receiver has been stopped + """ pass - # Called when a batch of jobs has been submitted for processing. def onBatchSubmitted(self, batchSubmitted): + """ + Called when a batch of jobs has been submitted for processing. + """ pass - # Called when processing of a batch of jobs has started. def onBatchStarted(self, batchStarted): + """ + Called when processing of a batch of jobs has started. + """ pass - # Called when processing of a batch of jobs has completed. def onBatchCompleted(self, batchCompleted): + """ + Called when processing of a batch of jobs has completed. + """ pass - # Called when processing of a job of a batch has started. def onOutputOperationStarted(self, outputOperationStarted): + """ + Called when processing of a job of a batch has started. + """ pass - # Called when processing of a job of a batch has completed def onOutputOperationCompleted(self, outputOperationCompleted): + """ + Called when processing of a job of a batch has completed + """ pass def getEventInfo(self, event): @@ -63,13 +79,115 @@ def getEventInfo(self, event): """ event_name = event.getClass().getSimpleName() if 'Batch' in event_name: - return event.batchInfo() + return BatchInfo(event.batchInfo()) elif 'Output' in event_name: - return event.outputOperationInfo() + return OutputOperationInfo(event.outputOperationInfo()) elif 'Receiver' in event_name: - return event.receiverInfo() + return ReceiverInfo(event.receiverInfo()) class Java: implements = ["org.apache.spark.streaming.scheduler.StreamingListener"] + + +class BatchInfo(object): + + def __init__(self, javaBatchInfo): + + self.processingStartTime = None + self.processingEndTime = None + + self.batchTime = javaBatchInfo.batchTime() + self.streamIdToInputInfo = self._map2dict(javaBatchInfo.streamIdToInputInfo()) + + self.submissionTime = javaBatchInfo.submissionTime() + if javaBatchInfo.processingStartTime().isEmpty() is False: + self.processingStartTime = javaBatchInfo.processingStartTime().get() + if javaBatchInfo.processingEndTime().isEmpty() is False: + self.processingEndTime = javaBatchInfo.processingEndTime().get() + + self.outputOperationInfos = self._map2dict(javaBatchInfo.outputOperationInfos()) + + + def schedulingDelay(self): + """ + Time taken for the first job of this batch to start processing from the time this batch + was submitted to the streaming scheduler. + """ + if self.processingStartTime is None: + return None + else: + return self.processingStartTime - self.submissionTime + + def processingDelay(self): + """ + Time taken for the all jobs of this batch to finish processing from the time they started + processing. + """ + if self.processingEndTime is None or self.processingStartTime is None: + return None + else: + return self.processingEndTime - self.processingStartTime + + def totalDelay(self): + """ + Time taken for all the jobs of this batch to finish processing from the time they + were submitted + """ + if self.processingEndTime is None or self.processingStartTime is None: + return None + else: + return self.processingDelay() + self.schedulingDelay() + + def numRecords(self): + """ + The number of recorders received by the receivers in this batch. + """ + return len(self.streamIdToInputInfo) + + def _map2dict(self, javaMap): + """ + Converts a scala.collection.immutable.Map to a Python dict + """ + mapping = dict() + map_iterator = javaMap.iterator() + while map_iterator.hasNext(): + entry = map_iterator.next() + mapping[entry._1()] = entry._2() + return mapping + + +class OutputOperationInfo(object): + + def __init__(self, outputOperationInfo): + self.batchTime = outputOperationInfo.batchTime() + self.id = outputOperationInfo.id() + self.name = outputOperationInfo.name() + self.startTime = None + if outputOperationInfo.startTime().isEmpty() is False: + self.startTime = outputOperationInfo.startTime().get() + self.endTime = None + if outputOperationInfo.endTime().isEmpty() is False: + self.endTime = outputOperationInfo.endTime().get() + self.failureReason = None + if outputOperationInfo.failureReason().isEmpty() is False: + self.failureReason = outputOperationInfo.failureReason().get() + + def duration(self): + if self.endTime is None or self.startTime is None: + return None + else: + return self.endTime - self.startTime + + +class ReceiverInfo(object): + + def __init__(self, receiverInfo): + self.streamId = receiverInfo.streamId() + self.name = receiverInfo.name() + self.active = receiverInfo.active() + self.location = receiverInfo.location() + self.lastErrorMessage = receiverInfo.lastErrorMessage() + self.lastError = receiverInfo.lastError() + self.lastErrorTime = receiverInfo.lastErrorTime() diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index c868c92feae78..cd8aba989a19c 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -43,7 +43,7 @@ from pyspark.streaming.flume import FlumeUtils from pyspark.streaming.mqtt import MQTTUtils from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream -from pyspark.streaming.listener import StreamingListener +from pyspark.streaming.listener import StreamingListener, BatchInfo class PySparkStreamingTestCase(unittest.TestCase): @@ -432,28 +432,29 @@ def func(dstream): self.assertEqual(len(batchInfosSubmitted), 4) for info in batchInfosSubmitted: - self.assertEqual(info.schedulingDelay().getClass().getSimpleName(), u'None$') - self.assertEqual(info.processingDelay().getClass().getSimpleName(), u'None$') - self.assertEqual(info.totalDelay().getClass().getSimpleName(), u'None$') + + self.assertIsNone(info.schedulingDelay()) + self.assertIsNone(info.processingDelay()) + self.assertIsNone(info.totalDelay()) batchInfosStarted = batch_collector.batchInfosStarted self.assertEqual(len(batchInfosStarted), 4) for info in batchInfosStarted: - self.assertNotEqual(info.schedulingDelay().getClass().getSimpleName(), u'None$') - self.assertGreaterEqual(info.schedulingDelay().get(), 0) - self.assertEqual(info.processingDelay().getClass().getSimpleName(), u'None$') - self.assertEqual(info.totalDelay().getClass().getSimpleName(), u'None$') + self.assertIsNotNone(info.schedulingDelay()) + self.assertGreaterEqual(info.schedulingDelay(), 0) + self.assertIsNone(info.processingDelay()) + self.assertIsNone(info.totalDelay()) batchInfosCompleted = batch_collector.batchInfosCompleted self.assertEqual(len(batchInfosCompleted), 4) for info in batchInfosCompleted: - self.assertNotEqual(info.schedulingDelay().getClass().getSimpleName(), u'None$') - self.assertNotEqual(info.processingDelay().getClass().getSimpleName(), u'None$') - self.assertNotEqual(info.totalDelay().getClass().getSimpleName(), u'None$') - self.assertGreaterEqual(info.schedulingDelay().get(), 0) - self.assertGreaterEqual(info.processingDelay().get(), 0) - self.assertGreaterEqual(info.totalDelay().get(), 0) + self.assertIsNotNone(info.schedulingDelay()) + self.assertIsNotNone(info.processingDelay()) + self.assertIsNotNone(info.totalDelay()) + self.assertGreaterEqual(info.schedulingDelay(), 0) + self.assertGreaterEqual(info.processingDelay(), 0) + self.assertGreaterEqual(info.totalDelay(), 0) class WindowFunctionTests(PySparkStreamingTestCase): From 233104d8cfc761eeaa9b3c21808f21209cbdac93 Mon Sep 17 00:00:00 2001 From: Daniel Jalova Date: Fri, 23 Oct 2015 16:10:24 -0700 Subject: [PATCH 09/17] Fixed pep8 check. Added delay to test_batch_info_reports --- python/pyspark/streaming/listener.py | 1 - python/pyspark/streaming/tests.py | 5 +++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/python/pyspark/streaming/listener.py b/python/pyspark/streaming/listener.py index 69a2fb7b3124b..221767bf85a79 100644 --- a/python/pyspark/streaming/listener.py +++ b/python/pyspark/streaming/listener.py @@ -109,7 +109,6 @@ def __init__(self, javaBatchInfo): self.outputOperationInfos = self._map2dict(javaBatchInfo.outputOperationInfos()) - def schedulingDelay(self): """ Time taken for the first job of this batch to start processing from the time this batch diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index cd8aba989a19c..fc76c6ea2300f 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -401,6 +401,8 @@ def func(dstream): class StreamingListenerTests(PySparkStreamingTestCase): + duration = .5 + class BatchInfoCollector(StreamingListener): def __init__(self): @@ -428,6 +430,9 @@ def func(dstream): expected = [[1], [2], [3], [4]] self._test_func(input, func, expected) + # Test occasionally fails without a delay + time.sleep(.1) + batchInfosSubmitted = batch_collector.batchInfosSubmitted self.assertEqual(len(batchInfosSubmitted), 4) From 79d70abe4d31f4b021ef9b35ed26464bed6fd16c Mon Sep 17 00:00:00 2001 From: Daniel Jalova Date: Mon, 26 Oct 2015 14:32:34 -0700 Subject: [PATCH 10/17] Added StreamingListenerAdapter for Python. --- python/pyspark/streaming/context.py | 5 +- python/pyspark/streaming/listener.py | 123 +++++++++++++++++++++++---- python/pyspark/streaming/tests.py | 6 +- 3 files changed, 114 insertions(+), 20 deletions(-) diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index cb928cf892ff9..ef0980e902f8a 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -28,7 +28,7 @@ from pyspark.storagelevel import StorageLevel from pyspark.streaming.dstream import DStream from pyspark.streaming.util import TransformFunction, TransformFunctionSerializer -from pyspark.streaming.listener import StreamingListener +from pyspark.streaming.listener import StreamingListenerAdapter __all__ = ["StreamingContext"] @@ -370,4 +370,5 @@ def addStreamingListener(self, streamingListener): Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for receiving system events related to streaming. """ - self._jssc.addStreamingListener(streamingListener) + streamingListenerAdapter = StreamingListenerAdapter(streamingListener) + self._jssc.addStreamingListener(streamingListenerAdapter) diff --git a/python/pyspark/streaming/listener.py b/python/pyspark/streaming/listener.py index 221767bf85a79..80d138ecf1c05 100644 --- a/python/pyspark/streaming/listener.py +++ b/python/pyspark/streaming/listener.py @@ -18,6 +18,68 @@ __all__ = ["StreamingListener"] +class StreamingListenerEvent(object): + + def __init__(self): + pass + + +class StreamingListenerBatchSubmitted(StreamingListenerEvent): + + def __init__(self, batchInfo): + super(StreamingListenerEvent, self).__init__() + self.batchInfo = batchInfo + + +class StreamingListenerBatchCompleted(StreamingListenerEvent): + + def __init__(self, batchInfo): + super(StreamingListenerEvent, self).__init__() + self.batchInfo = batchInfo + + +class StreamingListenerBatchStarted(StreamingListenerEvent): + + def __init__(self, batchInfo): + super(StreamingListenerEvent, self).__init__() + self.batchInfo = batchInfo + + +class StreamingListenerOutputOperationStarted(StreamingListenerEvent): + + def __init__(self, outputOperationInfo): + super(StreamingListenerEvent, self).__init__() + self.outputOperationInfo = outputOperationInfo + + +class StreamingListenerOutputOperationCompleted(StreamingListenerEvent): + + def __init__(self, outputOperationInfo): + super(StreamingListenerEvent, self).__init__() + self.outputOperationInfo = outputOperationInfo + + +class StreamingListenerReceieverStarted(StreamingListenerEvent): + + def __init__(self, receiverInfo): + super(StreamingListenerEvent, self).__init__() + self.receiverInfo = receiverInfo + + +class StreamingListenerReceiverError(StreamingListenerEvent): + + def __init__(self, receiverInfo): + super(StreamingListenerEvent, self).__init__() + self.receiverInfo = receiverInfo + + +class StreamingListenerReceiverStopped(StreamingListenerEvent): + + def __init__(self, receiverInfo): + super(StreamingListenerEvent, self).__init__() + self.receiverInfo = receiverInfo + + class StreamingListener(object): def __init__(self): @@ -71,24 +133,55 @@ def onOutputOperationCompleted(self, outputOperationCompleted): """ pass - def getEventInfo(self, event): - """ - :param event: StreamingListenerEvent - :return Returns a BatchInfo, OutputOperationInfo, or ReceiverInfo based on - event passed. - """ - event_name = event.getClass().getSimpleName() - if 'Batch' in event_name: - return BatchInfo(event.batchInfo()) + class Java: + implements = ["org.apache.spark.streaming.scheduler.StreamingListener"] - elif 'Output' in event_name: - return OutputOperationInfo(event.outputOperationInfo()) - elif 'Receiver' in event_name: - return ReceiverInfo(event.receiverInfo()) +class StreamingListenerAdapter(StreamingListener): - class Java: - implements = ["org.apache.spark.streaming.scheduler.StreamingListener"] + def __init__(self, streamingListener): + super(StreamingListener, self).__init__() + self.userStreamingListener = streamingListener + + def onReceiverStarted(self, receiverStarted): + receiver_info = ReceiverInfo(receiverStarted.receiverInfo()) + receiver_started = StreamingListenerReceieverStarted(receiver_info) + self.userStreamingListener.onReceiverStarted(receiver_started) + + def onReceiverError(self, receiverError): + receiver_info = ReceiverInfo(receiverError.receiverInfo()) + receiver_error = StreamingListenerReceiverError(receiver_info) + self.userStreamingListener.onReceiverError(receiver_error) + + def onReceiverStopped(self, receiverStopped): + receiver_info = ReceiverInfo(receiverStopped.receiverInfo()) + receiver_stopped = StreamingListenerReceiverStopped(receiver_info) + self.userStreamingListener.onReceiverStopped(receiver_stopped) + + def onBatchSubmitted(self, batchSubmitted): + batch_info = BatchInfo(batchSubmitted.batchInfo()) + batch_submitted = StreamingListenerBatchSubmitted(batch_info) + self.userStreamingListener.onBatchSubmitted(batch_submitted) + + def onBatchStarted(self, batchStarted): + batch_info = BatchInfo(batchStarted.batchInfo()) + batch_started = StreamingListenerBatchStarted(batch_info) + self.userStreamingListener .onBatchStarted(batch_started) + + def onBatchCompleted(self, batchCompleted): + batch_info = BatchInfo(batchCompleted.batchInfo()) + batch_completed = StreamingListenerBatchCompleted(batch_info) + self.userStreamingListener.onBatchCompleted(batch_completed) + + def onOutputOperationStarted(self, outputOperationStarted): + output_op_info = OutputOperationInfo(outputOperationStarted.outputOperationInfo()) + output_operation_started = StreamingListenerOutputOperationStarted(output_op_info) + self.userStreamingListener.onOutputOperationStarted(output_operation_started) + + def onOutputOperationCompleted(self, outputOperationCompleted): + output_op_info = OutputOperationInfo(outputOperationCompleted.outputOperationInfo()) + output_operation_completed = StreamingListenerOutputOperationCompleted(output_op_info) + self.userStreamingListener.onOutputOperationCompleted(output_operation_completed) class BatchInfo(object): diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index fc76c6ea2300f..223984f10ad8e 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -412,13 +412,13 @@ def __init__(self): self.batchInfosSubmitted = [] def onBatchSubmitted(self, batchSubmitted): - self.batchInfosSubmitted.append(self.getEventInfo(batchSubmitted)) + self.batchInfosSubmitted.append(batchSubmitted.batchInfo) def onBatchStarted(self, batchStarted): - self.batchInfosStarted.append(self.getEventInfo(batchStarted)) + self.batchInfosStarted.append(batchStarted.batchInfo) def onBatchCompleted(self, batchCompleted): - self.batchInfosCompleted.append(self.getEventInfo(batchCompleted)) + self.batchInfosCompleted.append(batchCompleted.batchInfo) def test_batch_info_reports(self): batch_collector = self.BatchInfoCollector() From 47c12ed2d1b5e1f66d5a3df1999074e8eb1054db Mon Sep 17 00:00:00 2001 From: Daniel Jalova Date: Tue, 27 Oct 2015 12:17:47 -0700 Subject: [PATCH 11/17] Added Python class for StreamInputInfo. Improved function for converting Scala maps to Python dict. --- python/pyspark/streaming/listener.py | 49 +++++++++++++++++++--------- python/pyspark/streaming/tests.py | 11 +++---- 2 files changed, 39 insertions(+), 21 deletions(-) diff --git a/python/pyspark/streaming/listener.py b/python/pyspark/streaming/listener.py index 80d138ecf1c05..9c745501255fb 100644 --- a/python/pyspark/streaming/listener.py +++ b/python/pyspark/streaming/listener.py @@ -166,7 +166,7 @@ def onBatchSubmitted(self, batchSubmitted): def onBatchStarted(self, batchStarted): batch_info = BatchInfo(batchStarted.batchInfo()) batch_started = StreamingListenerBatchStarted(batch_info) - self.userStreamingListener .onBatchStarted(batch_started) + self.userStreamingListener.onBatchStarted(batch_started) def onBatchCompleted(self, batchCompleted): batch_info = BatchInfo(batchCompleted.batchInfo()) @@ -192,15 +192,16 @@ def __init__(self, javaBatchInfo): self.processingEndTime = None self.batchTime = javaBatchInfo.batchTime() - self.streamIdToInputInfo = self._map2dict(javaBatchInfo.streamIdToInputInfo()) - + self.streamIdToInputInfo = _map2dict(javaBatchInfo.streamIdToInputInfo(), + StreamInputInfo) self.submissionTime = javaBatchInfo.submissionTime() if javaBatchInfo.processingStartTime().isEmpty() is False: self.processingStartTime = javaBatchInfo.processingStartTime().get() if javaBatchInfo.processingEndTime().isEmpty() is False: self.processingEndTime = javaBatchInfo.processingEndTime().get() - self.outputOperationInfos = self._map2dict(javaBatchInfo.outputOperationInfos()) + self.outputOperationInfos = _map2dict(javaBatchInfo.outputOperationInfos(), + OutputOperationInfo) def schedulingDelay(self): """ @@ -238,17 +239,6 @@ def numRecords(self): """ return len(self.streamIdToInputInfo) - def _map2dict(self, javaMap): - """ - Converts a scala.collection.immutable.Map to a Python dict - """ - mapping = dict() - map_iterator = javaMap.iterator() - while map_iterator.hasNext(): - entry = map_iterator.next() - mapping[entry._1()] = entry._2() - return mapping - class OutputOperationInfo(object): @@ -283,3 +273,32 @@ def __init__(self, receiverInfo): self.lastErrorMessage = receiverInfo.lastErrorMessage() self.lastError = receiverInfo.lastError() self.lastErrorTime = receiverInfo.lastErrorTime() + + +class StreamInputInfo(object): + + def __init__(self, streamInputInfo): + self.inputStreamId = streamInputInfo.inputStreamId() + self.numRecords = streamInputInfo.numRecords() + self.metadata = _map2dict(streamInputInfo.metadata()) + self.metadataDescription = None + if streamInputInfo.metadataDescription().isEmpty() is False: + self.metadataDescription = streamInputInfo.metadataDescription().get() + + +def _map2dict(scalaMap, constructor=None): + """ + Converts a scala.collection.immutable.Map to a Python dict. + Creates an instance of an object as the value if a constructor + is passed. + """ + mapping = dict() + map_iterator = scalaMap.iterator() + while map_iterator.hasNext(): + entry = map_iterator.next() + if constructor is not None: + info = constructor(entry._2()) + mapping[entry._1()] = info + else: + mapping[entry._1()] = entry._2() + return mapping diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 223984f10ad8e..a58bf8d30ff03 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -430,19 +430,19 @@ def func(dstream): expected = [[1], [2], [3], [4]] self._test_func(input, func, expected) - # Test occasionally fails without a delay - time.sleep(.1) - batchInfosSubmitted = batch_collector.batchInfosSubmitted - self.assertEqual(len(batchInfosSubmitted), 4) + batchInfosStarted = batch_collector.batchInfosStarted + batchInfosCompleted = batch_collector.batchInfosCompleted + self.wait_for(batchInfosCompleted, 4) + + self.assertEqual(len(batchInfosSubmitted), 4) for info in batchInfosSubmitted: self.assertIsNone(info.schedulingDelay()) self.assertIsNone(info.processingDelay()) self.assertIsNone(info.totalDelay()) - batchInfosStarted = batch_collector.batchInfosStarted self.assertEqual(len(batchInfosStarted), 4) for info in batchInfosStarted: self.assertIsNotNone(info.schedulingDelay()) @@ -450,7 +450,6 @@ def func(dstream): self.assertIsNone(info.processingDelay()) self.assertIsNone(info.totalDelay()) - batchInfosCompleted = batch_collector.batchInfosCompleted self.assertEqual(len(batchInfosCompleted), 4) for info in batchInfosCompleted: From 9e4e04a6c369da13c3cc8e798f5a5b0e210b24d5 Mon Sep 17 00:00:00 2001 From: Daniel Jalova Date: Tue, 10 Nov 2015 21:45:16 -0800 Subject: [PATCH 12/17] Simplified Python API by using JavaStreamingListener --- python/pyspark/streaming/context.py | 5 +- python/pyspark/streaming/listener.py | 231 +----------------- python/pyspark/streaming/tests.py | 23 +- .../api/java/JavaStreamingListener.scala | 76 ++++++ 4 files changed, 89 insertions(+), 246 deletions(-) diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 848a718bf1931..1388b6d044e04 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -28,7 +28,6 @@ from pyspark.storagelevel import StorageLevel from pyspark.streaming.dstream import DStream from pyspark.streaming.util import TransformFunction, TransformFunctionSerializer -from pyspark.streaming.listener import StreamingListenerAdapter __all__ = ["StreamingContext"] @@ -370,5 +369,5 @@ def addStreamingListener(self, streamingListener): Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for receiving system events related to streaming. """ - streamingListenerAdapter = StreamingListenerAdapter(streamingListener) - self._jssc.addStreamingListener(streamingListenerAdapter) + self._jssc.addStreamingListener(self._jvm.JavaStreamingListenerWrapper( + self._jvm.PythonStreamingListenerWrapper(streamingListener))) diff --git a/python/pyspark/streaming/listener.py b/python/pyspark/streaming/listener.py index 9c745501255fb..b830797f5c0a0 100644 --- a/python/pyspark/streaming/listener.py +++ b/python/pyspark/streaming/listener.py @@ -18,68 +18,6 @@ __all__ = ["StreamingListener"] -class StreamingListenerEvent(object): - - def __init__(self): - pass - - -class StreamingListenerBatchSubmitted(StreamingListenerEvent): - - def __init__(self, batchInfo): - super(StreamingListenerEvent, self).__init__() - self.batchInfo = batchInfo - - -class StreamingListenerBatchCompleted(StreamingListenerEvent): - - def __init__(self, batchInfo): - super(StreamingListenerEvent, self).__init__() - self.batchInfo = batchInfo - - -class StreamingListenerBatchStarted(StreamingListenerEvent): - - def __init__(self, batchInfo): - super(StreamingListenerEvent, self).__init__() - self.batchInfo = batchInfo - - -class StreamingListenerOutputOperationStarted(StreamingListenerEvent): - - def __init__(self, outputOperationInfo): - super(StreamingListenerEvent, self).__init__() - self.outputOperationInfo = outputOperationInfo - - -class StreamingListenerOutputOperationCompleted(StreamingListenerEvent): - - def __init__(self, outputOperationInfo): - super(StreamingListenerEvent, self).__init__() - self.outputOperationInfo = outputOperationInfo - - -class StreamingListenerReceieverStarted(StreamingListenerEvent): - - def __init__(self, receiverInfo): - super(StreamingListenerEvent, self).__init__() - self.receiverInfo = receiverInfo - - -class StreamingListenerReceiverError(StreamingListenerEvent): - - def __init__(self, receiverInfo): - super(StreamingListenerEvent, self).__init__() - self.receiverInfo = receiverInfo - - -class StreamingListenerReceiverStopped(StreamingListenerEvent): - - def __init__(self, receiverInfo): - super(StreamingListenerEvent, self).__init__() - self.receiverInfo = receiverInfo - - class StreamingListener(object): def __init__(self): @@ -134,171 +72,4 @@ def onOutputOperationCompleted(self, outputOperationCompleted): pass class Java: - implements = ["org.apache.spark.streaming.scheduler.StreamingListener"] - - -class StreamingListenerAdapter(StreamingListener): - - def __init__(self, streamingListener): - super(StreamingListener, self).__init__() - self.userStreamingListener = streamingListener - - def onReceiverStarted(self, receiverStarted): - receiver_info = ReceiverInfo(receiverStarted.receiverInfo()) - receiver_started = StreamingListenerReceieverStarted(receiver_info) - self.userStreamingListener.onReceiverStarted(receiver_started) - - def onReceiverError(self, receiverError): - receiver_info = ReceiverInfo(receiverError.receiverInfo()) - receiver_error = StreamingListenerReceiverError(receiver_info) - self.userStreamingListener.onReceiverError(receiver_error) - - def onReceiverStopped(self, receiverStopped): - receiver_info = ReceiverInfo(receiverStopped.receiverInfo()) - receiver_stopped = StreamingListenerReceiverStopped(receiver_info) - self.userStreamingListener.onReceiverStopped(receiver_stopped) - - def onBatchSubmitted(self, batchSubmitted): - batch_info = BatchInfo(batchSubmitted.batchInfo()) - batch_submitted = StreamingListenerBatchSubmitted(batch_info) - self.userStreamingListener.onBatchSubmitted(batch_submitted) - - def onBatchStarted(self, batchStarted): - batch_info = BatchInfo(batchStarted.batchInfo()) - batch_started = StreamingListenerBatchStarted(batch_info) - self.userStreamingListener.onBatchStarted(batch_started) - - def onBatchCompleted(self, batchCompleted): - batch_info = BatchInfo(batchCompleted.batchInfo()) - batch_completed = StreamingListenerBatchCompleted(batch_info) - self.userStreamingListener.onBatchCompleted(batch_completed) - - def onOutputOperationStarted(self, outputOperationStarted): - output_op_info = OutputOperationInfo(outputOperationStarted.outputOperationInfo()) - output_operation_started = StreamingListenerOutputOperationStarted(output_op_info) - self.userStreamingListener.onOutputOperationStarted(output_operation_started) - - def onOutputOperationCompleted(self, outputOperationCompleted): - output_op_info = OutputOperationInfo(outputOperationCompleted.outputOperationInfo()) - output_operation_completed = StreamingListenerOutputOperationCompleted(output_op_info) - self.userStreamingListener.onOutputOperationCompleted(output_operation_completed) - - -class BatchInfo(object): - - def __init__(self, javaBatchInfo): - - self.processingStartTime = None - self.processingEndTime = None - - self.batchTime = javaBatchInfo.batchTime() - self.streamIdToInputInfo = _map2dict(javaBatchInfo.streamIdToInputInfo(), - StreamInputInfo) - self.submissionTime = javaBatchInfo.submissionTime() - if javaBatchInfo.processingStartTime().isEmpty() is False: - self.processingStartTime = javaBatchInfo.processingStartTime().get() - if javaBatchInfo.processingEndTime().isEmpty() is False: - self.processingEndTime = javaBatchInfo.processingEndTime().get() - - self.outputOperationInfos = _map2dict(javaBatchInfo.outputOperationInfos(), - OutputOperationInfo) - - def schedulingDelay(self): - """ - Time taken for the first job of this batch to start processing from the time this batch - was submitted to the streaming scheduler. - """ - if self.processingStartTime is None: - return None - else: - return self.processingStartTime - self.submissionTime - - def processingDelay(self): - """ - Time taken for the all jobs of this batch to finish processing from the time they started - processing. - """ - if self.processingEndTime is None or self.processingStartTime is None: - return None - else: - return self.processingEndTime - self.processingStartTime - - def totalDelay(self): - """ - Time taken for all the jobs of this batch to finish processing from the time they - were submitted - """ - if self.processingEndTime is None or self.processingStartTime is None: - return None - else: - return self.processingDelay() + self.schedulingDelay() - - def numRecords(self): - """ - The number of recorders received by the receivers in this batch. - """ - return len(self.streamIdToInputInfo) - - -class OutputOperationInfo(object): - - def __init__(self, outputOperationInfo): - self.batchTime = outputOperationInfo.batchTime() - self.id = outputOperationInfo.id() - self.name = outputOperationInfo.name() - self.startTime = None - if outputOperationInfo.startTime().isEmpty() is False: - self.startTime = outputOperationInfo.startTime().get() - self.endTime = None - if outputOperationInfo.endTime().isEmpty() is False: - self.endTime = outputOperationInfo.endTime().get() - self.failureReason = None - if outputOperationInfo.failureReason().isEmpty() is False: - self.failureReason = outputOperationInfo.failureReason().get() - - def duration(self): - if self.endTime is None or self.startTime is None: - return None - else: - return self.endTime - self.startTime - - -class ReceiverInfo(object): - - def __init__(self, receiverInfo): - self.streamId = receiverInfo.streamId() - self.name = receiverInfo.name() - self.active = receiverInfo.active() - self.location = receiverInfo.location() - self.lastErrorMessage = receiverInfo.lastErrorMessage() - self.lastError = receiverInfo.lastError() - self.lastErrorTime = receiverInfo.lastErrorTime() - - -class StreamInputInfo(object): - - def __init__(self, streamInputInfo): - self.inputStreamId = streamInputInfo.inputStreamId() - self.numRecords = streamInputInfo.numRecords() - self.metadata = _map2dict(streamInputInfo.metadata()) - self.metadataDescription = None - if streamInputInfo.metadataDescription().isEmpty() is False: - self.metadataDescription = streamInputInfo.metadataDescription().get() - - -def _map2dict(scalaMap, constructor=None): - """ - Converts a scala.collection.immutable.Map to a Python dict. - Creates an instance of an object as the value if a constructor - is passed. - """ - mapping = dict() - map_iterator = scalaMap.iterator() - while map_iterator.hasNext(): - entry = map_iterator.next() - if constructor is not None: - info = constructor(entry._2()) - mapping[entry._1()] = info - else: - mapping[entry._1()] = entry._2() - return mapping + implements = ["org.apache.spark.streaming.api.java.PythonStreamingListener"] diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index fe5c318619741..ee19a59a2c9b4 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -48,7 +48,7 @@ from pyspark.streaming.flume import FlumeUtils from pyspark.streaming.mqtt import MQTTUtils from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream -from pyspark.streaming.listener import StreamingListener, BatchInfo +from pyspark.streaming.listener import StreamingListener class PySparkStreamingTestCase(unittest.TestCase): @@ -417,13 +417,13 @@ def __init__(self): self.batchInfosSubmitted = [] def onBatchSubmitted(self, batchSubmitted): - self.batchInfosSubmitted.append(batchSubmitted.batchInfo) + self.batchInfosSubmitted.append(batchSubmitted.batchInfo()) def onBatchStarted(self, batchStarted): - self.batchInfosStarted.append(batchStarted.batchInfo) + self.batchInfosStarted.append(batchStarted.batchInfo()) def onBatchCompleted(self, batchCompleted): - self.batchInfosCompleted.append(batchCompleted.batchInfo) + self.batchInfosCompleted.append(batchCompleted.batchInfo()) def test_batch_info_reports(self): batch_collector = self.BatchInfoCollector() @@ -444,23 +444,20 @@ def func(dstream): self.assertEqual(len(batchInfosSubmitted), 4) for info in batchInfosSubmitted: - self.assertIsNone(info.schedulingDelay()) - self.assertIsNone(info.processingDelay()) - self.assertIsNone(info.totalDelay()) + self.assertEqual(info.schedulingDelay(), -1L) + self.assertEqual(info.processingDelay(), -1L) + self.assertEqual(info.totalDelay(), -1L) self.assertEqual(len(batchInfosStarted), 4) for info in batchInfosStarted: - self.assertIsNotNone(info.schedulingDelay()) self.assertGreaterEqual(info.schedulingDelay(), 0) - self.assertIsNone(info.processingDelay()) - self.assertIsNone(info.totalDelay()) + self.assertEqual(info.processingDelay(), -1L) + self.assertEqual(info.totalDelay(), -1L) self.assertEqual(len(batchInfosCompleted), 4) for info in batchInfosCompleted: - self.assertIsNotNone(info.schedulingDelay()) - self.assertIsNotNone(info.processingDelay()) - self.assertIsNotNone(info.totalDelay()) + self.assertGreaterEqual(info.schedulingDelay(), 0) self.assertGreaterEqual(info.processingDelay(), 0) self.assertGreaterEqual(info.totalDelay(), 0) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala index 34429074fe804..02beaab25b196 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala @@ -18,6 +18,82 @@ package org.apache.spark.streaming.api.java import org.apache.spark.streaming.Time +import org.apache.spark.streaming.scheduler.StreamingListener + +private[streaming] trait PythonStreamingListener{ + + /** Called when a receiver has been started */ + def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted) { } + + /** Called when a receiver has reported an error */ + def onReceiverError(receiverError: JavaStreamingListenerReceiverError) { } + + /** Called when a receiver has been stopped */ + def onReceiverStopped(receiverStopped: JavaStreamingListenerReceiverStopped) { } + + /** Called when a batch of jobs has been submitted for processing. */ + def onBatchSubmitted(batchSubmitted: JavaStreamingListenerBatchSubmitted) { } + + /** Called when processing of a batch of jobs has started. */ + def onBatchStarted(batchStarted: JavaStreamingListenerBatchStarted) { } + + /** Called when processing of a batch of jobs has completed. */ + def onBatchCompleted(batchCompleted: JavaStreamingListenerBatchCompleted) { } + + /** Called when processing of a job of a batch has started. */ + def onOutputOperationStarted( + outputOperationStarted: JavaStreamingListenerOutputOperationStarted) { } + + /** Called when processing of a job of a batch has completed. */ + def onOutputOperationCompleted( + outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted) { } +} + +class PythonStreamingListenerWrapper(listener: PythonStreamingListener) + extends JavaStreamingListener { + + /** Called when a receiver has been started */ + override def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = { + listener.onReceiverStarted(receiverStarted) + } + + /** Called when a receiver has reported an error */ + override def onReceiverError(receiverError: JavaStreamingListenerReceiverError): Unit = { + listener.onReceiverError(receiverError) + } + + /** Called when a receiver has been stopped */ + override def onReceiverStopped(receiverStopped: JavaStreamingListenerReceiverStopped): Unit = { + listener.onReceiverStopped(receiverStopped) + } + + /** Called when a batch of jobs has been submitted for processing. */ + override def onBatchSubmitted(batchSubmitted: JavaStreamingListenerBatchSubmitted): Unit = { + listener.onBatchSubmitted(batchSubmitted) + } + + /** Called when processing of a batch of jobs has started. */ + override def onBatchStarted(batchStarted: JavaStreamingListenerBatchStarted): Unit = { + listener.onBatchStarted(batchStarted) + } + + /** Called when processing of a batch of jobs has completed. */ + override def onBatchCompleted(batchCompleted: JavaStreamingListenerBatchCompleted): Unit = { + listener.onBatchCompleted(batchCompleted) + } + + /** Called when processing of a job of a batch has started. */ + override def onOutputOperationStarted( + outputOperationStarted: JavaStreamingListenerOutputOperationStarted): Unit = { + listener.onOutputOperationStarted(outputOperationStarted) + } + + /** Called when processing of a job of a batch has completed. */ + override def onOutputOperationCompleted( + outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted): Unit = { + listener.onOutputOperationCompleted(outputOperationCompleted) + } +} /** * A listener interface for receiving information about an ongoing streaming computation. From 5415389e95dbe642dab8f00acb2c57a08d8b97b3 Mon Sep 17 00:00:00 2001 From: Daniel Jalova Date: Tue, 10 Nov 2015 22:57:55 -0800 Subject: [PATCH 13/17] Fixed syntax error for test_batch_info_reports --- python/pyspark/streaming/tests.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index ee19a59a2c9b4..8f3368050006b 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -444,15 +444,15 @@ def func(dstream): self.assertEqual(len(batchInfosSubmitted), 4) for info in batchInfosSubmitted: - self.assertEqual(info.schedulingDelay(), -1L) - self.assertEqual(info.processingDelay(), -1L) - self.assertEqual(info.totalDelay(), -1L) + self.assertEqual(info.schedulingDelay(), -1) + self.assertEqual(info.processingDelay(), -1) + self.assertEqual(info.totalDelay(), -1) self.assertEqual(len(batchInfosStarted), 4) for info in batchInfosStarted: self.assertGreaterEqual(info.schedulingDelay(), 0) - self.assertEqual(info.processingDelay(), -1L) - self.assertEqual(info.totalDelay(), -1L) + self.assertEqual(info.processingDelay(), -1) + self.assertEqual(info.totalDelay(), -1) self.assertEqual(len(batchInfosCompleted), 4) From 7cdaf37856a7cb279ef1dbb3f7a490971448ac7a Mon Sep 17 00:00:00 2001 From: Daniel Jalova Date: Wed, 11 Nov 2015 16:13:54 -0800 Subject: [PATCH 14/17] Added more checks for BatchInfo --- python/pyspark/streaming/tests.py | 21 ++++++++++++++++--- .../api/java/JavaStreamingListener.scala | 2 +- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 8f3368050006b..f3ec84fa1e2c2 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -443,24 +443,39 @@ def func(dstream): self.assertEqual(len(batchInfosSubmitted), 4) for info in batchInfosSubmitted: - + self.assertGreaterEqual(info.batchTime().milliseconds(), 0) + self.assertGreaterEqual(info.submissionTime(), 0) + self.assertTrue(info.streamIdToInputInfo().isEmpty()) + self.assertFalse(info.outputOperationInfos().isEmpty()) + self.assertIsNotNone(info.outputOperationInfos().get(0)) self.assertEqual(info.schedulingDelay(), -1) self.assertEqual(info.processingDelay(), -1) self.assertEqual(info.totalDelay(), -1) + self.assertEqual(info.numRecords(), 0) self.assertEqual(len(batchInfosStarted), 4) for info in batchInfosStarted: + self.assertGreaterEqual(info.batchTime().milliseconds(), 0) + self.assertGreaterEqual(info.submissionTime(), 0) + self.assertTrue(info.streamIdToInputInfo().isEmpty()) + self.assertFalse(info.outputOperationInfos().isEmpty()) + self.assertIsNotNone(info.outputOperationInfos().get(0)) self.assertGreaterEqual(info.schedulingDelay(), 0) self.assertEqual(info.processingDelay(), -1) self.assertEqual(info.totalDelay(), -1) + self.assertEqual(info.numRecords(), 0) self.assertEqual(len(batchInfosCompleted), 4) - for info in batchInfosCompleted: - + self.assertGreaterEqual(info.batchTime().milliseconds(), 0) + self.assertGreaterEqual(info.submissionTime(), 0) + self.assertTrue(info.streamIdToInputInfo().isEmpty()) + self.assertFalse(info.outputOperationInfos().isEmpty()) + self.assertIsNotNone(info.outputOperationInfos().get(0)) self.assertGreaterEqual(info.schedulingDelay(), 0) self.assertGreaterEqual(info.processingDelay(), 0) self.assertGreaterEqual(info.totalDelay(), 0) + self.assertEqual(info.numRecords(), 0) class WindowFunctionTests(PySparkStreamingTestCase): diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala index 02beaab25b196..7bfd6bd5af759 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala @@ -49,7 +49,7 @@ private[streaming] trait PythonStreamingListener{ outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted) { } } -class PythonStreamingListenerWrapper(listener: PythonStreamingListener) +private[streaming] class PythonStreamingListenerWrapper(listener: PythonStreamingListener) extends JavaStreamingListener { /** Called when a receiver has been started */ From ddad2551e54abe60a4148a7a494bfb695015eec4 Mon Sep 17 00:00:00 2001 From: Daniel Jalova Date: Fri, 13 Nov 2015 11:02:50 -0800 Subject: [PATCH 15/17] Fixed asserts in test_batch_info_reports --- python/pyspark/streaming/tests.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index f3ec84fa1e2c2..0054a467a88e2 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -441,37 +441,37 @@ def func(dstream): self.wait_for(batchInfosCompleted, 4) - self.assertEqual(len(batchInfosSubmitted), 4) + self.assertGreaterEqual(len(batchInfosSubmitted), 4) for info in batchInfosSubmitted: self.assertGreaterEqual(info.batchTime().milliseconds(), 0) self.assertGreaterEqual(info.submissionTime(), 0) self.assertTrue(info.streamIdToInputInfo().isEmpty()) self.assertFalse(info.outputOperationInfos().isEmpty()) - self.assertIsNotNone(info.outputOperationInfos().get(0)) + self.assertIsNotNone(info.outputOperationInfos()[0]) self.assertEqual(info.schedulingDelay(), -1) self.assertEqual(info.processingDelay(), -1) self.assertEqual(info.totalDelay(), -1) self.assertEqual(info.numRecords(), 0) - self.assertEqual(len(batchInfosStarted), 4) + self.assertGreaterEqual(len(batchInfosStarted), 4) for info in batchInfosStarted: self.assertGreaterEqual(info.batchTime().milliseconds(), 0) self.assertGreaterEqual(info.submissionTime(), 0) self.assertTrue(info.streamIdToInputInfo().isEmpty()) self.assertFalse(info.outputOperationInfos().isEmpty()) - self.assertIsNotNone(info.outputOperationInfos().get(0)) + self.assertIsNotNone(info.outputOperationInfos()[0]) self.assertGreaterEqual(info.schedulingDelay(), 0) self.assertEqual(info.processingDelay(), -1) self.assertEqual(info.totalDelay(), -1) self.assertEqual(info.numRecords(), 0) - self.assertEqual(len(batchInfosCompleted), 4) + self.assertGreaterEqual(len(batchInfosCompleted), 4) for info in batchInfosCompleted: self.assertGreaterEqual(info.batchTime().milliseconds(), 0) self.assertGreaterEqual(info.submissionTime(), 0) self.assertTrue(info.streamIdToInputInfo().isEmpty()) self.assertFalse(info.outputOperationInfos().isEmpty()) - self.assertIsNotNone(info.outputOperationInfos().get(0)) + self.assertIsNotNone(info.outputOperationInfos()[0]) self.assertGreaterEqual(info.schedulingDelay(), 0) self.assertGreaterEqual(info.processingDelay(), 0) self.assertGreaterEqual(info.totalDelay(), 0) From 5349c825de7ec1483b8d9c28480ec4071f75ea01 Mon Sep 17 00:00:00 2001 From: Daniel Jalova Date: Fri, 13 Nov 2015 15:02:34 -0800 Subject: [PATCH 16/17] Added more checks to test_batch_info_reports --- python/pyspark/streaming/tests.py | 36 +++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 4d4195a4a9f3e..8a285523ada16 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -445,9 +445,15 @@ def func(dstream): for info in batchInfosSubmitted: self.assertGreaterEqual(info.batchTime().milliseconds(), 0) self.assertGreaterEqual(info.submissionTime(), 0) - self.assertTrue(info.streamIdToInputInfo().isEmpty()) - self.assertFalse(info.outputOperationInfos().isEmpty()) - self.assertIsNotNone(info.outputOperationInfos()[0]) + + for streamId in info.streamIdToInputInfo(): + self.assertIsNotNone(info.streamIdToInputInfo()[streamId]) + # access fields of streamInputInfo + + for outputOpId in info.outputOperationInfos(): + self.assertIsNotNone(info.outputOperationInfos()[outputOpId]) + # access fields of outputOperationInfo + self.assertEqual(info.schedulingDelay(), -1) self.assertEqual(info.processingDelay(), -1) self.assertEqual(info.totalDelay(), -1) @@ -457,9 +463,15 @@ def func(dstream): for info in batchInfosStarted: self.assertGreaterEqual(info.batchTime().milliseconds(), 0) self.assertGreaterEqual(info.submissionTime(), 0) - self.assertTrue(info.streamIdToInputInfo().isEmpty()) - self.assertFalse(info.outputOperationInfos().isEmpty()) - self.assertIsNotNone(info.outputOperationInfos()[0]) + + for streamId in info.streamIdToInputInfo(): + self.assertIsNotNone(info.streamIdToInputInfo()[streamId]) + # access fields of streamInputInfo + + for outputOpId in info.outputOperationInfos(): + self.assertIsNotNone(info.outputOperationInfos()[outputOpId]) + # access fields of outputOperationInfo + self.assertGreaterEqual(info.schedulingDelay(), 0) self.assertEqual(info.processingDelay(), -1) self.assertEqual(info.totalDelay(), -1) @@ -469,9 +481,15 @@ def func(dstream): for info in batchInfosCompleted: self.assertGreaterEqual(info.batchTime().milliseconds(), 0) self.assertGreaterEqual(info.submissionTime(), 0) - self.assertTrue(info.streamIdToInputInfo().isEmpty()) - self.assertFalse(info.outputOperationInfos().isEmpty()) - self.assertIsNotNone(info.outputOperationInfos()[0]) + + for streamId in info.streamIdToInputInfo(): + self.assertIsNotNone(info.streamIdToInputInfo()[streamId]) + # access fields of streamInputInfo + + for outputOpId in info.outputOperationInfos(): + self.assertIsNotNone(info.outputOperationInfos()[outputOpId]) + # access fields of outputOperationInfo + self.assertGreaterEqual(info.schedulingDelay(), 0) self.assertGreaterEqual(info.processingDelay(), 0) self.assertGreaterEqual(info.totalDelay(), 0) From c941c3e5e457c8fe3fbf546c5e09aa954075be19 Mon Sep 17 00:00:00 2001 From: Daniel Jalova Date: Fri, 13 Nov 2015 16:07:48 -0800 Subject: [PATCH 17/17] Fixed tests in test_batch_info_reports --- python/pyspark/streaming/tests.py | 54 ++++++++++++++++++++++++------- 1 file changed, 42 insertions(+), 12 deletions(-) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 8a285523ada16..2983028413bb8 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -447,12 +447,22 @@ def func(dstream): self.assertGreaterEqual(info.submissionTime(), 0) for streamId in info.streamIdToInputInfo(): - self.assertIsNotNone(info.streamIdToInputInfo()[streamId]) - # access fields of streamInputInfo + streamInputInfo = info.streamIdToInputInfo()[streamId] + self.assertGreaterEqual(streamInputInfo.inputStreamId(), 0) + self.assertGreaterEqual(streamInputInfo.numRecords, 0) + for key in streamInputInfo.metadata(): + self.assertIsNotNone(streamInputInfo.metadata()[key]) + self.assertIsNotNone(streamInputInfo.metadataDescription()) for outputOpId in info.outputOperationInfos(): - self.assertIsNotNone(info.outputOperationInfos()[outputOpId]) - # access fields of outputOperationInfo + outputInfo = info.outputOperationInfos()[outputOpId] + self.assertGreaterEqual(outputInfo.batchTime().milliseconds(), 0) + self.assertGreaterEqual(outputInfo.id(), 0) + self.assertIsNotNone(outputInfo.name()) + self.assertIsNotNone(outputInfo.description()) + self.assertGreaterEqual(outputInfo.startTime(), -1) + self.assertGreaterEqual(outputInfo.endTime(), -1) + self.assertIsNone(outputInfo.failureReason()) self.assertEqual(info.schedulingDelay(), -1) self.assertEqual(info.processingDelay(), -1) @@ -465,12 +475,22 @@ def func(dstream): self.assertGreaterEqual(info.submissionTime(), 0) for streamId in info.streamIdToInputInfo(): - self.assertIsNotNone(info.streamIdToInputInfo()[streamId]) - # access fields of streamInputInfo + streamInputInfo = info.streamIdToInputInfo()[streamId] + self.assertGreaterEqual(streamInputInfo.inputStreamId(), 0) + self.assertGreaterEqual(streamInputInfo.numRecords, 0) + for key in streamInputInfo.metadata(): + self.assertIsNotNone(streamInputInfo.metadata()[key]) + self.assertIsNotNone(streamInputInfo.metadataDescription()) for outputOpId in info.outputOperationInfos(): - self.assertIsNotNone(info.outputOperationInfos()[outputOpId]) - # access fields of outputOperationInfo + outputInfo = info.outputOperationInfos()[outputOpId] + self.assertGreaterEqual(outputInfo.batchTime().milliseconds(), 0) + self.assertGreaterEqual(outputInfo.id(), 0) + self.assertIsNotNone(outputInfo.name()) + self.assertIsNotNone(outputInfo.description()) + self.assertGreaterEqual(outputInfo.startTime(), -1) + self.assertGreaterEqual(outputInfo.endTime(), -1) + self.assertIsNone(outputInfo.failureReason()) self.assertGreaterEqual(info.schedulingDelay(), 0) self.assertEqual(info.processingDelay(), -1) @@ -483,12 +503,22 @@ def func(dstream): self.assertGreaterEqual(info.submissionTime(), 0) for streamId in info.streamIdToInputInfo(): - self.assertIsNotNone(info.streamIdToInputInfo()[streamId]) - # access fields of streamInputInfo + streamInputInfo = info.streamIdToInputInfo()[streamId] + self.assertGreaterEqual(streamInputInfo.inputStreamId(), 0) + self.assertGreaterEqual(streamInputInfo.numRecords, 0) + for key in streamInputInfo.metadata(): + self.assertIsNotNone(streamInputInfo.metadata()[key]) + self.assertIsNotNone(streamInputInfo.metadataDescription()) for outputOpId in info.outputOperationInfos(): - self.assertIsNotNone(info.outputOperationInfos()[outputOpId]) - # access fields of outputOperationInfo + outputInfo = info.outputOperationInfos()[outputOpId] + self.assertGreaterEqual(outputInfo.batchTime().milliseconds(), 0) + self.assertGreaterEqual(outputInfo.id(), 0) + self.assertIsNotNone(outputInfo.name()) + self.assertIsNotNone(outputInfo.description()) + self.assertGreaterEqual(outputInfo.startTime(), 0) + self.assertGreaterEqual(outputInfo.endTime(), 0) + self.assertIsNone(outputInfo.failureReason()) self.assertGreaterEqual(info.schedulingDelay(), 0) self.assertGreaterEqual(info.processingDelay(), 0)