Skip to content

Commit b6fd3cd

Browse files
committed
Merge pull request apache#480 from pwendell/0.9-fixes
Handful of 0.9 fixes This patch addresses a few fixes for Spark 0.9.0 based on the last release candidate. @mridulm gets credit for reporting most of the issues here. Many of the fixes here are based on his work in apache#477 and follow up discussion with him. (cherry picked from commit 77b986f) Signed-off-by: Patrick Wendell <[email protected]>
1 parent e5f8917 commit b6fd3cd

File tree

7 files changed

+65
-24
lines changed

7 files changed

+65
-24
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,15 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
192192
}
193193

194194
/** Get all akka conf variables set on this SparkConf */
195-
def getAkkaConf: Seq[(String, String)] = getAll.filter {case (k, v) => k.startsWith("akka.")}
195+
def getAkkaConf: Seq[(String, String)] =
196+
/* This is currently undocumented. If we want to make this public we should consider
197+
* nesting options under the spark namespace to avoid conflicts with user akka options.
198+
* Otherwise users configuring their own akka code via system properties could mess up
199+
* spark's akka options.
200+
*
201+
* E.g. spark.akka.option.x.y.x = "value"
202+
*/
203+
getAll.filter {case (k, v) => k.startsWith("akka.")}
196204

197205
/** Does the configuration contain a given parameter? */
198206
def contains(key: String): Boolean = settings.contains(key)

core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
4343
val numPartitions =
4444
// listStatus can throw exception if path does not exist.
4545
if (fs.exists(cpath)) {
46-
val dirContents = fs.listStatus(cpath)
47-
val partitionFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted
46+
val dirContents = fs.listStatus(cpath).map(_.getPath)
47+
val partitionFiles = dirContents.filter(_.getName.startsWith("part-")).map(_.toString).sorted
4848
val numPart = partitionFiles.size
4949
if (numPart > 0 && (! partitionFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) ||
5050
! partitionFiles(numPart-1).endsWith(CheckpointRDD.splitIdToFile(numPart-1)))) {

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ private[spark] class TaskSetManager(
233233

234234
/** Check whether a task is currently running an attempt on a given host */
235235
private def hasAttemptOnHost(taskIndex: Int, host: String): Boolean = {
236-
!taskAttempts(taskIndex).exists(_.host == host)
236+
taskAttempts(taskIndex).exists(_.host == host)
237237
}
238238

239239
/**

core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,14 +138,16 @@ private[spark] class DiskBlockObjectWriter(
138138
fos = null
139139
ts = null
140140
objOut = null
141+
initialized = false
141142
}
142143
}
143144

144145
override def isOpen: Boolean = objOut != null
145146

146147
override def commit(): Long = {
147148
if (initialized) {
148-
// NOTE: Flush the serializer first and then the compressed/buffered output stream
149+
// NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the
150+
// serializer stream and the lower level stream.
149151
objOut.flush()
150152
bs.flush()
151153
val prevPos = lastValidPosition
@@ -175,7 +177,6 @@ private[spark] class DiskBlockObjectWriter(
175177
}
176178

177179
override def fileSegment(): FileSegment = {
178-
val bytesWritten = lastValidPosition - initialPosition
179180
new FileSegment(file, initialPosition, bytesWritten)
180181
}
181182

core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,11 @@ import java.util.concurrent.atomic.AtomicInteger
2323

2424
import scala.collection.JavaConversions._
2525

26+
import org.apache.spark.Logging
2627
import org.apache.spark.serializer.Serializer
27-
import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap}
28-
import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
2928
import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup
29+
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
30+
import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
3031

3132
/** A group of writers for a ShuffleMapTask, one writer per reducer. */
3233
private[spark] trait ShuffleWriterGroup {
@@ -58,7 +59,7 @@ private[spark] trait ShuffleWriterGroup {
5859
* files within a ShuffleFileGroups associated with the block's reducer.
5960
*/
6061
private[spark]
61-
class ShuffleBlockManager(blockManager: BlockManager) {
62+
class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
6263
def conf = blockManager.conf
6364

6465
// Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
@@ -106,6 +107,15 @@ class ShuffleBlockManager(blockManager: BlockManager) {
106107
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
107108
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
108109
val blockFile = blockManager.diskBlockManager.getFile(blockId)
110+
// Because of previous failures, the shuffle file may already exist on this machine.
111+
// If so, remove it.
112+
if (blockFile.exists) {
113+
if (blockFile.delete()) {
114+
logInfo(s"Removed existing shuffle file $blockFile")
115+
} else {
116+
logWarning(s"Failed to remove existing shuffle file $blockFile")
117+
}
118+
}
109119
blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
110120
}
111121
}

core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@ package org.apache.spark.util.collection
2020
import java.io._
2121
import java.util.Comparator
2222

23-
import it.unimi.dsi.fastutil.io.FastBufferedInputStream
24-
2523
import scala.collection.mutable
2624
import scala.collection.mutable.ArrayBuffer
2725

26+
import it.unimi.dsi.fastutil.io.FastBufferedInputStream
27+
2828
import org.apache.spark.{Logging, SparkEnv}
29-
import org.apache.spark.serializer.{KryoDeserializationStream, KryoSerializationStream, Serializer}
30-
import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockManager, DiskBlockObjectWriter}
29+
import org.apache.spark.io.LZFCompressionCodec
30+
import org.apache.spark.serializer.{KryoDeserializationStream, Serializer}
31+
import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockObjectWriter}
3132

3233
/**
3334
* An append-only map that spills sorted content to disk when there is insufficient space for it
@@ -153,9 +154,33 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
153154
.format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
154155
val (blockId, file) = diskBlockManager.createTempBlock()
155156

156-
val compressStream: OutputStream => OutputStream = blockManager.wrapForCompression(blockId, _)
157+
/* IMPORTANT NOTE: To avoid having to keep large object graphs in memory, this approach
158+
* closes and re-opens serialization and compression streams within each file. This makes some
159+
* assumptions about the way that serialization and compression streams work, specifically:
160+
*
161+
* 1) The serializer input streams do not pre-fetch data from the underlying stream.
162+
*
163+
* 2) Several compression streams can be opened, written to, and flushed on the write path
164+
* while only one compression input stream is created on the read path
165+
*
166+
* In practice (1) is only true for Java, so we add a special fix below to make it work for
167+
* Kryo. (2) is only true for LZF and not Snappy, so we coerce this to use LZF.
168+
*
169+
* To avoid making these assumptions we should create an intermediate stream that batches
170+
* objects and sends an EOF to the higher layer streams to make sure they never prefetch data.
171+
* This is a bit tricky because, within each segment, you'd need to track the total number
172+
* of bytes written and then re-wind and write it at the beginning of the segment. This will
173+
* most likely require using the file channel API.
174+
*/
175+
176+
val shouldCompress = blockManager.shouldCompress(blockId)
177+
val compressionCodec = new LZFCompressionCodec(sparkConf)
178+
def wrapForCompression(outputStream: OutputStream) = {
179+
if (shouldCompress) compressionCodec.compressedOutputStream(outputStream) else outputStream
180+
}
181+
157182
def getNewWriter = new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize,
158-
compressStream, syncWrites)
183+
wrapForCompression, syncWrites)
159184

160185
var writer = getNewWriter
161186
var objectsWritten = 0
@@ -168,6 +193,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
168193

169194
if (objectsWritten == serializerBatchSize) {
170195
writer.commit()
196+
writer.close()
197+
_diskBytesSpilled += writer.bytesWritten
171198
writer = getNewWriter
172199
objectsWritten = 0
173200
}
@@ -176,8 +203,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
176203
if (objectsWritten > 0) writer.commit()
177204
} finally {
178205
// Partial failures cannot be tolerated; do not revert partial writes
179-
_diskBytesSpilled += writer.bytesWritten
180206
writer.close()
207+
_diskBytesSpilled += writer.bytesWritten
181208
}
182209
currentMap = new SizeTrackingAppendOnlyMap[K, C]
183210
spilledMaps.append(new DiskMapIterator(file, blockId))

docs/configuration.md

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,9 @@ Apart from these, the following properties are also available, and may be useful
158158
<td>spark.shuffle.spill.compress</td>
159159
<td>true</td>
160160
<td>
161-
Whether to compress data spilled during shuffles.
161+
Whether to compress data spilled during shuffles. If enabled, spill compression
162+
always uses the `org.apache.spark.io.LZFCompressionCodec` codec,
163+
regardless of the value of `spark.io.compression.codec`.
162164
</td>
163165
</tr>
164166
<tr>
@@ -379,13 +381,6 @@ Apart from these, the following properties are also available, and may be useful
379381
Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, <code>BlockManager</code> might take a performance hit.
380382
</td>
381383
</tr>
382-
<tr>
383-
<td>akka.x.y....</td>
384-
<td>value</td>
385-
<td>
386-
An arbitrary akka configuration can be set directly on spark conf and it is applied for all the ActorSystems created spark wide for that SparkContext and its assigned executors as well.
387-
</td>
388-
</tr>
389384

390385
<tr>
391386
<td>spark.shuffle.consolidateFiles</td>

0 commit comments

Comments
 (0)