Skip to content

Commit f79f42f

Browse files
committed
[SPARK-28869][CORE] Roll over event log files
1 parent 98e1a4c commit f79f42f

File tree

13 files changed

+1665
-395
lines changed

13 files changed

+1665
-395
lines changed

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 97 additions & 79 deletions
Large diffs are not rendered by default.

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,17 @@ package object config {
174174
private[spark] val EVENT_LOG_CALLSITE_LONG_FORM =
175175
ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false)
176176

177+
private[spark] val EVENT_LOG_ENABLE_ROLLING =
178+
ConfigBuilder("spark.eventLog.rollLog")
179+
.doc("Whether rolling over event log files is enabled.")
180+
.booleanConf.createWithDefault(false)
181+
182+
private[spark] val EVENT_LOG_ROLLED_EVENT_LOG_MAX_FILE_SIZE =
183+
ConfigBuilder("spark.eventLog.rollLog.maxFileSize")
184+
.doc("The max size of event log file to be rolled over, in KiB unless otherwise specified.")
185+
.bytesConf(ByteUnit.KiB)
186+
.createWithDefaultString("10m")
187+
177188
private[spark] val EXECUTOR_ID =
178189
ConfigBuilder("spark.executor.id").stringConf.createOptional
179190

Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.scheduler
19+
20+
import java.io.{BufferedInputStream, InputStream}
21+
import java.util.zip.{ZipEntry, ZipOutputStream}
22+
23+
import scala.collection.mutable.Map
24+
25+
import com.google.common.io.ByteStreams
26+
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
27+
import org.apache.hadoop.hdfs.DFSInputStream
28+
29+
import org.apache.spark.SparkConf
30+
import org.apache.spark.io.CompressionCodec
31+
import org.apache.spark.scheduler.EventLogFileWriter.codecName
32+
import org.apache.spark.util.Utils
33+
34+
/** The base class of reader which will read the information of event log file(s). */
35+
abstract class EventLogFileReader(
36+
protected val fileSystem: FileSystem,
37+
val rootPath: Path) {
38+
39+
protected def fileSizeForDFS(path: Path): Option[Long] = {
40+
Utils.tryWithResource(fileSystem.open(path)) { in =>
41+
in.getWrappedStream match {
42+
case dfsIn: DFSInputStream => Some(dfsIn.getFileLength)
43+
case _ => None
44+
}
45+
}
46+
}
47+
48+
protected def addFileAsZipEntry(
49+
zipStream: ZipOutputStream,
50+
path: Path,
51+
entryName: String): Unit = {
52+
Utils.tryWithResource(fileSystem.open(path, 1 * 1024 * 1024)) { inputStream =>
53+
zipStream.putNextEntry(new ZipEntry(entryName))
54+
ByteStreams.copy(inputStream, zipStream)
55+
zipStream.closeEntry()
56+
}
57+
}
58+
59+
// ================ methods to be override ================
60+
61+
/** Returns the last sequence of event log files. None for single event log file. */
62+
def lastSequence: Option[Long]
63+
64+
/**
65+
* Returns the size of file for the last sequence of event log files. Returns its size for
66+
* single event log file.
67+
*/
68+
def fileSizeForLastSequence: Long
69+
70+
/** Returns whether the application is completed. */
71+
def completed: Boolean
72+
73+
/**
74+
* Returns the size of file for the last sequence of event log files, only when
75+
* underlying input stream is DFSInputStream. Otherwise returns None.
76+
*/
77+
def fileSizeForLastSequenceForDFS: Option[Long]
78+
79+
/** Returns the modification time for the last sequence of event log files. */
80+
def modificationTime: Long
81+
82+
/**
83+
* This method compresses the files passed in, and writes the compressed data out into the
84+
* [[OutputStream]] passed in. Each file is written as a new [[ZipEntry]] with its name being
85+
* the name of the file being compressed.
86+
*/
87+
def zipEventLogFiles(zipStream: ZipOutputStream): Unit
88+
89+
/** Returns all available event log files. */
90+
def listEventLogFiles: Seq[FileStatus]
91+
92+
/** Returns the short compression name if being used. None if it's uncompressed. */
93+
def compression: Option[String]
94+
95+
/** Returns the size of all event log files. */
96+
def allSize: Long
97+
}
98+
99+
object EventLogFileReader {
100+
// A cache for compression codecs to avoid creating the same codec many times
101+
private val codecMap = Map.empty[String, CompressionCodec]
102+
103+
def getEventLogReader(
104+
fs: FileSystem,
105+
path: Path,
106+
lastSequence: Option[Long]): EventLogFileReader = {
107+
lastSequence match {
108+
case Some(_) => new RollingEventLogFilesFileReader(fs, path)
109+
case None => new SingleFileEventLogFileReader(fs, path)
110+
}
111+
}
112+
113+
def getEventLogReader(fs: FileSystem, path: Path): Option[EventLogFileReader] = {
114+
val status = fs.getFileStatus(path)
115+
if (isSingleEventLog(status)) {
116+
Some(new SingleFileEventLogFileReader(fs, path))
117+
} else if (isRollingEventLogs(status)) {
118+
Some(new RollingEventLogFilesFileReader(fs, path))
119+
} else {
120+
None
121+
}
122+
}
123+
124+
def getEventLogReader(fs: FileSystem, status: FileStatus): Option[EventLogFileReader] = {
125+
if (isSingleEventLog(status)) {
126+
Some(new SingleFileEventLogFileReader(fs, status.getPath))
127+
} else if (isRollingEventLogs(status)) {
128+
Some(new RollingEventLogFilesFileReader(fs, status.getPath))
129+
} else {
130+
None
131+
}
132+
}
133+
134+
/**
135+
* Opens an event log file and returns an input stream that contains the event data.
136+
*
137+
* @return input stream that holds one JSON record per line.
138+
*/
139+
def openEventLog(log: Path, fs: FileSystem): InputStream = {
140+
val in = new BufferedInputStream(fs.open(log))
141+
try {
142+
val codec = codecName(log).map { c =>
143+
codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c))
144+
}
145+
codec.map(_.compressedContinuousInputStream(in)).getOrElse(in)
146+
} catch {
147+
case e: Throwable =>
148+
in.close()
149+
throw e
150+
}
151+
}
152+
153+
private def isSingleEventLog(status: FileStatus): Boolean = {
154+
!status.isDirectory &&
155+
// FsHistoryProvider used to generate a hidden file which can't be read. Accidentally
156+
// reading a garbage file is safe, but we would log an error which can be scary to
157+
// the end-user.
158+
!status.getPath.getName.startsWith(".")
159+
}
160+
161+
private def isRollingEventLogs(status: FileStatus): Boolean = {
162+
status.isDirectory && RollingEventLogFilesWriter.isEventLogDir(status)
163+
}
164+
}
165+
166+
/** The reader which will read the information of single event log file. */
167+
class SingleFileEventLogFileReader(
168+
fs: FileSystem,
169+
path: Path) extends EventLogFileReader(fs, path) {
170+
// TODO: get stats with constructor and only call if it's needed?
171+
private lazy val stats = fileSystem.getFileStatus(rootPath)
172+
173+
override def lastSequence: Option[Long] = None
174+
175+
override def fileSizeForLastSequence: Long = stats.getLen
176+
177+
override def completed: Boolean = !rootPath.getName.endsWith(EventLogFileWriter.IN_PROGRESS)
178+
179+
override def fileSizeForLastSequenceForDFS: Option[Long] = fileSizeForDFS(rootPath)
180+
181+
override def modificationTime: Long = stats.getModificationTime
182+
183+
override def zipEventLogFiles(zipStream: ZipOutputStream): Unit =
184+
addFileAsZipEntry(zipStream, rootPath, rootPath.getName)
185+
186+
override def listEventLogFiles: Seq[FileStatus] = Seq(stats)
187+
188+
override def compression: Option[String] = EventLogFileWriter.codecName(rootPath)
189+
190+
override def allSize: Long = fileSizeForLastSequence
191+
}
192+
193+
/** The reader which will read the information of rolled multiple event log files. */
194+
class RollingEventLogFilesFileReader(
195+
fs: FileSystem,
196+
path: Path) extends EventLogFileReader(fs, path) {
197+
import RollingEventLogFilesWriter._
198+
199+
private lazy val files: Seq[FileStatus] = {
200+
val ret = fs.listStatus(rootPath).toSeq
201+
require(ret.exists(isEventLogFile), "Log directory must contain at least one event log file!")
202+
require(ret.exists(isAppStatusFile), "Log directory must contain an appstatus file!")
203+
ret
204+
}
205+
206+
override def lastSequence: Option[Long] = {
207+
val maxSeq = files.filter(isEventLogFile)
208+
.map(stats => getSequence(stats.getPath.getName))
209+
.max
210+
Some(maxSeq)
211+
}
212+
213+
override def fileSizeForLastSequence: Long = lastEventLogFile.getLen
214+
215+
override def completed: Boolean = {
216+
val appStatsFile = files.find(isAppStatusFile)
217+
require(appStatsFile.isDefined)
218+
appStatsFile.exists(!_.getPath.getName.endsWith(EventLogFileWriter.IN_PROGRESS))
219+
}
220+
221+
override def fileSizeForLastSequenceForDFS: Option[Long] =
222+
fileSizeForDFS(lastEventLogFile.getPath)
223+
224+
override def modificationTime: Long = lastEventLogFile.getModificationTime
225+
226+
override def zipEventLogFiles(zipStream: ZipOutputStream): Unit = {
227+
val dirEntryName = rootPath.getName + "/"
228+
zipStream.putNextEntry(new ZipEntry(dirEntryName))
229+
files.foreach { file =>
230+
addFileAsZipEntry(zipStream, file.getPath, dirEntryName + file.getPath.getName)
231+
}
232+
}
233+
234+
override def listEventLogFiles: Seq[FileStatus] = eventLogFiles
235+
236+
override def compression: Option[String] = EventLogFileWriter.codecName(
237+
eventLogFiles.head.getPath)
238+
239+
override def allSize: Long = eventLogFiles.map(_.getLen).sum
240+
241+
private def eventLogFiles: Seq[FileStatus] = files.filter(isEventLogFile)
242+
.sortBy(stats => getSequence(stats.getPath.getName))
243+
244+
private def lastEventLogFile: FileStatus = eventLogFiles.reverse.head
245+
}

0 commit comments

Comments
 (0)