Skip to content

Commit 1faf4f4

Browse files
committed
Merge branch 'master' into allocateExecutors
2 parents 3c464bd + b8d2580 commit 1faf4f4

File tree

91 files changed

+2241
-1395
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

91 files changed

+2241
-1395
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
sbt/*.jar
88
.settings
99
.cache
10-
.mima-excludes
10+
.generated-mima-excludes
1111
/build/
1212
work/
1313
out/

.rat-excludes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ target
33
.project
44
.classpath
55
.mima-excludes
6+
.generated-mima-excludes
67
.rat-excludes
78
.*md
89
derby.log

bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,6 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
3838
sc.stop()
3939
sc = null
4040
}
41-
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
42-
System.clearProperty("spark.driver.port")
4341
}
4442

4543
test("halting by voting") {

core/pom.xml

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -258,35 +258,6 @@
258258
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
259259
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
260260
<plugins>
261-
<plugin>
262-
<groupId>org.apache.maven.plugins</groupId>
263-
<artifactId>maven-antrun-plugin</artifactId>
264-
<executions>
265-
<execution>
266-
<phase>test</phase>
267-
<goals>
268-
<goal>run</goal>
269-
</goals>
270-
<configuration>
271-
<exportAntProperties>true</exportAntProperties>
272-
<target>
273-
<property name="spark.classpath" refid="maven.test.classpath" />
274-
<property environment="env" />
275-
<fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
276-
<condition>
277-
<not>
278-
<or>
279-
<isset property="env.SCALA_HOME" />
280-
<isset property="env.SCALA_LIBRARY_PATH" />
281-
</or>
282-
</not>
283-
</condition>
284-
</fail>
285-
</target>
286-
</configuration>
287-
</execution>
288-
</executions>
289-
</plugin>
290261
<plugin>
291262
<groupId>org.scalatest</groupId>
292263
<artifactId>scalatest-maven-plugin</artifactId>

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ class SparkContext(config: SparkConf) extends Logging {
7676
* :: DeveloperApi ::
7777
* Alternative constructor for setting preferred locations where Spark will create executors.
7878
*
79-
* @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. Ca
80-
* be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
79+
* @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on.
80+
* Can be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
8181
* from a list of input files or InputFormats for the application.
8282
*/
8383
@DeveloperApi

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -672,38 +672,47 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
672672

673673
/**
674674
* Return approximate number of distinct values for each key in this RDD.
675-
* The accuracy of approximation can be controlled through the relative standard deviation
676-
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
677-
* more accurate counts but increase the memory footprint and vise versa. Uses the provided
678-
* Partitioner to partition the output RDD.
675+
*
676+
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
677+
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
678+
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
679+
*
680+
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
681+
* It must be greater than 0.000017.
682+
* @param partitioner partitioner of the resulting RDD.
679683
*/
680-
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = {
681-
rdd.countApproxDistinctByKey(relativeSD, partitioner)
684+
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaPairRDD[K, Long] =
685+
{
686+
fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner))
682687
}
683688

684689
/**
685-
* Return approximate number of distinct values for each key this RDD.
686-
* The accuracy of approximation can be controlled through the relative standard deviation
687-
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
688-
* more accurate counts but increase the memory footprint and vise versa. The default value of
689-
* relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
690-
* level.
690+
* Return approximate number of distinct values for each key in this RDD.
691+
*
692+
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
693+
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
694+
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
695+
*
696+
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
697+
* It must be greater than 0.000017.
698+
* @param numPartitions number of partitions of the resulting RDD.
691699
*/
692-
def countApproxDistinctByKey(relativeSD: Double = 0.05): JavaRDD[(K, Long)] = {
693-
rdd.countApproxDistinctByKey(relativeSD)
700+
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, Long] = {
701+
fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions))
694702
}
695703

696-
697704
/**
698705
* Return approximate number of distinct values for each key in this RDD.
699-
* The accuracy of approximation can be controlled through the relative standard deviation
700-
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
701-
* more accurate counts but increase the memory footprint and vise versa. HashPartitions the
702-
* output RDD into numPartitions.
703706
*
707+
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
708+
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
709+
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
710+
*
711+
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
712+
* It must be greater than 0.000017.
704713
*/
705-
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = {
706-
rdd.countApproxDistinctByKey(relativeSD, numPartitions)
714+
def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, Long] = {
715+
fromRDD(rdd.countApproxDistinctByKey(relativeSD))
707716
}
708717

709718
/** Assign a name to this RDD */

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -560,12 +560,14 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
560560
/**
561561
* Return approximate number of distinct elements in the RDD.
562562
*
563-
* The accuracy of approximation can be controlled through the relative standard deviation
564-
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
565-
* more accurate counts but increase the memory footprint and vise versa. The default value of
566-
* relativeSD is 0.05.
563+
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
564+
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
565+
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
566+
*
567+
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
568+
* It must be greater than 0.000017.
567569
*/
568-
def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)
570+
def countApproxDistinct(relativeSD: Double): Long = rdd.countApproxDistinct(relativeSD)
569571

570572
def name(): String = rdd.name
571573

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,26 @@ private object SpecialLengths {
269269
private[spark] object PythonRDD {
270270
val UTF8 = Charset.forName("UTF-8")
271271

272+
/**
273+
* Adapter for calling SparkContext#runJob from Python.
274+
*
275+
* This method will return an iterator of an array that contains all elements in the RDD
276+
* (effectively a collect()), but allows you to run on a certain subset of partitions,
277+
* or to enable local execution.
278+
*/
279+
def runJob(
280+
sc: SparkContext,
281+
rdd: JavaRDD[Array[Byte]],
282+
partitions: JArrayList[Int],
283+
allowLocal: Boolean): Iterator[Array[Byte]] = {
284+
type ByteArray = Array[Byte]
285+
type UnrolledPartition = Array[ByteArray]
286+
val allPartitions: Array[UnrolledPartition] =
287+
sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions, allowLocal)
288+
val flattenedPartition: UnrolledPartition = Array.concat(allPartitions: _*)
289+
flattenedPartition.iterator
290+
}
291+
272292
def readRDDFromFile(sc: JavaSparkContext, filename: String, parallelism: Int):
273293
JavaRDD[Array[Byte]] = {
274294
val file = new DataInputStream(new FileInputStream(filename))

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -381,16 +381,19 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
381381
object SparkSubmitArguments {
382382
/** Load properties present in the given file. */
383383
def getPropertiesFromFile(file: File): Seq[(String, String)] = {
384-
require(file.exists(), s"Properties file ${file.getName} does not exist")
384+
require(file.exists(), s"Properties file $file does not exist")
385+
require(file.isFile(), s"Properties file $file is not a normal file")
385386
val inputStream = new FileInputStream(file)
386-
val properties = new Properties()
387387
try {
388+
val properties = new Properties()
388389
properties.load(inputStream)
390+
properties.stringPropertyNames().toSeq.map(k => (k, properties(k).trim))
389391
} catch {
390392
case e: IOException =>
391-
val message = s"Failed when loading Spark properties file ${file.getName}"
393+
val message = s"Failed when loading Spark properties file $file"
392394
throw new SparkException(message, e)
395+
} finally {
396+
inputStream.close()
393397
}
394-
properties.stringPropertyNames().toSeq.map(k => (k, properties(k).trim))
395398
}
396399
}

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,17 +61,23 @@ private[spark] class ExecutorRunner(
6161
// Shutdown hook that kills actors on shutdown.
6262
shutdownHook = new Thread() {
6363
override def run() {
64-
killProcess()
64+
killProcess(Some("Worker shutting down"))
6565
}
6666
}
6767
Runtime.getRuntime.addShutdownHook(shutdownHook)
6868
}
6969

70-
private def killProcess() {
70+
/**
71+
* kill executor process, wait for exit and notify worker to update resource status
72+
*
73+
* @param message the exception message which caused the executor's death
74+
*/
75+
private def killProcess(message: Option[String]) {
7176
if (process != null) {
7277
logInfo("Killing process!")
7378
process.destroy()
74-
process.waitFor()
79+
val exitCode = process.waitFor()
80+
worker ! ExecutorStateChanged(appId, execId, state, message, Some(exitCode))
7581
}
7682
}
7783

@@ -82,7 +88,6 @@ private[spark] class ExecutorRunner(
8288
workerThread.interrupt()
8389
workerThread = null
8490
state = ExecutorState.KILLED
85-
worker ! ExecutorStateChanged(appId, execId, state, None, None)
8691
Runtime.getRuntime.removeShutdownHook(shutdownHook)
8792
}
8893
}
@@ -148,14 +153,13 @@ private[spark] class ExecutorRunner(
148153
} catch {
149154
case interrupted: InterruptedException => {
150155
logInfo("Runner thread for executor " + fullId + " interrupted")
151-
killProcess()
156+
state = ExecutorState.KILLED
157+
killProcess(None)
152158
}
153159
case e: Exception => {
154160
logError("Error running executor", e)
155-
killProcess()
156161
state = ExecutorState.FAILED
157-
val message = e.getClass + ": " + e.getMessage
158-
worker ! ExecutorStateChanged(appId, execId, state, Some(message), None)
162+
killProcess(Some(e.toString))
159163
}
160164
}
161165
}

0 commit comments

Comments
 (0)