@@ -80,6 +80,9 @@ def disconnect_all(self,
8080 self ._wait_for_timeout (after_idle_for , after )
8181 return ray .get (self .actor .disconnect_all .remote (stream_drain_timeout ))
8282
83+ def event_count (self ):
84+ return ray .get (self .actor .event_count .remote ())
85+
8386 def _meta (self , action , * args , ** kwargs ):
8487 return ray .get (self .actor ._meta .remote (action , * args , ** kwargs ))
8588
@@ -119,6 +122,7 @@ def __init__(self, name, operator=None):
119122 self ._sinks = {}
120123 self ._latest_sent_event_timestamp = None
121124 self ._limit_subscribers = False
125+ self ._event_counter = 0
122126
123127 def send_to (self , subscriber , name = None ):
124128 if self ._limit_subscribers :
@@ -142,6 +146,7 @@ def append(self, data):
142146 continue
143147 _eval (subscriber , data )
144148 self ._latest_sent_event_timestamp = time .time ()
149+ self ._event_counter += 1
145150
146151 def add_operator (self , operator ):
147152 self ._operator = operator
@@ -195,6 +200,9 @@ def disconnect_all(self, stream_drain_timeout):
195200 for sink_name in dict (self ._sinks ):
196201 self .disconnect_sink (sink_name )
197202
203+ def event_count (self ):
204+ return self .event_count
205+
198206 def _meta (self , action , * args , ** kwargs ):
199207 return verify_do (self , _global_camel , action , * args , ** kwargs )
200208
0 commit comments