|
17 | 17 |
|
18 | 18 | package org.apache.spark.deploy.history |
19 | 19 |
|
20 | | -import java.io.{BufferedInputStream, FileNotFoundException, IOException, InputStream} |
| 20 | +import java.io.{BufferedInputStream, FileNotFoundException, InputStream, IOException, OutputStream} |
21 | 21 | import java.util.concurrent.{ExecutorService, Executors, TimeUnit} |
| 22 | +import java.util.zip.{ZipEntry, ZipOutputStream} |
22 | 23 |
|
23 | 24 | import scala.collection.mutable |
24 | 25 |
|
| 26 | +import com.google.common.io.ByteStreams |
25 | 27 | import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder} |
26 | | -import org.apache.hadoop.fs.{FileStatus, Path} |
| 28 | +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} |
27 | 29 | import org.apache.hadoop.fs.permission.AccessControlException |
28 | 30 |
|
29 | | -import org.apache.spark.{Logging, SecurityManager, SparkConf} |
| 31 | +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} |
30 | 32 | import org.apache.spark.deploy.SparkHadoopUtil |
31 | 33 | import org.apache.spark.io.CompressionCodec |
32 | 34 | import org.apache.spark.scheduler._ |
@@ -59,7 +61,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) |
59 | 61 | .map { d => Utils.resolveURI(d).toString } |
60 | 62 | .getOrElse(DEFAULT_LOG_DIR) |
61 | 63 |
|
62 | | - private val fs = Utils.getHadoopFileSystem(logDir, SparkHadoopUtil.get.newConfiguration(conf)) |
| 64 | + private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) |
| 65 | + private val fs = Utils.getHadoopFileSystem(logDir, hadoopConf) |
63 | 66 |
|
64 | 67 | // Used by check event thread and clean log thread. |
65 | 68 | // Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs |
@@ -219,6 +222,58 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) |
219 | 222 | } |
220 | 223 | } |
221 | 224 |
|
| 225 | + override def writeEventLogs( |
| 226 | + appId: String, |
| 227 | + attemptId: Option[String], |
| 228 | + zipStream: ZipOutputStream): Unit = { |
| 229 | + |
| 230 | + /** |
| 231 | + * This method compresses the files passed in, and writes the compressed data out into the |
| 232 | + * [[OutputStream]] passed in. Each file is written as a new [[ZipEntry]] with its name being |
| 233 | + * the name of the file being compressed. |
| 234 | + */ |
| 235 | + def zipFileToStream(file: Path, entryName: String, outputStream: ZipOutputStream): Unit = { |
| 236 | + val fs = FileSystem.get(hadoopConf) |
| 237 | + val inputStream = fs.open(file, 1 * 1024 * 1024) // 1MB Buffer |
| 238 | + try { |
| 239 | + outputStream.putNextEntry(new ZipEntry(entryName)) |
| 240 | + ByteStreams.copy(inputStream, outputStream) |
| 241 | + outputStream.closeEntry() |
| 242 | + } finally { |
| 243 | + inputStream.close() |
| 244 | + } |
| 245 | + } |
| 246 | + |
| 247 | + applications.get(appId) match { |
| 248 | + case Some(appInfo) => |
| 249 | + try { |
| 250 | + // If no attempt is specified, or there is no attemptId for attempts, return all attempts |
| 251 | + appInfo.attempts.filter { attempt => |
| 252 | + attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get |
| 253 | + }.foreach { attempt => |
| 254 | + val logPath = new Path(logDir, attempt.logPath) |
| 255 | + // If this is a legacy directory, then add the directory to the zipStream and add |
| 256 | + // each file to that directory. |
| 257 | + if (isLegacyLogDirectory(fs.getFileStatus(logPath))) { |
| 258 | + val files = fs.listStatus(logPath) |
| 259 | + zipStream.putNextEntry(new ZipEntry(attempt.logPath + "/")) |
| 260 | + zipStream.closeEntry() |
| 261 | + files.foreach { file => |
| 262 | + val path = file.getPath |
| 263 | + zipFileToStream(path, attempt.logPath + Path.SEPARATOR + path.getName, zipStream) |
| 264 | + } |
| 265 | + } else { |
| 266 | + zipFileToStream(new Path(logDir, attempt.logPath), attempt.logPath, zipStream) |
| 267 | + } |
| 268 | + } |
| 269 | + } finally { |
| 270 | + zipStream.close() |
| 271 | + } |
| 272 | + case None => throw new SparkException(s"Logs for $appId not found.") |
| 273 | + } |
| 274 | + } |
| 275 | + |
| 276 | + |
222 | 277 | /** |
223 | 278 | * Replay the log files in the list and merge the list of old applications with new ones |
224 | 279 | */ |
|
0 commit comments