@@ -30,11 +30,12 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
3030import org .apache .spark .internal .Logging
3131import org .apache .spark .scheduler .LiveListenerBus
3232
33-
34- private class ListenerEventExecutor ( listenerName : String , queueCapacity : Int ) extends Logging {
33+ private class ListenerEventExecutor [ L <: AnyRef ] ( listenerName : String , queueCapacity : Int )
34+ extends Logging {
3535 private val threadFactory = new ThreadFactoryBuilder ().setDaemon(true )
3636 .setNameFormat(listenerName + " -event-executor" )
3737 .build()
38+ val listeners = new CopyOnWriteArrayList [L ]()
3839 /** Holds the events to be processed by this listener. */
3940 private val eventQueue = new LinkedBlockingQueue [Runnable ](queueCapacity)
4041 /**
@@ -55,10 +56,18 @@ private class ListenerEventExecutor(listenerName: String, queueCapacity: Int) ex
5556 * guarantee that we do not process any event before starting the event executor.
5657 */
5758 private val isStarted = new AtomicBoolean (false )
58- private val lock = new ReentrantLock ();
59+ private val lock = new ReentrantLock ()
5960 /** Condition variable which is signaled once the event executor is started */
6061 private val startCondition : Condition = lock.newCondition
6162
63+ def addListener (listener : L ): Unit = {
64+ listeners.add(listener)
65+ }
66+
67+ def removeListener (listener : L ): Unit = {
68+ listeners.remove(listener)
69+ }
70+
6271 def start (): Unit = {
6372 isStarted.set(true )
6473 lock.lock()
@@ -133,7 +142,8 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
133142 // Cap the capacity of the event queue so we get an explicit error (rather than
134143 // an OOM exception) if it's perpetually being added to more quickly than it's being drained.
135144 protected def eventQueueSize = 10000
136- private val listenerAndEventExecutors = new CopyOnWriteArrayList [(L , ListenerEventExecutor )]()
145+ private val eventGroupToEventExecutors =
146+ new ConcurrentHashMap [String , ListenerEventExecutor [L ]] ()
137147
138148 // Indicate if `start()` is called
139149 private val started = new AtomicBoolean (false )
@@ -143,11 +153,19 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
143153 /**
144154 * Add a listener to listen events. This method is thread-safe and can be called in any thread.
145155 */
146- final def addListener (listener : L ): Unit = {
147- val eventProcessor = new ListenerEventExecutor (listener.getClass.getName, eventQueueSize)
148- listenerAndEventExecutors.add((listener, eventProcessor))
156+ final def addListener (
157+ listener : L , eventListenerGroup : String = ListenerEventExecutor .DefaultEventListenerGroup ):
158+ Unit = synchronized {
159+ var listenerEventExecutor = eventGroupToEventExecutors.get(eventListenerGroup)
160+ if (listenerEventExecutor == null ) {
161+ listenerEventExecutor =
162+ new ListenerEventExecutor [L ](listener.getClass.getName, eventQueueSize)
163+ eventGroupToEventExecutors.put(eventListenerGroup, listenerEventExecutor)
164+
165+ }
166+ listenerEventExecutor.addListener(listener)
149167 if (started.get()) {
150- eventProcessor .start
168+ listenerEventExecutor .start
151169 }
152170 }
153171
@@ -156,14 +174,8 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
156174 * in any thread.
157175 */
158176 final def removeListener (listener : L ): Unit = {
159- val iter = listenerAndEventExecutors.iterator()
160- var index = 0
161- while (iter.hasNext) {
162- if (iter.next()._1 == listener) {
163- listenerAndEventExecutors.remove(index)
164- return
165- }
166- index = index + 1
177+ for (eventExecutor <- eventGroupToEventExecutors.values().asScala) {
178+ eventExecutor.removeListener(listener)
167179 }
168180 }
169181
@@ -172,10 +184,8 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
172184 * any of the existing listener
173185 */
174186 def isListenerBusEmpty : Boolean = {
175- val iter = listenerAndEventExecutors.iterator()
176- while (iter.hasNext) {
177- val listenerEvenProcessor = iter.next._2
178- if (! listenerEvenProcessor.isEmpty) {
187+ for (eventExecutor <- eventGroupToEventExecutors.values().asScala) {
188+ if (! eventExecutor.isEmpty) {
179189 return false
180190 }
181191 }
@@ -188,19 +198,19 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
188198 * the {@link ListenerEventExecutor}.
189199 */
190200 final def postToAll (event : E ): Unit = {
191- // JavaConverters can create a JIterableWrapper if we use asScala.
192- // However, this method will be called frequently. To avoid the wrapper cost, here we use
193- // Java Iterator directly.
194- val iter = listenerAndEventExecutors.iterator()
195- while (iter.hasNext) {
196- val item = iter.next()
197- val listener = item._1
198- val listenerEventProcessor = item._2
201+ for (listenerEventProcessor <- eventGroupToEventExecutors.values().asScala) {
202+ // JavaConverters can create a JIterableWrapper if we use asScala.
203+ // However, this method will be called frequently. To avoid the wrapper cost, here we use
204+ // Java Iterator directly.
205+ val iter = listenerEventProcessor.listeners.iterator()
206+ while (iter.hasNext) {
207+ val listener = iter.next()
199208 listenerEventProcessor.submit(new Runnable {
200209 override def run (): Unit = Utils .tryLogNonFatalError {
201210 doPostEvent(listener, event)
202211 }
203212 })
213+ }
204214 }
205215 }
206216
@@ -210,15 +220,19 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
210220 * events.
211221 */
212222 final def postToAllSync (event : E ): Unit = {
213- val iter = listenerAndEventExecutors.iterator()
214- while (iter.hasNext) {
215- val item = iter.next()
216- val listener = item._1
217- try {
218- doPostEvent(listener, event)
219- } catch {
220- case NonFatal (e) =>
221- logError(s " Listener ${Utils .getFormattedClassName(listener)} threw an exception " , e)
223+ for (listenerEventProcessor <- eventGroupToEventExecutors.values().asScala) {
224+ // JavaConverters can create a JIterableWrapper if we use asScala.
225+ // However, this method will be called frequently. To avoid the wrapper cost, here we use
226+ // Java Iterator directly.
227+ val iter = listenerEventProcessor.listeners.iterator()
228+ while (iter.hasNext) {
229+ val listener = iter.next()
230+ try {
231+ doPostEvent(listener, event)
232+ } catch {
233+ case NonFatal (e) =>
234+ logError(s " Listener ${Utils .getFormattedClassName(listener)} threw an exception " , e)
235+ }
222236 }
223237 }
224238 }
@@ -231,11 +245,11 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
231245
232246 private [spark] def findListenersByClass [T <: L : ClassTag ](): Seq [T ] = {
233247 val c = implicitly[ClassTag [T ]].runtimeClass
234- listenerAndEventExecutors.asScala .filter(_._1. getClass == c).map(_._1 .asInstanceOf [T ])
248+ listeners().toSeq .filter(_.getClass == c).map(_.asInstanceOf [T ])
235249 }
236250
237251 private [spark] def listeners (): Seq [L ] = {
238- listenerAndEventExecutors. asScala.map(_._1)
252+ eventGroupToEventExecutors.values. asScala.map(l => l.listeners.asScala).flatten.toSeq
239253 }
240254
241255 /**
@@ -250,9 +264,8 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
250264 if (! started.compareAndSet(false , true )) {
251265 throw new IllegalStateException (s " already started! " )
252266 }
253- val iter = listenerAndEventExecutors.iterator()
254- while (iter.hasNext) {
255- iter.next()._2.start()
267+ for (eventExecutor <- eventGroupToEventExecutors.values().asScala) {
268+ eventExecutor.start()
256269 }
257270 }
258271
@@ -268,10 +281,19 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
268281 } else {
269282 // Keep quiet
270283 }
271- val iter = listenerAndEventExecutors .iterator()
284+ val iter = eventGroupToEventExecutors.values() .iterator()
272285 while (iter.hasNext) {
273- iter.next()._2. stop()
286+ iter.next().stop()
274287 }
275288 }
276289}
277290
291+ private [spark] object ListenerEventExecutor {
292+ val DefaultEventListenerGroup = " default-event-listener"
293+ val DefaultUserEventListenerGroup = " default-user-event-listener"
294+ val ExecutorAllocationManagerGroup = " executor-allocation-manager-listener"
295+ val HeartBeatReceiverGroup = " heart-beat-receiver-listener"
296+ val EventLoggingGroup = " event-logging-listener"
297+ // Allows for Context to check whether stop() call is made within listener thread
298+ }
299+
0 commit comments