Skip to content

Commit 3979aad

Browse files
author
Marcelo Vanzin
committed
Merge branch 'master' into SPARK-4194
Conflicts: core/src/main/scala/org/apache/spark/SparkContext.scala core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
2 parents 8caa8b3 + 512a2f1 commit 3979aad

File tree

85 files changed

+833
-279
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

85 files changed

+833
-279
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -356,11 +356,14 @@ private[spark] object MapOutputTracker extends Logging {
356356
def serializeMapStatuses(statuses: Array[MapStatus]): Array[Byte] = {
357357
val out = new ByteArrayOutputStream
358358
val objOut = new ObjectOutputStream(new GZIPOutputStream(out))
359-
// Since statuses can be modified in parallel, sync on it
360-
statuses.synchronized {
361-
objOut.writeObject(statuses)
359+
Utils.tryWithSafeFinally {
360+
// Since statuses can be modified in parallel, sync on it
361+
statuses.synchronized {
362+
objOut.writeObject(statuses)
363+
}
364+
} {
365+
objOut.close()
362366
}
363-
objOut.close()
364367
out.toByteArray
365368
}
366369

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
328328
// Thread Local variable that can be used by users to pass information down the stack
329329
private val localProperties = new InheritableThreadLocal[Properties] {
330330
override protected def childValue(parent: Properties): Properties = new Properties(parent)
331+
override protected def initialValue(): Properties = new Properties()
331332
}
332333

333334
/* ------------------------------------------------------------------------------------- *
@@ -563,9 +564,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
563564
* Spark fair scheduler pool.
564565
*/
565566
def setLocalProperty(key: String, value: String) {
566-
if (localProperties.get() == null) {
567-
localProperties.set(new Properties())
568-
}
569567
if (value == null) {
570568
localProperties.get.remove(key)
571569
} else {
@@ -1474,6 +1472,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
14741472
if (!stopped) {
14751473
stopped = true
14761474
postApplicationEnd()
1475+
<<<<<<< HEAD
14771476
_ui.foreach(_.stop())
14781477
if (env != null) {
14791478
env.metricsSystem.report()
@@ -1497,6 +1496,20 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
14971496
}
14981497
_progressBar.foreach(_.stop())
14991498
_taskScheduler = null
1499+
=======
1500+
ui.foreach(_.stop())
1501+
env.metricsSystem.report()
1502+
metadataCleaner.cancel()
1503+
cleaner.foreach(_.stop())
1504+
executorAllocationManager.foreach(_.stop())
1505+
dagScheduler.stop()
1506+
dagScheduler = null
1507+
listenerBus.stop()
1508+
eventLogger.foreach(_.stop())
1509+
env.actorSystem.stop(heartbeatReceiver)
1510+
progressBar.foreach(_.stop())
1511+
taskScheduler = null
1512+
>>>>>>> master
15001513
// TODO: Cache.stop()?
15011514
if (_env != null) {
15021515
_env.stop()

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -661,7 +661,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
661661
*/
662662
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = {
663663
import scala.collection.JavaConverters._
664-
def fn = (x: V) => f.call(x).asScala
664+
def fn: (V) => Iterable[U] = (x: V) => f.call(x).asScala
665665
implicit val ctag: ClassTag[U] = fakeClassTag
666666
fromRDD(rdd.flatMapValues(fn))
667667
}

core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
192192
*/
193193
def sortBy[S](f: JFunction[T, S], ascending: Boolean, numPartitions: Int): JavaRDD[T] = {
194194
import scala.collection.JavaConverters._
195-
def fn = (x: T) => f.call(x)
195+
def fn: (T) => S = (x: T) => f.call(x)
196196
import com.google.common.collect.Ordering // shadows scala.math.Ordering
197197
implicit val ordering = Ordering.natural().asInstanceOf[Ordering[S]]
198198
implicit val ctag: ClassTag[S] = fakeClassTag

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717

1818
package org.apache.spark.api.java
1919

20-
import java.util.{Comparator, List => JList, Iterator => JIterator}
20+
import java.{lang => jl}
2121
import java.lang.{Iterable => JIterable, Long => JLong}
22+
import java.util.{Comparator, List => JList, Iterator => JIterator}
2223

2324
import scala.collection.JavaConversions._
2425
import scala.collection.JavaConverters._
@@ -93,7 +94,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
9394
* of the original partition.
9495
*/
9596
def mapPartitionsWithIndex[R](
96-
f: JFunction2[java.lang.Integer, java.util.Iterator[T], java.util.Iterator[R]],
97+
f: JFunction2[jl.Integer, java.util.Iterator[T], java.util.Iterator[R]],
9798
preservesPartitioning: Boolean = false): JavaRDD[R] =
9899
new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))),
99100
preservesPartitioning)(fakeClassTag))(fakeClassTag)
@@ -109,7 +110,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
109110
* Return a new RDD by applying a function to all elements of this RDD.
110111
*/
111112
def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
112-
def cm = implicitly[ClassTag[(K2, V2)]]
113+
def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]]
113114
new JavaPairRDD(rdd.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2])
114115
}
115116

@@ -119,7 +120,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
119120
*/
120121
def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = {
121122
import scala.collection.JavaConverters._
122-
def fn = (x: T) => f.call(x).asScala
123+
def fn: (T) => Iterable[U] = (x: T) => f.call(x).asScala
123124
JavaRDD.fromRDD(rdd.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
124125
}
125126

@@ -129,8 +130,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
129130
*/
130131
def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
131132
import scala.collection.JavaConverters._
132-
def fn = (x: T) => f.call(x).asScala
133-
new JavaDoubleRDD(rdd.flatMap(fn).map((x: java.lang.Double) => x.doubleValue()))
133+
def fn: (T) => Iterable[jl.Double] = (x: T) => f.call(x).asScala
134+
new JavaDoubleRDD(rdd.flatMap(fn).map((x: jl.Double) => x.doubleValue()))
134135
}
135136

136137
/**
@@ -139,16 +140,18 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
139140
*/
140141
def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
141142
import scala.collection.JavaConverters._
142-
def fn = (x: T) => f.call(x).asScala
143-
def cm = implicitly[ClassTag[(K2, V2)]]
143+
def fn: (T) => Iterable[(K2, V2)] = (x: T) => f.call(x).asScala
144+
def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]]
144145
JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
145146
}
146147

147148
/**
148149
* Return a new RDD by applying a function to each partition of this RDD.
149150
*/
150151
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = {
151-
def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
152+
def fn: (Iterator[T]) => Iterator[U] = {
153+
(x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
154+
}
152155
JavaRDD.fromRDD(rdd.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
153156
}
154157

@@ -157,7 +160,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
157160
*/
158161
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U],
159162
preservesPartitioning: Boolean): JavaRDD[U] = {
160-
def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
163+
def fn: (Iterator[T]) => Iterator[U] = {
164+
(x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
165+
}
161166
JavaRDD.fromRDD(
162167
rdd.mapPartitions(fn, preservesPartitioning)(fakeClassTag[U]))(fakeClassTag[U])
163168
}
@@ -166,16 +171,20 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
166171
* Return a new RDD by applying a function to each partition of this RDD.
167172
*/
168173
def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
169-
def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
170-
new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: java.lang.Double) => x.doubleValue()))
174+
def fn: (Iterator[T]) => Iterator[jl.Double] = {
175+
(x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
176+
}
177+
new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: jl.Double) => x.doubleValue()))
171178
}
172179

173180
/**
174181
* Return a new RDD by applying a function to each partition of this RDD.
175182
*/
176183
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
177184
JavaPairRDD[K2, V2] = {
178-
def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
185+
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
186+
(x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
187+
}
179188
JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
180189
}
181190

@@ -184,7 +193,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
184193
*/
185194
def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]],
186195
preservesPartitioning: Boolean): JavaDoubleRDD = {
187-
def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
196+
def fn: (Iterator[T]) => Iterator[jl.Double] = {
197+
(x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
198+
}
188199
new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning)
189200
.map(x => x.doubleValue()))
190201
}
@@ -194,7 +205,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
194205
*/
195206
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2],
196207
preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = {
197-
def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
208+
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
209+
(x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
210+
}
198211
JavaPairRDD.fromRDD(
199212
rdd.mapPartitions(fn, preservesPartitioning))(fakeClassTag[K2], fakeClassTag[V2])
200213
}
@@ -277,8 +290,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
277290
def zipPartitions[U, V](
278291
other: JavaRDDLike[U, _],
279292
f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = {
280-
def fn = (x: Iterator[T], y: Iterator[U]) => asScalaIterator(
281-
f.call(asJavaIterator(x), asJavaIterator(y)).iterator())
293+
def fn: (Iterator[T], Iterator[U]) => Iterator[V] = {
294+
(x: Iterator[T], y: Iterator[U]) => asScalaIterator(
295+
f.call(asJavaIterator(x), asJavaIterator(y)).iterator())
296+
}
282297
JavaRDD.fromRDD(
283298
rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V])
284299
}
@@ -441,8 +456,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
441456
* Return the count of each unique value in this RDD as a map of (value, count) pairs. The final
442457
* combine step happens locally on the master, equivalent to running a single reduce task.
443458
*/
444-
def countByValue(): java.util.Map[T, java.lang.Long] =
445-
mapAsSerializableJavaMap(rdd.countByValue().map((x => (x._1, new java.lang.Long(x._2)))))
459+
def countByValue(): java.util.Map[T, jl.Long] =
460+
mapAsSerializableJavaMap(rdd.countByValue().map((x => (x._1, new jl.Long(x._2)))))
446461

447462
/**
448463
* (Experimental) Approximate version of countByValue().

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -614,9 +614,9 @@ private[spark] object PythonRDD extends Logging {
614614
try {
615615
val sock = serverSocket.accept()
616616
val out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream))
617-
try {
617+
Utils.tryWithSafeFinally {
618618
writeIteratorToStream(items, out)
619-
} finally {
619+
} {
620620
out.close()
621621
}
622622
} catch {
@@ -862,9 +862,9 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial
862862
val file = File.createTempFile("broadcast", "", dir)
863863
path = file.getAbsolutePath
864864
val out = new FileOutputStream(file)
865-
try {
865+
Utils.tryWithSafeFinally {
866866
Utils.copyStream(in, out)
867-
} finally {
867+
} {
868868
out.close()
869869
}
870870
}

core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ private[broadcast] object HttpBroadcast extends Logging {
165165
private def write(id: Long, value: Any) {
166166
val file = getFile(id)
167167
val fileOutputStream = new FileOutputStream(file)
168-
try {
168+
Utils.tryWithSafeFinally {
169169
val out: OutputStream = {
170170
if (compress) {
171171
compressionCodec.compressedOutputStream(fileOutputStream)
@@ -175,10 +175,13 @@ private[broadcast] object HttpBroadcast extends Logging {
175175
}
176176
val ser = SparkEnv.get.serializer.newInstance()
177177
val serOut = ser.serializeStream(out)
178-
serOut.writeObject(value)
179-
serOut.close()
178+
Utils.tryWithSafeFinally {
179+
serOut.writeObject(value)
180+
} {
181+
serOut.close()
182+
}
180183
files += file
181-
} finally {
184+
} {
182185
fileOutputStream.close()
183186
}
184187
}
@@ -212,9 +215,11 @@ private[broadcast] object HttpBroadcast extends Logging {
212215
}
213216
val ser = SparkEnv.get.serializer.newInstance()
214217
val serIn = ser.deserializeStream(in)
215-
val obj = serIn.readObject[T]()
216-
serIn.close()
217-
obj
218+
Utils.tryWithSafeFinally {
219+
serIn.readObject[T]()
220+
} {
221+
serIn.close()
222+
}
218223
}
219224

220225
/**

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
118118
if (!fs.exists(path)) {
119119
var msg = s"Log directory specified does not exist: $logDir."
120120
if (logDir == DEFAULT_LOG_DIR) {
121-
msg += " Did you configure the correct one through spark.fs.history.logDirectory?"
121+
msg += " Did you configure the correct one through spark.history.fs.logDirectory?"
122122
}
123123
throw new IllegalArgumentException(msg)
124124
}

core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import scala.reflect.ClassTag
2424
import akka.serialization.Serialization
2525

2626
import org.apache.spark.Logging
27+
import org.apache.spark.util.Utils
2728

2829

2930
/**
@@ -59,9 +60,9 @@ private[master] class FileSystemPersistenceEngine(
5960
val serializer = serialization.findSerializerFor(value)
6061
val serialized = serializer.toBinary(value)
6162
val out = new FileOutputStream(file)
62-
try {
63+
Utils.tryWithSafeFinally {
6364
out.write(serialized)
64-
} finally {
65+
} {
6566
out.close()
6667
}
6768
}

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.core.JsonProcessingException
2727
import com.google.common.base.Charsets
2828

2929
import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
30+
import org.apache.spark.util.Utils
3031

3132
/**
3233
* A client that submits applications to the standalone Master using a REST protocol.
@@ -148,8 +149,11 @@ private[deploy] class StandaloneRestClient extends Logging {
148149
conn.setRequestProperty("charset", "utf-8")
149150
conn.setDoOutput(true)
150151
val out = new DataOutputStream(conn.getOutputStream)
151-
out.write(json.getBytes(Charsets.UTF_8))
152-
out.close()
152+
Utils.tryWithSafeFinally {
153+
out.write(json.getBytes(Charsets.UTF_8))
154+
} {
155+
out.close()
156+
}
153157
readResponse(conn)
154158
}
155159

0 commit comments

Comments
 (0)