Skip to content

Commit eed3390

Browse files
author
Andrew Or
committed
Merge branch 'master' of github.com:apache/spark into streaming-closure-cleaner
Conflicts: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
2 parents 67eeff4 + 49549d5 commit eed3390

File tree

23 files changed

+1207
-131
lines changed

23 files changed

+1207
-131
lines changed

core/src/main/scala/org/apache/spark/Aggregator.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,7 @@ case class Aggregator[K, V, C] (
8888
combiners.iterator
8989
} else {
9090
val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
91-
while (iter.hasNext) {
92-
val pair = iter.next()
93-
combiners.insert(pair._1, pair._2)
94-
}
91+
combiners.insertAll(iter)
9592
// Update task metrics if context is not null
9693
// TODO: Make context non-optional in a future release
9794
Option(context).foreach { c =>

core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala

Lines changed: 250 additions & 55 deletions
Large diffs are not rendered by default.

core/src/main/scala/org/apache/spark/util/SizeEstimator.scala

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ private[spark] object SizeEstimator extends Logging {
4747
private val FLOAT_SIZE = 4
4848
private val DOUBLE_SIZE = 8
4949

50+
// Fields can be primitive types, sizes are: 1, 2, 4, 8. Or fields can be pointers. The size of
51+
// a pointer is 4 or 8 depending on the JVM (32-bit or 64-bit) and UseCompressedOops flag.
52+
// The sizes should be in descending order, as we will use that information for fields placement.
53+
private val fieldSizes = List(8, 4, 2, 1)
54+
5055
// Alignment boundary for objects
5156
// TODO: Is this arch dependent ?
5257
private val ALIGN_SIZE = 8
@@ -171,7 +176,7 @@ private[spark] object SizeEstimator extends Logging {
171176
// general all ClassLoaders and Classes will be shared between objects anyway.
172177
} else {
173178
val classInfo = getClassInfo(cls)
174-
state.size += classInfo.shellSize
179+
state.size += alignSize(classInfo.shellSize)
175180
for (field <- classInfo.pointerFields) {
176181
state.enqueue(field.get(obj))
177182
}
@@ -237,8 +242,8 @@ private[spark] object SizeEstimator extends Logging {
237242
}
238243
size
239244
}
240-
241-
private def primitiveSize(cls: Class[_]): Long = {
245+
246+
private def primitiveSize(cls: Class[_]): Int = {
242247
if (cls == classOf[Byte]) {
243248
BYTE_SIZE
244249
} else if (cls == classOf[Boolean]) {
@@ -274,30 +279,66 @@ private[spark] object SizeEstimator extends Logging {
274279
val parent = getClassInfo(cls.getSuperclass)
275280
var shellSize = parent.shellSize
276281
var pointerFields = parent.pointerFields
282+
val sizeCount = Array.fill(fieldSizes.max + 1)(0)
277283

284+
// iterate through the fields of this class and gather information.
278285
for (field <- cls.getDeclaredFields) {
279286
if (!Modifier.isStatic(field.getModifiers)) {
280287
val fieldClass = field.getType
281288
if (fieldClass.isPrimitive) {
282-
shellSize += primitiveSize(fieldClass)
289+
sizeCount(primitiveSize(fieldClass)) += 1
283290
} else {
284291
field.setAccessible(true) // Enable future get()'s on this field
285-
shellSize += pointerSize
292+
sizeCount(pointerSize) += 1
286293
pointerFields = field :: pointerFields
287294
}
288295
}
289296
}
290297

291-
shellSize = alignSize(shellSize)
298+
// Based on the simulated field layout code in Aleksey Shipilev's report:
299+
// http://cr.openjdk.java.net/~shade/papers/2013-shipilev-fieldlayout-latest.pdf
300+
// The code is in Figure 9.
301+
// The simplified idea of field layout consists of 4 parts (see more details in the report):
302+
//
303+
// 1. field alignment: HotSpot lays out the fields aligned by their size.
304+
// 2. object alignment: HotSpot rounds instance size up to 8 bytes
305+
// 3. consistent fields layouts throughout the hierarchy: This means we should layout
306+
// superclass first. And we can use superclass's shellSize as a starting point to layout the
307+
// other fields in this class.
308+
// 4. class alignment: HotSpot rounds field blocks up to to HeapOopSize not 4 bytes, confirmed
309+
// with Aleksey. see https://bugs.openjdk.java.net/browse/CODETOOLS-7901322
310+
//
311+
// The real world field layout is much more complicated. There are three kinds of fields
312+
// order in Java 8. And we don't consider the @contended annotation introduced by Java 8.
313+
// see the HotSpot classloader code, layout_fields method for more details.
314+
// hg.openjdk.java.net/jdk8/jdk8/hotspot/file/tip/src/share/vm/classfile/classFileParser.cpp
315+
var alignedSize = shellSize
316+
for (size <- fieldSizes if sizeCount(size) > 0) {
317+
val count = sizeCount(size)
318+
// If there are internal gaps, smaller field can fit in.
319+
alignedSize = math.max(alignedSize, alignSizeUp(shellSize, size) + size * count)
320+
shellSize += size * count
321+
}
322+
323+
// Should choose a larger size to be new shellSize and clearly alignedSize >= shellSize, and
324+
// round up the instance filed blocks
325+
shellSize = alignSizeUp(alignedSize, pointerSize)
292326

293327
// Create and cache a new ClassInfo
294328
val newInfo = new ClassInfo(shellSize, pointerFields)
295329
classInfos.put(cls, newInfo)
296330
newInfo
297331
}
298332

299-
private def alignSize(size: Long): Long = {
300-
val rem = size % ALIGN_SIZE
301-
if (rem == 0) size else (size + ALIGN_SIZE - rem)
302-
}
333+
private def alignSize(size: Long): Long = alignSizeUp(size, ALIGN_SIZE)
334+
335+
/**
336+
* Compute aligned size. The alignSize must be 2^n, otherwise the result will be wrong.
337+
* When alignSize = 2^n, alignSize - 1 = 2^n - 1. The binary representation of (alignSize - 1)
338+
* will only have n trailing 1s(0b00...001..1). ~(alignSize - 1) will be 0b11..110..0. Hence,
339+
* (size + alignSize - 1) & ~(alignSize - 1) will set the last n bits to zeros, which leads to
340+
* multiple of alignSize.
341+
*/
342+
private def alignSizeUp(size: Long, alignSize: Int): Long =
343+
(size + alignSize - 1) & ~(alignSize - 1)
303344
}

core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class ClosureCleanerSuite extends FunSuite {
5050
val obj = new TestClassWithNesting(1)
5151
assert(obj.run() === 96) // 4 * (1+2+3+4) + 4 * (1+2+3+4) + 16 * 1
5252
}
53-
53+
5454
test("toplevel return statements in closures are identified at cleaning time") {
5555
val ex = intercept[SparkException] {
5656
TestObjectWithBogusReturns.run()
@@ -61,7 +61,7 @@ class ClosureCleanerSuite extends FunSuite {
6161

6262
test("return statements from named functions nested in closures don't raise exceptions") {
6363
val result = TestObjectWithNestedReturns.run()
64-
assert(result == 1)
64+
assert(result === 1)
6565
}
6666

6767
test("should clean only closures") {
@@ -79,7 +79,14 @@ class ClosureCleanerSuite extends FunSuite {
7979

8080
// A non-serializable class we create in closures to make sure that we aren't
8181
// keeping references to unneeded variables from our outer closures.
82-
class NonSerializable {}
82+
class NonSerializable(val id: Int = -1) {
83+
override def equals(other: Any): Boolean = {
84+
other match {
85+
case o: NonSerializable => id == o.id
86+
case _ => false
87+
}
88+
}
89+
}
8390

8491
object TestObject {
8592
def run(): Int = {

0 commit comments

Comments
 (0)