Skip to content

Conversation

@loneknightpy
Copy link
Contributor

What changes were proposed in this pull request?

This PR adds support for executor log compression.

How was this patch tested?

Unit tests

cc: @yhuai @tdas @mengxr

@mengxr
Copy link
Contributor

mengxr commented Sep 29, 2016

ok to test

@mengxr
Copy link
Contributor

mengxr commented Sep 29, 2016

add to whitelist

@SparkQA
Copy link

SparkQA commented Sep 29, 2016

Test build #66104 has finished for PR 15285 at commit 15034e5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

import RollingFileAppender._

private val maxRetainedFiles = conf.getInt(RETAINED_FILES_PROPERTY, -1)
private val enableCompression = conf.getBoolean(ENABLE_COMPRESSION, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we enable this by default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to existing behavior.

activeFile.delete()
} finally {
IOUtils.closeQuietly(inputStream)
IOUtils.closeQuietly(gzOutputStream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What kinds of error do we expect? If there is an exception, we will lose log data, right? Seems we should at least have a log.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may throw some kind of IOException which will be logged at rollover() method.

var altRolloverFile: File = null
do {
altRolloverFile = new File(activeFile.getParent,
s"${activeFile.getName}$rolloverSuffix--$i").getAbsoluteFile
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want to double check. If we enable compression, the suffix will still be gz even we use the alternative file name, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will be whatever we have before + .gz.

val effectiveStart = math.max(0, start)
val buff = new Array[Byte]((effectiveEnd-effectiveStart).toInt)
val stream = new FileInputStream(file)
val stream = if (path.endsWith(".gz")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use GZIP_LOG_SUFFIX?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, Utils.scala doesn't depend on FileAppender. It does use string literal in other places. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/Utils.scala#L463

logInfo("Filtered files: \n" + generatedFiles.mkString("\n"))
assert(generatedFiles.size > 1)
if (isCompressed) {
assert(generatedFiles.filter(_.getName.endsWith(".gz")).size > 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use GZIP_LOG_SUFFIX?

}
val allText = generatedFiles.map { file =>
Files.toString(file, StandardCharsets.UTF_8)
if (file.getName.endsWith(".gz")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yhuai
Copy link
Contributor

yhuai commented Sep 29, 2016

@tdas will also take a look

@SparkQA
Copy link

SparkQA commented Sep 29, 2016

Test build #66116 has finished for PR 15285 at commit 1df96e6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@loneknightpy
Copy link
Contributor Author

ping @tdas

Copy link
Contributor

@tdas tdas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are major concerns with this patch. Reading the log files to show in the UI may be completely broken if the files are compressed. Fixing this needs some brainstorming and adding a lot of new tests.

inputStream = new FileInputStream(activeFile)
gzOutputStream = new GZIPOutputStream(new FileOutputStream(gzFile))
IOUtils.copy(inputStream, gzOutputStream)
activeFile.delete()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure this is a good idea to delete the activeFile before closing the inputStream? I am not sure this is the right thing to do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact the docs of IOUtils.closeQuietly says that it should not be used as a replacement for normal closing.
See https://commons.apache.org/proper/commons-io/javadocs/api-2.5/org/apache/commons/io/IOUtils.html#closeQuietly(java.io.Closeable...)

So this is not right.

val SIZE_DEFAULT = (1024 * 1024).toString
val RETAINED_FILES_PROPERTY = "spark.executor.logs.rolling.maxRetainedFiles"
val DEFAULT_BUFFER_SIZE = 8192
val ENABLE_COMPRESSION = "spark.executor.logs.rolling.enableCompression"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldnt we document this in the spark docs?

new TimeBasedRollingPolicy(rolloverIntervalMillis, s"--HH-mm-ss-SSSS", false),
sparkConf, 10)

testRolling(appender, testOutputStream, textToAppend, rolloverIntervalMillis, true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: isCompressed = true

val files = testRolling(appender, testOutputStream, textToAppend, 0, true)
files.foreach { file =>
logInfo(file.toString + ": " + file.length + " bytes")
assert(file.length <= rolloverSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe if we should check that it is indeed gzipped by checking file.length < rolloverSize

val appender = new RollingFileAppender(testInputStream, testFile,
new SizeBasedRollingPolicy(rolloverSize, false), sparkConf, 99)

val files = testRolling(appender, testOutputStream, textToAppend, 0, true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isCompressed = true

// verify whether all the data written to rolled over files is same as expected
val generatedFiles = RollingFileAppender.getSortedRolledOverFiles(
testFile.getParentFile.toString, testFile.getName)
logInfo("Filtered files: \n" + generatedFiles.mkString("\n"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can you change this to "Generate files: \n"? This is incorrect, misleading.

@tdas
Copy link
Contributor

tdas commented Oct 11, 2016

ping. any updates?

@SparkQA
Copy link

SparkQA commented Oct 11, 2016

Test build #66709 has finished for PR 15285 at commit c9dd1b0.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 11, 2016

Test build #66711 has finished for PR 15285 at commit ae08495.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@loneknightpy
Copy link
Contributor Author

@tdas Addressed your comments

@SparkQA
Copy link

SparkQA commented Oct 11, 2016

Test build #66716 has finished for PR 15285 at commit ef4f2b9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 11, 2016

Test build #66718 has finished for PR 15285 at commit e5676a6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 11, 2016

Test build #66725 has finished for PR 15285 at commit 7cc6935.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@loneknightpy
Copy link
Contributor Author

@tdas Add fileUncompressedLengthCacheSize to executor conf.

@SparkQA
Copy link

SparkQA commented Oct 14, 2016

Test build #66942 has finished for PR 15285 at commit a6fbebe.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 14, 2016

Test build #66977 has finished for PR 15285 at commit 440fcfc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@tdas tdas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall functionality and tests are good, but I would like to see a bit more code refactoring to make the design cleaner.

.maximumSize(fileUncompressedLengthCacheSize)
.build[String, java.lang.Long](new CacheLoader[String, java.lang.Long]() {
override def load(path: String): java.lang.Long = {
Utils.getFileLength(new File(path))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just learnt that CacheLoad.load has issues with obfuscating exception. So I think what may happen is that if Utils.getFileLength throws any exception, it will be wrapped and rethrown by Guava with a different stack trace, thus making it very confusing. I strongly suggest adding try catch here to log stuff (as warning) before rethrowing the same exception.

val DEFAULT_BUFFER_SIZE = 8192
val ENABLE_COMPRESSION = "spark.executor.logs.rolling.enableCompression"
val FILE_UNCOMPRESSED_LENGTH_CACHE_SIZE =
"spark.executor.logs.rolling.fileUncompressedLengthCacheSize"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a configuration inside executor. Its inside the worker. So why is this named "spark.executor"?

Its nothing to do with executor. The worker process (that manages executors) runs this code, and is independent of the application specific configuration in the executor.

Spark worker configurations are named as "spark.worker.*". See http://spark.apache.org/docs/latest/spark-standalone.html

So how about renaming it to "spark.worker.ui. fileUncompressedLengthCacheSize"

CallSite(shortForm, longForm)
}

def getFileLength(file: File): Long = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add docs, that this can handle non-compressed and gzip compressed files.

UIUtils.basicSparkPage(content, logType + " log page for " + pageName)
}

private val fileUncompressedLengthCacheSize = parent.worker.conf.getInt(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about the organization of the code, I dont like this. LogPage should not be aware of the details of compressing uncompressing, when Utils is doing the heavy lifting handling gzip files in a special manner. Its spreading the gzip support of log viewing between two classes unnecessary. Rather a cleaner approach is

  • Utils has the cache, and the cacheloader calls private util function to get gzip file size.
  • LogPage calls public method called Utils.getFileSize(file, conf), which transparently handle compressed and non-compressed files.

In the Utils, the cache should be called compressedLogFileLengthCache. It is initialized when Utils.getFileSize(file, conf) is called for the first time, using the configurations in conf.

val files = (1 to 3).map(i => new File(tmpDir, i.toString + suffix))
writeLogFile(files(0).getAbsolutePath, "0123456789".getBytes(StandardCharsets.UTF_8))
writeLogFile(files(1).getAbsolutePath, "abcdefghij".getBytes(StandardCharsets.UTF_8))
writeLogFile(files(2).getAbsolutePath, "ABCDEFGHIJ".getBytes(StandardCharsets.UTF_8))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you test mixed compressed and uncompressed files. i think that case arises when we compressed rolled files, and active file.

just having another file(3) which is always uncompressed should be fine.

@SparkQA
Copy link

SparkQA commented Oct 18, 2016

Test build #67099 has finished for PR 15285 at commit 81465ca.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 18, 2016

Test build #67106 has finished for PR 15285 at commit 82d4575.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@loneknightpy
Copy link
Contributor Author

@tdas Addressed your comments, please take a look.

Copy link
Contributor

@tdas tdas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its looking better, but still needs some work to improve test coverage. Currently the tests does not seem to test the code path of loading from the cache, as theh tests call the Utils.getFileLength() that bypasses the cache.

CallSite(shortForm, longForm)
}

val UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE = "spark.worker.ui.compressedLogFileLengthCacheSize"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also this is not the cache size, this is the cache size conf.

}

val UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE = "spark.worker.ui.compressedLogFileLengthCacheSize"
val DEFAULT_UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE = 100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private

}
} catch {
case e: Throwable =>
logWarning(s"Cannot get file length of ${file}", e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this is a critical error. Better to make this logError

Files.write("abcdefghij", files(1), StandardCharsets.UTF_8)
Files.write("ABCDEFGHIJ", files(2), StandardCharsets.UTF_8)
val suffix = getSuffix(isCompressed)
val files = (1 to 3).map(i => new File(tmpDir, i.toString + suffix)) ++
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ++ Seq(item) can probably be replaced by :+ item

compressedLogFileLengthCache
}

def getFileLength(file: File, sparkConf: SparkConf): Long = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add docs. Specify why conf is needed.
Rename sparkConf to more specific workerConf.

val suffix = getSuffix(isCompressed)
val f1Path = tmpDir2 + "/f1" + suffix
writeLogFile(f1Path, "1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(StandardCharsets.UTF_8))
val f1Length = Utils.getFileLength(new File(f1Path))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing in these tests is testing the cache usage. How about making the internal getFileLength completely private (not private[util]) and deal with only compressed files (rename to getCompressedFileLenght). And so the only publicly visible function would be Utils.getFileLenght(), which is used by both LogPage and test code.

This would ensure that we expose only one new public interface in Utils, which gets tested thoroughly in the tests.

* Return the file length, if the file is compressed it returns the uncompressed file length.
* It also caches the uncompressed file size to avoid repeated decompression. The cache size is
* read from workerConf.
* */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: doc style incorrect

fileSize
} catch {
case e: Throwable =>
logWarning(s"Cannot get file length of ${file}", e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logError

@SparkQA
Copy link

SparkQA commented Oct 18, 2016

Test build #67133 has finished for PR 15285 at commit b3041b7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Oct 18, 2016

LGTM. Merging to master and 2.0

asfgit pushed a commit that referenced this pull request Oct 18, 2016
## What changes were proposed in this pull request?

This PR adds support for executor log compression.

## How was this patch tested?

Unit tests

cc: yhuai tdas mengxr

Author: Yu Peng <[email protected]>

Closes #15285 from loneknightpy/compress-executor-log.

(cherry picked from commit 231f39e)
Signed-off-by: Tathagata Das <[email protected]>
@asfgit asfgit closed this in 231f39e Oct 18, 2016
@SparkQA
Copy link

SparkQA commented Oct 18, 2016

Test build #67135 has finished for PR 15285 at commit 1e3302d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 18, 2016

Test build #67134 has finished for PR 15285 at commit 4ead779.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Oct 18, 2016

robert3005 pushed a commit to palantir/spark that referenced this pull request Nov 1, 2016
## What changes were proposed in this pull request?

This PR adds support for executor log compression.

## How was this patch tested?

Unit tests

cc: yhuai tdas mengxr

Author: Yu Peng <[email protected]>

Closes apache#15285 from loneknightpy/compress-executor-log.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?

This PR adds support for executor log compression.

## How was this patch tested?

Unit tests

cc: yhuai tdas mengxr

Author: Yu Peng <[email protected]>

Closes apache#15285 from loneknightpy/compress-executor-log.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants