@@ -89,15 +89,17 @@ private[spark] class MesosClusterScheduler(conf: SparkConf)
8989 var frameworkUrl : String = _
9090 val master = conf.get(" spark.master" )
9191 val appName = conf.get(" spark.app.name" )
92- val capacity = conf.getInt(" spark.mesos.driver.capacity" , 200 )
92+ val queuedCapacity = conf.getInt(" spark.deploy.mesos.queuedDrivers" , 200 )
93+ val retainedDrivers = conf.getInt(" spark.deploy.retainedDrivers" , 200 )
94+
9395 val stateLock = new Object
9496 val launchedDrivers = new mutable.HashMap [String , ClusterTaskState ]()
9597
96- // TODO: Bound this finished drivers map or make it a array
97- val finishedDrivers = new mutable.HashMap [String , ClusterTaskState ]()
98+ val finishedDrivers = new mutable.ArrayBuffer [ClusterTaskState ](retainedDrivers)
9899 val nextDriverNumber : AtomicLong = new AtomicLong (0 )
99100 var appId : String = _
100- private val queue = new LinkedBlockingQueue [DriverSubmission ](capacity)
101+
102+ private val queue = new LinkedBlockingQueue [DriverSubmission ](queuedCapacity)
101103
102104 def createDateFormat = new SimpleDateFormat (" yyyyMMddHHmmss" ) // For application IDs
103105
@@ -319,7 +321,7 @@ private[spark] class MesosClusterScheduler(conf: SparkConf)
319321 ClusterSchedulerState (
320322 queueCopy,
321323 copyDriverStates(launchedDrivers.values),
322- copyDriverStates(finishedDrivers.values ))
324+ copyDriverStates(finishedDrivers))
323325 }
324326 }
325327
@@ -378,11 +380,18 @@ private[spark] class MesosClusterScheduler(conf: SparkConf)
378380 val driverState = getDriverState(status.getState)
379381 val state = if (isFinished(status.getState)) {
380382 val launchedState = launchedDrivers.remove(taskId).get
381- finishedDrivers(taskId) = launchedState
383+ if (finishedDrivers.size >= retainedDrivers) {
384+ val toRemove = math.max(retainedDrivers / 10 , 1 )
385+ finishedDrivers.trimStart(toRemove)
386+ }
387+
388+ finishedDrivers += launchedState
389+
382390 launchedState
383391 } else {
384392 launchedDrivers(taskId)
385393 }
394+
386395 state.taskState = Option (status)
387396 state.driverState = driverState
388397 } else {
0 commit comments