Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.reflect.ClassTag
import org.apache.spark.internal.Logging
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, ShuffleClient}
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.storage.{BlockId, EncryptedManagedBuffer, StorageLevel}
import org.apache.spark.util.ThreadUtils

private[spark]
Expand Down Expand Up @@ -104,6 +104,8 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
data match {
case f: FileSegmentManagedBuffer =>
result.success(f)
case e: EncryptedManagedBuffer =>
result.success(e)
case _ =>
val ret = ByteBuffer.allocate(data.size.toInt)
ret.put(data.nioByteBuffer())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ private[spark] class BlockManager(
* Get block from remote block managers as serialized bytes.
*/
def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
// TODO if we change this method to return the ManagedBuffer, then getRemoteValues
// TODO SPARK-25905 if we change this method to return the ManagedBuffer, then getRemoteValues
// could just use the inputStream on the temp file, rather than reading the file into memory.
// Until then, replication can cause the process to use too much memory and get killed
// even though we've read the data to disk.
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ private class DiskBlockData(
private def open() = new FileInputStream(file).getChannel
}

private class EncryptedBlockData(
private[spark] class EncryptedBlockData(
file: File,
blockSize: Long,
conf: SparkConf,
Expand Down Expand Up @@ -263,7 +263,7 @@ private class EncryptedBlockData(
}
}

private class EncryptedManagedBuffer(val blockData: EncryptedBlockData) extends ManagedBuffer {
private[spark] class EncryptedManagedBuffer(val blockData: EncryptedBlockData) extends ManagedBuffer {

// This is the size of the decrypted data
override def size(): Long = blockData.size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.SparkEnv
import org.apache.spark.internal.config
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.util.{ByteArrayWritableChannel, LimitedInputStream}
import org.apache.spark.storage.StorageUtils
import org.apache.spark.storage.{EncryptedManagedBuffer, StorageUtils}
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -173,11 +173,13 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
private[spark] object ChunkedByteBuffer {


// TODO eliminate this method if we switch BlockManager to getting InputStreams
// TODO SPARK-25905 eliminate this method if we switch BlockManager to getting InputStreams
def fromManagedBuffer(data: ManagedBuffer): ChunkedByteBuffer = {
data match {
case f: FileSegmentManagedBuffer =>
fromFile(f.getFile, f.getOffset, f.getLength)
case e: EncryptedManagedBuffer =>
e.blockData.toChunkedByteBuffer(ByteBuffer.allocate _)
case other =>
new ChunkedByteBuffer(other.nioByteBuffer())
}
Expand Down