Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,7 @@ local-1425081759269/*
local-1426533911241/*
local-1426633911242/*
local-1430917381534/*
local-1430917381535_1
local-1430917381535_2
DESCRIPTION
NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.spark.deploy.history

import java.util.zip.ZipOutputStream

import org.apache.spark.SparkException
import org.apache.spark.ui.SparkUI

private[spark] case class ApplicationAttemptInfo(
Expand Down Expand Up @@ -62,4 +65,12 @@ private[history] abstract class ApplicationHistoryProvider {
*/
def getConfig(): Map[String, String] = Map()

/**
* Writes out the event logs to the output stream provided. The logs will be compressed into a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, I'm confused. What does zipping have to do with the provided output stream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, messed up the doc. If the data is in the form of multiple files for each attempt (like legacy files), they will be zipped into a single one, and the single zipped file is written out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't that an implementation detail?

Or, if you want to enforce a zip file as the output, shouldn't it be a ZipOutputStream? That actually sounds better since then it enforces a stronger contract regardless of the underlying provider implementation. Otherwise different providers may decide to return different types of files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can pass in a zip stream, but there is no real way to enforce the content formats (since each provider might use a different format). Anyway with legacy files, we'd have multiple files zipped (since for legacy files all attempts are the same files, I guess it is fine to just write each of the files as is. I will make that change to be consistent.

* single zip file and written out.
* @throws SparkException if the logs for the app id cannot be found.
*/
@throws(classOf[SparkException])
def writeEventLogs(appId: String, attemptId: Option[String], zipStream: ZipOutputStream): Unit

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@

package org.apache.spark.deploy.history

import java.io.{BufferedInputStream, FileNotFoundException, IOException, InputStream}
import java.io.{BufferedInputStream, FileNotFoundException, InputStream, IOException, OutputStream}
import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.mutable

import com.google.common.io.ByteStreams
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.permission.AccessControlException

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -59,7 +61,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
.map { d => Utils.resolveURI(d).toString }
.getOrElse(DEFAULT_LOG_DIR)

private val fs = Utils.getHadoopFileSystem(logDir, SparkHadoopUtil.get.newConfiguration(conf))
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
private val fs = Utils.getHadoopFileSystem(logDir, hadoopConf)

// Used by check event thread and clean log thread.
// Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs
Expand Down Expand Up @@ -219,6 +222,58 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

override def writeEventLogs(
appId: String,
attemptId: Option[String],
zipStream: ZipOutputStream): Unit = {

/**
* This method compresses the files passed in, and writes the compressed data out into the
* [[OutputStream]] passed in. Each file is written as a new [[ZipEntry]] with its name being
* the name of the file being compressed.
*/
def zipFileToStream(file: Path, entryName: String, outputStream: ZipOutputStream): Unit = {
val fs = FileSystem.get(hadoopConf)
val inputStream = fs.open(file, 1 * 1024 * 1024) // 1MB Buffer
try {
outputStream.putNextEntry(new ZipEntry(entryName))
ByteStreams.copy(inputStream, outputStream)
outputStream.closeEntry()
} finally {
inputStream.close()
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unnecessary empty line.

applications.get(appId) match {
case Some(appInfo) =>
try {
// If no attempt is specified, or there is no attemptId for attempts, return all attempts
appInfo.attempts.filter { attempt =>
attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get
}.foreach { attempt =>
val logPath = new Path(logDir, attempt.logPath)
// If this is a legacy directory, then add the directory to the zipStream and add
// each file to that directory.
if (isLegacyLogDirectory(fs.getFileStatus(logPath))) {
val files = fs.listFiles(logPath, false)
zipStream.putNextEntry(new ZipEntry(attempt.logPath + "/"))
zipStream.closeEntry()
while (files.hasNext) {
val file = files.next().getPath
zipFileToStream(file, attempt.logPath + Path.SEPARATOR + file.getName, zipStream)
}
} else {
zipFileToStream(new Path(logDir, attempt.logPath), attempt.logPath, zipStream)
}
}
} finally {
zipStream.close()
}
case None => throw new SparkException(s"Logs for $appId not found.")
}
}


/**
* Replay the log files in the list and merge the list of old applications with new ones
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.deploy.history

import java.util.NoSuchElementException
import java.util.zip.ZipOutputStream
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}

import com.google.common.cache._
Expand Down Expand Up @@ -173,6 +174,13 @@ class HistoryServer(
getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
}

override def writeEventLogs(
appId: String,
attemptId: Option[String],
zipStream: ZipOutputStream): Unit = {
provider.writeEventLogs(appId, attemptId, zipStream)
}

/**
* Returns the provider configuration to show in the listing page.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.status.api.v1

import java.util.zip.ZipOutputStream
import javax.servlet.ServletContext
import javax.ws.rs._
import javax.ws.rs.core.{Context, Response}
Expand Down Expand Up @@ -164,6 +165,18 @@ private[v1] class ApiRootResource extends UIRootFromServletContext {
}
}

@Path("applications/{appId}/logs")
def getEventLogs(
@PathParam("appId") appId: String): EventLogDownloadResource = {
new EventLogDownloadResource(uiRoot, appId, None)
}

@Path("applications/{appId}/{attemptId}/logs")
def getEventLogs(
@PathParam("appId") appId: String,
@PathParam("attemptId") attemptId: String): EventLogDownloadResource = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these need to be indented 2 more spaces

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(same L170)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can add a style checker for this in the future. this are also lots of violations in Spark itself.

new EventLogDownloadResource(uiRoot, appId, Some(attemptId))
}
}

private[spark] object ApiRootResource {
Expand Down Expand Up @@ -193,6 +206,13 @@ private[spark] trait UIRoot {
def getSparkUI(appKey: String): Option[SparkUI]
def getApplicationInfoList: Iterator[ApplicationInfo]

def writeEventLogs(appId: String, attemptId: Option[String], zipStream: ZipOutputStream): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a java doc since it's technically not the same as the one you defined in ApplicationHistoryProvider. We should expand on why attemptId is an option (what happens if it's None).

Response.serverError()
.entity("Event logs are only available through the history server.")
.status(Response.Status.SERVICE_UNAVAILABLE)
.build()
}

/**
* Get the spark UI with the given appID, and apply a function
* to it. If there is no such app, throw an appropriate exception
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.status.api.v1

import java.io.OutputStream
import java.util.zip.ZipOutputStream
import javax.ws.rs.{GET, Produces}
import javax.ws.rs.core.{MediaType, Response, StreamingOutput}

import scala.util.control.NonFatal

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil

@Produces(Array(MediaType.APPLICATION_OCTET_STREAM))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably fine, but you could also use "application/zip".

private[v1] class EventLogDownloadResource(
val uIRoot: UIRoot,
val appId: String,
val attemptId: Option[String]) extends Logging {
val conf = SparkHadoopUtil.get.newConfiguration(new SparkConf)

@GET
def getEventLogs(): Response = {
try {
val fileName = {
attemptId match {
case Some(id) => s"eventLogs-$appId-$id.zip"
case None => s"eventLogs-$appId.zip"
}
}

val stream = new StreamingOutput {
override def write(output: OutputStream) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: need return type : Unit

val zipStream = new ZipOutputStream(output)
try {
uIRoot.writeEventLogs(appId, attemptId, zipStream)
} finally {
zipStream.close()
}

}
}

Response.ok(stream)
.header("Content-Disposition", s"attachment; filename=$fileName")
.header("Content-Type", MediaType.APPLICATION_OCTET_STREAM)
.build()
} catch {
case NonFatal(e) =>
Response.serverError()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to do matching here. Just make this case the default implementation in UIRoot (and perhaps make the error message more generic).

.entity(s"Event logs are not available for app: $appId.")
.status(Response.Status.SERVICE_UNAVAILABLE)
.build()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,22 @@
"sparkUser" : "irashid",
"completed" : true
} ]
}, {
"id" : "local-1430917381535",
"name" : "Spark shell",
"attempts" : [ {
"attemptId" : "2",
"startTime" : "2015-05-06T13:03:00.893GMT",
"endTime" : "2015-05-06T13:03:00.950GMT",
"sparkUser" : "irashid",
"completed" : true
}, {
"attemptId" : "1",
"startTime" : "2015-05-06T13:03:00.880GMT",
"endTime" : "2015-05-06T13:03:00.890GMT",
"sparkUser" : "irashid",
"completed" : true
} ]
}, {
"id" : "local-1426533911241",
"name" : "Spark shell",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,22 @@
"sparkUser" : "irashid",
"completed" : true
} ]
}, {
"id" : "local-1430917381535",
"name" : "Spark shell",
"attempts" : [ {
"attemptId" : "2",
"startTime" : "2015-05-06T13:03:00.893GMT",
"endTime" : "2015-05-06T13:03:00.950GMT",
"sparkUser" : "irashid",
"completed" : true
}, {
"attemptId" : "1",
"startTime" : "2015-05-06T13:03:00.880GMT",
"endTime" : "2015-05-06T13:03:00.890GMT",
"sparkUser" : "irashid",
"completed" : true
} ]
}, {
"id" : "local-1426533911241",
"name" : "Spark shell",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,22 @@
"sparkUser" : "irashid",
"completed" : true
} ]
}, {
"id" : "local-1430917381535",
"name" : "Spark shell",
"attempts" : [ {
"attemptId" : "2",
"startTime" : "2015-05-06T13:03:00.893GMT",
"endTime" : "2015-05-06T13:03:00.950GMT",
"sparkUser" : "irashid",
"completed" : true
}, {
"attemptId" : "1",
"startTime" : "2015-05-06T13:03:00.880GMT",
"endTime" : "2015-05-06T13:03:00.890GMT",
"sparkUser" : "irashid",
"completed" : true
} ]
}, {
"id" : "local-1426533911241",
"name" : "Spark shell",
Expand All @@ -24,12 +40,14 @@
"completed" : true
} ]
}, {
"id" : "local-1425081759269",
"name" : "Spark shell",
"attempts" : [ {
"startTime" : "2015-02-28T00:02:38.277GMT",
"endTime" : "2015-02-28T00:02:46.912GMT",
"sparkUser" : "irashid",
"completed" : true
} ]
"id": "local-1425081759269",
"name": "Spark shell",
"attempts": [
{
"startTime": "2015-02-28T00:02:38.277GMT",
"endTime": "2015-02-28T00:02:46.912GMT",
"sparkUser": "irashid",
"completed": true
}
]
} ]
5 changes: 5 additions & 0 deletions core/src/test/resources/spark-events/local-1430917381535_1
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"Event":"SparkListenerLogStart","Spark Version":"1.4.0-SNAPSHOT"}
{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"localhost","Port":61103},"Maximum Memory":278019440,"Timestamp":1430917380880}
{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java Home":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre","Java Version":"1.8.0_25 (Oracle Corporation)","Scala Version":"version 2.10.4"},"Spark Properties":{"spark.driver.host":"192.168.1.102","spark.eventLog.enabled":"true","spark.driver.port":"61101","spark.repl.class.uri":"http://192.168.1.102:61100","spark.jars":"","spark.app.name":"Spark shell","spark.scheduler.mode":"FIFO","spark.executor.id":"driver","spark.master":"local[*]","spark.eventLog.dir":"/Users/irashid/github/kraps/core/src/test/resources/spark-events","spark.fileserver.uri":"http://192.168.1.102:61102","spark.tachyonStore.folderName":"spark-aaaf41b3-d1dd-447f-8951-acf51490758b","spark.app.id":"local-1430917381534"},"System Properties":{"java.io.tmpdir":"/var/folders/36/m29jw1z95qv4ywb1c4n0rz000000gp/T/","line.separator":"\n","path.separator":":","sun.management.compiler":"HotSpot 64-Bit Tiered Compilers","SPARK_SUBMIT":"true","sun.cpu.endian":"little","java.specification.version":"1.8","java.vm.specification.name":"Java Virtual Machine Specification","java.vendor":"Oracle Corporation","java.vm.specification.version":"1.8","user.home":"/Users/irashid","file.encoding.pkg":"sun.io","sun.nio.ch.bugLevel":"","ftp.nonProxyHosts":"local|*.local|169.254/16|*.169.254/16","sun.arch.data.model":"64","sun.boot.library.path":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib","user.dir":"/Users/irashid/github/spark","java.library.path":"/Users/irashid/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.","sun.cpu.isalist":"","os.arch":"x86_64","java.vm.version":"25.25-b02","java.endorsed.dirs":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/endorsed","java.runtime.version":"1.8.0_25-b17","java.vm.info":"mixed mode","java.ext.dirs":"/Users/irashid/Library/Java/Extensions:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/ext:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java","java.runtime.name":"Java(TM) SE Runtime Environment","file.separator":"/","java.class.version":"52.0","scala.usejavacp":"true","java.specification.name":"Java Platform API Specification","sun.boot.class.path":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/sunrsasign.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/classes","file.encoding":"UTF-8","user.timezone":"America/Chicago","java.specification.vendor":"Oracle Corporation","sun.java.launcher":"SUN_STANDARD","os.version":"10.9.5","sun.os.patch.level":"unknown","gopherProxySet":"false","java.vm.specification.vendor":"Oracle Corporation","user.country":"US","sun.jnu.encoding":"UTF-8","http.nonProxyHosts":"local|*.local|169.254/16|*.169.254/16","user.language":"en","socksNonProxyHosts":"local|*.local|169.254/16|*.169.254/16","java.vendor.url":"http://java.oracle.com/","java.awt.printerjob":"sun.lwawt.macosx.CPrinterJob","java.awt.graphicsenv":"sun.awt.CGraphicsEnvironment","awt.toolkit":"sun.lwawt.macosx.LWCToolkit","os.name":"Mac OS X","java.vm.vendor":"Oracle Corporation","java.vendor.url.bug":"http://bugreport.sun.com/bugreport/","user.name":"irashid","java.vm.name":"Java HotSpot(TM) 64-Bit Server VM","sun.java.command":"org.apache.spark.deploy.SparkSubmit --conf spark.eventLog.enabled=true --conf spark.eventLog.dir=/Users/irashid/github/kraps/core/src/test/resources/spark-events --class org.apache.spark.repl.Main spark-shell","java.home":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre","java.version":"1.8.0_25","sun.io.unicode.encoding":"UnicodeBig"},"Classpath Entries":{"/etc/hadoop":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-rdbms-3.2.9.jar":"System Classpath","/Users/irashid/github/spark/conf/":"System Classpath","/Users/irashid/github/spark/assembly/target/scala-2.10/spark-assembly-1.4.0-SNAPSHOT-hadoop2.5.0.jar":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-core-3.2.10.jar":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-api-jdo-3.2.6.jar":"System Classpath"}}
{"Event":"SparkListenerApplicationStart","App Name":"Spark shell","App ID":"local-1430917381535","Timestamp":1430917380880,"User":"irashid","App Attempt ID":"1"}
{"Event":"SparkListenerApplicationEnd","Timestamp":1430917380890}
Loading