Skip to content

Commit b092a81

Browse files
committed
Add a pool to cache the hostname
1 parent 5d603df commit b092a81

File tree

1 file changed

+20
-0
lines changed

1 file changed

+20
-0
lines changed

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,15 @@
1717

1818
package org.apache.spark.executor
1919

20+
import java.io.{IOException, ObjectInputStream}
21+
import java.util.concurrent.ConcurrentHashMap
22+
2023
import scala.collection.mutable.ArrayBuffer
2124

2225
import org.apache.spark.annotation.DeveloperApi
2326
import org.apache.spark.executor.DataReadMethod.DataReadMethod
2427
import org.apache.spark.storage.{BlockId, BlockStatus}
28+
import org.apache.spark.util.Utils
2529

2630
/**
2731
* :: DeveloperApi ::
@@ -210,10 +214,26 @@ class TaskMetrics extends Serializable {
210214
private[spark] def updateInputMetrics(): Unit = synchronized {
211215
inputMetrics.foreach(_.updateBytesRead())
212216
}
217+
218+
@throws(classOf[IOException])
219+
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
220+
in.defaultReadObject()
221+
// Get the hostname from cached data, since hostname is the order of number of nodes in
222+
// cluster, so using cached hostname will decrease the object number and alleviate the GC
223+
// overhead.
224+
_hostname = TaskMetrics.getCachedHostName(_hostname)
225+
}
213226
}
214227

215228
private[spark] object TaskMetrics {
229+
private val hostNameCache = new ConcurrentHashMap[String, String]()
230+
216231
def empty: TaskMetrics = new TaskMetrics
232+
233+
def getCachedHostName(host: String): String = {
234+
hostNameCache.putIfAbsent(host, host)
235+
hostNameCache.get(host)
236+
}
217237
}
218238

219239
/**

0 commit comments

Comments
 (0)