diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 83ce14a0a806a..a7368f9f3dfbe 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -86,6 +86,10 @@ private[deploy] object DeployMessages { case class KillDriver(driverId: String) extends DeployMessage + // Worker internal + + case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders + // AppClient to Master case class RegisterApplication(appDescription: ApplicationDescription) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 8a71ddda4cb5e..aaf96fbe4618f 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -64,6 +64,12 @@ private[spark] class Worker( val REGISTRATION_TIMEOUT = 20.seconds val REGISTRATION_RETRIES = 3 + val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", true) + // How often worker will clean up old app folders + val CLEANUP_INTERVAL_MILLIS = conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000 + // TTL for app folders/data; after TTL expires it will be cleaned up + val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600) + // Index into masterUrls that we're currently trying to register with. var masterIndex = 0 @@ -179,12 +185,28 @@ private[spark] class Worker( registered = true changeMaster(masterUrl, masterWebUiUrl) context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat) + if (CLEANUP_ENABLED) { + context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis, + CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup) + } case SendHeartbeat => masterLock.synchronized { if (connected) { master ! Heartbeat(workerId) } } + case WorkDirCleanup => + // Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor + val cleanupFuture = concurrent.future { + logInfo("Cleaning up oldest application directories in " + workDir + " ...") + Utils.findOldFiles(workDir, APP_DATA_RETENTION_SECS) + .foreach(Utils.deleteRecursively) + } + cleanupFuture onFailure { + case e: Throwable => + logError("App dir cleanup failed: " + e.getMessage, e) + } + case MasterChanged(masterUrl, masterWebUiUrl) => logInfo("Master has changed, new master is at " + masterUrl) changeMaster(masterUrl, masterWebUiUrl) @@ -331,7 +353,6 @@ private[spark] class Worker( } private[spark] object Worker { - def main(argStrings: Array[String]) { val args = new WorkerArguments(argStrings) val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores, diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 62ee704d580c2..78fb9be212c23 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -536,6 +536,21 @@ private[spark] object Utils extends Logging { } } + /** + * Finds all the files in a directory whose last modified time is older than cutoff seconds. + * @param dir must be the path to a directory, or IllegalArgumentException is thrown + * @param cutoff measured in seconds. Files older than this are returned. + */ + def findOldFiles(dir: File, cutoff: Long): Seq[File] = { + val currentTimeMillis = System.currentTimeMillis + if (dir.isDirectory) { + val files = listFilesSafely(dir) + files.filter { file => file.lastModified < (currentTimeMillis - cutoff * 1000) } + } else { + throw new IllegalArgumentException(dir + " is not a directory!") + } + } + /** * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes. */ diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 616214fb5e3a6..eb7fb6318262b 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.util import scala.util.Random -import java.io.{ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream} +import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream} import java.nio.{ByteBuffer, ByteOrder} import com.google.common.base.Charsets @@ -154,5 +154,18 @@ class UtilsSuite extends FunSuite { val iterator = Iterator.range(0, 5) assert(Utils.getIteratorSize(iterator) === 5L) } + + test("findOldFiles") { + // create some temporary directories and files + val parent: File = Utils.createTempDir() + val child1: File = Utils.createTempDir(parent.getCanonicalPath) // The parent directory has two child directories + val child2: File = Utils.createTempDir(parent.getCanonicalPath) + // set the last modified time of child1 to 10 secs old + child1.setLastModified(System.currentTimeMillis() - (1000 * 10)) + + val result = Utils.findOldFiles(parent, 5) // find files older than 5 secs + assert(result.size.equals(1)) + assert(result(0).getCanonicalPath.equals(child1.getCanonicalPath)) + } } diff --git a/docs/configuration.md b/docs/configuration.md index 1ff0150567255..c0bee954bf144 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -161,13 +161,13 @@ Apart from these, the following properties are also available, and may be useful
spark.ui.view.acls for more details.
Also note this requires the user to be known, if the user comes across as null no checks
are done. Filters can be used to authenticate and set the user.
TorrentBroadcastFactory.
+ Size of each piece of a block in kilobytes for TorrentBroadcastFactory.
Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, BlockManager might take a performance hit.