Skip to content

Commit 97ea499

Browse files
committed
Change BlockManager interface to use Arrays
... rather than ArrayBuffer. We only ever iterate through it anyway, so there is really no reason for it to be a mutable buffer of any sort. This change is introduced so that we can eventually directly pass our SizeTrackingAppendOnlyBuffer's underlying array to BlockManager, instead of having to awkwardly make it an ArrayBuffer first.
1 parent c12f093 commit 97ea499

File tree

7 files changed

+27
-40
lines changed

7 files changed

+27
-40
lines changed

core/src/main/scala/org/apache/spark/CacheManager.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
142142
* to the BlockManager as an iterator and expect to read it back later. This is because
143143
* we may end up dropping a partition from memory store before getting it back, e.g.
144144
* when the entirety of the RDD does not fit in memory. */
145-
val elements = new ArrayBuffer[Any]
146-
elements ++= values
145+
val elements = values.toArray[Any]
147146
updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true)
148147
elements.iterator.asInstanceOf[Iterator[T]]
149148
}

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.apache.spark.util._
3737
private[spark] sealed trait BlockValues
3838
private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValues
3939
private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues
40-
private[spark] case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends BlockValues
40+
private[spark] case class ArrayValues(buffer: Array[Any]) extends BlockValues
4141

4242
private[spark] class BlockManager(
4343
executorId: String,
@@ -451,16 +451,12 @@ private[spark] class BlockManager(
451451
val values = dataDeserialize(blockId, bytes)
452452
if (level.deserialized) {
453453
// Cache the values before returning them
454-
// TODO: Consider creating a putValues that also takes in a iterator?
455-
val valuesBuffer = new ArrayBuffer[Any]
456-
valuesBuffer ++= values
457-
memoryStore.putValues(blockId, valuesBuffer, level, returnValues = true).data
458-
match {
459-
case Left(values2) =>
460-
return Some(values2)
461-
case _ =>
462-
throw new SparkException("Memory store did not return an iterator")
463-
}
454+
memoryStore.putValues(blockId, values, level, returnValues = true).data match {
455+
case Left(values2) =>
456+
return Some(values2)
457+
case _ =>
458+
throw new SparkException("Memory store did not return an iterator")
459+
}
464460
} else {
465461
return Some(values)
466462
}
@@ -576,11 +572,11 @@ private[spark] class BlockManager(
576572
*/
577573
def put(
578574
blockId: BlockId,
579-
values: ArrayBuffer[Any],
575+
values: Array[Any],
580576
level: StorageLevel,
581577
tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
582578
require(values != null, "Values is null")
583-
doPut(blockId, ArrayBufferValues(values), level, tellMaster)
579+
doPut(blockId, ArrayValues(values), level, tellMaster)
584580
}
585581

586582
/**
@@ -682,7 +678,7 @@ private[spark] class BlockManager(
682678
val result = data match {
683679
case IteratorValues(iterator) =>
684680
blockStore.putValues(blockId, iterator, level, returnValues)
685-
case ArrayBufferValues(array) =>
681+
case ArrayValues(array) =>
686682
blockStore.putValues(blockId, array, level, returnValues)
687683
case ByteBufferValues(bytes) =>
688684
bytes.rewind()
@@ -814,7 +810,7 @@ private[spark] class BlockManager(
814810
*/
815811
def dropFromMemory(
816812
blockId: BlockId,
817-
data: Either[ArrayBuffer[Any], ByteBuffer]): Option[BlockStatus] = {
813+
data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
818814

819815
logInfo(s"Dropping block $blockId from memory")
820816
val info = blockInfo.get(blockId).orNull

core/src/main/scala/org/apache/spark/storage/BlockStore.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ private[spark] abstract class BlockStore(val blockManager: BlockManager) extends
4545

4646
def putValues(
4747
blockId: BlockId,
48-
values: ArrayBuffer[Any],
48+
values: Array[Any],
4949
level: StorageLevel,
5050
returnValues: Boolean): PutResult
5151

core/src/main/scala/org/apache/spark/storage/DiskStore.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ import java.io.{FileOutputStream, RandomAccessFile}
2121
import java.nio.ByteBuffer
2222
import java.nio.channels.FileChannel.MapMode
2323

24-
import scala.collection.mutable.ArrayBuffer
25-
2624
import org.apache.spark.Logging
2725
import org.apache.spark.serializer.Serializer
2826
import org.apache.spark.util.Utils
@@ -59,7 +57,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
5957

6058
override def putValues(
6159
blockId: BlockId,
62-
values: ArrayBuffer[Any],
60+
values: Array[Any],
6361
level: StorageLevel,
6462
returnValues: Boolean): PutResult = {
6563
putValues(blockId, values.toIterator, level, returnValues)

core/src/main/scala/org/apache/spark/storage/MemoryStore.scala

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.util.{SizeEstimator, Utils}
2727
private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
2828

2929
/**
30-
* Stores blocks in memory, either as ArrayBuffers of deserialized Java objects or as
30+
* Stores blocks in memory, either as Arrays of deserialized Java objects or as
3131
* serialized ByteBuffers.
3232
*/
3333
private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
@@ -55,8 +55,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
5555
bytes.rewind()
5656
if (level.deserialized) {
5757
val values = blockManager.dataDeserialize(blockId, bytes)
58-
val elements = new ArrayBuffer[Any]
59-
elements ++= values
58+
val elements = values.toArray
6059
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
6160
val putAttempt = tryToPut(blockId, elements, sizeEstimate, deserialized = true)
6261
PutResult(sizeEstimate, Left(values.toIterator), putAttempt.droppedBlocks)
@@ -68,7 +67,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
6867

6968
override def putValues(
7069
blockId: BlockId,
71-
values: ArrayBuffer[Any],
70+
values: Array[Any],
7271
level: StorageLevel,
7372
returnValues: Boolean): PutResult = {
7473
if (level.deserialized) {
@@ -87,9 +86,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
8786
values: Iterator[Any],
8887
level: StorageLevel,
8988
returnValues: Boolean): PutResult = {
90-
val valueEntries = new ArrayBuffer[Any]()
91-
valueEntries ++= values
92-
putValues(blockId, valueEntries, level, returnValues)
89+
putValues(blockId, values.toArray, level, returnValues)
9390
}
9491

9592
override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
@@ -99,7 +96,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
9996
if (entry == null) {
10097
None
10198
} else if (entry.deserialized) {
102-
Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[ArrayBuffer[Any]].iterator))
99+
Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[Array[Any]].iterator))
103100
} else {
104101
Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data
105102
}
@@ -112,7 +109,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
112109
if (entry == null) {
113110
None
114111
} else if (entry.deserialized) {
115-
Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
112+
Some(entry.value.asInstanceOf[Array[Any]].iterator)
116113
} else {
117114
val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data
118115
Some(blockManager.dataDeserialize(blockId, buffer))
@@ -149,8 +146,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
149146

150147
/**
151148
* Try to put in a set of values, if we can free up enough space. The value should either be
152-
* an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated)
153-
* size must also be passed by the caller.
149+
* an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size
150+
* must also be passed by the caller.
154151
*
155152
* Lock on the object putLock to ensure that all the put requests and its associated block
156153
* dropping is done by only on thread at a time. Otherwise while one thread is dropping
@@ -193,7 +190,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
193190
// Tell the block manager that we couldn't put it in memory so that it can drop it to
194191
// disk if the block allows disk storage.
195192
val data = if (deserialized) {
196-
Left(value.asInstanceOf[ArrayBuffer[Any]])
193+
Left(value.asInstanceOf[Array[Any]])
197194
} else {
198195
Right(value.asInstanceOf[ByteBuffer].duplicate())
199196
}
@@ -227,7 +224,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
227224

228225
if (maxMemory - currentMemory < space) {
229226
val rddToAdd = getRddId(blockIdToAdd)
230-
val selectedBlocks = new ArrayBuffer[BlockId]()
227+
val selectedBlocks = new ArrayBuffer[BlockId]
231228
var selectedMemory = 0L
232229

233230
// This is synchronized to ensure that the set of entries is not changed
@@ -254,7 +251,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
254251
// future safety.
255252
if (entry != null) {
256253
val data = if (entry.deserialized) {
257-
Left(entry.value.asInstanceOf[ArrayBuffer[Any]])
254+
Left(entry.value.asInstanceOf[Array[Any]])
258255
} else {
259256
Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
260257
}

core/src/main/scala/org/apache/spark/storage/TachyonStore.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ package org.apache.spark.storage
2020
import java.io.IOException
2121
import java.nio.ByteBuffer
2222

23-
import scala.collection.mutable.ArrayBuffer
24-
2523
import tachyon.client.{ReadType, WriteType}
2624

2725
import org.apache.spark.Logging
@@ -47,7 +45,7 @@ private class TachyonStore(
4745

4846
override def putValues(
4947
blockId: BlockId,
50-
values: ArrayBuffer[Any],
48+
values: Array[Any],
5149
level: StorageLevel,
5250
returnValues: Boolean): PutResult = {
5351
putValues(blockId, values.toIterator, level, returnValues)

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,7 @@ private[streaming] class ReceiverSupervisorImpl(
110110
) {
111111
val blockId = optionalBlockId.getOrElse(nextBlockId)
112112
val time = System.currentTimeMillis
113-
blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]],
114-
storageLevel, tellMaster = true)
113+
blockManager.put(blockId, arrayBuffer.toArray[Any], storageLevel, tellMaster = true)
115114
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
116115
reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata)
117116
}

0 commit comments

Comments
 (0)