Skip to content

Commit c12f093

Browse files
committed
Add SizeTrackingAppendOnlyBuffer and tests
This buffer is supported by an underlying PrimitiveVector. It is to be used for unrolling partitions, so we can efficiently check the size of the in-memory buffer periodically. Note that the underlying buffer cannot be an existing implementation of a mutable Scala or Java collection. This is because we need to be exposed to when the underlying array is resized. Otherwise, size estimation may not be accurate.
1 parent 08d0aca commit c12f093

File tree

7 files changed

+346
-188
lines changed

7 files changed

+346
-188
lines changed

core/src/main/scala/org/apache/spark/util/SizeEstimator.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ private[spark] object SizeEstimator extends Logging {
180180
}
181181
}
182182

183-
// Estimat the size of arrays larger than ARRAY_SIZE_FOR_SAMPLING by sampling.
183+
// Estimate the size of arrays larger than ARRAY_SIZE_FOR_SAMPLING by sampling.
184184
private val ARRAY_SIZE_FOR_SAMPLING = 200
185185
private val ARRAY_SAMPLE_SIZE = 100 // should be lower than ARRAY_SIZE_FOR_SAMPLING
186186

core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class PrimitiveVector[@specialized(Long, Int, Double) V: ClassTag](initialSize:
3636
_array(index)
3737
}
3838

39-
def +=(value: V) {
39+
def +=(value: V): Unit = {
4040
if (_numElements == _array.length) {
4141
resize(_array.length * 2)
4242
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.util.collection
19+
20+
import scala.collection.mutable.ArrayBuffer
21+
22+
import org.apache.spark.util.SizeEstimator
23+
24+
/**
25+
* A general interface for collections that keeps track of its estimated size in bytes.
26+
* We sample with a slow exponential back-off using the SizeEstimator to amortize the time,
27+
* as each call to SizeEstimator can take a sizable amount of time (order of a few milliseconds).
28+
*/
29+
private[spark] trait SizeTracker {
30+
31+
import SizeTracker._
32+
33+
/**
34+
* Controls the base of the exponential which governs the rate of sampling.
35+
* E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements.
36+
*/
37+
private val SAMPLE_GROWTH_RATE = 1.1
38+
39+
/** All samples taken since last resetSamples(). Only the last two are used for extrapolation. */
40+
private val samples = new ArrayBuffer[Sample]
41+
42+
/** The average number of bytes per update between our last two samples. */
43+
private var bytesPerUpdate: Double = _
44+
45+
/** Total number of insertions and updates into the map since the last resetSamples(). */
46+
private var numUpdates: Long = _
47+
48+
/** The value of 'numUpdates' at which we will take our next sample. */
49+
private var nextSampleNum: Long = _
50+
51+
resetSamples()
52+
53+
/** Called after the collection undergoes a dramatic change in size. */
54+
protected def resetSamples(): Unit = {
55+
numUpdates = 1
56+
nextSampleNum = 1
57+
samples.clear()
58+
takeSample()
59+
}
60+
61+
/** Callback to be invoked after an update. */
62+
protected def afterUpdate(): Unit = {
63+
numUpdates += 1
64+
if (nextSampleNum == numUpdates) {
65+
takeSample()
66+
}
67+
}
68+
69+
/** Takes a new sample of the current collection's size. */
70+
private def takeSample(): Unit = {
71+
samples += Sample(SizeEstimator.estimate(this), numUpdates)
72+
// Only use the last two samples to extrapolate
73+
if (samples.size > 2) {
74+
samples.remove(0)
75+
}
76+
val bytesDelta = samples.toSeq.reverse match {
77+
case latest :: previous :: tail =>
78+
(latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)
79+
// If fewer than 2 samples, assume no change
80+
case _ => 0
81+
}
82+
bytesPerUpdate = math.max(0, bytesDelta)
83+
nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong
84+
}
85+
86+
/** Estimates the current size of the collection in bytes. O(1) time. */
87+
def estimateSize(): Long = {
88+
assert(samples.nonEmpty)
89+
val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
90+
(samples.last.size + extrapolatedDelta).toLong
91+
}
92+
}
93+
94+
private object SizeTracker {
95+
case class Sample(size: Long, numUpdates: Long)
96+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.util.collection
19+
20+
import scala.reflect.ClassTag
21+
22+
/**
23+
* An append-only buffer that keeps track of its estimated size in bytes.
24+
*/
25+
private[spark] class SizeTrackingAppendOnlyBuffer[T: ClassTag]
26+
extends PrimitiveVector[T]
27+
with SizeTracker {
28+
29+
override def +=(value: T): Unit = {
30+
super.+=(value)
31+
super.afterUpdate()
32+
}
33+
34+
override def resize(newLength: Int): PrimitiveVector[T] = {
35+
super.resize(newLength)
36+
resetSamples()
37+
this
38+
}
39+
}

core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala

Lines changed: 5 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -17,85 +17,24 @@
1717

1818
package org.apache.spark.util.collection
1919

20-
import scala.collection.mutable.ArrayBuffer
21-
22-
import org.apache.spark.util.SizeEstimator
23-
import org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.Sample
24-
2520
/**
26-
* Append-only map that keeps track of its estimated size in bytes.
27-
* We sample with a slow exponential back-off using the SizeEstimator to amortize the time,
28-
* as each call to SizeEstimator can take a sizable amount of time (order of a few milliseconds).
21+
* An append-only map that keeps track of its estimated size in bytes.
2922
*/
30-
private[spark] class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] {
31-
32-
/**
33-
* Controls the base of the exponential which governs the rate of sampling.
34-
* E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements.
35-
*/
36-
private val SAMPLE_GROWTH_RATE = 1.1
37-
38-
/** All samples taken since last resetSamples(). Only the last two are used for extrapolation. */
39-
private val samples = new ArrayBuffer[Sample]()
40-
41-
/** Total number of insertions and updates into the map since the last resetSamples(). */
42-
private var numUpdates: Long = _
43-
44-
/** The value of 'numUpdates' at which we will take our next sample. */
45-
private var nextSampleNum: Long = _
46-
47-
/** The average number of bytes per update between our last two samples. */
48-
private var bytesPerUpdate: Double = _
49-
50-
resetSamples()
51-
52-
/** Called after the map grows in size, as this can be a dramatic change for small objects. */
53-
def resetSamples() {
54-
numUpdates = 1
55-
nextSampleNum = 1
56-
samples.clear()
57-
takeSample()
58-
}
23+
private[spark] class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] with SizeTracker {
5924

6025
override def update(key: K, value: V): Unit = {
6126
super.update(key, value)
62-
numUpdates += 1
63-
if (nextSampleNum == numUpdates) { takeSample() }
27+
super.afterUpdate()
6428
}
6529

6630
override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
6731
val newValue = super.changeValue(key, updateFunc)
68-
numUpdates += 1
69-
if (nextSampleNum == numUpdates) { takeSample() }
32+
super.afterUpdate()
7033
newValue
7134
}
7235

73-
/** Takes a new sample of the current map's size. */
74-
def takeSample() {
75-
samples += Sample(SizeEstimator.estimate(this), numUpdates)
76-
// Only use the last two samples to extrapolate. If fewer than 2 samples, assume no change.
77-
bytesPerUpdate = math.max(0, samples.toSeq.reverse match {
78-
case latest :: previous :: tail =>
79-
(latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)
80-
case _ =>
81-
0
82-
})
83-
nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong
84-
}
85-
86-
override protected def growTable() {
36+
override protected def growTable(): Unit = {
8737
super.growTable()
8838
resetSamples()
8939
}
90-
91-
/** Estimates the current size of the map in bytes. O(1) time. */
92-
def estimateSize(): Long = {
93-
assert(samples.nonEmpty)
94-
val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
95-
(samples.last.size + extrapolatedDelta).toLong
96-
}
97-
}
98-
99-
private object SizeTrackingAppendOnlyMap {
100-
case class Sample(size: Long, numUpdates: Long)
10140
}

core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala

Lines changed: 0 additions & 120 deletions
This file was deleted.

0 commit comments

Comments
 (0)