Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 28 additions & 19 deletions core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -176,26 +176,31 @@ private[deploy] object RPackageUtils extends Logging {
val file = new File(Utils.resolveURI(jarPath))
if (file.exists()) {
val jar = new JarFile(file)
if (checkManifestForR(jar)) {
print(s"$file contains R source code. Now installing package.", printStream, Level.INFO)
val rSource = extractRFolder(jar, printStream, verbose)
if (RUtils.rPackages.isEmpty) {
RUtils.rPackages = Some(Utils.createTempDir().getAbsolutePath)
}
try {
if (!rPackageBuilder(rSource, printStream, verbose, RUtils.rPackages.get)) {
print(s"ERROR: Failed to build R package in $file.", printStream)
print(RJarDoc, printStream)
Utils.tryWithSafeFinally {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actual change is as below:

Utils.tryWithSafeFinally {
  ...
} {
  jar.close()
}

if (checkManifestForR(jar)) {
print(s"$file contains R source code. Now installing package.", printStream, Level.INFO)
val rSource = extractRFolder(jar, printStream, verbose)
if (RUtils.rPackages.isEmpty) {
RUtils.rPackages = Some(Utils.createTempDir().getAbsolutePath)
}
} finally { // clean up
if (!rSource.delete()) {
logWarning(s"Error deleting ${rSource.getPath()}")
try {
if (!rPackageBuilder(rSource, printStream, verbose, RUtils.rPackages.get)) {
print(s"ERROR: Failed to build R package in $file.", printStream)
print(RJarDoc, printStream)
}
} finally {
// clean up
if (!rSource.delete()) {
logWarning(s"Error deleting ${rSource.getPath()}")
}
}
} else {
if (verbose) {
print(s"$file doesn't contain R source code, skipping...", printStream)
}
}
} else {
if (verbose) {
print(s"$file doesn't contain R source code, skipping...", printStream)
}
} {
jar.close()
}
} else {
print(s"WARN: $file resolved as dependency, but not found.", printStream, Level.WARNING)
Expand Down Expand Up @@ -231,8 +236,12 @@ private[deploy] object RPackageUtils extends Logging {
val zipOutputStream = new ZipOutputStream(new FileOutputStream(zipFile, false))
try {
filesToBundle.foreach { file =>
// get the relative paths for proper naming in the zip file
val relPath = file.getAbsolutePath.replaceFirst(dir.getAbsolutePath, "")
// Get the relative paths for proper naming in the ZIP file. Note that
// we convert dir to URI to force / and then remove trailing / that show up for
// directories because the separator should always be / for according to ZIP
// specification and therefore `relPath` here should be, for example,
// "/packageTest/def.R" or "/test.R".
val relPath = file.toURI.toString.replaceFirst(dir.toURI.toString.stripSuffix("/"), "")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should always be / according to ZIP specification (See 4.4.17 file name: (Variable) in https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT)

Copy link
Member Author

@HyukjinKwon HyukjinKwon Dec 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @shivaram, could I please ask to take a look for this one? This fixes the test, SparkR zipping works properly on Windows in RPackageUtilsSuite.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think using / always is good. Could you write a small comment on what the toURI is accomplishing here (as opposed to the the getAbsolutePath we were using before)

Copy link
Member Author

@HyukjinKwon HyukjinKwon Dec 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I thought you meant writing a comment in the codes.. :).

it just replaces the \ on Windows to /. The reason for stripSuffix is, it seems it has a trailing / when the uri is known as a directory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can put it in the code as well :) Something like We convert dir to URI to force / and then remove trailing / that show up for directories

Copy link
Member Author

@HyukjinKwon HyukjinKwon Dec 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example,

Before

  • Windows

    val a = file.getAbsolutePath // "C:\...\tmp\1481863447985-0"
    val b = dir.getAbsolutePath  // "C:\...\tmp\1481863447985-0\test.R"
    a.replaceFirst(b, "")        // java.util.regex.PatternSyntaxException: Unknown character property name {r} near index 4

    Full exception message:

    [info]   java.util.regex.PatternSyntaxException: Unknown character property name {r} near index 4
    [info] C:\projects\spark\target\tmp\1481863447985-0
    [info]     ^
    [info]   at java.util.regex.Pattern.error(Pattern.java:1955)
    [info]   at java.util.regex.Pattern.charPropertyNodeFor(Pattern.java:2781)
    [info]   at java.util.regex.Pattern.family(Pattern.java:2736)
    [info]   at java.util.regex.Pattern.sequence(Pattern.java:2076)
    [info]   at java.util.regex.Pattern.expr(Pattern.java:1996)
    [info]   at java.util.regex.Pattern.compile(Pattern.java:1696)
    [info]   at java.util.regex.Pattern.<init>(Pattern.java:1351)
    [info]   at java.util.regex.Pattern.compile(Pattern.java:1028)
    [info]   at java.lang.String.replaceFirst(String.java:2178)
    [info]   at org.apache.spark.deploy.RPackageUtils$$anonfun$zipRLibraries$2.apply(RPackageUtils.scala:235)
    
  • Linux/Mac

    val a = file.getAbsolutePath // "/var/.../T/1481938681657-0/test.R"
    val b = dir.getAbsolutePath  // "/var/.../T/1481938681657-0"
    a.replaceFirst(b, "")        // "/test.R"

After

  • Windows

    val a = file.toURI.toString              // "file:/C:/.../tmp/1481863447985-0/test.R"
    val b = dir.toURI.toString               // "file:/C:/.../tmp/1481863447985-0/"
    a.replaceFirst(b.stripSuffix("/"),  "")  // "/test.R"
  • Linux/Mac

    val a = file.toURI.toString              // "file:/var/.../T/1481938681657-0/test.R"
    val b = dir.toURI.toString               // "file:/var/.../T/1481938681657-0/"
    a.replaceFirst(b.stripSuffix("/"),  "")  // "/test.R"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, thanks!

val fis = new FileInputStream(file)
val zipEntry = new ZipEntry(relPath)
zipOutputStream.putNextEntry(zipEntry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
pw.close()

// Path to tmpFile
tmpFilePath = "file://" + tmpFile.getAbsolutePath
tmpFilePath = tmpFile.toURI.toString
}

after {
Expand Down Expand Up @@ -181,7 +181,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
sc.textFile(tmpFilePath, 4)
.map(key => (key, 1))
.reduceByKey(_ + _)
.saveAsTextFile("file://" + tmpFile.getAbsolutePath)
.saveAsTextFile(tmpFile.toURI.toString)

sc.listenerBus.waitUntilEmpty(500)
assert(inputRead == numRecords)
Expand All @@ -197,7 +197,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
val numPartitions = 2
val cartVector = 0 to 9
val cartFile = new File(tmpDir, getClass.getSimpleName + "_cart.txt")
val cartFilePath = "file://" + cartFile.getAbsolutePath
val cartFilePath = cartFile.toURI.toString

// write files to disk so we can read them later.
sc.parallelize(cartVector).saveAsTextFile(cartFilePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,20 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
}

test("Event log name") {
val baseDirUri = Utils.resolveURI("/base-dir")
// without compression
assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath(
Utils.resolveURI("/base-dir"), "app1", None))
assert(s"${baseDirUri.toString}/app1" === EventLoggingListener.getLogPath(
baseDirUri, "app1", None))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On Windows, it compares

"file:/C:/base-dir/app1" === "file:/C:/base-dir/app1"

whereas on Linux and Mac,

"file:/base-dir/app1" === "file:/base-dir/app1"

// with compression
assert(s"file:/base-dir/app1.lzf" ===
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", None, Some("lzf")))
assert(s"${baseDirUri.toString}/app1.lzf" ===
EventLoggingListener.getLogPath(baseDirUri, "app1", None, Some("lzf")))
// illegal characters in app ID
assert(s"file:/base-dir/a-fine-mind_dollar_bills__1" ===
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
assert(s"${baseDirUri.toString}/a-fine-mind_dollar_bills__1" ===
EventLoggingListener.getLogPath(baseDirUri,
"a fine:mind$dollar{bills}.1", None))
// illegal characters in app ID with compression
assert(s"file:/base-dir/a-fine-mind_dollar_bills__1.lz4" ===
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
assert(s"${baseDirUri.toString}/a-fine-mind_dollar_bills__1.lz4" ===
EventLoggingListener.getLogPath(baseDirUri,
"a fine:mind$dollar{bills}.1", None, Some("lz4")))
}

Expand Down Expand Up @@ -289,7 +290,7 @@ object EventLoggingListenerSuite {
val conf = new SparkConf
conf.set("spark.eventLog.enabled", "true")
conf.set("spark.eventLog.testing", "true")
conf.set("spark.eventLog.dir", logDir.toString)
conf.set("spark.eventLog.dir", logDir.toUri.toString)
compressionCodec.foreach { codec =>
conf.set("spark.eventLog.compress", "true")
conf.set("spark.io.compression.codec", codec)
Expand Down