Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class SparkContext(config: SparkConf) extends Logging {
if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")

// An asynchronous listener bus for Spark events
private[spark] val listenerBus = new LiveListenerBus
private[spark] val listenerBus = new LiveListenerBus(conf)

// Create the Spark execution environment (cache, map output tracker, etc)
private[spark] val env = SparkEnv.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.scheduler

import java.util.concurrent.{LinkedBlockingQueue, Semaphore}

import org.apache.spark.Logging
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.util.Utils

/**
Expand All @@ -29,11 +29,11 @@ import org.apache.spark.util.Utils
* has started will events be actually propagated to all attached listeners. This listener bus
* is stopped when it receives a SparkListenerShutdown event, which is posted using stop().
*/
private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus with Logging {

/* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
* an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
private val EVENT_QUEUE_CAPACITY = 10000
private val EVENT_QUEUE_CAPACITY = conf.getInt("spark.liveListenerBus.eventQueueCapacity", 10000)
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
private var queueFullErrorMessageLogged = false
private var started = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private[spark] object ThreadingTest {
val conf = new SparkConf()
val serializer = new KryoSerializer(conf)
val blockManagerMaster = new BlockManagerMaster(
actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus(conf)))),
conf)
val blockManager = new BlockManager(
"<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
private def testEventLogging(compressionCodec: Option[String] = None) {
val conf = getLoggingConf(logDirPath, compressionCodec)
val eventLogger = new EventLoggingListener("test", conf)
val listenerBus = new LiveListenerBus
val listenerBus = new LiveListenerBus(conf)
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey")
val applicationEnd = SparkListenerApplicationEnd(1000L)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.mutable
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
import org.scalatest.Matchers

import org.apache.spark.{LocalSparkContext, SparkContext}
import org.apache.spark.{SparkConf, LocalSparkContext, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.executor.TaskMetrics

Expand All @@ -33,7 +33,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers

/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000

val conf = new SparkConf()
before {
sc = new SparkContext("local", "SparkListenerSuite")
}
Expand All @@ -44,7 +44,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers

test("basic creation and shutdown of LiveListenerBus") {
val counter = new BasicJobCounter
val bus = new LiveListenerBus
val bus = new LiveListenerBus(conf)
bus.addListener(counter)

// Listener bus hasn't started yet, so posting events should not increment counter
Expand All @@ -63,14 +63,14 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers

// Listener bus must not be started twice
intercept[IllegalStateException] {
val bus = new LiveListenerBus
val bus = new LiveListenerBus(conf)
bus.start()
bus.start()
}

// ... or stopped before starting
intercept[IllegalStateException] {
val bus = new LiveListenerBus
val bus = new LiveListenerBus(conf)
bus.stop()
}
}
Expand Down Expand Up @@ -98,7 +98,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
}
}

val bus = new LiveListenerBus
val bus = new LiveListenerBus(conf)
val blockingListener = new BlockingListener

bus.addListener(blockingListener)
Expand Down Expand Up @@ -338,7 +338,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
val badListener = new BadListener
val jobCounter1 = new BasicJobCounter
val jobCounter2 = new BasicJobCounter
val bus = new LiveListenerBus
val bus = new LiveListenerBus(conf)

// Propagate events to bad listener first
bus.addListener(badListener)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
conf.set("spark.driver.port", boundPort.toString)

master = new BlockManagerMaster(
actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus(conf)))),
conf)

// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
Expand Down