@@ -55,29 +55,29 @@ import org.apache.spark.deploy.master.{RecoveryState, SparkCuratorUtil}
5555 * - The docker images tagged spark-test-master and spark-test-worker are built from the
5656 * docker/ directory. Run 'docker/spark-test/build' to generate these.
5757 */
58- private [spark] object FaultToleranceTest extends App with Logging {
58+ private object FaultToleranceTest extends App with Logging {
5959
60- val conf = new SparkConf ()
61- val ZK_DIR = conf.get(" spark.deploy.zookeeper.dir" , " /spark" )
60+ private val conf = new SparkConf ()
61+ private val ZK_DIR = conf.get(" spark.deploy.zookeeper.dir" , " /spark" )
6262
63- val masters = ListBuffer [TestMasterInfo ]()
64- val workers = ListBuffer [TestWorkerInfo ]()
65- var sc : SparkContext = _
63+ private val masters = ListBuffer [TestMasterInfo ]()
64+ private val workers = ListBuffer [TestWorkerInfo ]()
65+ private var sc : SparkContext = _
6666
67- val zk = SparkCuratorUtil .newClient(conf)
67+ private val zk = SparkCuratorUtil .newClient(conf)
6868
69- var numPassed = 0
70- var numFailed = 0
69+ private var numPassed = 0
70+ private var numFailed = 0
7171
72- val sparkHome = System .getenv(" SPARK_HOME" )
72+ private val sparkHome = System .getenv(" SPARK_HOME" )
7373 assertTrue(sparkHome != null , " Run with a valid SPARK_HOME" )
7474
75- val containerSparkHome = " /opt/spark"
76- val dockerMountDir = " %s:%s" .format(sparkHome, containerSparkHome)
75+ private val containerSparkHome = " /opt/spark"
76+ private val dockerMountDir = " %s:%s" .format(sparkHome, containerSparkHome)
7777
7878 System .setProperty(" spark.driver.host" , " 172.17.42.1" ) // default docker host ip
7979
80- def afterEach () {
80+ private def afterEach () {
8181 if (sc != null ) {
8282 sc.stop()
8383 sc = null
@@ -179,7 +179,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
179179 }
180180 }
181181
182- def test (name : String )(fn : => Unit ) {
182+ private def test (name : String )(fn : => Unit ) {
183183 try {
184184 fn
185185 numPassed += 1
@@ -197,19 +197,19 @@ private[spark] object FaultToleranceTest extends App with Logging {
197197 afterEach()
198198 }
199199
200- def addMasters (num : Int ) {
200+ private def addMasters (num : Int ) {
201201 logInfo(s " >>>>> ADD MASTERS $num <<<<< " )
202202 (1 to num).foreach { _ => masters += SparkDocker .startMaster(dockerMountDir) }
203203 }
204204
205- def addWorkers (num : Int ) {
205+ private def addWorkers (num : Int ) {
206206 logInfo(s " >>>>> ADD WORKERS $num <<<<< " )
207207 val masterUrls = getMasterUrls(masters)
208208 (1 to num).foreach { _ => workers += SparkDocker .startWorker(dockerMountDir, masterUrls) }
209209 }
210210
211211 /** Creates a SparkContext, which constructs a Client to interact with our cluster. */
212- def createClient () = {
212+ private def createClient () = {
213213 logInfo(" >>>>> CREATE CLIENT <<<<<" )
214214 if (sc != null ) { sc.stop() }
215215 // Counter-hack: Because of a hack in SparkEnv#create() that changes this
@@ -218,27 +218,27 @@ private[spark] object FaultToleranceTest extends App with Logging {
218218 sc = new SparkContext (getMasterUrls(masters), " fault-tolerance" , containerSparkHome)
219219 }
220220
221- def getMasterUrls (masters : Seq [TestMasterInfo ]): String = {
221+ private def getMasterUrls (masters : Seq [TestMasterInfo ]): String = {
222222 " spark://" + masters.map(master => master.ip + " :7077" ).mkString(" ," )
223223 }
224224
225- def getLeader : TestMasterInfo = {
225+ private def getLeader : TestMasterInfo = {
226226 val leaders = masters.filter(_.state == RecoveryState .ALIVE )
227227 assertTrue(leaders.size == 1 )
228228 leaders(0 )
229229 }
230230
231- def killLeader (): Unit = {
231+ private def killLeader (): Unit = {
232232 logInfo(" >>>>> KILL LEADER <<<<<" )
233233 masters.foreach(_.readState())
234234 val leader = getLeader
235235 masters -= leader
236236 leader.kill()
237237 }
238238
239- def delay (secs : Duration = 5 .seconds) = Thread .sleep(secs.toMillis)
239+ private def delay (secs : Duration = 5 .seconds) = Thread .sleep(secs.toMillis)
240240
241- def terminateCluster () {
241+ private def terminateCluster () {
242242 logInfo(" >>>>> TERMINATE CLUSTER <<<<<" )
243243 masters.foreach(_.kill())
244244 workers.foreach(_.kill())
@@ -247,7 +247,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
247247 }
248248
249249 /** This includes Client retry logic, so it may take a while if the cluster is recovering. */
250- def assertUsable () = {
250+ private def assertUsable () = {
251251 val f = future {
252252 try {
253253 val res = sc.parallelize(0 until 10 ).collect()
@@ -269,7 +269,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
269269 * Asserts that the cluster is usable and that the expected masters and workers
270270 * are all alive in a proper configuration (e.g., only one leader).
271271 */
272- def assertValidClusterState () = {
272+ private def assertValidClusterState () = {
273273 logInfo(" >>>>> ASSERT VALID CLUSTER STATE <<<<<" )
274274 assertUsable()
275275 var numAlive = 0
@@ -325,7 +325,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
325325 }
326326 }
327327
328- def assertTrue (bool : Boolean , message : String = " " ) {
328+ private def assertTrue (bool : Boolean , message : String = " " ) {
329329 if (! bool) {
330330 throw new IllegalStateException (" Assertion failed: " + message)
331331 }
@@ -335,7 +335,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
335335 numFailed))
336336}
337337
338- private [spark] class TestMasterInfo (val ip : String , val dockerId : DockerId , val logFile : File )
338+ private class TestMasterInfo (val ip : String , val dockerId : DockerId , val logFile : File )
339339 extends Logging {
340340
341341 implicit val formats = org.json4s.DefaultFormats
@@ -377,7 +377,7 @@ private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val
377377 format(ip, dockerId.id, logFile.getAbsolutePath, state)
378378}
379379
380- private [spark] class TestWorkerInfo (val ip : String , val dockerId : DockerId , val logFile : File )
380+ private class TestWorkerInfo (val ip : String , val dockerId : DockerId , val logFile : File )
381381 extends Logging {
382382
383383 implicit val formats = org.json4s.DefaultFormats
@@ -390,7 +390,7 @@ private[spark] class TestWorkerInfo(val ip: String, val dockerId: DockerId, val
390390 " [ip=%s, id=%s, logFile=%s]" .format(ip, dockerId, logFile.getAbsolutePath)
391391}
392392
393- private [spark] object SparkDocker {
393+ private object SparkDocker {
394394 def startMaster (mountDir : String ): TestMasterInfo = {
395395 val cmd = Docker .makeRunCmd(" spark-test-master" , mountDir = mountDir)
396396 val (ip, id, outFile) = startNode(cmd)
@@ -425,11 +425,11 @@ private[spark] object SparkDocker {
425425 }
426426}
427427
428- private [spark] class DockerId (val id : String ) {
428+ private class DockerId (val id : String ) {
429429 override def toString = id
430430}
431431
432- private [spark] object Docker extends Logging {
432+ private object Docker extends Logging {
433433 def makeRunCmd (imageTag : String , args : String = " " , mountDir : String = " " ): ProcessBuilder = {
434434 val mountCmd = if (mountDir != " " ) { " -v " + mountDir } else " "
435435
0 commit comments