Skip to content

Commit ae32e92

Browse files
author
Davies Liu
committed
Merge branch 'master' of github.com:apache/spark into fix_conf
2 parents 1fd70df + e41786c commit ae32e92

File tree

22 files changed

+595
-139
lines changed

22 files changed

+595
-139
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark;
19+
20+
public enum JobExecutionStatus {
21+
RUNNING,
22+
SUCCEEDED,
23+
FAILED,
24+
UNKNOWN
25+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark;
19+
20+
/**
21+
* Exposes information about Spark Jobs.
22+
*
23+
* This interface is not designed to be implemented outside of Spark. We may add additional methods
24+
* which may break binary compatibility with outside implementations.
25+
*/
26+
public interface SparkJobInfo {
27+
int jobId();
28+
int[] stageIds();
29+
JobExecutionStatus status();
30+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark;
19+
20+
/**
21+
* Exposes information about Spark Stages.
22+
*
23+
* This interface is not designed to be implemented outside of Spark. We may add additional methods
24+
* which may break binary compatibility with outside implementations.
25+
*/
26+
public interface SparkStageInfo {
27+
int stageId();
28+
int currentAttemptId();
29+
String name();
30+
int numTasks();
31+
int numActiveTasks();
32+
int numCompletedTasks();
33+
int numFailedTasks();
34+
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 9 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicInteger
2626
import java.util.{Properties, UUID}
2727
import java.util.UUID.randomUUID
2828
import scala.collection.{Map, Set}
29-
import scala.collection.JavaConversions._
3029
import scala.collection.generic.Growable
3130
import scala.collection.mutable.HashMap
3231
import scala.reflect.{ClassTag, classTag}
@@ -51,6 +50,7 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me
5150
import org.apache.spark.scheduler.local.LocalBackend
5251
import org.apache.spark.storage._
5352
import org.apache.spark.ui.SparkUI
53+
import org.apache.spark.ui.jobs.JobProgressListener
5454
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
5555

5656
/**
@@ -61,7 +61,7 @@ import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, Metadat
6161
* this config overrides the default configs as well as system properties.
6262
*/
6363

64-
class SparkContext(config: SparkConf) extends Logging {
64+
class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
6565

6666
// This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
6767
// etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It
@@ -224,10 +224,15 @@ class SparkContext(config: SparkConf) extends Logging {
224224
private[spark] val metadataCleaner =
225225
new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)
226226

227-
// Initialize the Spark UI, registering all associated listeners
227+
228+
private[spark] val jobProgressListener = new JobProgressListener(conf)
229+
listenerBus.addListener(jobProgressListener)
230+
231+
// Initialize the Spark UI
228232
private[spark] val ui: Option[SparkUI] =
229233
if (conf.getBoolean("spark.ui.enabled", true)) {
230-
Some(new SparkUI(this))
234+
Some(SparkUI.createLiveUI(this, conf, listenerBus, jobProgressListener,
235+
env.securityManager,appName))
231236
} else {
232237
// For tests, do not enable the UI
233238
None
@@ -852,69 +857,6 @@ class SparkContext(config: SparkConf) extends Logging {
852857
/** The version of Spark on which this application is running. */
853858
def version = SPARK_VERSION
854859

855-
/**
856-
* Return a map from the slave to the max memory available for caching and the remaining
857-
* memory available for caching.
858-
*/
859-
def getExecutorMemoryStatus: Map[String, (Long, Long)] = {
860-
env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) =>
861-
(blockManagerId.host + ":" + blockManagerId.port, mem)
862-
}
863-
}
864-
865-
/**
866-
* :: DeveloperApi ::
867-
* Return information about what RDDs are cached, if they are in mem or on disk, how much space
868-
* they take, etc.
869-
*/
870-
@DeveloperApi
871-
def getRDDStorageInfo: Array[RDDInfo] = {
872-
val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray
873-
StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus)
874-
rddInfos.filter(_.isCached)
875-
}
876-
877-
/**
878-
* Returns an immutable map of RDDs that have marked themselves as persistent via cache() call.
879-
* Note that this does not necessarily mean the caching or computation was successful.
880-
*/
881-
def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap
882-
883-
/**
884-
* :: DeveloperApi ::
885-
* Return information about blocks stored in all of the slaves
886-
*/
887-
@DeveloperApi
888-
def getExecutorStorageStatus: Array[StorageStatus] = {
889-
env.blockManager.master.getStorageStatus
890-
}
891-
892-
/**
893-
* :: DeveloperApi ::
894-
* Return pools for fair scheduler
895-
*/
896-
@DeveloperApi
897-
def getAllPools: Seq[Schedulable] = {
898-
// TODO(xiajunluan): We should take nested pools into account
899-
taskScheduler.rootPool.schedulableQueue.toSeq
900-
}
901-
902-
/**
903-
* :: DeveloperApi ::
904-
* Return the pool associated with the given name, if one exists
905-
*/
906-
@DeveloperApi
907-
def getPoolForName(pool: String): Option[Schedulable] = {
908-
Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool))
909-
}
910-
911-
/**
912-
* Return current scheduling mode
913-
*/
914-
def getSchedulingMode: SchedulingMode.SchedulingMode = {
915-
taskScheduler.schedulingMode
916-
}
917-
918860
/**
919861
* Clear the job's list of files added by `addFile` so that they do not get downloaded to
920862
* any new nodes.
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
import scala.collection.Map
21+
import scala.collection.JavaConversions._
22+
23+
import org.apache.spark.annotation.DeveloperApi
24+
import org.apache.spark.rdd.RDD
25+
import org.apache.spark.scheduler.{SchedulingMode, Schedulable}
26+
import org.apache.spark.storage.{StorageStatus, StorageUtils, RDDInfo}
27+
28+
/**
29+
* Trait that implements Spark's status APIs. This trait is designed to be mixed into
30+
* SparkContext; it allows the status API code to live in its own file.
31+
*/
32+
private[spark] trait SparkStatusAPI { this: SparkContext =>
33+
34+
/**
35+
* Return a map from the slave to the max memory available for caching and the remaining
36+
* memory available for caching.
37+
*/
38+
def getExecutorMemoryStatus: Map[String, (Long, Long)] = {
39+
env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) =>
40+
(blockManagerId.host + ":" + blockManagerId.port, mem)
41+
}
42+
}
43+
44+
/**
45+
* :: DeveloperApi ::
46+
* Return information about what RDDs are cached, if they are in mem or on disk, how much space
47+
* they take, etc.
48+
*/
49+
@DeveloperApi
50+
def getRDDStorageInfo: Array[RDDInfo] = {
51+
val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray
52+
StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus)
53+
rddInfos.filter(_.isCached)
54+
}
55+
56+
/**
57+
* Returns an immutable map of RDDs that have marked themselves as persistent via cache() call.
58+
* Note that this does not necessarily mean the caching or computation was successful.
59+
*/
60+
def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap
61+
62+
/**
63+
* :: DeveloperApi ::
64+
* Return information about blocks stored in all of the slaves
65+
*/
66+
@DeveloperApi
67+
def getExecutorStorageStatus: Array[StorageStatus] = {
68+
env.blockManager.master.getStorageStatus
69+
}
70+
71+
/**
72+
* :: DeveloperApi ::
73+
* Return pools for fair scheduler
74+
*/
75+
@DeveloperApi
76+
def getAllPools: Seq[Schedulable] = {
77+
// TODO(xiajunluan): We should take nested pools into account
78+
taskScheduler.rootPool.schedulableQueue.toSeq
79+
}
80+
81+
/**
82+
* :: DeveloperApi ::
83+
* Return the pool associated with the given name, if one exists
84+
*/
85+
@DeveloperApi
86+
def getPoolForName(pool: String): Option[Schedulable] = {
87+
Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool))
88+
}
89+
90+
/**
91+
* Return current scheduling mode
92+
*/
93+
def getSchedulingMode: SchedulingMode.SchedulingMode = {
94+
taskScheduler.schedulingMode
95+
}
96+
97+
98+
/**
99+
* Return a list of all known jobs in a particular job group. The returned list may contain
100+
* running, failed, and completed jobs, and may vary across invocations of this method. This
101+
* method does not guarantee the order of the elements in its result.
102+
*/
103+
def getJobIdsForGroup(jobGroup: String): Array[Int] = {
104+
jobProgressListener.synchronized {
105+
val jobData = jobProgressListener.jobIdToData.valuesIterator
106+
jobData.filter(_.jobGroup.exists(_ == jobGroup)).map(_.jobId).toArray
107+
}
108+
}
109+
110+
/**
111+
* Returns job information, or `None` if the job info could not be found or was garbage collected.
112+
*/
113+
def getJobInfo(jobId: Int): Option[SparkJobInfo] = {
114+
jobProgressListener.synchronized {
115+
jobProgressListener.jobIdToData.get(jobId).map { data =>
116+
new SparkJobInfoImpl(jobId, data.stageIds.toArray, data.status)
117+
}
118+
}
119+
}
120+
121+
/**
122+
* Returns stage information, or `None` if the stage info could not be found or was
123+
* garbage collected.
124+
*/
125+
def getStageInfo(stageId: Int): Option[SparkStageInfo] = {
126+
jobProgressListener.synchronized {
127+
for (
128+
info <- jobProgressListener.stageIdToInfo.get(stageId);
129+
data <- jobProgressListener.stageIdToData.get((stageId, info.attemptId))
130+
) yield {
131+
new SparkStageInfoImpl(
132+
stageId,
133+
info.attemptId,
134+
info.name,
135+
info.numTasks,
136+
data.numActiveTasks,
137+
data.numCompleteTasks,
138+
data.numFailedTasks)
139+
}
140+
}
141+
}
142+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
private class SparkJobInfoImpl (
21+
val jobId: Int,
22+
val stageIds: Array[Int],
23+
val status: JobExecutionStatus)
24+
extends SparkJobInfo
25+
26+
private class SparkStageInfoImpl(
27+
val stageId: Int,
28+
val currentAttemptId: Int,
29+
val name: String,
30+
val numTasks: Int,
31+
val numActiveTasks: Int,
32+
val numCompletedTasks: Int,
33+
val numFailedTasks: Int)
34+
extends SparkStageInfo

0 commit comments

Comments
 (0)