Skip to content

Commit e132d69

Browse files
committed
Merge branch 'master' of github.com:apache/spark into fix-drop-events
2 parents 14fa1c3 + 0da07da commit e132d69

File tree

27 files changed

+1545
-560
lines changed

27 files changed

+1545
-560
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 49 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio
2525
import scala.collection.JavaConversions._
2626
import scala.language.existentials
2727
import scala.reflect.ClassTag
28-
import scala.util.Try
28+
import scala.util.{Try, Success, Failure}
2929

3030
import net.razorvine.pickle.{Pickler, Unpickler}
3131

@@ -536,25 +536,6 @@ private[spark] object PythonRDD extends Logging {
536536
file.close()
537537
}
538538

539-
/**
540-
* Convert an RDD of serialized Python dictionaries to Scala Maps (no recursive conversions).
541-
* It is only used by pyspark.sql.
542-
* TODO: Support more Python types.
543-
*/
544-
def pythonToJavaMap(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[Map[String, _]] = {
545-
pyRDD.rdd.mapPartitions { iter =>
546-
val unpickle = new Unpickler
547-
iter.flatMap { row =>
548-
unpickle.loads(row) match {
549-
// in case of objects are pickled in batch mode
550-
case objs: java.util.ArrayList[JMap[String, _] @unchecked] => objs.map(_.toMap)
551-
// not in batch mode
552-
case obj: JMap[String @unchecked, _] => Seq(obj.toMap)
553-
}
554-
}
555-
}
556-
}
557-
558539
private def getMergedConf(confAsMap: java.util.HashMap[String, String],
559540
baseConf: Configuration): Configuration = {
560541
val conf = PythonHadoopUtil.mapToConf(confAsMap)
@@ -701,6 +682,54 @@ private[spark] object PythonRDD extends Logging {
701682
}
702683
}
703684

685+
686+
/**
687+
* Convert an RDD of serialized Python dictionaries to Scala Maps (no recursive conversions).
688+
* This function is outdated, PySpark does not use it anymore
689+
*/
690+
@deprecated
691+
def pythonToJavaMap(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[Map[String, _]] = {
692+
pyRDD.rdd.mapPartitions { iter =>
693+
val unpickle = new Unpickler
694+
iter.flatMap { row =>
695+
unpickle.loads(row) match {
696+
// in case of objects are pickled in batch mode
697+
case objs: JArrayList[JMap[String, _] @unchecked] => objs.map(_.toMap)
698+
// not in batch mode
699+
case obj: JMap[String @unchecked, _] => Seq(obj.toMap)
700+
}
701+
}
702+
}
703+
}
704+
705+
/**
706+
* Convert an RDD of serialized Python tuple to Array (no recursive conversions).
707+
* It is only used by pyspark.sql.
708+
*/
709+
def pythonToJavaArray(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Array[_]] = {
710+
711+
def toArray(obj: Any): Array[_] = {
712+
obj match {
713+
case objs: JArrayList[_] =>
714+
objs.toArray
715+
case obj if obj.getClass.isArray =>
716+
obj.asInstanceOf[Array[_]].toArray
717+
}
718+
}
719+
720+
pyRDD.rdd.mapPartitions { iter =>
721+
val unpickle = new Unpickler
722+
iter.flatMap { row =>
723+
val obj = unpickle.loads(row)
724+
if (batched) {
725+
obj.asInstanceOf[JArrayList[_]].map(toArray)
726+
} else {
727+
Seq(toArray(obj))
728+
}
729+
}
730+
}.toJavaRDD()
731+
}
732+
704733
/**
705734
* Convert and RDD of Java objects to and RDD of serialized Python objects, that is usable by
706735
* PySpark.

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,17 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
8585
*/
8686
private def mergeSparkProperties(): Unit = {
8787
// Use common defaults file, if not specified by user
88+
if (propertiesFile == null) {
89+
sys.env.get("SPARK_CONF_DIR").foreach { sparkConfDir =>
90+
val sep = File.separator
91+
val defaultPath = s"${sparkConfDir}${sep}spark-defaults.conf"
92+
val file = new File(defaultPath)
93+
if (file.exists()) {
94+
propertiesFile = file.getAbsolutePath
95+
}
96+
}
97+
}
98+
8899
if (propertiesFile == null) {
89100
sys.env.get("SPARK_HOME").foreach { sparkHome =>
90101
val sep = File.separator

core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,23 +65,25 @@ private[spark] class HashShuffleWriter[K, V](
6565
}
6666

6767
/** Close this writer, passing along whether the map completed */
68-
override def stop(success: Boolean): Option[MapStatus] = {
68+
override def stop(initiallySuccess: Boolean): Option[MapStatus] = {
69+
var success = initiallySuccess
6970
try {
7071
if (stopping) {
7172
return None
7273
}
7374
stopping = true
7475
if (success) {
7576
try {
76-
return Some(commitWritesAndBuildStatus())
77+
Some(commitWritesAndBuildStatus())
7778
} catch {
7879
case e: Exception =>
80+
success = false
7981
revertWrites()
8082
throw e
8183
}
8284
} else {
8385
revertWrites()
84-
return None
86+
None
8587
}
8688
} finally {
8789
// Release the writers back to the shuffle block manager.
@@ -100,8 +102,7 @@ private[spark] class HashShuffleWriter[K, V](
100102
var totalBytes = 0L
101103
var totalTime = 0L
102104
val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter =>
103-
writer.commit()
104-
writer.close()
105+
writer.commitAndClose()
105106
val size = writer.fileSegment().length
106107
totalBytes += size
107108
totalTime += writer.timeWriting()
@@ -120,8 +121,7 @@ private[spark] class HashShuffleWriter[K, V](
120121
private def revertWrites(): Unit = {
121122
if (shuffle != null && shuffle.writers != null) {
122123
for (writer <- shuffle.writers) {
123-
writer.revertPartialWrites()
124-
writer.close()
124+
writer.revertPartialWritesAndClose()
125125
}
126126
}
127127
}

core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,7 @@ private[spark] class SortShuffleWriter[K, V, C](
9494
for (elem <- elements) {
9595
writer.write(elem)
9696
}
97-
writer.commit()
98-
writer.close()
97+
writer.commitAndClose()
9998
val segment = writer.fileSegment()
10099
offsets(id + 1) = segment.offset + segment.length
101100
lengths(id) = segment.length

core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,16 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
3939
def isOpen: Boolean
4040

4141
/**
42-
* Flush the partial writes and commit them as a single atomic block. Return the
43-
* number of bytes written for this commit.
42+
* Flush the partial writes and commit them as a single atomic block.
4443
*/
45-
def commit(): Long
44+
def commitAndClose(): Unit
4645

4746
/**
4847
* Reverts writes that haven't been flushed yet. Callers should invoke this function
49-
* when there are runtime exceptions.
48+
* when there are runtime exceptions. This method will not throw, though it may be
49+
* unsuccessful in truncating written data.
5050
*/
51-
def revertPartialWrites()
51+
def revertPartialWritesAndClose()
5252

5353
/**
5454
* Writes an object.
@@ -57,6 +57,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
5757

5858
/**
5959
* Returns the file segment of committed data that this Writer has written.
60+
* This is only valid after commitAndClose() has been called.
6061
*/
6162
def fileSegment(): FileSegment
6263

@@ -108,15 +109,14 @@ private[spark] class DiskBlockObjectWriter(
108109
private var ts: TimeTrackingOutputStream = null
109110
private var objOut: SerializationStream = null
110111
private val initialPosition = file.length()
111-
private var lastValidPosition = initialPosition
112+
private var finalPosition: Long = -1
112113
private var initialized = false
113114
private var _timeWriting = 0L
114115

115116
override def open(): BlockObjectWriter = {
116117
fos = new FileOutputStream(file, true)
117118
ts = new TimeTrackingOutputStream(fos)
118119
channel = fos.getChannel()
119-
lastValidPosition = initialPosition
120120
bs = compressStream(new BufferedOutputStream(ts, bufferSize))
121121
objOut = serializer.newInstance().serializeStream(bs)
122122
initialized = true
@@ -147,28 +147,36 @@ private[spark] class DiskBlockObjectWriter(
147147

148148
override def isOpen: Boolean = objOut != null
149149

150-
override def commit(): Long = {
150+
override def commitAndClose(): Unit = {
151151
if (initialized) {
152152
// NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the
153153
// serializer stream and the lower level stream.
154154
objOut.flush()
155155
bs.flush()
156-
val prevPos = lastValidPosition
157-
lastValidPosition = channel.position()
158-
lastValidPosition - prevPos
159-
} else {
160-
// lastValidPosition is zero if stream is uninitialized
161-
lastValidPosition
156+
close()
162157
}
158+
finalPosition = file.length()
163159
}
164160

165-
override def revertPartialWrites() {
166-
if (initialized) {
167-
// Discard current writes. We do this by flushing the outstanding writes and
168-
// truncate the file to the last valid position.
169-
objOut.flush()
170-
bs.flush()
171-
channel.truncate(lastValidPosition)
161+
// Discard current writes. We do this by flushing the outstanding writes and then
162+
// truncating the file to its initial position.
163+
override def revertPartialWritesAndClose() {
164+
try {
165+
if (initialized) {
166+
objOut.flush()
167+
bs.flush()
168+
close()
169+
}
170+
171+
val truncateStream = new FileOutputStream(file, true)
172+
try {
173+
truncateStream.getChannel.truncate(initialPosition)
174+
} finally {
175+
truncateStream.close()
176+
}
177+
} catch {
178+
case e: Exception =>
179+
logError("Uncaught exception while reverting partial writes to file " + file, e)
172180
}
173181
}
174182

@@ -188,6 +196,7 @@ private[spark] class DiskBlockObjectWriter(
188196

189197
// Only valid if called after commit()
190198
override def bytesWritten: Long = {
191-
lastValidPosition - initialPosition
199+
assert(finalPosition != -1, "bytesWritten is only valid after successful commit()")
200+
finalPosition - initialPosition
192201
}
193202
}

core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,8 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
144144
if (consolidateShuffleFiles) {
145145
if (success) {
146146
val offsets = writers.map(_.fileSegment().offset)
147-
fileGroup.recordMapOutput(mapId, offsets)
147+
val lengths = writers.map(_.fileSegment().length)
148+
fileGroup.recordMapOutput(mapId, offsets, lengths)
148149
}
149150
recycleFileGroup(fileGroup)
150151
} else {
@@ -247,47 +248,48 @@ object ShuffleBlockManager {
247248
* A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
248249
*/
249250
private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[File]) {
251+
private var numBlocks: Int = 0
252+
250253
/**
251254
* Stores the absolute index of each mapId in the files of this group. For instance,
252255
* if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
253256
*/
254257
private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
255258

256259
/**
257-
* Stores consecutive offsets of blocks into each reducer file, ordered by position in the file.
258-
* This ordering allows us to compute block lengths by examining the following block offset.
260+
* Stores consecutive offsets and lengths of blocks into each reducer file, ordered by
261+
* position in the file.
259262
* Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every
260263
* reducer.
261264
*/
262265
private val blockOffsetsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
263266
new PrimitiveVector[Long]()
264267
}
265-
266-
def numBlocks = mapIdToIndex.size
268+
private val blockLengthsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
269+
new PrimitiveVector[Long]()
270+
}
267271

268272
def apply(bucketId: Int) = files(bucketId)
269273

270-
def recordMapOutput(mapId: Int, offsets: Array[Long]) {
274+
def recordMapOutput(mapId: Int, offsets: Array[Long], lengths: Array[Long]) {
275+
assert(offsets.length == lengths.length)
271276
mapIdToIndex(mapId) = numBlocks
277+
numBlocks += 1
272278
for (i <- 0 until offsets.length) {
273279
blockOffsetsByReducer(i) += offsets(i)
280+
blockLengthsByReducer(i) += lengths(i)
274281
}
275282
}
276283

277284
/** Returns the FileSegment associated with the given map task, or None if no entry exists. */
278285
def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = {
279286
val file = files(reducerId)
280287
val blockOffsets = blockOffsetsByReducer(reducerId)
288+
val blockLengths = blockLengthsByReducer(reducerId)
281289
val index = mapIdToIndex.getOrElse(mapId, -1)
282290
if (index >= 0) {
283291
val offset = blockOffsets(index)
284-
val length =
285-
if (index + 1 < numBlocks) {
286-
blockOffsets(index + 1) - offset
287-
} else {
288-
file.length() - offset
289-
}
290-
assert(length >= 0)
292+
val length = blockLengths(index)
291293
Some(new FileSegment(file, offset, length))
292294
} else {
293295
None

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -286,17 +286,23 @@ private[spark] object Utils extends Logging {
286286
out: OutputStream,
287287
closeStreams: Boolean = false)
288288
{
289-
val buf = new Array[Byte](8192)
290-
var n = 0
291-
while (n != -1) {
292-
n = in.read(buf)
293-
if (n != -1) {
294-
out.write(buf, 0, n)
289+
try {
290+
val buf = new Array[Byte](8192)
291+
var n = 0
292+
while (n != -1) {
293+
n = in.read(buf)
294+
if (n != -1) {
295+
out.write(buf, 0, n)
296+
}
297+
}
298+
} finally {
299+
if (closeStreams) {
300+
try {
301+
in.close()
302+
} finally {
303+
out.close()
304+
}
295305
}
296-
}
297-
if (closeStreams) {
298-
in.close()
299-
out.close()
300306
}
301307
}
302308

@@ -868,9 +874,12 @@ private[spark] object Utils extends Logging {
868874
val buff = new Array[Byte]((effectiveEnd-effectiveStart).toInt)
869875
val stream = new FileInputStream(file)
870876

871-
stream.skip(effectiveStart)
872-
stream.read(buff)
873-
stream.close()
877+
try {
878+
stream.skip(effectiveStart)
879+
stream.read(buff)
880+
} finally {
881+
stream.close()
882+
}
874883
Source.fromBytes(buff).mkString
875884
}
876885

0 commit comments

Comments
 (0)