From b92752bb01d088b3e365c192c132b7d7c71fb70a Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Tue, 1 Apr 2014 11:04:33 -0700 Subject: [PATCH 01/11] SPARK-1154: Add a periodic task to clean up app directories This adds two config params: spark.worker.cleanup_interval spark.worker.app_data_ttl --- .../apache/spark/deploy/worker/Worker.scala | 20 +++++++++++++++++++ .../scala/org/apache/spark/util/Utils.scala | 16 +++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 8a71ddda4cb5e..3314568dd512d 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -64,6 +64,11 @@ private[spark] class Worker( val REGISTRATION_TIMEOUT = 20.seconds val REGISTRATION_RETRIES = 3 + // How often worker will clean up old app folders + val CLEANUP_INTERVAL_MILLIS = conf.getLong("spark.worker.cleanup_interval", 60 * 30) * 1000 + // TTL for app folders/data; after TTL expires it will be cleaned up + val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.app_data_ttl", 15 * 24 * 3600) + // Index into masterUrls that we're currently trying to register with. var masterIndex = 0 @@ -179,12 +184,26 @@ private[spark] class Worker( registered = true changeMaster(masterUrl, masterWebUiUrl) context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat) + context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis, + CLEANUP_INTERVAL_MILLIS millis, self, Worker.AppDirCleanup) case SendHeartbeat => masterLock.synchronized { if (connected) { master ! Heartbeat(workerId) } } + case Worker.AppDirCleanup => + // Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor + val cleanupFuture = concurrent.future { + logInfo("Cleaning up oldest application directories in " + workDir + " ...") + Utils.findOldestFiles(workDir, APP_DATA_RETENTION_SECS) + .foreach(Utils.deleteRecursively(_)) + } + cleanupFuture onFailure { + case e: Throwable => + logError("App dir cleanup failed: " + e.getMessage, e) + } + case MasterChanged(masterUrl, masterWebUiUrl) => logInfo("Master has changed, new master is at " + masterUrl) changeMaster(masterUrl, masterWebUiUrl) @@ -331,6 +350,7 @@ private[spark] class Worker( } private[spark] object Worker { + case object AppDirCleanup // Sent to Worker actor periodically for cleaning up app folders def main(argStrings: Array[String]) { val args = new WorkerArguments(argStrings) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 62ee704d580c2..3add43cf2cf68 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -536,6 +536,22 @@ private[spark] object Utils extends Logging { } } + /** + * Finds all the files in a directory whose last modified time is older than cutoff seconds. + * @param dir must be the path to a directory, or IllegalArgumentException is thrown + * @param cutoff filter for files is lastModified < (currentTimeMillis/1000 - cutoff) + */ + def findOldestFiles(dir: File, cutoff: Long): Seq[File] = { + if (dir.isDirectory) { + val files = listFilesSafely(dir) + files.filter { file => + file.lastModified < ((System.currentTimeMillis / 1000) - cutoff) + } + } else { + throw new IllegalArgumentException(dir + " is not a directory!") + } + } + /** * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes. */ From e3c408ef94c1d9597890b6547f9f165c1e8ed6be Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Tue, 1 Apr 2014 11:31:29 -0700 Subject: [PATCH 02/11] Document the two new settings --- .../apache/spark/deploy/worker/Worker.scala | 2 +- docs/configuration.md | 41 +++++++++++++------ 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 3314568dd512d..aa336db95593a 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -67,7 +67,7 @@ private[spark] class Worker( // How often worker will clean up old app folders val CLEANUP_INTERVAL_MILLIS = conf.getLong("spark.worker.cleanup_interval", 60 * 30) * 1000 // TTL for app folders/data; after TTL expires it will be cleaned up - val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.app_data_ttl", 15 * 24 * 3600) + val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.app_data_ttl", 7 * 24 * 3600) // Index into masterUrls that we're currently trying to register with. var masterIndex = 0 diff --git a/docs/configuration.md b/docs/configuration.md index 1ff0150567255..a8708165a4863 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -161,13 +161,13 @@ Apart from these, the following properties are also available, and may be useful spark.ui.acls.enable false - Whether spark web ui acls should are enabled. If enabled, this checks to see if the user has + Whether spark web ui acls should are enabled. If enabled, this checks to see if the user has access permissions to view the web ui. See spark.ui.view.acls for more details. Also note this requires the user to be known, if the user comes across as null no checks are done. Filters can be used to authenticate and set the user. - + spark.ui.view.acls Empty @@ -276,10 +276,10 @@ Apart from these, the following properties are also available, and may be useful spark.serializer.objectStreamReset 10000 - When serializing using org.apache.spark.serializer.JavaSerializer, the serializer caches - objects to prevent writing redundant data, however that stops garbage collection of those - objects. By calling 'reset' you flush that info from the serializer, and allow old - objects to be collected. To turn off this periodic reset set it to a value of <= 0. + When serializing using org.apache.spark.serializer.JavaSerializer, the serializer caches + objects to prevent writing redundant data, however that stops garbage collection of those + objects. By calling 'reset' you flush that info from the serializer, and allow old + objects to be collected. To turn off this periodic reset set it to a value of <= 0. By default it will reset the serializer every 10,000 objects. @@ -375,7 +375,7 @@ Apart from these, the following properties are also available, and may be useful spark.akka.heartbeat.interval 1000 - This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.failure-detector.threshold` if you need to. Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick. However this is usually not the case as gc pauses and network lags are expected in a real spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those. + This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.failure-detector.threshold` if you need to. Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick. However this is usually not the case as gc pauses and network lags are expected in a real spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those. @@ -430,7 +430,7 @@ Apart from these, the following properties are also available, and may be useful spark.broadcast.blockSize 4096 - Size of each piece of a block in kilobytes for TorrentBroadcastFactory. + Size of each piece of a block in kilobytes for TorrentBroadcastFactory. Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, BlockManager might take a performance hit. @@ -555,7 +555,7 @@ Apart from these, the following properties are also available, and may be useful the driver. - + spark.authenticate false @@ -563,7 +563,7 @@ Apart from these, the following properties are also available, and may be useful running on Yarn. - + spark.authenticate.secret None @@ -571,12 +571,12 @@ Apart from these, the following properties are also available, and may be useful not running on Yarn and authentication is enabled. - + spark.core.connection.auth.wait.timeout 30 Number of seconds for the connection to wait for authentication to occur before timing - out and giving up. + out and giving up. @@ -586,6 +586,23 @@ Apart from these, the following properties are also available, and may be useful Number of cores to allocate for each task. + + spark.worker.cleanup_interval + 1800 (30 minutes) + + Controls the interval, in seconds, at which the worker cleans up old application work dirs + on the local machine. + + + + spark.worker.app_data_ttl + 7 * 24 * 3600 (7 days) + + The number of seconds to retain application work directories on each worker. This is a Time To Live + and should depend on the amount of available disk space you have. Application logs and jars are + downloaded to each application work dir. Over time, the work dirs can quickly fill up disk space, + especially if you run jobs very frequently. + ## Viewing Spark Properties From dc1a311dc3747d89bf730d2b93f5d50004f8c51e Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Tue, 1 Apr 2014 13:30:54 -0700 Subject: [PATCH 03/11] Don't recompute current time with every new file --- core/src/main/scala/org/apache/spark/util/Utils.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 3add43cf2cf68..652ac7118f9cc 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -542,11 +542,10 @@ private[spark] object Utils extends Logging { * @param cutoff filter for files is lastModified < (currentTimeMillis/1000 - cutoff) */ def findOldestFiles(dir: File, cutoff: Long): Seq[File] = { + val currentTimeSecs = System.currentTimeMillis / 1000 if (dir.isDirectory) { val files = listFilesSafely(dir) - files.filter { file => - file.lastModified < ((System.currentTimeMillis / 1000) - cutoff) - } + files.filter { file => file.lastModified < (currentTimeSecs - cutoff) } } else { throw new IllegalArgumentException(dir + " is not a directory!") } From ad99955dc914777730fad47c715046a8dd5fce3d Mon Sep 17 00:00:00 2001 From: Kelvin Chu Date: Wed, 2 Apr 2014 00:17:44 -0700 Subject: [PATCH 04/11] Add unit test for Utils.findOldestFiles() --- .../scala/org/apache/spark/util/UtilsSuite.scala | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 616214fb5e3a6..dabecd89be9f3 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.util import scala.util.Random -import java.io.{ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream} +import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream} import java.nio.{ByteBuffer, ByteOrder} import com.google.common.base.Charsets @@ -154,5 +154,18 @@ class UtilsSuite extends FunSuite { val iterator = Iterator.range(0, 5) assert(Utils.getIteratorSize(iterator) === 5L) } + + test("findOldestFiles") { + // create some temporary directories and files + val parent: File = Utils.createTempDir() + val child1: File = Utils.createTempDir(parent.getCanonicalPath) // The parent directory has two child directories + val child2: File = Utils.createTempDir(parent.getCanonicalPath) + // set the last modified time of child1 to 10 secs old + child1.setLastModified(System.currentTimeMillis() - (1000 * 10)) + + val result = Utils.findOldFiles(parent, 5) // find files older than 5 secs + assert(result.size.equals(1)) + assert(result(0).getCanonicalPath.equals(child1.getCanonicalPath)) + } } From 72f7d2d44805084472a7f354ee2b2355e92d3350 Mon Sep 17 00:00:00 2001 From: Kelvin Chu Date: Wed, 2 Apr 2014 00:19:49 -0700 Subject: [PATCH 05/11] Fix a bug of Utils.findOldestFiles(). file.lastModified is returned in milliseconds. --- core/src/main/scala/org/apache/spark/util/Utils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 652ac7118f9cc..891caed156776 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -539,13 +539,13 @@ private[spark] object Utils extends Logging { /** * Finds all the files in a directory whose last modified time is older than cutoff seconds. * @param dir must be the path to a directory, or IllegalArgumentException is thrown - * @param cutoff filter for files is lastModified < (currentTimeMillis/1000 - cutoff) + * @param cutoff measured in seconds. Files older than this are returned. */ def findOldestFiles(dir: File, cutoff: Long): Seq[File] = { val currentTimeSecs = System.currentTimeMillis / 1000 if (dir.isDirectory) { val files = listFilesSafely(dir) - files.filter { file => file.lastModified < (currentTimeSecs - cutoff) } + files.filter { file => file.lastModified < (currentTimeSecs - cutoff * 1000) } } else { throw new IllegalArgumentException(dir + " is not a directory!") } From cb52f2bb747960cd4945a3784d2d8a153565bbf8 Mon Sep 17 00:00:00 2001 From: Kelvin Chu Date: Wed, 2 Apr 2014 00:23:24 -0700 Subject: [PATCH 06/11] Change the name of findOldestFiles() to findOldFiles() --- core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 2 +- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- core/src/test/scala/org/apache/spark/util/UtilsSuite.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index aa336db95593a..df78e1e3d1d96 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -196,7 +196,7 @@ private[spark] class Worker( // Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor val cleanupFuture = concurrent.future { logInfo("Cleaning up oldest application directories in " + workDir + " ...") - Utils.findOldestFiles(workDir, APP_DATA_RETENTION_SECS) + Utils.findOldFiles(workDir, APP_DATA_RETENTION_SECS) .foreach(Utils.deleteRecursively(_)) } cleanupFuture onFailure { diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 891caed156776..559cd7f00471b 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -541,7 +541,7 @@ private[spark] object Utils extends Logging { * @param dir must be the path to a directory, or IllegalArgumentException is thrown * @param cutoff measured in seconds. Files older than this are returned. */ - def findOldestFiles(dir: File, cutoff: Long): Seq[File] = { + def findOldFiles(dir: File, cutoff: Long): Seq[File] = { val currentTimeSecs = System.currentTimeMillis / 1000 if (dir.isDirectory) { val files = listFilesSafely(dir) diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index dabecd89be9f3..eb7fb6318262b 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -155,7 +155,7 @@ class UtilsSuite extends FunSuite { assert(Utils.getIteratorSize(iterator) === 5L) } - test("findOldestFiles") { + test("findOldFiles") { // create some temporary directories and files val parent: File = Utils.createTempDir() val child1: File = Utils.createTempDir(parent.getCanonicalPath) // The parent directory has two child directories From 8dc9cb54d5b46031b648cd81cc8761597e1ef31c Mon Sep 17 00:00:00 2001 From: Kelvin Chu Date: Wed, 2 Apr 2014 01:28:04 -0700 Subject: [PATCH 07/11] Fixed a bug in Utils.findOldFiles() after merge. --- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 559cd7f00471b..d3b57d79e8fad 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -542,7 +542,7 @@ private[spark] object Utils extends Logging { * @param cutoff measured in seconds. Files older than this are returned. */ def findOldFiles(dir: File, cutoff: Long): Seq[File] = { - val currentTimeSecs = System.currentTimeMillis / 1000 + val currentTimeSecs = System.currentTimeMillis if (dir.isDirectory) { val files = listFilesSafely(dir) files.filter { file => file.lastModified < (currentTimeSecs - cutoff * 1000) } From 553d8c2eace0aeecbd4ebd9ca8fe6df50700e4e1 Mon Sep 17 00:00:00 2001 From: Kelvin Chu Date: Wed, 2 Apr 2014 10:57:01 -0700 Subject: [PATCH 08/11] change the variable name to currentTimeMillis since it actually tracks in seconds --- core/src/main/scala/org/apache/spark/util/Utils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index d3b57d79e8fad..78fb9be212c23 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -542,10 +542,10 @@ private[spark] object Utils extends Logging { * @param cutoff measured in seconds. Files older than this are returned. */ def findOldFiles(dir: File, cutoff: Long): Seq[File] = { - val currentTimeSecs = System.currentTimeMillis + val currentTimeMillis = System.currentTimeMillis if (dir.isDirectory) { val files = listFilesSafely(dir) - files.filter { file => file.lastModified < (currentTimeSecs - cutoff * 1000) } + files.filter { file => file.lastModified < (currentTimeMillis - cutoff * 1000) } } else { throw new IllegalArgumentException(dir + " is not a directory!") } From f2f6027467b175d6d3a262ec3829ccc27798c5bf Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Wed, 2 Apr 2014 16:48:24 -0700 Subject: [PATCH 09/11] CR from @andrewor14 --- .../org/apache/spark/deploy/DeployMessage.scala | 4 ++++ .../org/apache/spark/deploy/worker/Worker.scala | 12 +++++------- docs/configuration.md | 4 ++-- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 83ce14a0a806a..a7368f9f3dfbe 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -86,6 +86,10 @@ private[deploy] object DeployMessages { case class KillDriver(driverId: String) extends DeployMessage + // Worker internal + + case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders + // AppClient to Master case class RegisterApplication(appDescription: ApplicationDescription) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index df78e1e3d1d96..f67b16238b540 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -65,9 +65,9 @@ private[spark] class Worker( val REGISTRATION_RETRIES = 3 // How often worker will clean up old app folders - val CLEANUP_INTERVAL_MILLIS = conf.getLong("spark.worker.cleanup_interval", 60 * 30) * 1000 + val CLEANUP_INTERVAL_MILLIS = conf.getLong("spark.worker.cleanupInterval", 60 * 30) * 1000 // TTL for app folders/data; after TTL expires it will be cleaned up - val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.app_data_ttl", 7 * 24 * 3600) + val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.appDataTTL", 7 * 24 * 3600) // Index into masterUrls that we're currently trying to register with. var masterIndex = 0 @@ -185,19 +185,19 @@ private[spark] class Worker( changeMaster(masterUrl, masterWebUiUrl) context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat) context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis, - CLEANUP_INTERVAL_MILLIS millis, self, Worker.AppDirCleanup) + CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup) case SendHeartbeat => masterLock.synchronized { if (connected) { master ! Heartbeat(workerId) } } - case Worker.AppDirCleanup => + case WorkDirCleanup => // Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor val cleanupFuture = concurrent.future { logInfo("Cleaning up oldest application directories in " + workDir + " ...") Utils.findOldFiles(workDir, APP_DATA_RETENTION_SECS) - .foreach(Utils.deleteRecursively(_)) + .foreach(Utils.deleteRecursively) } cleanupFuture onFailure { case e: Throwable => @@ -350,8 +350,6 @@ private[spark] class Worker( } private[spark] object Worker { - case object AppDirCleanup // Sent to Worker actor periodically for cleaning up app folders - def main(argStrings: Array[String]) { val args = new WorkerArguments(argStrings) val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores, diff --git a/docs/configuration.md b/docs/configuration.md index a8708165a4863..3f49e87b8f6ae 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -587,7 +587,7 @@ Apart from these, the following properties are also available, and may be useful - spark.worker.cleanup_interval + spark.worker.cleanupInterval 1800 (30 minutes) Controls the interval, in seconds, at which the worker cleans up old application work dirs @@ -595,7 +595,7 @@ Apart from these, the following properties are also available, and may be useful - spark.worker.app_data_ttl + spark.worker.appDataTTL 7 * 24 * 3600 (7 days) The number of seconds to retain application work directories on each worker. This is a Time To Live From 9f10d9613a27b5c7ad28681b63f6a0951735f439 Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Wed, 2 Apr 2014 16:55:04 -0700 Subject: [PATCH 10/11] CR from @pwendell - rename configs and add cleanup.enabled --- .../scala/org/apache/spark/deploy/worker/Worker.scala | 11 +++++++---- docs/configuration.md | 11 +++++++++-- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index f67b16238b540..aaf96fbe4618f 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -64,10 +64,11 @@ private[spark] class Worker( val REGISTRATION_TIMEOUT = 20.seconds val REGISTRATION_RETRIES = 3 + val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", true) // How often worker will clean up old app folders - val CLEANUP_INTERVAL_MILLIS = conf.getLong("spark.worker.cleanupInterval", 60 * 30) * 1000 + val CLEANUP_INTERVAL_MILLIS = conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000 // TTL for app folders/data; after TTL expires it will be cleaned up - val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.appDataTTL", 7 * 24 * 3600) + val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600) // Index into masterUrls that we're currently trying to register with. var masterIndex = 0 @@ -184,8 +185,10 @@ private[spark] class Worker( registered = true changeMaster(masterUrl, masterWebUiUrl) context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat) - context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis, - CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup) + if (CLEANUP_ENABLED) { + context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis, + CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup) + } case SendHeartbeat => masterLock.synchronized { diff --git a/docs/configuration.md b/docs/configuration.md index 3f49e87b8f6ae..b907cd312d782 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -587,7 +587,14 @@ Apart from these, the following properties are also available, and may be useful - spark.worker.cleanupInterval + spark.worker.cleanup.enabled + true + + Enable periodic cleanup of worker / application directories + + + + spark.worker.cleanup.interval 1800 (30 minutes) Controls the interval, in seconds, at which the worker cleans up old application work dirs @@ -595,7 +602,7 @@ Apart from these, the following properties are also available, and may be useful - spark.worker.appDataTTL + spark.worker.cleanup.appDataTtl 7 * 24 * 3600 (7 days) The number of seconds to retain application work directories on each worker. This is a Time To Live From 06899953d5d542ef5aa39f7dcc7330b74c7e41ae Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Thu, 3 Apr 2014 15:42:36 -0700 Subject: [PATCH 11/11] CR from @aarondav - move config, clarify for standalone mode --- docs/configuration.md | 50 ++++++++++++++++++++++--------------------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index b907cd312d782..c0bee954bf144 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -333,6 +333,32 @@ Apart from these, the following properties are also available, and may be useful receives no heartbeats. + + spark.worker.cleanup.enabled + true + + Enable periodic cleanup of worker / application directories. Note that this only affects standalone + mode, as YARN works differently. + + + + spark.worker.cleanup.interval + 1800 (30 minutes) + + Controls the interval, in seconds, at which the worker cleans up old application work dirs + on the local machine. + + + + spark.worker.cleanup.appDataTtl + 7 * 24 * 3600 (7 days) + + The number of seconds to retain application work directories on each worker. This is a Time To Live + and should depend on the amount of available disk space you have. Application logs and jars are + downloaded to each application work dir. Over time, the work dirs can quickly fill up disk space, + especially if you run jobs very frequently. + + spark.akka.frameSize 10 @@ -586,30 +612,6 @@ Apart from these, the following properties are also available, and may be useful Number of cores to allocate for each task. - - spark.worker.cleanup.enabled - true - - Enable periodic cleanup of worker / application directories - - - - spark.worker.cleanup.interval - 1800 (30 minutes) - - Controls the interval, in seconds, at which the worker cleans up old application work dirs - on the local machine. - - - - spark.worker.cleanup.appDataTtl - 7 * 24 * 3600 (7 days) - - The number of seconds to retain application work directories on each worker. This is a Time To Live - and should depend on the amount of available disk space you have. Application logs and jars are - downloaded to each application work dir. Over time, the work dirs can quickly fill up disk space, - especially if you run jobs very frequently. - ## Viewing Spark Properties