@@ -59,65 +59,67 @@ private[spark] class Master(
5959
6060 import context .dispatcher // to use Akka's scheduler.schedule()
6161
62- val hadoopConf = SparkHadoopUtil .get.newConfiguration(conf)
63-
64- def createDateFormat = new SimpleDateFormat (" yyyyMMddHHmmss" ) // For application IDs
65- val WORKER_TIMEOUT = conf.getLong(" spark.worker.timeout" , 60 ) * 1000
66- val RETAINED_APPLICATIONS = conf.getInt(" spark.deploy.retainedApplications" , 200 )
67- val RETAINED_DRIVERS = conf.getInt(" spark.deploy.retainedDrivers" , 200 )
68- val REAPER_ITERATIONS = conf.getInt(" spark.dead.worker.persistence" , 15 )
69- val RECOVERY_MODE = conf.get(" spark.deploy.recoveryMode" , " NONE" )
70-
71- val workers = new HashSet [WorkerInfo ]
72- val idToWorker = new HashMap [String , WorkerInfo ]
73- val addressToWorker = new HashMap [Address , WorkerInfo ]
74-
75- val apps = new HashSet [ApplicationInfo ]
76- val idToApp = new HashMap [String , ApplicationInfo ]
77- val actorToApp = new HashMap [ActorRef , ApplicationInfo ]
78- val addressToApp = new HashMap [Address , ApplicationInfo ]
79- val waitingApps = new ArrayBuffer [ApplicationInfo ]
80- val completedApps = new ArrayBuffer [ApplicationInfo ]
81- var nextAppNumber = 0
82- val appIdToUI = new HashMap [String , SparkUI ]
83-
84- val drivers = new HashSet [DriverInfo ]
85- val completedDrivers = new ArrayBuffer [DriverInfo ]
86- val waitingDrivers = new ArrayBuffer [DriverInfo ] // Drivers currently spooled for scheduling
87- var nextDriverNumber = 0
62+ private val hadoopConf = SparkHadoopUtil .get.newConfiguration(conf)
63+
64+ private def createDateFormat = new SimpleDateFormat (" yyyyMMddHHmmss" ) // For application IDs
65+
66+ private val WORKER_TIMEOUT = conf.getLong(" spark.worker.timeout" , 60 ) * 1000
67+ private val RETAINED_APPLICATIONS = conf.getInt(" spark.deploy.retainedApplications" , 200 )
68+ private val RETAINED_DRIVERS = conf.getInt(" spark.deploy.retainedDrivers" , 200 )
69+ private val REAPER_ITERATIONS = conf.getInt(" spark.dead.worker.persistence" , 15 )
70+ private val RECOVERY_MODE = conf.get(" spark.deploy.recoveryMode" , " NONE" )
71+
72+ private [master] val workers = new HashSet [WorkerInfo ]
73+ private [master] val idToApp = new HashMap [String , ApplicationInfo ]
74+ private [master] val waitingApps = new ArrayBuffer [ApplicationInfo ]
75+ private [master] val apps = new HashSet [ApplicationInfo ]
76+
77+ private val idToWorker = new HashMap [String , WorkerInfo ]
78+ private val addressToWorker = new HashMap [Address , WorkerInfo ]
79+
80+ private val actorToApp = new HashMap [ActorRef , ApplicationInfo ]
81+ private val addressToApp = new HashMap [Address , ApplicationInfo ]
82+ private val completedApps = new ArrayBuffer [ApplicationInfo ]
83+ private var nextAppNumber = 0
84+ private val appIdToUI = new HashMap [String , SparkUI ]
85+
86+ private val drivers = new HashSet [DriverInfo ]
87+ private val completedDrivers = new ArrayBuffer [DriverInfo ]
88+ private val waitingDrivers = new ArrayBuffer [DriverInfo ] // Drivers currently spooled for scheduling
89+ private var nextDriverNumber = 0
8890
8991 Utils .checkHost(host, " Expected hostname" )
9092
91- val masterMetricsSystem = MetricsSystem .createMetricsSystem(" master" , conf, securityMgr)
92- val applicationMetricsSystem = MetricsSystem .createMetricsSystem(" applications" , conf,
93+ private val masterMetricsSystem = MetricsSystem .createMetricsSystem(" master" , conf, securityMgr)
94+ private val applicationMetricsSystem = MetricsSystem .createMetricsSystem(" applications" , conf,
9395 securityMgr)
94- val masterSource = new MasterSource (this )
96+ private val masterSource = new MasterSource (this )
9597
96- val webUi = new MasterWebUI (this , webUiPort)
98+ private val webUi = new MasterWebUI (this , webUiPort)
9799
98- val masterPublicAddress = {
99- val envVar = conf .getenv(" SPARK_PUBLIC_DNS" )
100+ private val masterPublicAddress = {
101+ val envVar = System .getenv(" SPARK_PUBLIC_DNS" )
100102 if (envVar != null ) envVar else host
101103 }
102104
103- val masterUrl = " spark://" + host + " :" + port
104- var masterWebUiUrl : String = _
105+ private val masterUrl = " spark://" + host + " :" + port
106+ private var masterWebUiUrl : String = _
105107
106- var state = RecoveryState .STANDBY
108+ private var state = RecoveryState .STANDBY
107109
108- var persistenceEngine : PersistenceEngine = _
110+ private var persistenceEngine : PersistenceEngine = _
109111
110- var leaderElectionAgent : LeaderElectionAgent = _
112+ private var leaderElectionAgent : LeaderElectionAgent = _
111113
112114 private var recoveryCompletionTask : Cancellable = _
113115
114116 // As a temporary workaround before better ways of configuring memory, we allow users to set
115117 // a flag that will perform round-robin scheduling across the nodes (spreading out each app
116118 // among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
117- val spreadOutApps = conf.getBoolean(" spark.deploy.spreadOut" , true )
119+ private val spreadOutApps = conf.getBoolean(" spark.deploy.spreadOut" , true )
118120
119121 // Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue)
120- val defaultCores = conf.getInt(" spark.deploy.defaultCores" , Int .MaxValue )
122+ private val defaultCores = conf.getInt(" spark.deploy.defaultCores" , Int .MaxValue )
121123 if (defaultCores < 1 ) {
122124 throw new SparkException (" spark.deploy.defaultCores must be positive" )
123125 }
@@ -449,11 +451,11 @@ private[spark] class Master(
449451 }
450452 }
451453
452- def canCompleteRecovery =
454+ private def canCompleteRecovery =
453455 workers.count(_.state == WorkerState .UNKNOWN ) == 0 &&
454456 apps.count(_.state == ApplicationState .UNKNOWN ) == 0
455457
456- def beginRecovery (storedApps : Seq [ApplicationInfo ], storedDrivers : Seq [DriverInfo ],
458+ private def beginRecovery (storedApps : Seq [ApplicationInfo ], storedDrivers : Seq [DriverInfo ],
457459 storedWorkers : Seq [WorkerInfo ]) {
458460 for (app <- storedApps) {
459461 logInfo(" Trying to recover app: " + app.id)
@@ -484,7 +486,7 @@ private[spark] class Master(
484486 }
485487 }
486488
487- def completeRecovery () {
489+ private def completeRecovery () {
488490 // Ensure "only-once" recovery semantics using a short synchronization period.
489491 synchronized {
490492 if (state != RecoveryState .RECOVERING ) { return }
@@ -517,7 +519,7 @@ private[spark] class Master(
517519 * launched an executor for the app on it (right now the standalone backend doesn't like having
518520 * two executors on the same worker).
519521 */
520- def canUse (app : ApplicationInfo , worker : WorkerInfo ): Boolean = {
522+ private def canUse (app : ApplicationInfo , worker : WorkerInfo ): Boolean = {
521523 worker.memoryFree >= app.desc.memoryPerSlave && ! worker.hasExecutor(app)
522524 }
523525
@@ -596,7 +598,7 @@ private[spark] class Master(
596598 }
597599 }
598600
599- def launchExecutor (worker : WorkerInfo , exec : ExecutorDesc ) {
601+ private def launchExecutor (worker : WorkerInfo , exec : ExecutorDesc ) {
600602 logInfo(" Launching executor " + exec.fullId + " on worker " + worker.id)
601603 worker.addExecutor(exec)
602604 worker.actor ! LaunchExecutor (masterUrl,
@@ -605,7 +607,7 @@ private[spark] class Master(
605607 exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
606608 }
607609
608- def registerWorker (worker : WorkerInfo ): Boolean = {
610+ private def registerWorker (worker : WorkerInfo ): Boolean = {
609611 // There may be one or more refs to dead workers on this same node (w/ different ID's),
610612 // remove them.
611613 workers.filter { w =>
@@ -633,7 +635,7 @@ private[spark] class Master(
633635 true
634636 }
635637
636- def removeWorker (worker : WorkerInfo ) {
638+ private def removeWorker (worker : WorkerInfo ) {
637639 logInfo(" Removing worker " + worker.id + " on " + worker.host + " :" + worker.port)
638640 worker.setState(WorkerState .DEAD )
639641 idToWorker -= worker.id
@@ -656,20 +658,20 @@ private[spark] class Master(
656658 persistenceEngine.removeWorker(worker)
657659 }
658660
659- def relaunchDriver (driver : DriverInfo ) {
661+ private def relaunchDriver (driver : DriverInfo ) {
660662 driver.worker = None
661663 driver.state = DriverState .RELAUNCHING
662664 waitingDrivers += driver
663665 schedule()
664666 }
665667
666- def createApplication (desc : ApplicationDescription , driver : ActorRef ): ApplicationInfo = {
668+ private def createApplication (desc : ApplicationDescription , driver : ActorRef ): ApplicationInfo = {
667669 val now = System .currentTimeMillis()
668670 val date = new Date (now)
669671 new ApplicationInfo (now, newApplicationId(date), desc, date, driver, defaultCores)
670672 }
671673
672- def registerApplication (app : ApplicationInfo ): Unit = {
674+ private def registerApplication (app : ApplicationInfo ): Unit = {
673675 val appAddress = app.driver.path.address
674676 if (addressToApp.contains(appAddress)) {
675677 logInfo(" Attempted to re-register application at same address: " + appAddress)
@@ -684,11 +686,11 @@ private[spark] class Master(
684686 waitingApps += app
685687 }
686688
687- def finishApplication (app : ApplicationInfo ) {
689+ private def finishApplication (app : ApplicationInfo ) {
688690 removeApplication(app, ApplicationState .FINISHED )
689691 }
690692
691- def removeApplication (app : ApplicationInfo , state : ApplicationState .Value ) {
693+ private [master] def removeApplication (app : ApplicationInfo , state : ApplicationState .Value ) {
692694 if (apps.contains(app)) {
693695 logInfo(" Removing app " + app.id)
694696 apps -= app
@@ -732,7 +734,7 @@ private[spark] class Master(
732734 * Rebuild a new SparkUI from the given application's event logs.
733735 * Return whether this is successful.
734736 */
735- def rebuildSparkUI (app : ApplicationInfo ): Boolean = {
737+ private def rebuildSparkUI (app : ApplicationInfo ): Boolean = {
736738 val appName = app.desc.name
737739 val notFoundBasePath = HistoryServer .UI_PATH_PREFIX + " /not-found"
738740 try {
@@ -797,14 +799,14 @@ private[spark] class Master(
797799 }
798800
799801 /** Generate a new app ID given a app's submission date */
800- def newApplicationId (submitDate : Date ): String = {
802+ private def newApplicationId (submitDate : Date ): String = {
801803 val appId = " app-%s-%04d" .format(createDateFormat.format(submitDate), nextAppNumber)
802804 nextAppNumber += 1
803805 appId
804806 }
805807
806808 /** Check for, and remove, any timed-out workers */
807- def timeOutDeadWorkers () {
809+ private def timeOutDeadWorkers () {
808810 // Copy the workers into an array so we don't modify the hashset while iterating through it
809811 val currentTime = System .currentTimeMillis()
810812 val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT ).toArray
@@ -821,27 +823,27 @@ private[spark] class Master(
821823 }
822824 }
823825
824- def newDriverId (submitDate : Date ): String = {
826+ private def newDriverId (submitDate : Date ): String = {
825827 val appId = " driver-%s-%04d" .format(createDateFormat.format(submitDate), nextDriverNumber)
826828 nextDriverNumber += 1
827829 appId
828830 }
829831
830- def createDriver (desc : DriverDescription ): DriverInfo = {
832+ private def createDriver (desc : DriverDescription ): DriverInfo = {
831833 val now = System .currentTimeMillis()
832834 val date = new Date (now)
833835 new DriverInfo (now, newDriverId(date), desc, date)
834836 }
835837
836- def launchDriver (worker : WorkerInfo , driver : DriverInfo ) {
838+ private def launchDriver (worker : WorkerInfo , driver : DriverInfo ) {
837839 logInfo(" Launching driver " + driver.id + " on worker " + worker.id)
838840 worker.addDriver(driver)
839841 driver.worker = Some (worker)
840842 worker.actor ! LaunchDriver (driver.id, driver.desc)
841843 driver.state = DriverState .RUNNING
842844 }
843845
844- def removeDriver (driverId : String , finalState : DriverState , exception : Option [Exception ]) {
846+ private def removeDriver (driverId : String , finalState : DriverState , exception : Option [Exception ]) {
845847 drivers.find(d => d.id == driverId) match {
846848 case Some (driver) =>
847849 logInfo(s " Removing driver: $driverId" )
@@ -879,7 +881,7 @@ private[spark] object Master extends Logging {
879881 *
880882 * @throws SparkException if the url is invalid
881883 */
882- def toAkkaUrl (sparkUrl : String , protocol : String ): String = {
884+ private [deploy] def toAkkaUrl (sparkUrl : String , protocol : String ): String = {
883885 val (host, port) = Utils .extractHostPortFromSparkUrl(sparkUrl)
884886 AkkaUtils .address(protocol, systemName, host, port, actorName)
885887 }
@@ -889,7 +891,7 @@ private[spark] object Master extends Logging {
889891 *
890892 * @throws SparkException if the url is invalid
891893 */
892- def toAkkaAddress (sparkUrl : String , protocol : String ): Address = {
894+ private [deploy] def toAkkaAddress (sparkUrl : String , protocol : String ): Address = {
893895 val (host, port) = Utils .extractHostPortFromSparkUrl(sparkUrl)
894896 Address (protocol, systemName, host, port)
895897 }
@@ -901,7 +903,7 @@ private[spark] object Master extends Logging {
901903 * (3) The web UI bound port
902904 * (4) The REST server bound port, if any
903905 */
904- def startSystemAndActor (
906+ private [deploy] def startSystemAndActor (
905907 host : String ,
906908 port : Int ,
907909 webUiPort : Int ,
0 commit comments