Skip to content

Commit 53faef1

Browse files
author
Marcelo Vanzin
committed
Merge branch 'master' into SPARK-4924
Conflicts: bin/spark-submit bin/spark-submit2.cmd yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
2 parents a7936ef + 72df5a3 commit 53faef1

File tree

77 files changed

+1505
-1363
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+1505
-1363
lines changed

bagel/pom.xml

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,6 @@
4444
<groupId>org.eclipse.jetty</groupId>
4545
<artifactId>jetty-server</artifactId>
4646
</dependency>
47-
<dependency>
48-
<groupId>org.scalatest</groupId>
49-
<artifactId>scalatest_${scala.binary.version}</artifactId>
50-
<scope>test</scope>
51-
</dependency>
5247
<dependency>
5348
<groupId>org.scalacheck</groupId>
5449
<artifactId>scalacheck_${scala.binary.version}</artifactId>
@@ -58,11 +53,5 @@
5853
<build>
5954
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
6055
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
61-
<plugins>
62-
<plugin>
63-
<groupId>org.scalatest</groupId>
64-
<artifactId>scalatest-maven-plugin</artifactId>
65-
</plugin>
66-
</plugins>
6756
</build>
6857
</project>

bagel/src/test/resources/log4j.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@
1515
# limitations under the License.
1616
#
1717

18-
# Set everything to be logged to the file bagel/target/unit-tests.log
18+
# Set everything to be logged to the file target/unit-tests.log
1919
log4j.rootCategory=INFO, file
2020
log4j.appender.file=org.apache.log4j.FileAppender
21-
log4j.appender.file.append=false
21+
log4j.appender.file.append=true
2222
log4j.appender.file.file=target/unit-tests.log
2323
log4j.appender.file.layout=org.apache.log4j.PatternLayout
2424
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n

core/pom.xml

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -281,11 +281,6 @@
281281
<artifactId>selenium-java</artifactId>
282282
<scope>test</scope>
283283
</dependency>
284-
<dependency>
285-
<groupId>org.scalatest</groupId>
286-
<artifactId>scalatest_${scala.binary.version}</artifactId>
287-
<scope>test</scope>
288-
</dependency>
289284
<dependency>
290285
<groupId>org.mockito</groupId>
291286
<artifactId>mockito-all</artifactId>
@@ -331,19 +326,6 @@
331326
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
332327
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
333328
<plugins>
334-
<plugin>
335-
<groupId>org.scalatest</groupId>
336-
<artifactId>scalatest-maven-plugin</artifactId>
337-
<executions>
338-
<execution>
339-
<id>test</id>
340-
<goals>
341-
<goal>test</goal>
342-
</goals>
343-
</execution>
344-
</executions>
345-
</plugin>
346-
347329
<!-- Unzip py4j so we can include its files in the jar -->
348330
<plugin>
349331
<groupId>org.apache.maven.plugins</groupId>

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
229229
// An asynchronous listener bus for Spark events
230230
private[spark] val listenerBus = new LiveListenerBus
231231

232-
conf.set("spark.executor.id", "driver")
232+
conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
233233

234234
// Create the Spark execution environment (cache, map output tracker, etc)
235235
private[spark] val env = SparkEnv.createDriverEnv(conf, isLocal, listenerBus)

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ object Client {
160160
val (actorSystem, _) = AkkaUtils.createActorSystem(
161161
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
162162

163+
// Verify driverArgs.master is a valid url so that we can use it in ClientActor safely
164+
Master.toAkkaUrl(driverArgs.master)
163165
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
164166

165167
actorSystem.awaitTermination()

core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import akka.actor._
2626
import akka.pattern.ask
2727
import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
2828

29-
import org.apache.spark.{Logging, SparkConf, SparkException}
29+
import org.apache.spark.{Logging, SparkConf}
3030
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
3131
import org.apache.spark.deploy.DeployMessages._
3232
import org.apache.spark.deploy.master.Master
@@ -47,6 +47,8 @@ private[spark] class AppClient(
4747
conf: SparkConf)
4848
extends Logging {
4949

50+
val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl)
51+
5052
val REGISTRATION_TIMEOUT = 20.seconds
5153
val REGISTRATION_RETRIES = 3
5254

@@ -75,9 +77,9 @@ private[spark] class AppClient(
7577
}
7678

7779
def tryRegisterAllMasters() {
78-
for (masterUrl <- masterUrls) {
79-
logInfo("Connecting to master " + masterUrl + "...")
80-
val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
80+
for (masterAkkaUrl <- masterAkkaUrls) {
81+
logInfo("Connecting to master " + masterAkkaUrl + "...")
82+
val actor = context.actorSelection(masterAkkaUrl)
8183
actor ! RegisterApplication(appDescription)
8284
}
8385
}
@@ -103,20 +105,14 @@ private[spark] class AppClient(
103105
}
104106

105107
def changeMaster(url: String) {
108+
// activeMasterUrl is a valid Spark url since we receive it from master.
106109
activeMasterUrl = url
107110
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
108-
masterAddress = activeMasterUrl match {
109-
case Master.sparkUrlRegex(host, port) =>
110-
Address("akka.tcp", Master.systemName, host, port.toInt)
111-
case x =>
112-
throw new SparkException("Invalid spark URL: " + x)
113-
}
111+
masterAddress = Master.toAkkaAddress(activeMasterUrl)
114112
}
115113

116114
private def isPossibleMaster(remoteUrl: Address) = {
117-
masterUrls.map(s => Master.toAkkaUrl(s))
118-
.map(u => AddressFromURIString(u).hostPort)
119-
.contains(remoteUrl.hostPort)
115+
masterAkkaUrls.map(AddressFromURIString(_).hostPort).contains(remoteUrl.hostPort)
120116
}
121117

122118
override def receiveWithLogging = {

core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ private[spark] case class ApplicationHistoryInfo(
2525
startTime: Long,
2626
endTime: Long,
2727
lastUpdated: Long,
28-
sparkUser: String)
28+
sparkUser: String,
29+
completed: Boolean = false)
2930

3031
private[spark] abstract class ApplicationHistoryProvider {
3132

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -173,20 +173,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
173173
val logInfos = statusList
174174
.filter { entry =>
175175
try {
176-
val isFinishedApplication =
177-
if (isLegacyLogDirectory(entry)) {
178-
fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE))
179-
} else {
180-
!entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
181-
}
182-
183-
if (isFinishedApplication) {
184-
val modTime = getModificationTime(entry)
185-
newLastModifiedTime = math.max(newLastModifiedTime, modTime)
186-
modTime >= lastModifiedTime
187-
} else {
188-
false
189-
}
176+
val modTime = getModificationTime(entry)
177+
newLastModifiedTime = math.max(newLastModifiedTime, modTime)
178+
modTime >= lastModifiedTime
190179
} catch {
191180
case e: AccessControlException =>
192181
// Do not use "logInfo" since these messages can get pretty noisy if printed on
@@ -204,7 +193,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
204193
None
205194
}
206195
}
207-
.sortBy { info => -info.endTime }
196+
.sortBy { info => (-info.endTime, -info.startTime) }
208197

209198
lastModifiedTime = newLastModifiedTime
210199

@@ -261,7 +250,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
261250
appListener.startTime.getOrElse(-1L),
262251
appListener.endTime.getOrElse(-1L),
263252
getModificationTime(eventLog),
264-
appListener.sparkUser.getOrElse(NOT_STARTED))
253+
appListener.sparkUser.getOrElse(NOT_STARTED),
254+
isApplicationCompleted(eventLog))
265255
} finally {
266256
logInput.close()
267257
}
@@ -329,6 +319,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
329319
/** Returns the system's mononotically increasing time. */
330320
private def getMonotonicTimeMs(): Long = System.nanoTime() / (1000 * 1000)
331321

322+
/**
323+
* Return true when the application has completed.
324+
*/
325+
private def isApplicationCompleted(entry: FileStatus): Boolean = {
326+
if (isLegacyLogDirectory(entry)) {
327+
fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE))
328+
} else {
329+
!entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
330+
}
331+
}
332+
332333
}
333334

334335
private object FsHistoryProvider {
@@ -342,5 +343,6 @@ private class FsApplicationHistoryInfo(
342343
startTime: Long,
343344
endTime: Long,
344345
lastUpdated: Long,
345-
sparkUser: String)
346-
extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser)
346+
sparkUser: String,
347+
completed: Boolean = true)
348+
extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser, completed)

core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
3131
def render(request: HttpServletRequest): Seq[Node] = {
3232
val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt
3333
val requestedFirst = (requestedPage - 1) * pageSize
34+
val requestedIncomplete =
35+
Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean
3436

35-
val allApps = parent.getApplicationList()
37+
val allApps = parent.getApplicationList().filter(_.completed != requestedIncomplete)
3638
val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0
3739
val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size))
3840

@@ -65,25 +67,26 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
6567

6668
<h4>
6769
Showing {actualFirst + 1}-{last + 1} of {allApps.size}
68-
<span style="float: right">
69-
{
70-
if (actualPage > 1) {
71-
<a href={"/?page=" + (actualPage - 1)}>&lt; </a>
72-
<a href={"/?page=1"}>1</a>
73-
}
70+
{if (requestedIncomplete) "(Incomplete applications)"}
71+
<span style="float: right">
72+
{
73+
if (actualPage > 1) {
74+
<a href={makePageLink(actualPage - 1, requestedIncomplete)}>&lt; </a>
75+
<a href={makePageLink(1, requestedIncomplete)}>1</a>
7476
}
75-
{if (actualPage - plusOrMinus > secondPageFromLeft) " ... "}
76-
{leftSideIndices}
77-
{actualPage}
78-
{rightSideIndices}
79-
{if (actualPage + plusOrMinus < secondPageFromRight) " ... "}
80-
{
81-
if (actualPage < pageCount) {
82-
<a href={"/?page=" + pageCount}>{pageCount}</a>
83-
<a href={"/?page=" + (actualPage + 1)}> &gt;</a>
84-
}
77+
}
78+
{if (actualPage - plusOrMinus > secondPageFromLeft) " ... "}
79+
{leftSideIndices}
80+
{actualPage}
81+
{rightSideIndices}
82+
{if (actualPage + plusOrMinus < secondPageFromRight) " ... "}
83+
{
84+
if (actualPage < pageCount) {
85+
<a href={makePageLink(pageCount, requestedIncomplete)}>{pageCount}</a>
86+
<a href={makePageLink(actualPage + 1, requestedIncomplete)}> &gt;</a>
8587
}
86-
</span>
88+
}
89+
</span>
8790
</h4> ++
8891
appTable
8992
} else {
@@ -96,6 +99,15 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
9699
</p>
97100
}
98101
}
102+
<a href={makePageLink(actualPage, !requestedIncomplete)}>
103+
{
104+
if (requestedIncomplete) {
105+
"Back to completed applications"
106+
} else {
107+
"Show incomplete applications"
108+
}
109+
}
110+
</a>
99111
</div>
100112
</div>
101113
UIUtils.basicSparkPage(content, "History Server")
@@ -117,8 +129,9 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
117129
private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
118130
val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}"
119131
val startTime = UIUtils.formatDate(info.startTime)
120-
val endTime = UIUtils.formatDate(info.endTime)
121-
val duration = UIUtils.formatDuration(info.endTime - info.startTime)
132+
val endTime = if (info.endTime > 0) UIUtils.formatDate(info.endTime) else "-"
133+
val duration =
134+
if (info.endTime > 0) UIUtils.formatDuration(info.endTime - info.startTime) else "-"
122135
val lastUpdated = UIUtils.formatDate(info.lastUpdated)
123136
<tr>
124137
<td><a href={uiAddress}>{info.id}</a></td>
@@ -130,4 +143,11 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
130143
<td sorttable_customkey={info.lastUpdated.toString}>{lastUpdated}</td>
131144
</tr>
132145
}
146+
147+
private def makePageLink(linkPage: Int, showIncomplete: Boolean): String = {
148+
"/?" + Array(
149+
"page=" + linkPage,
150+
"showIncomplete=" + showIncomplete
151+
).mkString("&")
152+
}
133153
}

0 commit comments

Comments
 (0)