Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,25 @@ private[spark] class HashShuffleWriter[K, V](
}

/** Close this writer, passing along whether the map completed */
override def stop(success: Boolean): Option[MapStatus] = {
override def stop(initiallySuccess: Boolean): Option[MapStatus] = {
var success = initiallySuccess
try {
if (stopping) {
return None
}
stopping = true
if (success) {
try {
return Some(commitWritesAndBuildStatus())
Some(commitWritesAndBuildStatus())
} catch {
case e: Exception =>
success = false
revertWrites()
throw e
}
} else {
revertWrites()
return None
None
}
} finally {
// Release the writers back to the shuffle block manager.
Expand All @@ -100,8 +102,7 @@ private[spark] class HashShuffleWriter[K, V](
var totalBytes = 0L
var totalTime = 0L
val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter =>
writer.commit()
writer.close()
writer.commitAndClose()
val size = writer.fileSegment().length
totalBytes += size
totalTime += writer.timeWriting()
Expand All @@ -120,8 +121,7 @@ private[spark] class HashShuffleWriter[K, V](
private def revertWrites(): Unit = {
if (shuffle != null && shuffle.writers != null) {
for (writer <- shuffle.writers) {
writer.revertPartialWrites()
writer.close()
writer.revertPartialWritesAndClose()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert can throw exception : which will cause other writers to not revert.
We need to wrap it in try/catch, log and continue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert actually doesn't throw, per its (updated) comment.

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ private[spark] class SortShuffleWriter[K, V, C](
for (elem <- elements) {
writer.write(elem)
}
writer.commit()
writer.close()
writer.commitAndClose()
val segment = writer.fileSegment()
offsets(id + 1) = segment.offset + segment.length
lengths(id) = segment.length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
def isOpen: Boolean

/**
* Flush the partial writes and commit them as a single atomic block. Return the
* number of bytes written for this commit.
* Flush the partial writes and commit them as a single atomic block.
*/
def commit(): Long
def commitAndClose(): Unit

/**
* Reverts writes that haven't been flushed yet. Callers should invoke this function
* when there are runtime exceptions.
* when there are runtime exceptions. This method will not throw, though it may be
* unsuccessful in truncating written data.
*/
def revertPartialWrites()
def revertPartialWritesAndClose()

/**
* Writes an object.
Expand All @@ -57,6 +57,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {

/**
* Returns the file segment of committed data that this Writer has written.
* This is only valid after commitAndClose() has been called.
*/
def fileSegment(): FileSegment

Expand Down Expand Up @@ -108,15 +109,14 @@ private[spark] class DiskBlockObjectWriter(
private var ts: TimeTrackingOutputStream = null
private var objOut: SerializationStream = null
private val initialPosition = file.length()
private var lastValidPosition = initialPosition
private var finalPosition: Long = -1
private var initialized = false
private var _timeWriting = 0L

override def open(): BlockObjectWriter = {
fos = new FileOutputStream(file, true)
ts = new TimeTrackingOutputStream(fos)
channel = fos.getChannel()
lastValidPosition = initialPosition
bs = compressStream(new BufferedOutputStream(ts, bufferSize))
objOut = serializer.newInstance().serializeStream(bs)
initialized = true
Expand Down Expand Up @@ -147,28 +147,36 @@ private[spark] class DiskBlockObjectWriter(

override def isOpen: Boolean = objOut != null

override def commit(): Long = {
override def commitAndClose(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove close from the interface, and make it private to this class btw.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely -- I did not do that in this patch because ExternalAppendOnlyMap did a close without a commit, which is a fix outside of the scope of this PR, but definitely one that should be made.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I merged the sort patch, and modified EAOM, it was simply replace close with commitAndClose.
commitAndClose should be semantically equivalent to close actually.
It is not equivalent to commit() - but we want to remove that :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing close() actually now requires a very minor refactor of ExternalSorter for the objectsWritten == 0 case -- I'd actually rather not risk it in this PR.

if (initialized) {
// NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the
// serializer stream and the lower level stream.
objOut.flush()
bs.flush()
val prevPos = lastValidPosition
lastValidPosition = channel.position()
lastValidPosition - prevPos
} else {
// lastValidPosition is zero if stream is uninitialized
lastValidPosition
close()
}
finalPosition = file.length()
}

override def revertPartialWrites() {
if (initialized) {
// Discard current writes. We do this by flushing the outstanding writes and
// truncate the file to the last valid position.
objOut.flush()
bs.flush()
channel.truncate(lastValidPosition)
// Discard current writes. We do this by flushing the outstanding writes and then
// truncating the file to its initial position.
override def revertPartialWritesAndClose() {
try {
if (initialized) {
objOut.flush()
bs.flush()
close()
}

val truncateStream = new FileOutputStream(file, true)
try {
truncateStream.getChannel.truncate(initialPosition)
} finally {
truncateStream.close()
}
} catch {
case e: Exception =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This catches Exception but it could catch IOException instead.

logError("Uncaught exception while reverting partial writes to file " + file, e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the use of writers in HashShuffleWriter, it is possible for a closed stream to be reverted (if some other stream's close failed for example).
In the above, that will leave this file with leftover data - I am not sure what the impact of this would be.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closed streams should not inherently throw (since we check initialized before flushing and closing). However, we may be left with leftover data, as you said. I don't see a way to prevent the possibility of that occurring, but it should be possible to recover if users only rely on the returned fileSegment().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant the former case : close on a writer fails with an exception; while earlier streams succeeded.
So now we have some writers which have committed data (which is not removed by subsequent revert) while others are reverted.

On the face of it, I agree, it should not cause issues : but then since the expectation from this class is never enforced; and so can silently fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not certain I understand. The situation I am imagining is that we commit to the first Writer, then the second one fails. In HashShuffleWriter, we will then call revertPartialWritesAndClose() on all Writers, causing us to revert all the changes back to "initialPosition", which should revert even the committed data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, did not notice that the "if (initialized)" did not include the truncate call !

}
}

Expand All @@ -188,6 +196,7 @@ private[spark] class DiskBlockObjectWriter(

// Only valid if called after commit()
override def bytesWritten: Long = {
lastValidPosition - initialPosition
assert(finalPosition != -1, "bytesWritten is only valid after successful commit()")
finalPosition - initialPosition
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
if (consolidateShuffleFiles) {
if (success) {
val offsets = writers.map(_.fileSegment().offset)
fileGroup.recordMapOutput(mapId, offsets)
val lengths = writers.map(_.fileSegment().length)
fileGroup.recordMapOutput(mapId, offsets, lengths)
}
recycleFileGroup(fileGroup)
} else {
Expand Down Expand Up @@ -247,47 +248,48 @@ object ShuffleBlockManager {
* A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
*/
private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[File]) {
private var numBlocks: Int = 0

/**
* Stores the absolute index of each mapId in the files of this group. For instance,
* if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
*/
private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()

/**
* Stores consecutive offsets of blocks into each reducer file, ordered by position in the file.
* This ordering allows us to compute block lengths by examining the following block offset.
* Stores consecutive offsets and lengths of blocks into each reducer file, ordered by
* position in the file.
* Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every
* reducer.
*/
private val blockOffsetsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
new PrimitiveVector[Long]()
}

def numBlocks = mapIdToIndex.size
private val blockLengthsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
new PrimitiveVector[Long]()
}

def apply(bucketId: Int) = files(bucketId)

def recordMapOutput(mapId: Int, offsets: Array[Long]) {
def recordMapOutput(mapId: Int, offsets: Array[Long], lengths: Array[Long]) {
assert(offsets.length == lengths.length)
mapIdToIndex(mapId) = numBlocks
numBlocks += 1
for (i <- 0 until offsets.length) {
blockOffsetsByReducer(i) += offsets(i)
blockLengthsByReducer(i) += lengths(i)
}
}

/** Returns the FileSegment associated with the given map task, or None if no entry exists. */
def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = {
val file = files(reducerId)
val blockOffsets = blockOffsetsByReducer(reducerId)
val blockLengths = blockLengthsByReducer(reducerId)
val index = mapIdToIndex.getOrElse(mapId, -1)
if (index >= 0) {
val offset = blockOffsets(index)
val length =
if (index + 1 < numBlocks) {
blockOffsets(index + 1) - offset
} else {
file.length() - offset
}
assert(length >= 0)
val length = blockLengths(index)
Some(new FileSegment(file, offset, length))
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class ExternalAppendOnlyMap[K, V, C](

// Flush the disk writer's contents to disk, and update relevant variables
def flush() = {
writer.commit()
writer.commitAndClose()
val bytesWritten = writer.bytesWritten
batchSizes.append(bytesWritten)
_diskBytesSpilled += bytesWritten
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,10 @@ private[spark] class ExternalSorter[K, V, C](
// How many elements we have in each partition
val elementsPerPartition = new Array[Long](numPartitions)

// Flush the disk writer's contents to disk, and update relevant variables
// Flush the disk writer's contents to disk, and update relevant variables.
// The writer is closed at the end of this process, and cannot be reused.
def flush() = {
writer.commit()
writer.commitAndClose()
val bytesWritten = writer.bytesWritten
batchSizes.append(bytesWritten)
_diskBytesSpilled += bytesWritten
Expand All @@ -293,7 +294,6 @@ private[spark] class ExternalSorter[K, V, C](

if (objectsWritten == serializerBatchSize) {
flush()
writer.close()
writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ import java.io.{File, FileWriter}
import scala.collection.mutable
import scala.language.reflectiveCalls

import akka.actor.Props
import com.google.common.io.Files
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}

import org.apache.spark.SparkConf
import org.apache.spark.util.Utils
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.util.{AkkaUtils, Utils}

class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll {
private val testConf = new SparkConf(false)
Expand Down Expand Up @@ -121,6 +124,88 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before
newFile.delete()
}

private def checkSegments(segment1: FileSegment, segment2: FileSegment) {
assert (segment1.file.getCanonicalPath === segment2.file.getCanonicalPath)
assert (segment1.offset === segment2.offset)
assert (segment1.length === segment2.length)
}

test("consolidated shuffle can write to shuffle group without messing existing offsets/lengths") {

val serializer = new JavaSerializer(testConf)
val confCopy = testConf.clone
// reset after EACH object write. This is to ensure that there are bytes appended after
// an object is written. So if the codepaths assume writeObject is end of data, this should
// flush those bugs out. This was common bug in ExternalAppendOnlyMap, etc.
confCopy.set("spark.serializer.objectStreamReset", "1")

val securityManager = new org.apache.spark.SecurityManager(confCopy)
// Do not use the shuffleBlockManager above !
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, confCopy,
securityManager)
val master = new BlockManagerMaster(
actorSystem.actorOf(Props(new BlockManagerMasterActor(true, confCopy, new LiveListenerBus))),
confCopy)
val store = new BlockManager("<driver>", actorSystem, master , serializer, confCopy,
securityManager, null)

try {

val shuffleManager = store.shuffleBlockManager

val shuffle1 = shuffleManager.forMapTask(1, 1, 1, serializer)
for (writer <- shuffle1.writers) {
writer.write("test1")
writer.write("test2")
}
for (writer <- shuffle1.writers) {
writer.commitAndClose()
}

val shuffle1Segment = shuffle1.writers(0).fileSegment()
shuffle1.releaseWriters(success = true)

val shuffle2 = shuffleManager.forMapTask(1, 2, 1, new JavaSerializer(testConf))

for (writer <- shuffle2.writers) {
writer.write("test3")
writer.write("test4")
}
for (writer <- shuffle2.writers) {
writer.commitAndClose()
}
val shuffle2Segment = shuffle2.writers(0).fileSegment()
shuffle2.releaseWriters(success = true)

// Now comes the test :
// Write to shuffle 3; and close it, but before registering it, check if the file lengths for
// previous task (forof shuffle1) is the same as 'segments'. Earlier, we were inferring length
// of block based on remaining data in file : which could mess things up when there is concurrent read
// and writes happening to the same shuffle group.

val shuffle3 = shuffleManager.forMapTask(1, 3, 1, new JavaSerializer(testConf))
for (writer <- shuffle3.writers) {
writer.write("test3")
writer.write("test4")
}
for (writer <- shuffle3.writers) {
writer.commitAndClose()
}
// check before we register.
checkSegments(shuffle2Segment, shuffleManager.getBlockLocation(ShuffleBlockId(1, 2, 0)))
shuffle3.releaseWriters(success = true)
checkSegments(shuffle2Segment, shuffleManager.getBlockLocation(ShuffleBlockId(1, 2, 0)))
shuffleManager.removeShuffle(1)
} finally {

if (store != null) {
store.stop()
}
actorSystem.shutdown()
actorSystem.awaitTermination()
}
}

def assertSegmentEquals(blockId: BlockId, filename: String, offset: Int, length: Int) {
val segment = diskBlockManager.getBlockLocation(blockId)
assert(segment.file.getName === filename)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,9 @@ object StoragePerfTester {
for (i <- 1 to recordsPerMap) {
writers(i % numOutputSplits).write(writeData)
}
writers.map {w =>
w.commit()
writers.map { w =>
w.commitAndClose()
total.addAndGet(w.fileSegment().length)
w.close()
}

shuffle.releaseWriters(true)
Expand Down