Skip to content

Commit bb870e7

Browse files
jerryshaotdas
authored andcommitted
[SPARK-5523] [CORE] [STREAMING] Add a cache for hostname in TaskMetrics to decrease the memory usage and GC overhead
Hostname in TaskMetrics will be created through deserialization, mostly the number of hostname is only the order of number of cluster node, so adding a cache layer to dedup the object could reduce the memory usage and alleviate GC overhead, especially for long-running and fast job generation applications like Spark Streaming. Author: jerryshao <[email protected]> Author: Saisai Shao <[email protected]> Closes apache#5064 from jerryshao/SPARK-5523 and squashes the following commits: 3e2412a [jerryshao] Address the comments b092a81 [Saisai Shao] Add a pool to cache the hostname
1 parent f957796 commit bb870e7

File tree

1 file changed

+20
-0
lines changed

1 file changed

+20
-0
lines changed

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,15 @@
1717

1818
package org.apache.spark.executor
1919

20+
import java.io.{IOException, ObjectInputStream}
21+
import java.util.concurrent.ConcurrentHashMap
22+
2023
import scala.collection.mutable.ArrayBuffer
2124

2225
import org.apache.spark.annotation.DeveloperApi
2326
import org.apache.spark.executor.DataReadMethod.DataReadMethod
2427
import org.apache.spark.storage.{BlockId, BlockStatus}
28+
import org.apache.spark.util.Utils
2529

2630
/**
2731
* :: DeveloperApi ::
@@ -210,10 +214,26 @@ class TaskMetrics extends Serializable {
210214
private[spark] def updateInputMetrics(): Unit = synchronized {
211215
inputMetrics.foreach(_.updateBytesRead())
212216
}
217+
218+
@throws(classOf[IOException])
219+
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
220+
in.defaultReadObject()
221+
// Get the hostname from cached data, since hostname is the order of number of nodes in
222+
// cluster, so using cached hostname will decrease the object number and alleviate the GC
223+
// overhead.
224+
_hostname = TaskMetrics.getCachedHostName(_hostname)
225+
}
213226
}
214227

215228
private[spark] object TaskMetrics {
229+
private val hostNameCache = new ConcurrentHashMap[String, String]()
230+
216231
def empty: TaskMetrics = new TaskMetrics
232+
233+
def getCachedHostName(host: String): String = {
234+
val canonicalHost = hostNameCache.putIfAbsent(host, host)
235+
if (canonicalHost != null) canonicalHost else host
236+
}
217237
}
218238

219239
/**

0 commit comments

Comments
 (0)