Skip to content

Commit 7d50848

Browse files
committed
Refactored StreamingListener methods and added getEventInfo.
1 parent 3523e1f commit 7d50848

File tree

1 file changed

+19
-22
lines changed

1 file changed

+19
-22
lines changed

python/pyspark/streaming/listener.py

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,32 +15,13 @@
1515
# limitations under the License.
1616
#
1717

18-
19-
from py4j.java_gateway import JavaGateway
20-
21-
22-
"""
23-
class StreamingListenerEvent(object):
24-
25-
class StreamingListenerBatchSubmitted(StreamingListenerEvent, batchInfo):
26-
27-
class StreamingListenerBatchCompleted(StreamingListenerEvent, batchInfo):
28-
29-
class StreamingListenerBatchStarted(StreamingListenerEvent, batchInfo):
30-
31-
class StreamingListenerStarted(StreamingListenerEvent, receiverInfo):
32-
33-
class StreamingListenerReceiverError(StreamingListenerEvent, receiverInfo):
34-
35-
class StreamingListenerReceiverStopped(StreamingListenerEvent, receiverInfo):
36-
"""
37-
3818
__all__ = ["StreamingListener"]
3919

4020

4121
class StreamingListener(object):
4222

4323
def __init__(self):
24+
pass
4425

4526
# Called when a receiver has been started.
4627
def onReceiverStarted(self, receiverStarted):
@@ -50,12 +31,13 @@ def onReceiverStarted(self, receiverStarted):
5031
def onReceiverError(self, receiverError):
5132
pass
5233

53-
# Called when a receiver has been stopped
34+
# Called when a receiver has been stopped.
5435
def onReceiverStopped(self, receiverStopped):
5536
pass
5637

5738
# Called when a batch of jobs has been submitted for processing.
5839
def onBatchSubmitted(self, batchSubmitted):
40+
pass
5941

6042
# Called when processing of a batch of jobs has started.
6143
def onBatchStarted(self, batchStarted):
@@ -73,6 +55,21 @@ def onOutputOperationStarted(self, outputOperationStarted):
7355
def onOutputOperationCompleted(self, outputOperationCompleted):
7456
pass
7557

58+
def getEventInfo(self, event):
59+
"""
60+
:param event: StreamingListenerEvent
61+
:return Returns a BatchInfo, OutputOperationInfo, or ReceiverInfo based on
62+
event passed.
63+
"""
64+
event_name = event.getClass().getSimpleName()
65+
if 'Batch' in event_name:
66+
return event.batchInfo()
67+
68+
elif 'Output' in event_name:
69+
return event.outputOperationInfo()
70+
71+
elif 'Receiver' in event_name:
72+
return event.receiverInfo()
73+
7674
class Java:
7775
implements = ["org.apache.spark.streaming.scheduler.StreamingListener"]
78-

0 commit comments

Comments
 (0)