Skip to content

Conversation

@a-roberts
Copy link
Contributor

What changes were proposed in this pull request?

Several improvements to the SizeEstimator for performance, most of the benefit comes from, when estimating, contending to not contending on multiple threads. There can be a small boost in uncontended scenarios from the removal of the synchronisation code but the cost of that synchronisation when not truly contended is low. On the PageRank workload for HiBench we see 10-15% performance improvements (measuring elapsed times on average) with both IBM's SDK for Java and OpenJDK 8. I don't see any changes other than noise for the other workloads on this benchmark.

How was this patch tested?

Existing unit tests but there are problems to resolve.

I see SizeEstimatorSuite and SizeTrackerSuite failing with at least IBM Java now due to smaller sizes being reported than the test expects (let's see what happens with OpenJDK on the community runs).

In SizeTrackerSuite I think the failures are caused by using ThreadLocalRandom and not Random - because with Random we see these tests passing again. Not sure how robust SizeTrackerSuite is though.

For performance testing I've used HiBench, large profile, with one executor ranging from 10g to 25g, experimenting with fixed and dynamic heaps. The Spark code I've based my results on is from December the 1st (master branch, so 2.1.0 snapshot).

More details on the optimisations (this being phase one and JDK agnostic) at www.spark.tc/improvements-to-the-sizeestimator-class

Several improvements to the SizeEstimator for performance, most of the benefit comes from, when estimating, contending to not contending on multiple threads. There can be a small boost in uncontended scenarios from the removal of the synchronisation code but the cost of that synchronisation when not truly contended is low. On the PageRank workload for HiBench we see 49~ second durations reduced to ~41 second durations. I don't see any changes for other workloads. Observed with both IBM's SDK for Java and OpenJDK.
}
pointerSize = if (is64bit && !isCompressedOops) 8 else 4
classInfos.clear()
classInfos.put(classOf[Object], new ClassInfo(objectSize, Nil))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should preserve behavior here.
Apply the same on classInfos.get()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to clear() because we're using ThreadLocal WeakHashMaps so the initialisation will occur once per thread, we put the new value in and we assume the size won't be changing which is going to be the case except for running in some debug modes*. As they're in a ThreadLocal WeakHashMap the classes can still be unloaded if no longer used.

  • I'm referring to JDK debug modes such as fullspeeddebug where we can change class layouts at runtime. Normal execution modes prohibit changes to the class layout during execution without the class being unloaded and reloaded which will trigger the map entry to clear and be recreated due to its weakness (so in normal cases the map will always be correct)

Copy link
Contributor

@mridulm mridulm Dec 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The map is expected to contain class info from the current execution; not from past runs (which might or might not be relevant - increasing the map size).
We would never have cleared it if it was not required.

override def initialValue(): java.util.WeakHashMap[Class[_], ClassInfo] = {
val toReturn = new WeakHashMap[Class[_], ClassInfo]()
toReturn.put(classOf[Object], new ClassInfo(objectSize, new Array[Int](0)))
return toReturn
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not keep the returned value same as before ?
And move the initialization back into initialize() - so that use of classInfos Map across threads wont happen.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Built and profiled, averaging 42 sec run times with the initial commit, averaging 45 second run times with this. No changes = 48 sec.

My code as a diff (so using a ConcurrentHashMap and var not val so we can initialise it later) provided here:

 import java.lang.management.ManagementFactory
 import java.lang.reflect.{Field, Modifier}
 import java.util.{IdentityHashMap, WeakHashMap}
-import java.util.concurrent.ThreadLocalRandom
+import java.util.concurrent.{ThreadLocalRandom, ConcurrentMap}

 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.util.Unsafe
@@ -88,16 +88,6 @@ object SizeEstimator extends Logging {
   // TODO: Is this arch dependent ?
   private val ALIGN_SIZE = 8

-  // A cache of ClassInfo objects for each class
-  // We use weakKeys to allow GC of dynamically created classes
-  private val classInfos = new ThreadLocal[WeakHashMap[Class[_], ClassInfo]] {
-    override def initialValue(): java.util.WeakHashMap[Class[_], ClassInfo] = {
-      val toReturn = new WeakHashMap[Class[_], ClassInfo]()
-      toReturn.put(classOf[Object], new ClassInfo(objectSize, new Array[Int](0)))
-      return toReturn
-    }
-  }
-
   // Object and pointer sizes are arch dependent
   private var is64bit = false

@@ -109,6 +99,8 @@ object SizeEstimator extends Logging {
   // Minimum size of a java.lang.Object
   private var objectSize = 8

+  private var classInfos: ConcurrentMap[Class[_], ClassInfo] = null
+
   initialize()

   // Sets object size, pointer size based on architecture and CompressedOops settings
@@ -126,6 +118,9 @@ object SizeEstimator extends Logging {
       }
     }
     pointerSize = if (is64bit && !isCompressedOops) 8 else 4
+
+    classInfos = new MapMaker().weakKeys().makeMap[Class[_], ClassInfo]()
+    classInfos.put(classOf[Object], new ClassInfo(objectSize, new Array[Int](0)))
   }

   private def getIsCompressedOops: Boolean = {
@@ -338,7 +333,7 @@ object SizeEstimator extends Logging {
    */
   private def getClassInfo(cls: Class[_]): ClassInfo = {
     // Check whether we've already cached a ClassInfo for this class
-    val info = classInfos.get().get(cls)
+    val info = classInfos.get(cls)
     if (info != null) {
       return info
     }
@@ -371,7 +366,7 @@ object SizeEstimator extends Logging {

     // Create and cache a new ClassInfo
     val newInfo = new ClassInfo(shellSize, fieldOffsets.toArray)
-    classInfos.get().put(cls, newInfo)
+    classInfos.put(cls, newInfo)
     newInfo
   }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant was, continue to use ThreadLocal, but maintain the MapMaker's result for thlocal.get()

And move the initilization to initialize() instead of in initialValue()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks will give this a try

val s1 = sampleArray(objArray, state, rand, drawn, length)
val s2 = sampleArray(objArray, state, rand, drawn, length)
val size = math.min(s1, s2)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes to this method are excellent and should speed things up !

// avoid the use of an iterator derrived from the range syntax here for performance
var count = 0
val end = ARRAY_SAMPLE_SIZE
while (count <= end) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

< end for until semantics

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, should be just < not <=, will add into the next commit

// Create and cache a new ClassInfo
val newInfo = new ClassInfo(shellSize, pointerFields)
classInfos.put(cls, newInfo)
val newInfo = new ClassInfo(shellSize, fieldOffsets.toArray)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are loosing out on padding due to allignment here which the earlier code was computing. No ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will look into this and determine if the padding is needed

@SparkQA
Copy link

SparkQA commented Dec 7, 2016

Test build #69795 has finished for PR 16196 at commit 50af8fc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm kind of concerned that this is changing a lot, some of which isn't obviously without problems or risk, for marginal gains. I'd prefer to stick to obviously correct wins

array: Array[AnyRef],
state: SearchState,
rand: Random,
rand: ThreadLocalRandom,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this has to change

// We use weakKeys to allow GC of dynamically created classes
private val classInfos = new MapMaker().weakKeys().makeMap[Class[_], ClassInfo]()
private val classInfos = new ThreadLocal[WeakHashMap[Class[_], ClassInfo]] {
override def initialValue(): java.util.WeakHashMap[Class[_], ClassInfo] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: remove java.util, and 'return' below. "map" is better than "toReturn"

This is going to expand the memory footprint, because redundant copies of this info will be maintained per thread. Is the contention that significant?

val fieldCount = classInfo.fieldOffsets.length
val us = Unsafe.instance
while (index < fieldCount) {
state.enqueue(us.getObject(obj, classInfo.fieldOffsets(index).toLong))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand avoiding reflection, but this is a dicier way to access fields of an object. I don't have a specific reason this would fail but the fact that it uses unsafe is riskier. Is this worth it?

for (i <- 0 until ARRAY_SAMPLE_SIZE) {
// avoid the use of an iterator derrived from the range syntax here for performance
var count = 0
val end = ARRAY_SAMPLE_SIZE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

end is redundant here

val fieldClass = field.getType
if (fieldClass.isPrimitive) {
sizeCount(primitiveSize(fieldClass)) += 1
if (cls == classOf[Double] || cls == classOf[Long]) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and the logic changes below aren't obviously OK. this seems to lose a lot of logic. I think this has to be explained or backed out

@rxin
Copy link
Contributor

rxin commented Dec 8, 2016

+1 on @srowen's suggestion. This change is not surgical at all. It is going to be difficult to guarantee no behavior change. If anything I'd favor correctness over performance here.

@srowen
Copy link
Member

srowen commented Dec 11, 2016

Ping @a-roberts -- I think some sections of this are clearly a win, like near #16196 (comment) but maybe best to back out anything controversial. And touch up the style. Then I think this could be ready.

@a-roberts
Copy link
Contributor Author

Agreed, I'll be back working on this and answering the queries after the 2.1.0 release vote passes, that's my current priority as we're nearing the Christmas break period

@srowen
Copy link
Member

srowen commented Dec 30, 2016

Ping to keep this on the radar; these couple PRs have been open a long time

@a-roberts
Copy link
Contributor Author

It's on my to-do list, working on this once I'm back from an end of year break, can I get a list of concerns here please? I'll run with them and I think one concern is that we'll underestimate and this'll lead to insufficient memory problems at runtime

Once we figure this out I can add the scenario(s) above as unit tests to ensure any changes here are entirely correct

@srowen
Copy link
Member

srowen commented Jan 3, 2017

See the discussion above; I think the request is to back out much of this and leave only the clearly correct improvements like #16196 (comment) There are still a number of small comments and questions outstanding, which you can go back and browse.

@srowen
Copy link
Member

srowen commented Jan 9, 2017

Ping @a-roberts ; let's close this if not going to proceed, though at least part of it looks like a clear win.

@a-roberts
Copy link
Contributor Author

We can close it for now and I'll reopen it once the changes are more conservative and I've done plenty of testing/profiling - or anybody else could do so by keeping in only the changes we deem as safe, unfortunately I have too much other Spark work on at the moment to give this PR the attention it deserves

srowen added a commit to srowen/spark that referenced this pull request Feb 2, 2017
@srowen srowen mentioned this pull request Feb 2, 2017
@asfgit asfgit closed this in 20b4ca1 Feb 3, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants