Skip to content

Commit 186de31

Browse files
committed
initial commit for off-heap block storage api
1 parent 473552f commit 186de31

File tree

2 files changed

+252
-0
lines changed

2 files changed

+252
-0
lines changed
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.storage
19+
20+
import java.nio.ByteBuffer
21+
import org.apache.spark.Logging
22+
23+
import scala.util.control.NonFatal
24+
25+
26+
trait OffHeapBlockManager {
27+
28+
/**
29+
* desc for the implementation.
30+
*
31+
*/
32+
def desc(): String = {"OffHeap"}
33+
34+
/**
35+
* initialize a concrete block manager implementation.
36+
*
37+
* @throws java.io.IOException when FS init failure.
38+
*/
39+
def init(blockManager: BlockManager, executorId: String)
40+
41+
/**
42+
* remove the cache from offheap
43+
*
44+
* @throws java.io.IOException when FS failure in removing file.
45+
*/
46+
def removeFile(blockId: BlockId): Boolean
47+
48+
/**
49+
* check the existence of the block cache
50+
*
51+
* @throws java.io.IOException when FS failure in checking the block existence.
52+
*/
53+
def fileExists(blockId: BlockId): Boolean
54+
55+
/**
56+
* save the cache to the offheap.
57+
*
58+
* @throws java.io.IOException when FS failure in put blocks.
59+
*/
60+
def putBytes(blockId: BlockId, bytes: ByteBuffer)
61+
62+
/**
63+
* retrieve the cache from offheap
64+
*
65+
* @throws java.io.IOException when FS failure in get blocks.
66+
*/
67+
def getBytes(blockId: BlockId): Option[ByteBuffer]
68+
69+
/**
70+
* retrieve the size of the cache
71+
*
72+
* @throws java.io.IOException when FS failure in get block size.
73+
*/
74+
def getSize(blockId: BlockId): Long
75+
76+
/**
77+
* cleanup when shutdown
78+
*
79+
*/
80+
def addShutdownHook()
81+
82+
final def setup(blockManager: BlockManager, executorId: String): Unit = {
83+
init(blockManager, executorId)
84+
addShutdownHook()
85+
}
86+
}
87+
88+
object OffHeapBlockManager extends Logging{
89+
val MAX_DIR_CREATION_ATTEMPTS = 10
90+
val subDirsPerDir = 64
91+
def create(blockManager: BlockManager,
92+
executorId: String): Option[OffHeapBlockManager] = {
93+
val sNames = blockManager.conf.getOption("spark.offHeapStore.blockManager")
94+
sNames match {
95+
case Some(name) =>
96+
try {
97+
val instance = Class.forName(name)
98+
.newInstance()
99+
.asInstanceOf[OffHeapBlockManager]
100+
instance.setup(blockManager, executorId)
101+
Some(instance)
102+
} catch {
103+
case NonFatal(t) =>
104+
logError("Cannot initialize offHeap store")
105+
None
106+
}
107+
case None => None
108+
}
109+
}
110+
}
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.storage
19+
20+
import java.nio.ByteBuffer
21+
import org.apache.spark.Logging
22+
import org.apache.spark.util.Utils
23+
24+
import scala.util.control.NonFatal
25+
26+
27+
/**
28+
* Stores BlockManager blocks on OffHeap.
29+
* We capture any potential exception from underlying implementation
30+
* and return with the expected failure value
31+
*/
32+
private[spark] class OffHeapStore(blockManager: BlockManager, executorId: String)
33+
extends BlockStore(blockManager: BlockManager) with Logging {
34+
35+
lazy val offHeapManager: Option[OffHeapBlockManager] =
36+
OffHeapBlockManager.create(blockManager, executorId)
37+
38+
logInfo("OffHeap started")
39+
40+
override def getSize(blockId: BlockId): Long = {
41+
try {
42+
offHeapManager.map(_.getSize(blockId)).getOrElse(0)
43+
} catch {
44+
case NonFatal(t) => logError(s"error in getSize from $blockId")
45+
0
46+
}
47+
}
48+
49+
override def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult = {
50+
putIntoOffHeapStore(blockId, bytes, returnValues = true)
51+
}
52+
53+
override def putArray(
54+
blockId: BlockId,
55+
values: Array[Any],
56+
level: StorageLevel,
57+
returnValues: Boolean): PutResult = {
58+
putIterator(blockId, values.toIterator, level, returnValues)
59+
}
60+
61+
override def putIterator(
62+
blockId: BlockId,
63+
values: Iterator[Any],
64+
level: StorageLevel,
65+
returnValues: Boolean): PutResult = {
66+
logDebug(s"Attempting to write values for block $blockId")
67+
val bytes = blockManager.dataSerialize(blockId, values)
68+
putIntoOffHeapStore(blockId, bytes, returnValues)
69+
}
70+
71+
private def putIntoOffHeapStore(
72+
blockId: BlockId,
73+
bytes: ByteBuffer,
74+
returnValues: Boolean): PutResult = {
75+
76+
// So that we do not modify the input offsets !
77+
// duplicate does not copy buffer, so inexpensive
78+
val byteBuffer = bytes.duplicate()
79+
byteBuffer.rewind()
80+
logDebug(s"Attempting to put block $blockId into OffHeap store")
81+
val startTime = System.currentTimeMillis
82+
// we should never hit here if offHeapManager is None. Handle it anyway for safety.
83+
try {
84+
if (offHeapManager.isDefined) {
85+
offHeapManager.get.putBytes(blockId, bytes)
86+
val finishTime = System.currentTimeMillis
87+
logDebug("Block %s stored as %s file in OffHeap store in %d ms".format(
88+
blockId, Utils.bytesToString(byteBuffer.limit), finishTime - startTime))
89+
90+
if (returnValues) {
91+
PutResult(bytes.limit(), Right(bytes.duplicate()))
92+
} else {
93+
PutResult(bytes.limit(), null)
94+
}
95+
} else {
96+
logError(s"error in putBytes $blockId")
97+
PutResult(bytes.limit(), null, Seq((blockId, BlockStatus.empty)))
98+
}
99+
} catch {
100+
case NonFatal(t) => logError(s"error in putBytes $blockId")
101+
PutResult(bytes.limit(), null, Seq((blockId, BlockStatus.empty)))
102+
}
103+
}
104+
105+
// We assume the block is removed even if exception thrown
106+
override def remove(blockId: BlockId): Boolean = {
107+
try {
108+
offHeapManager.map(_.removeFile(blockId)).getOrElse(true)
109+
} catch {
110+
case NonFatal(t) => logError(s"error in removing $blockId")
111+
true
112+
}
113+
}
114+
115+
override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
116+
getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
117+
}
118+
119+
120+
override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
121+
try {
122+
offHeapManager.flatMap(_.getBytes(blockId))
123+
} catch {
124+
case NonFatal(t) =>logError(s"error in getBytes from $blockId")
125+
None
126+
}
127+
}
128+
129+
override def contains(blockId: BlockId): Boolean = {
130+
try {
131+
val ret = offHeapManager.map(_.fileExists(blockId)).getOrElse(false)
132+
if (!ret) {
133+
logInfo(s"remove block $blockId")
134+
blockManager.removeBlock(blockId, true)
135+
}
136+
ret
137+
} catch {
138+
case NonFatal(t) => logError(s"error in getBytes from $blockId")
139+
false
140+
}
141+
}
142+
}

0 commit comments

Comments
 (0)