From dd25697c490e40f644b544c975afff49e107ace6 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 6 Oct 2014 16:26:29 -0700 Subject: [PATCH 1/3] [SPARK-2546] [1.0 / 1.1 backport] Clone JobConf for each task. --- .../org/apache/spark/rdd/HadoopRDD.scala | 25 ++++++------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index c8623314c98e..fd9c4656041b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -132,24 +132,12 @@ class HadoopRDD[K, V]( // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. protected def getJobConf(): JobConf = { val conf: Configuration = broadcastedConf.value.value - if (conf.isInstanceOf[JobConf]) { - // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it. - conf.asInstanceOf[JobConf] - } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { - // getJobConf() has been called previously, so there is already a local cache of the JobConf - // needed by this RDD. - HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] - } else { - // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the - // local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). - // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. - // Synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456). - HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized { - val newJobConf = new JobConf(conf) + HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized { + val newJobConf = new JobConf(conf) + if (!conf.isInstanceOf[JobConf]) { initLocalJobConfFuncOpt.map(f => f(newJobConf)) - HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) - newJobConf } + newJobConf } } @@ -257,7 +245,10 @@ class HadoopRDD[K, V]( } private[spark] object HadoopRDD { - /** Constructing Configuration objects is not threadsafe, use this lock to serialize. */ + /** + * Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456). + * Therefore, we synchronize on this lock before calling new JobConf() or new Configuration(). + */ val CONFIGURATION_INSTANTIATION_LOCK = new Object() /** From b562451f142078321a102fef4f48190acc822e03 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 7 Oct 2014 15:50:54 -0700 Subject: [PATCH 2/3] Remove unused jobConfCacheKey field. --- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index fd9c4656041b..b121f47d16a5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -122,8 +122,6 @@ class HadoopRDD[K, V]( minPartitions) } - protected val jobConfCacheKey = "rdd_%d_job_conf".format(id) - protected val inputFormatCacheKey = "rdd_%d_input_format".format(id) // used to build JobTracker ID From f14f25981f1b922f1a8d07dfd80774a78daec368 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 17 Oct 2014 14:15:08 -0700 Subject: [PATCH 3/3] Add configuration option to control cloning of Hadoop JobConf. --- .../org/apache/spark/rdd/HadoopRDD.scala | 44 ++++++++++++++++--- docs/configuration.md | 9 ++++ 2 files changed, 48 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index b121f47d16a5..e3d6c5fb98b4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -122,20 +122,54 @@ class HadoopRDD[K, V]( minPartitions) } + protected val jobConfCacheKey = "rdd_%d_job_conf".format(id) + protected val inputFormatCacheKey = "rdd_%d_input_format".format(id) // used to build JobTracker ID private val createTime = new Date() + private val shouldCloneJobConf = sc.conf.get("spark.hadoop.cloneConf", "false").toBoolean + // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. protected def getJobConf(): JobConf = { val conf: Configuration = broadcastedConf.value.value - HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized { - val newJobConf = new JobConf(conf) - if (!conf.isInstanceOf[JobConf]) { - initLocalJobConfFuncOpt.map(f => f(newJobConf)) + if (shouldCloneJobConf) { + // Hadoop Configuration objects are not thread-safe, which may lead to various problems if + // one job modifies a configuration while another reads it (SPARK-2546). This problem occurs + // somewhat rarely because most jobs treat the configuration as though it's immutable. One + // solution, implemented here, is to clone the Configuration object. Unfortunately, this + // clone can be very expensive. To avoid unexpected performance regressions for workloads and + // Hadoop versions that do not suffer from these thread-safety issues, this cloning is + // disabled by default. + HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized { + logDebug("Cloning Hadoop Configuration") + val newJobConf = new JobConf(conf) + if (!conf.isInstanceOf[JobConf]) { + initLocalJobConfFuncOpt.map(f => f(newJobConf)) + } + newJobConf + } + } else { + if (conf.isInstanceOf[JobConf]) { + logDebug("Re-using user-broadcasted JobConf") + conf.asInstanceOf[JobConf] + } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { + logDebug("Re-using cached JobConf") + HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] + } else { + // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the + // local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). + // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. + // Synchronize to prevent ConcurrentModificationException (SPARK-1097, HADOOP-10456). + HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized { + logDebug("Creating new JobConf and caching it for later re-use") + val newJobConf = new JobConf(conf) + initLocalJobConfFuncOpt.map(f => f(newJobConf)) + HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) + newJobConf + } } - newJobConf } } diff --git a/docs/configuration.md b/docs/configuration.md index 3b5751a9820e..6cce50818c52 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -590,6 +590,15 @@ Apart from these, the following properties are also available, and may be useful output directories. We recommend that users do not disable this except if trying to achieve compatibility with previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand. + + spark.hadoop.cloneConf + false + If set to true, clones a new Hadoop Configuration object for each task. This + option should be enabled to work around Configuration thread-safety issues (see + SPARK-2546 for more details). + This is disabled by default in order to avoid unexpected performance regressions for jobs that + are not affected by these issues. + spark.executor.heartbeatInterval 10000