From 0e014b02d03eeda8373cd8892662ed6ce9de664c Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 23 Sep 2016 16:57:28 -0700 Subject: [PATCH] [SPARK-17649][CORE] Log how many Spark events got dropped in LiveListenerBus Log how many Spark events got dropped in LiveListenerBus so that the user can get insights on how to set a correct event queue size. Jenkins Author: Shixiong Zhu Closes #15220 from zsxwing/SPARK-17649. --- .../spark/util/AsynchronousListenerBus.scala | 27 ++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala index 6c1fca71f228..b5455fface34 100644 --- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala @@ -18,7 +18,8 @@ package org.apache.spark.util import java.util.concurrent._ -import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} + import scala.util.DynamicVariable import org.apache.spark.SparkContext @@ -51,6 +52,12 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri // Indicate if `stop()` is called private val stopped = new AtomicBoolean(false) + /** A counter for dropped events. It will be reset every time we log it. */ + private val droppedEventsCounter = new AtomicLong(0L) + + /** When `droppedEventsCounter` was logged last time in milliseconds. */ + @volatile private var lastReportTimestamp = 0L + // Indicate if we are processing some event // Guarded by `self` private var processingEvent = false @@ -117,6 +124,24 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri eventLock.release() } else { onDropEvent(event) + droppedEventsCounter.incrementAndGet() + } + + val droppedEvents = droppedEventsCounter.get + if (droppedEvents > 0) { + // Don't log too frequently + if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) { + // There may be multiple threads trying to decrease droppedEventsCounter. + // Use "compareAndSet" to make sure only one thread can win. + // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and + // then that thread will update it. + if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) { + val prevLastReportTimestamp = lastReportTimestamp + lastReportTimestamp = System.currentTimeMillis() + logWarning(s"Dropped $droppedEvents SparkListenerEvents since " + + new java.util.Date(prevLastReportTimestamp)) + } + } } }