Skip to content

Commit 231f39e

Browse files
loneknightpytdas
authored andcommitted
[SPARK-17711] Compress rolled executor log
## What changes were proposed in this pull request? This PR adds support for executor log compression. ## How was this patch tested? Unit tests cc: yhuai tdas mengxr Author: Yu Peng <[email protected]> Closes #15285 from loneknightpy/compress-executor-log.
1 parent 3768653 commit 231f39e

File tree

8 files changed

+263
-44
lines changed

8 files changed

+263
-44
lines changed

core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import javax.servlet.http.HttpServletRequest
2222

2323
import scala.xml.{Node, Unparsed}
2424

25+
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
26+
2527
import org.apache.spark.internal.Logging
2628
import org.apache.spark.ui.{UIUtils, WebUIPage}
2729
import org.apache.spark.util.Utils
@@ -138,7 +140,8 @@ private[ui] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with
138140
val files = RollingFileAppender.getSortedRolledOverFiles(logDirectory, logType)
139141
logDebug(s"Sorted log files of type $logType in $logDirectory:\n${files.mkString("\n")}")
140142

141-
val totalLength = files.map { _.length }.sum
143+
val fileLengths: Seq[Long] = files.map(Utils.getFileLength(_, worker.conf))
144+
val totalLength = fileLengths.sum
142145
val offset = offsetOption.getOrElse(totalLength - byteLength)
143146
val startIndex = {
144147
if (offset < 0) {
@@ -151,7 +154,7 @@ private[ui] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with
151154
}
152155
val endIndex = math.min(startIndex + byteLength, totalLength)
153156
logDebug(s"Getting log from $startIndex to $endIndex")
154-
val logText = Utils.offsetBytes(files, startIndex, endIndex)
157+
val logText = Utils.offsetBytes(files, fileLengths, startIndex, endIndex)
155158
logDebug(s"Got log of length ${logText.length} bytes")
156159
(logText, startIndex, endIndex, totalLength)
157160
} catch {

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 71 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import java.nio.file.{Files, Paths}
2727
import java.util.{Locale, Properties, Random, UUID}
2828
import java.util.concurrent._
2929
import java.util.concurrent.atomic.AtomicBoolean
30+
import java.util.zip.GZIPInputStream
3031
import javax.net.ssl.HttpsURLConnection
3132

3233
import scala.annotation.tailrec
@@ -38,8 +39,10 @@ import scala.reflect.ClassTag
3839
import scala.util.Try
3940
import scala.util.control.{ControlThrowable, NonFatal}
4041

42+
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
4143
import com.google.common.io.{ByteStreams, Files => GFiles}
4244
import com.google.common.net.InetAddresses
45+
import org.apache.commons.io.IOUtils
4346
import org.apache.commons.lang3.SystemUtils
4447
import org.apache.hadoop.conf.Configuration
4548
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
@@ -55,6 +58,7 @@ import org.apache.spark.internal.Logging
5558
import org.apache.spark.internal.config.{DYN_ALLOCATION_INITIAL_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES}
5659
import org.apache.spark.network.util.JavaUtils
5760
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
61+
import org.apache.spark.util.logging.RollingFileAppender
5862

5963
/** CallSite represents a place in user code. It can have a short and a long form. */
6064
private[spark] case class CallSite(shortForm: String, longForm: String)
@@ -1440,14 +1444,72 @@ private[spark] object Utils extends Logging {
14401444
CallSite(shortForm, longForm)
14411445
}
14421446

1447+
private val UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF =
1448+
"spark.worker.ui.compressedLogFileLengthCacheSize"
1449+
private val DEFAULT_UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE = 100
1450+
private var compressedLogFileLengthCache: LoadingCache[String, java.lang.Long] = null
1451+
private def getCompressedLogFileLengthCache(
1452+
sparkConf: SparkConf): LoadingCache[String, java.lang.Long] = this.synchronized {
1453+
if (compressedLogFileLengthCache == null) {
1454+
val compressedLogFileLengthCacheSize = sparkConf.getInt(
1455+
UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF,
1456+
DEFAULT_UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE)
1457+
compressedLogFileLengthCache = CacheBuilder.newBuilder()
1458+
.maximumSize(compressedLogFileLengthCacheSize)
1459+
.build[String, java.lang.Long](new CacheLoader[String, java.lang.Long]() {
1460+
override def load(path: String): java.lang.Long = {
1461+
Utils.getCompressedFileLength(new File(path))
1462+
}
1463+
})
1464+
}
1465+
compressedLogFileLengthCache
1466+
}
1467+
1468+
/**
1469+
* Return the file length, if the file is compressed it returns the uncompressed file length.
1470+
* It also caches the uncompressed file size to avoid repeated decompression. The cache size is
1471+
* read from workerConf.
1472+
*/
1473+
def getFileLength(file: File, workConf: SparkConf): Long = {
1474+
if (file.getName.endsWith(".gz")) {
1475+
getCompressedLogFileLengthCache(workConf).get(file.getAbsolutePath)
1476+
} else {
1477+
file.length
1478+
}
1479+
}
1480+
1481+
/** Return uncompressed file length of a compressed file. */
1482+
private def getCompressedFileLength(file: File): Long = {
1483+
try {
1484+
// Uncompress .gz file to determine file size.
1485+
var fileSize = 0L
1486+
val gzInputStream = new GZIPInputStream(new FileInputStream(file))
1487+
val bufSize = 1024
1488+
val buf = new Array[Byte](bufSize)
1489+
var numBytes = IOUtils.read(gzInputStream, buf)
1490+
while (numBytes > 0) {
1491+
fileSize += numBytes
1492+
numBytes = IOUtils.read(gzInputStream, buf)
1493+
}
1494+
fileSize
1495+
} catch {
1496+
case e: Throwable =>
1497+
logError(s"Cannot get file length of ${file}", e)
1498+
throw e
1499+
}
1500+
}
1501+
14431502
/** Return a string containing part of a file from byte 'start' to 'end'. */
1444-
def offsetBytes(path: String, start: Long, end: Long): String = {
1503+
def offsetBytes(path: String, length: Long, start: Long, end: Long): String = {
14451504
val file = new File(path)
1446-
val length = file.length()
14471505
val effectiveEnd = math.min(length, end)
14481506
val effectiveStart = math.max(0, start)
14491507
val buff = new Array[Byte]((effectiveEnd-effectiveStart).toInt)
1450-
val stream = new FileInputStream(file)
1508+
val stream = if (path.endsWith(".gz")) {
1509+
new GZIPInputStream(new FileInputStream(file))
1510+
} else {
1511+
new FileInputStream(file)
1512+
}
14511513

14521514
try {
14531515
ByteStreams.skipFully(stream, effectiveStart)
@@ -1463,16 +1525,16 @@ private[spark] object Utils extends Logging {
14631525
* and `endIndex` is based on the cumulative size of all the files take in
14641526
* the given order. See figure below for more details.
14651527
*/
1466-
def offsetBytes(files: Seq[File], start: Long, end: Long): String = {
1467-
val fileLengths = files.map { _.length }
1528+
def offsetBytes(files: Seq[File], fileLengths: Seq[Long], start: Long, end: Long): String = {
1529+
assert(files.length == fileLengths.length)
14681530
val startIndex = math.max(start, 0)
14691531
val endIndex = math.min(end, fileLengths.sum)
14701532
val fileToLength = files.zip(fileLengths).toMap
14711533
logDebug("Log files: \n" + fileToLength.mkString("\n"))
14721534

14731535
val stringBuffer = new StringBuffer((endIndex - startIndex).toInt)
14741536
var sum = 0L
1475-
for (file <- files) {
1537+
files.zip(fileLengths).foreach { case (file, fileLength) =>
14761538
val startIndexOfFile = sum
14771539
val endIndexOfFile = sum + fileToLength(file)
14781540
logDebug(s"Processing file $file, " +
@@ -1491,19 +1553,19 @@ private[spark] object Utils extends Logging {
14911553

14921554
if (startIndex <= startIndexOfFile && endIndex >= endIndexOfFile) {
14931555
// Case C: read the whole file
1494-
stringBuffer.append(offsetBytes(file.getAbsolutePath, 0, fileToLength(file)))
1556+
stringBuffer.append(offsetBytes(file.getAbsolutePath, fileLength, 0, fileToLength(file)))
14951557
} else if (startIndex > startIndexOfFile && startIndex < endIndexOfFile) {
14961558
// Case A and B: read from [start of required range] to [end of file / end of range]
14971559
val effectiveStartIndex = startIndex - startIndexOfFile
14981560
val effectiveEndIndex = math.min(endIndex - startIndexOfFile, fileToLength(file))
14991561
stringBuffer.append(Utils.offsetBytes(
1500-
file.getAbsolutePath, effectiveStartIndex, effectiveEndIndex))
1562+
file.getAbsolutePath, fileLength, effectiveStartIndex, effectiveEndIndex))
15011563
} else if (endIndex > startIndexOfFile && endIndex < endIndexOfFile) {
15021564
// Case D: read from [start of file] to [end of require range]
15031565
val effectiveStartIndex = math.max(startIndex - startIndexOfFile, 0)
15041566
val effectiveEndIndex = endIndex - startIndexOfFile
15051567
stringBuffer.append(Utils.offsetBytes(
1506-
file.getAbsolutePath, effectiveStartIndex, effectiveEndIndex))
1568+
file.getAbsolutePath, fileLength, effectiveStartIndex, effectiveEndIndex))
15071569
}
15081570
sum += fileToLength(file)
15091571
logDebug(s"After processing file $file, string built is ${stringBuffer.toString}")

core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717

1818
package org.apache.spark.util.logging
1919

20-
import java.io.{File, FileFilter, InputStream}
20+
import java.io._
21+
import java.util.zip.GZIPOutputStream
2122

2223
import com.google.common.io.Files
24+
import org.apache.commons.io.IOUtils
2325

2426
import org.apache.spark.SparkConf
2527

@@ -45,6 +47,7 @@ private[spark] class RollingFileAppender(
4547
import RollingFileAppender._
4648

4749
private val maxRetainedFiles = conf.getInt(RETAINED_FILES_PROPERTY, -1)
50+
private val enableCompression = conf.getBoolean(ENABLE_COMPRESSION, false)
4851

4952
/** Stop the appender */
5053
override def stop() {
@@ -76,15 +79,42 @@ private[spark] class RollingFileAppender(
7679
}
7780
}
7881

82+
// Roll the log file and compress if enableCompression is true.
83+
private def rotateFile(activeFile: File, rolloverFile: File): Unit = {
84+
if (enableCompression) {
85+
val gzFile = new File(rolloverFile.getAbsolutePath + GZIP_LOG_SUFFIX)
86+
var gzOutputStream: GZIPOutputStream = null
87+
var inputStream: InputStream = null
88+
try {
89+
inputStream = new FileInputStream(activeFile)
90+
gzOutputStream = new GZIPOutputStream(new FileOutputStream(gzFile))
91+
IOUtils.copy(inputStream, gzOutputStream)
92+
inputStream.close()
93+
gzOutputStream.close()
94+
activeFile.delete()
95+
} finally {
96+
IOUtils.closeQuietly(inputStream)
97+
IOUtils.closeQuietly(gzOutputStream)
98+
}
99+
} else {
100+
Files.move(activeFile, rolloverFile)
101+
}
102+
}
103+
104+
// Check if the rollover file already exists.
105+
private def rolloverFileExist(file: File): Boolean = {
106+
file.exists || new File(file.getAbsolutePath + GZIP_LOG_SUFFIX).exists
107+
}
108+
79109
/** Move the active log file to a new rollover file */
80110
private def moveFile() {
81111
val rolloverSuffix = rollingPolicy.generateRolledOverFileSuffix()
82112
val rolloverFile = new File(
83113
activeFile.getParentFile, activeFile.getName + rolloverSuffix).getAbsoluteFile
84114
logDebug(s"Attempting to rollover file $activeFile to file $rolloverFile")
85115
if (activeFile.exists) {
86-
if (!rolloverFile.exists) {
87-
Files.move(activeFile, rolloverFile)
116+
if (!rolloverFileExist(rolloverFile)) {
117+
rotateFile(activeFile, rolloverFile)
88118
logInfo(s"Rolled over $activeFile to $rolloverFile")
89119
} else {
90120
// In case the rollover file name clashes, make a unique file name.
@@ -97,11 +127,11 @@ private[spark] class RollingFileAppender(
97127
altRolloverFile = new File(activeFile.getParent,
98128
s"${activeFile.getName}$rolloverSuffix--$i").getAbsoluteFile
99129
i += 1
100-
} while (i < 10000 && altRolloverFile.exists)
130+
} while (i < 10000 && rolloverFileExist(altRolloverFile))
101131

102132
logWarning(s"Rollover file $rolloverFile already exists, " +
103133
s"rolled over $activeFile to file $altRolloverFile")
104-
Files.move(activeFile, altRolloverFile)
134+
rotateFile(activeFile, altRolloverFile)
105135
}
106136
} else {
107137
logWarning(s"File $activeFile does not exist")
@@ -142,6 +172,9 @@ private[spark] object RollingFileAppender {
142172
val SIZE_DEFAULT = (1024 * 1024).toString
143173
val RETAINED_FILES_PROPERTY = "spark.executor.logs.rolling.maxRetainedFiles"
144174
val DEFAULT_BUFFER_SIZE = 8192
175+
val ENABLE_COMPRESSION = "spark.executor.logs.rolling.enableCompression"
176+
177+
val GZIP_LOG_SUFFIX = ".gz"
145178

146179
/**
147180
* Get the sorted list of rolled over files. This assumes that the all the rolled
@@ -158,6 +191,6 @@ private[spark] object RollingFileAppender {
158191
val file = new File(directory, activeFileName).getAbsoluteFile
159192
if (file.exists) Some(file) else None
160193
}
161-
rolledOverFiles ++ activeFile
194+
rolledOverFiles.sortBy(_.getName.stripSuffix(GZIP_LOG_SUFFIX)) ++ activeFile
162195
}
163196
}

core/src/test/scala/org/apache/spark/deploy/worker/ui/LogPageSuite.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,20 @@ import java.io.{File, FileWriter}
2222
import org.mockito.Mockito.{mock, when}
2323
import org.scalatest.PrivateMethodTester
2424

25-
import org.apache.spark.SparkFunSuite
25+
import org.apache.spark.{SparkConf, SparkFunSuite}
26+
import org.apache.spark.deploy.worker.Worker
2627

2728
class LogPageSuite extends SparkFunSuite with PrivateMethodTester {
2829

2930
test("get logs simple") {
3031
val webui = mock(classOf[WorkerWebUI])
32+
val worker = mock(classOf[Worker])
3133
val tmpDir = new File(sys.props("java.io.tmpdir"))
3234
val workDir = new File(tmpDir, "work-dir")
3335
workDir.mkdir()
3436
when(webui.workDir).thenReturn(workDir)
37+
when(webui.worker).thenReturn(worker)
38+
when(worker.conf).thenReturn(new SparkConf())
3539
val logPage = new LogPage(webui)
3640

3741
// Prepare some fake log files to read later

core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ package org.apache.spark.util
2020
import java.io._
2121
import java.nio.charset.StandardCharsets
2222
import java.util.concurrent.CountDownLatch
23+
import java.util.zip.GZIPInputStream
2324

2425
import scala.collection.mutable.HashSet
2526
import scala.reflect._
2627

2728
import com.google.common.io.Files
29+
import org.apache.commons.io.IOUtils
2830
import org.apache.log4j.{Appender, Level, Logger}
2931
import org.apache.log4j.spi.LoggingEvent
3032
import org.mockito.ArgumentCaptor
@@ -72,6 +74,25 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
7274
testRolling(appender, testOutputStream, textToAppend, rolloverIntervalMillis)
7375
}
7476

77+
test("rolling file appender - time-based rolling (compressed)") {
78+
// setup input stream and appender
79+
val testOutputStream = new PipedOutputStream()
80+
val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000)
81+
val rolloverIntervalMillis = 100
82+
val durationMillis = 1000
83+
val numRollovers = durationMillis / rolloverIntervalMillis
84+
val textToAppend = (1 to numRollovers).map( _.toString * 10 )
85+
86+
val sparkConf = new SparkConf()
87+
sparkConf.set("spark.executor.logs.rolling.enableCompression", "true")
88+
val appender = new RollingFileAppender(testInputStream, testFile,
89+
new TimeBasedRollingPolicy(rolloverIntervalMillis, s"--HH-mm-ss-SSSS", false),
90+
sparkConf, 10)
91+
92+
testRolling(
93+
appender, testOutputStream, textToAppend, rolloverIntervalMillis, isCompressed = true)
94+
}
95+
7596
test("rolling file appender - size-based rolling") {
7697
// setup input stream and appender
7798
val testOutputStream = new PipedOutputStream()
@@ -89,6 +110,25 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
89110
}
90111
}
91112

113+
test("rolling file appender - size-based rolling (compressed)") {
114+
// setup input stream and appender
115+
val testOutputStream = new PipedOutputStream()
116+
val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000)
117+
val rolloverSize = 1000
118+
val textToAppend = (1 to 3).map( _.toString * 1000 )
119+
120+
val sparkConf = new SparkConf()
121+
sparkConf.set("spark.executor.logs.rolling.enableCompression", "true")
122+
val appender = new RollingFileAppender(testInputStream, testFile,
123+
new SizeBasedRollingPolicy(rolloverSize, false), sparkConf, 99)
124+
125+
val files = testRolling(appender, testOutputStream, textToAppend, 0, isCompressed = true)
126+
files.foreach { file =>
127+
logInfo(file.toString + ": " + file.length + " bytes")
128+
assert(file.length < rolloverSize)
129+
}
130+
}
131+
92132
test("rolling file appender - cleaning") {
93133
// setup input stream and appender
94134
val testOutputStream = new PipedOutputStream()
@@ -273,7 +313,8 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
273313
appender: FileAppender,
274314
outputStream: OutputStream,
275315
textToAppend: Seq[String],
276-
sleepTimeBetweenTexts: Long
316+
sleepTimeBetweenTexts: Long,
317+
isCompressed: Boolean = false
277318
): Seq[File] = {
278319
// send data to appender through the input stream, and wait for the data to be written
279320
val expectedText = textToAppend.mkString("")
@@ -290,10 +331,23 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
290331
// verify whether all the data written to rolled over files is same as expected
291332
val generatedFiles = RollingFileAppender.getSortedRolledOverFiles(
292333
testFile.getParentFile.toString, testFile.getName)
293-
logInfo("Filtered files: \n" + generatedFiles.mkString("\n"))
334+
logInfo("Generate files: \n" + generatedFiles.mkString("\n"))
294335
assert(generatedFiles.size > 1)
336+
if (isCompressed) {
337+
assert(
338+
generatedFiles.filter(_.getName.endsWith(RollingFileAppender.GZIP_LOG_SUFFIX)).size > 0)
339+
}
295340
val allText = generatedFiles.map { file =>
296-
Files.toString(file, StandardCharsets.UTF_8)
341+
if (file.getName.endsWith(RollingFileAppender.GZIP_LOG_SUFFIX)) {
342+
val inputStream = new GZIPInputStream(new FileInputStream(file))
343+
try {
344+
IOUtils.toString(inputStream, StandardCharsets.UTF_8)
345+
} finally {
346+
IOUtils.closeQuietly(inputStream)
347+
}
348+
} else {
349+
Files.toString(file, StandardCharsets.UTF_8)
350+
}
297351
}.mkString("")
298352
assert(allText === expectedText)
299353
generatedFiles

0 commit comments

Comments
 (0)