From c0e5ea5923b26443e988384b37c325d0a62760c3 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 15 Dec 2014 14:00:56 -0800 Subject: [PATCH 1/3] [SPARK-4834] [standalone] Clean up application files after app finishes. Commit 7aacb7bfa added support for sharing downloaded files among multiple executors of the same app. That works great in Yarn, since the app's directory is cleaned up after the app is done. But Spark standalone mode didn't do that, so the lock/cache files created by that change were left around and could eventually fill up the disk hosting /tmp. To solve that, create app-specific directories under the local dirs when launching executors. Multiple executors launched by the same Worker will use the same app directories, so they should be able to share the downloaded files. When the application finishes, a new message is sent to all executors telling them the application has finished; once that message has been received, and all executors registered for the application shut down, then those directories will be cleaned up by the Worker. Note 1: Unit testing this is hard (if even possible), since local-cluster mode doesn't seem to leave the Master/Worker daemons running long enough after `sc.stop()` is called for the clean up protocol to take effect. Note 2: the code tracking finished apps / app directories in Master.scala and Worker.scala is not really thread-safe, but then the code that modifies other shared maps in those classes isn't either, so this change is not making anything worse. --- .../apache/spark/deploy/DeployMessage.scala | 5 +++ .../apache/spark/deploy/master/Master.scala | 7 +++- .../spark/deploy/worker/ExecutorRunner.scala | 4 ++- .../apache/spark/deploy/worker/Worker.scala | 36 +++++++++++++++++-- .../scala/org/apache/spark/util/Utils.scala | 16 +++++++-- .../spark/deploy/JsonProtocolSuite.scala | 2 +- .../deploy/worker/ExecutorRunnerTest.scala | 3 +- 7 files changed, 64 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index c46f84de8444..1415d625625b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -175,4 +175,9 @@ private[deploy] object DeployMessages { // Liveness checks in various places case object SendHeartbeat + + // Application finished message, used for cleanup + + case class ApplicationFinished(id: String) + } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 7b32c505def9..773a9d109e87 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -510,7 +510,7 @@ private[spark] class Master( val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) val numWorkersAlive = shuffledAliveWorkers.size var curPos = 0 - + for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers // We assign workers to each waiting driver in a round-robin fashion. For each driver, we // start from the last worker that was assigned a driver, and continue onwards until we have @@ -697,6 +697,11 @@ private[spark] class Master( } persistenceEngine.removeApplication(app) schedule() + + // Tell all workers that the application has finished, so they can clean up any app state. + workers.foreach { w => + w.actor ! ApplicationFinished(app.id) + } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index f4fedc6327ab..acbdf0d8bd7b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -47,6 +47,7 @@ private[spark] class ExecutorRunner( val executorDir: File, val workerUrl: String, val conf: SparkConf, + val appLocalDirs: Seq[String], var state: ExecutorState.Value) extends Logging { @@ -77,7 +78,7 @@ private[spark] class ExecutorRunner( /** * Kill executor process, wait for exit and notify worker to update resource status. * - * @param message the exception message which caused the executor's death + * @param message the exception message which caused the executor's death */ private def killProcess(message: Option[String]) { var exitCode: Option[Int] = None @@ -129,6 +130,7 @@ private[spark] class ExecutorRunner( logInfo("Launch command: " + command.mkString("\"", "\" \"", "\"")) builder.directory(executorDir) + builder.environment.put("SPARK_LOCAL_DIRS", appLocalDirs.mkString(",")) // In case we are running this from within the Spark Shell, avoid creating a "scala" // parent process for the executor command builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0") diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index eb11163538b2..8b560714d374 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -23,7 +23,7 @@ import java.text.SimpleDateFormat import java.util.{UUID, Date} import scala.collection.JavaConversions._ -import scala.collection.mutable.HashMap +import scala.collection.mutable.{HashMap, HashSet} import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.Random @@ -109,6 +109,8 @@ private[spark] class Worker( val finishedExecutors = new HashMap[String, ExecutorRunner] val drivers = new HashMap[String, DriverRunner] val finishedDrivers = new HashMap[String, DriverRunner] + val appDirectories = new HashMap[String, Seq[String]] + val finishedApps = new HashSet[String] // The shuffle service is not actually started unless configured. val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr) @@ -292,7 +294,7 @@ private[spark] class Worker( val isAppStillRunning = executors.values.map(_.appId).contains(appIdFromDir) dir.isDirectory && !isAppStillRunning && !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECS) - }.foreach { dir => + }.foreach { dir => logInfo(s"Removing directory: ${dir.getPath}") Utils.deleteRecursively(dir) } @@ -337,8 +339,19 @@ private[spark] class Worker( throw new IOException("Failed to create directory " + executorDir) } + // Create local dirs for the executor. These are passed to the executor via the + // SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the + // application finishes. + val appLocalDirs = appDirectories.get(appId).getOrElse { + Utils.getOrCreateLocalRootDirs(conf).map { dir => + Utils.createDirectory(dir).getAbsolutePath() + }.toSeq + } + appDirectories(appId) = appLocalDirs + val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, - self, workerId, host, sparkHome, executorDir, akkaUrl, conf, ExecutorState.LOADING) + self, workerId, host, sparkHome, executorDir, akkaUrl, conf, appLocalDirs, + ExecutorState.LOADING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ @@ -375,6 +388,7 @@ private[spark] class Worker( message.map(" message " + _).getOrElse("") + exitStatus.map(" exitStatus " + _).getOrElse("")) } + maybeCleanupApplication(appId) } case KillExecutor(masterUrl, appId, execId) => @@ -444,6 +458,9 @@ private[spark] class Worker( case ReregisterWithMaster => reregisterWithMaster() + case ApplicationFinished(id) => + finishedApps += id + maybeCleanupApplication(id) } private def masterDisconnected() { @@ -452,6 +469,19 @@ private[spark] class Worker( registerWithMaster() } + private def maybeCleanupApplication(id: String): Unit = synchronized { + val shouldCleanup = finishedApps.contains(id) && !executors.values.exists(_.appId == id) + if (shouldCleanup) { + finishedApps -= id + appDirectories.remove(id).foreach { + logInfo(s"Cleaning up local directories for application $id") + _.foreach { dir => + Utils.deleteRecursively(new File(dir)) + } + } + } + } + def generateWorkerId(): String = { "worker-%s-%s-%d".format(createDateFormat.format(new Date), host, port) } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 9c04e45a5847..5b9b8a6fe485 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -246,8 +246,11 @@ private[spark] object Utils extends Logging { retval } - /** Create a temporary directory inside the given parent directory */ - def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = { + /** + * Create a directory inside the given parent directory. The directory is guaranteed to be + * newly created, and is not marked for automatic deletion. + */ + def createDirectory(root: String): File = { var attempts = 0 val maxAttempts = 10 var dir: File = null @@ -265,6 +268,15 @@ private[spark] object Utils extends Logging { } catch { case e: SecurityException => dir = null; } } + dir + } + + /** + * Create a temporary directory inside the given parent directory. The directory will be + * automatically deleted when the VM shuts down. + */ + def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = { + val dir = createDirectory(root) registerShutdownDeleteDir(dir) dir } diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 3f1cd0752e76..aa65f7e8915e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -119,7 +119,7 @@ class JsonProtocolSuite extends FunSuite { def createExecutorRunner(): ExecutorRunner = { new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", new File("sparkHome"), new File("workDir"), "akka://worker", - new SparkConf, ExecutorState.RUNNING) + new SparkConf, Seq("localDir"), ExecutorState.RUNNING) } def createDriverRunner(): DriverRunner = { diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index 196217062991..6f233d7cf97a 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -33,7 +33,8 @@ class ExecutorRunnerTest extends FunSuite { val appDesc = new ApplicationDescription("app name", Some(8), 500, Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl") val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", - new File(sparkHome), new File("ooga"), "blah", new SparkConf, ExecutorState.RUNNING) + new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"), + ExecutorState.RUNNING) val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables) assert(builder.command().last === appId) } From 50eb4b957a7e7b1cbb6f5445f8a0fbe99478ba4c Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 22 Dec 2014 15:55:59 -0800 Subject: [PATCH 2/3] Review feedback. --- .../apache/spark/deploy/DeployMessage.scala | 6 ++---- .../apache/spark/deploy/worker/Worker.scala | 19 +++++++++++-------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 1415d625625b..243d8edb72ed 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -88,6 +88,8 @@ private[deploy] object DeployMessages { case class KillDriver(driverId: String) extends DeployMessage + case class ApplicationFinished(id: String) + // Worker internal case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders @@ -176,8 +178,4 @@ private[deploy] object DeployMessages { case object SendHeartbeat - // Application finished message, used for cleanup - - case class ApplicationFinished(id: String) - } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 8b560714d374..3a3db0fb1e54 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -342,12 +342,15 @@ private[spark] class Worker( // Create local dirs for the executor. These are passed to the executor via the // SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the // application finishes. - val appLocalDirs = appDirectories.get(appId).getOrElse { - Utils.getOrCreateLocalRootDirs(conf).map { dir => - Utils.createDirectory(dir).getAbsolutePath() - }.toSeq + val appLocalDirs = appDirectories.synchronized { + val dirs = appDirectories.get(appId).getOrElse { + Utils.getOrCreateLocalRootDirs(conf).map { dir => + Utils.createDirectory(dir).getAbsolutePath() + }.toSeq + } + appDirectories(appId) = dirs + dirs } - appDirectories(appId) = appLocalDirs val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, self, workerId, host, sparkHome, executorDir, akkaUrl, conf, appLocalDirs, @@ -469,13 +472,13 @@ private[spark] class Worker( registerWithMaster() } - private def maybeCleanupApplication(id: String): Unit = synchronized { + private def maybeCleanupApplication(id: String): Unit = appDirectories.synchronized { val shouldCleanup = finishedApps.contains(id) && !executors.values.exists(_.appId == id) if (shouldCleanup) { finishedApps -= id - appDirectories.remove(id).foreach { + appDirectories.remove(id).foreach { dirList => logInfo(s"Cleaning up local directories for application $id") - _.foreach { dir => + dirList.foreach { dir => Utils.deleteRecursively(new File(dir)) } } From b430534e5d47fd12ae7b59925eeac925c1e6d2ab Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 23 Dec 2014 10:48:24 -0800 Subject: [PATCH 3/3] Remove seemingly unnecessary synchronization. --- .../org/apache/spark/deploy/worker/Worker.scala | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 3a3db0fb1e54..edcf0f4615ae 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -342,15 +342,12 @@ private[spark] class Worker( // Create local dirs for the executor. These are passed to the executor via the // SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the // application finishes. - val appLocalDirs = appDirectories.synchronized { - val dirs = appDirectories.get(appId).getOrElse { - Utils.getOrCreateLocalRootDirs(conf).map { dir => - Utils.createDirectory(dir).getAbsolutePath() - }.toSeq - } - appDirectories(appId) = dirs - dirs + val appLocalDirs = appDirectories.get(appId).getOrElse { + Utils.getOrCreateLocalRootDirs(conf).map { dir => + Utils.createDirectory(dir).getAbsolutePath() + }.toSeq } + appDirectories(appId) = appLocalDirs val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, self, workerId, host, sparkHome, executorDir, akkaUrl, conf, appLocalDirs, @@ -472,7 +469,7 @@ private[spark] class Worker( registerWithMaster() } - private def maybeCleanupApplication(id: String): Unit = appDirectories.synchronized { + private def maybeCleanupApplication(id: String): Unit = { val shouldCleanup = finishedApps.contains(id) && !executors.values.exists(_.appId == id) if (shouldCleanup) { finishedApps -= id