From fc1b6a0169c123a825a253defb021c73aebf1c98 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Sat, 24 Feb 2018 18:13:01 +0800 Subject: [PATCH 01/11] Use timeStampedHashMap for BlockmanagerId in case blockManagerIdCache cause oom --- .../apache/spark/storage/BlockManagerId.scala | 16 +++++++++++----- .../org/apache/spark/util/JsonProtocol.scala | 2 +- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index 2c3da0ee85e06..7ef3a301a982a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentHashMap import org.apache.spark.SparkContext import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.util.Utils +import org.apache.spark.util.{TimeStampedHashMap, Utils} /** * :: DeveloperApi :: @@ -123,7 +123,8 @@ private[spark] object BlockManagerId { execId: String, host: String, port: Int, - topologyInfo: Option[String] = None): BlockManagerId = + topologyInfo: Option[String] = None, + clearOldValues: Boolean = false): BlockManagerId = getCachedBlockManagerId(new BlockManagerId(execId, host, port, topologyInfo)) def apply(in: ObjectInput): BlockManagerId = { @@ -132,10 +133,15 @@ private[spark] object BlockManagerId { getCachedBlockManagerId(obj) } - val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]() + val blockManagerIdCache = new TimeStampedHashMap[BlockManagerId, BlockManagerId](true) - def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = { + def getCachedBlockManagerId(id: BlockManagerId, clearOldValues: Boolean = false): BlockManagerId = + { blockManagerIdCache.putIfAbsent(id, id) - blockManagerIdCache.get(id) + val blockManagerId = blockManagerIdCache.get(id) + if (clearOldValues) { + blockManagerIdCache.clearOldValues(System.currentTimeMillis - Utils.timeStringAsMs("10d")) + } + blockManagerId.get } } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index ff83301d631c4..0f54a6895cfc7 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -946,7 +946,7 @@ private[spark] object JsonProtocol { val executorId = (json \ "Executor ID").extract[String].intern() val host = (json \ "Host").extract[String].intern() val port = (json \ "Port").extract[Int] - BlockManagerId(executorId, host, port) + BlockManagerId(executorId, host, port, clearOldValues = true) } private object JOB_RESULT_FORMATTED_CLASS_NAMES { From 59f34e1251af0dab0d4b72365b760e5d2436c861 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Mon, 26 Feb 2018 16:20:51 +0800 Subject: [PATCH 02/11] Revert "Use timeStampedHashMap for BlockmanagerId in case blockManagerIdCache cause oom" This reverts commit fc1b6a0169c123a825a253defb021c73aebf1c98. --- .../apache/spark/storage/BlockManagerId.scala | 16 +++++----------- .../org/apache/spark/util/JsonProtocol.scala | 2 +- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index 7ef3a301a982a..2c3da0ee85e06 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentHashMap import org.apache.spark.SparkContext import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.util.{TimeStampedHashMap, Utils} +import org.apache.spark.util.Utils /** * :: DeveloperApi :: @@ -123,8 +123,7 @@ private[spark] object BlockManagerId { execId: String, host: String, port: Int, - topologyInfo: Option[String] = None, - clearOldValues: Boolean = false): BlockManagerId = + topologyInfo: Option[String] = None): BlockManagerId = getCachedBlockManagerId(new BlockManagerId(execId, host, port, topologyInfo)) def apply(in: ObjectInput): BlockManagerId = { @@ -133,15 +132,10 @@ private[spark] object BlockManagerId { getCachedBlockManagerId(obj) } - val blockManagerIdCache = new TimeStampedHashMap[BlockManagerId, BlockManagerId](true) + val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]() - def getCachedBlockManagerId(id: BlockManagerId, clearOldValues: Boolean = false): BlockManagerId = - { + def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = { blockManagerIdCache.putIfAbsent(id, id) - val blockManagerId = blockManagerIdCache.get(id) - if (clearOldValues) { - blockManagerIdCache.clearOldValues(System.currentTimeMillis - Utils.timeStringAsMs("10d")) - } - blockManagerId.get + blockManagerIdCache.get(id) } } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 0f54a6895cfc7..ff83301d631c4 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -946,7 +946,7 @@ private[spark] object JsonProtocol { val executorId = (json \ "Executor ID").extract[String].intern() val host = (json \ "Host").extract[String].intern() val port = (json \ "Port").extract[Int] - BlockManagerId(executorId, host, port, clearOldValues = true) + BlockManagerId(executorId, host, port) } private object JOB_RESULT_FORMATTED_CLASS_NAMES { From e5e3effed5e65156fd6ef35160a96f08a6c00ec0 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Mon, 26 Feb 2018 16:37:10 +0800 Subject: [PATCH 03/11] Use softreference instead --- .../apache/spark/storage/BlockManagerId.scala | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index 2c3da0ee85e06..8ccd234de1549 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -18,6 +18,7 @@ package org.apache.spark.storage import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} +import java.lang.ref.{Reference, SoftReference, WeakReference} import java.util.concurrent.ConcurrentHashMap import org.apache.spark.SparkContext @@ -132,10 +133,21 @@ private[spark] object BlockManagerId { getCachedBlockManagerId(obj) } - val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]() + /** + * We use SoftReference in case blockManagerIdCache will not remove old values + * and which may cause out of memory issue.(SPARK-23508) + */ + val blockManagerIdCache = new ConcurrentHashMap[SoftReference[BlockManagerId], + SoftReference[BlockManagerId]]() def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = { - blockManagerIdCache.putIfAbsent(id, id) - blockManagerIdCache.get(id) + blockManagerIdCache.putIfAbsent(new SoftReference[BlockManagerId](id), + new SoftReference[BlockManagerId](id)) + val blockManagerIdRef = blockManagerIdCache.get(id) + if (blockManagerIdRef.get equals null) { + id + } else { + blockManagerIdRef.get + } } } From c741b58d74f0f5e479982c4756c93d77641c2444 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Mon, 26 Feb 2018 16:38:26 +0800 Subject: [PATCH 04/11] Remove unused import --- .../main/scala/org/apache/spark/storage/BlockManagerId.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index 8ccd234de1549..47d76c364084c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -18,7 +18,7 @@ package org.apache.spark.storage import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} -import java.lang.ref.{Reference, SoftReference, WeakReference} +import java.lang.ref.SoftReference import java.util.concurrent.ConcurrentHashMap import org.apache.spark.SparkContext From 36d66e1b426d1990ac9f2c3c3260051ed3048920 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Mon, 26 Feb 2018 16:54:35 +0800 Subject: [PATCH 05/11] Revert "Remove unused import" This reverts commit c741b58d74f0f5e479982c4756c93d77641c2444. --- .../main/scala/org/apache/spark/storage/BlockManagerId.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index 47d76c364084c..8ccd234de1549 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -18,7 +18,7 @@ package org.apache.spark.storage import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} -import java.lang.ref.SoftReference +import java.lang.ref.{Reference, SoftReference, WeakReference} import java.util.concurrent.ConcurrentHashMap import org.apache.spark.SparkContext From e5b793da28915af57d432338f151c9f48ded4df0 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Mon, 26 Feb 2018 16:54:46 +0800 Subject: [PATCH 06/11] Revert "Use softreference instead" This reverts commit e5e3effed5e65156fd6ef35160a96f08a6c00ec0. --- .../apache/spark/storage/BlockManagerId.scala | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index 8ccd234de1549..2c3da0ee85e06 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -18,7 +18,6 @@ package org.apache.spark.storage import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} -import java.lang.ref.{Reference, SoftReference, WeakReference} import java.util.concurrent.ConcurrentHashMap import org.apache.spark.SparkContext @@ -133,21 +132,10 @@ private[spark] object BlockManagerId { getCachedBlockManagerId(obj) } - /** - * We use SoftReference in case blockManagerIdCache will not remove old values - * and which may cause out of memory issue.(SPARK-23508) - */ - val blockManagerIdCache = new ConcurrentHashMap[SoftReference[BlockManagerId], - SoftReference[BlockManagerId]]() + val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]() def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = { - blockManagerIdCache.putIfAbsent(new SoftReference[BlockManagerId](id), - new SoftReference[BlockManagerId](id)) - val blockManagerIdRef = blockManagerIdCache.get(id) - if (blockManagerIdRef.get equals null) { - id - } else { - blockManagerIdRef.get - } + blockManagerIdCache.putIfAbsent(id, id) + blockManagerIdCache.get(id) } } From 8f3cd47fc705890d2b69197c4f4e1a4cc9e9a69a Mon Sep 17 00:00:00 2001 From: zhoukang Date: Mon, 26 Feb 2018 17:12:43 +0800 Subject: [PATCH 07/11] Use guava cache instead --- .../apache/spark/internal/config/package.scala | 5 +++++ .../org/apache/spark/storage/BlockManagerId.scala | 15 +++++++++++---- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index bbfcfbaa7363c..4e0fc52359cdd 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -285,6 +285,11 @@ package object config { .doc("Address where to bind network listen sockets on the driver.") .fallbackConf(DRIVER_HOST_ADDRESS) + private[spark] val BLOCK_MANAGER_ID_CACHE_SIZE = ConfigBuilder("spark.blockManagerId.cache.size") + .doc("Max size for blockManagerCache of BlockManagerId.") + .intConf + .createWithDefault(100) + private[spark] val BLOCK_MANAGER_PORT = ConfigBuilder("spark.blockManager.port") .doc("Port to use for the block manager when a more specific setting is not provided.") .intConf diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index 2c3da0ee85e06..d458634c904f5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -18,10 +18,12 @@ package org.apache.spark.storage import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} -import java.util.concurrent.ConcurrentHashMap -import org.apache.spark.SparkContext +import com.google.common.cache.{CacheBuilder, CacheLoader} + +import org.apache.spark.{SparkContext, SparkEnv} import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.internal.config._ import org.apache.spark.util.Utils /** @@ -132,10 +134,15 @@ private[spark] object BlockManagerId { getCachedBlockManagerId(obj) } - val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]() + val blockManagerIdCache = CacheBuilder.newBuilder() + .maximumSize(SparkEnv.get.conf.get(BLOCK_MANAGER_ID_CACHE_SIZE)) + .build(new CacheLoader[BlockManagerId, BlockManagerId]() { + override def load(id: BlockManagerId) = { + id + } + }) def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = { - blockManagerIdCache.putIfAbsent(id, id) blockManagerIdCache.get(id) } } From dc522fc8cb6d5eda3147dbe473b8080379a239ea Mon Sep 17 00:00:00 2001 From: zhoukang Date: Tue, 27 Feb 2018 13:08:54 +0800 Subject: [PATCH 08/11] Update --- .../scala/org/apache/spark/internal/config/package.scala | 5 ----- .../main/scala/org/apache/spark/storage/BlockManagerId.scala | 5 ++--- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 4e0fc52359cdd..bbfcfbaa7363c 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -285,11 +285,6 @@ package object config { .doc("Address where to bind network listen sockets on the driver.") .fallbackConf(DRIVER_HOST_ADDRESS) - private[spark] val BLOCK_MANAGER_ID_CACHE_SIZE = ConfigBuilder("spark.blockManagerId.cache.size") - .doc("Max size for blockManagerCache of BlockManagerId.") - .intConf - .createWithDefault(100) - private[spark] val BLOCK_MANAGER_PORT = ConfigBuilder("spark.blockManager.port") .doc("Port to use for the block manager when a more specific setting is not provided.") .intConf diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index d458634c904f5..42ecd5c57ca3e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -21,9 +21,8 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} import com.google.common.cache.{CacheBuilder, CacheLoader} -import org.apache.spark.{SparkContext, SparkEnv} +import org.apache.spark.SparkContext import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.internal.config._ import org.apache.spark.util.Utils /** @@ -135,7 +134,7 @@ private[spark] object BlockManagerId { } val blockManagerIdCache = CacheBuilder.newBuilder() - .maximumSize(SparkEnv.get.conf.get(BLOCK_MANAGER_ID_CACHE_SIZE)) + .maximumSize(500) .build(new CacheLoader[BlockManagerId, BlockManagerId]() { override def load(id: BlockManagerId) = { id From b28985b9f2d941d6be1beeeacf2ee21452f46b6a Mon Sep 17 00:00:00 2001 From: zhoukang Date: Tue, 27 Feb 2018 17:41:31 +0800 Subject: [PATCH 09/11] Refine comment --- .../main/scala/org/apache/spark/storage/BlockManagerId.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index 42ecd5c57ca3e..e55accb2806c6 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -136,9 +136,7 @@ private[spark] object BlockManagerId { val blockManagerIdCache = CacheBuilder.newBuilder() .maximumSize(500) .build(new CacheLoader[BlockManagerId, BlockManagerId]() { - override def load(id: BlockManagerId) = { - id - } + override def load(id: BlockManagerId) = id }) def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = { From 337989945b0757dfc6a069315c4e7828afe77d00 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Wed, 28 Feb 2018 09:33:11 +0800 Subject: [PATCH 10/11] Update documentation --- .../scala/org/apache/spark/storage/BlockManagerId.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index e55accb2806c6..514c4b9c6a673 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -133,8 +133,12 @@ private[spark] object BlockManagerId { getCachedBlockManagerId(obj) } + /** + * Here we set max cache size as 10000.Since the size of a BlockManagerId object + * is about 48B,so the max memory this cache cost will be about 1MB which is feasible. + */ val blockManagerIdCache = CacheBuilder.newBuilder() - .maximumSize(500) + .maximumSize(10000) .build(new CacheLoader[BlockManagerId, BlockManagerId]() { override def load(id: BlockManagerId) = id }) From bf79f4d5c83c364c7f1fc05f158753d282409330 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Wed, 28 Feb 2018 11:47:00 +0800 Subject: [PATCH 11/11] Refine comment --- .../main/scala/org/apache/spark/storage/BlockManagerId.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index 514c4b9c6a673..d4a59c33b974c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -134,8 +134,8 @@ private[spark] object BlockManagerId { } /** - * Here we set max cache size as 10000.Since the size of a BlockManagerId object - * is about 48B,so the max memory this cache cost will be about 1MB which is feasible. + * The max cache size is hardcoded to 10000, since the size of a BlockManagerId + * object is about 48B, the total memory cost should be below 1MB which is feasible. */ val blockManagerIdCache = CacheBuilder.newBuilder() .maximumSize(10000)