From 2a9a1d404a9e87ea5758dc94f1ea0efa8b434797 Mon Sep 17 00:00:00 2001 From: bxshi Date: Sat, 28 Jun 2014 11:12:42 -0400 Subject: [PATCH] fix SPARK-2228 change hard coded EVENT_QUEUE_CAPACITY to spark.liveListenerBus.eventQueueCapacity --- .../main/scala/org/apache/spark/SparkContext.scala | 2 +- .../apache/spark/scheduler/LiveListenerBus.scala | 6 +++--- .../org/apache/spark/storage/ThreadingTest.scala | 2 +- .../scheduler/EventLoggingListenerSuite.scala | 2 +- .../spark/scheduler/SparkListenerSuite.scala | 14 +++++++------- .../apache/spark/storage/BlockManagerSuite.scala | 2 +- 6 files changed, 14 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f9476ff826a6..7fbb73b61a0a 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -196,7 +196,7 @@ class SparkContext(config: SparkConf) extends Logging { if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true") // An asynchronous listener bus for Spark events - private[spark] val listenerBus = new LiveListenerBus + private[spark] val listenerBus = new LiveListenerBus(conf) // Create the Spark execution environment (cache, map output tracker, etc) private[spark] val env = SparkEnv.create( diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 36a6e6338faa..fdf3c49b7a9f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler import java.util.concurrent.{LinkedBlockingQueue, Semaphore} -import org.apache.spark.Logging +import org.apache.spark.{SparkConf, Logging} import org.apache.spark.util.Utils /** @@ -29,11 +29,11 @@ import org.apache.spark.util.Utils * has started will events be actually propagated to all attached listeners. This listener bus * is stopped when it receives a SparkListenerShutdown event, which is posted using stop(). */ -private[spark] class LiveListenerBus extends SparkListenerBus with Logging { +private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus with Logging { /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */ - private val EVENT_QUEUE_CAPACITY = 10000 + private val EVENT_QUEUE_CAPACITY = conf.getInt("spark.liveListenerBus.eventQueueCapacity", 10000) private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) private var queueFullErrorMessageLogged = false private var started = false diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala index a107c5182b3b..6ac80ff972f9 100644 --- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala +++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala @@ -97,7 +97,7 @@ private[spark] object ThreadingTest { val conf = new SparkConf() val serializer = new KryoSerializer(conf) val blockManagerMaster = new BlockManagerMaster( - actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))), + actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus(conf)))), conf) val blockManager = new BlockManager( "", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf, diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 21e3db34b8b7..4c7dda70fe01 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -226,7 +226,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { private def testEventLogging(compressionCodec: Option[String] = None) { val conf = getLoggingConf(logDirPath, compressionCodec) val eventLogger = new EventLoggingListener("test", conf) - val listenerBus = new LiveListenerBus + val listenerBus = new LiveListenerBus(conf) val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey") val applicationEnd = SparkListenerApplicationEnd(1000L) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 6df0a080961b..c780bb9b5da6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -24,7 +24,7 @@ import scala.collection.mutable import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} import org.scalatest.Matchers -import org.apache.spark.{LocalSparkContext, SparkContext} +import org.apache.spark.{SparkConf, LocalSparkContext, SparkContext} import org.apache.spark.SparkContext._ import org.apache.spark.executor.TaskMetrics @@ -33,7 +33,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers /** Length of time to wait while draining listener events. */ val WAIT_TIMEOUT_MILLIS = 10000 - + val conf = new SparkConf() before { sc = new SparkContext("local", "SparkListenerSuite") } @@ -44,7 +44,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers test("basic creation and shutdown of LiveListenerBus") { val counter = new BasicJobCounter - val bus = new LiveListenerBus + val bus = new LiveListenerBus(conf) bus.addListener(counter) // Listener bus hasn't started yet, so posting events should not increment counter @@ -63,14 +63,14 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers // Listener bus must not be started twice intercept[IllegalStateException] { - val bus = new LiveListenerBus + val bus = new LiveListenerBus(conf) bus.start() bus.start() } // ... or stopped before starting intercept[IllegalStateException] { - val bus = new LiveListenerBus + val bus = new LiveListenerBus(conf) bus.stop() } } @@ -98,7 +98,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers } } - val bus = new LiveListenerBus + val bus = new LiveListenerBus(conf) val blockingListener = new BlockingListener bus.addListener(blockingListener) @@ -338,7 +338,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers val badListener = new BadListener val jobCounter1 = new BasicJobCounter val jobCounter2 = new BasicJobCounter - val bus = new LiveListenerBus + val bus = new LiveListenerBus(conf) // Propagate events to bad listener first bus.addListener(badListener) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index d7dbe5164b7f..0c16bd9f4aec 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -66,7 +66,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter conf.set("spark.driver.port", boundPort.toString) master = new BlockManagerMaster( - actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))), + actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus(conf)))), conf) // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case