Skip to content

Commit 001cb1d

Browse files
committed
Tue Nov 8 20:09:31 PST 2016
1 parent 4296612 commit 001cb1d

File tree

3 files changed

+12
-8
lines changed

3 files changed

+12
-8
lines changed

core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ abstract class FileCommitProtocol {
8484
* are left to the commit protocol implementation to decide.
8585
*
8686
* Important: it is the caller's responsibility to add uniquely identifying content to "ext"
87-
* if a task is going to write out multiple files to the same dir.
87+
* if a task is going to write out multiple files to the same dir. The file commit protocol only
88+
* guarantees that files written by different tasks will not conflict.
8889
*/
8990
def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String
9091

@@ -93,7 +94,8 @@ abstract class FileCommitProtocol {
9394
* Depending on the implementation, there may be weaker guarantees around adding files this way.
9495
*
9596
* Important: it is the caller's responsibility to add uniquely identifying content to "ext"
96-
* if a task is going to write out multiple files at all (even to different directories).
97+
* if a task is going to write out multiple files to the same dir. The file commit protocol only
98+
* guarantees that files written by different tasks will not conflict.
9799
*/
98100
def newTaskTempFileAbsPath(
99101
taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String

core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.internal.io
1919

20-
import java.util.Date
20+
import java.util.{Date, UUID}
2121

2222
import scala.collection.mutable
2323

@@ -82,7 +82,12 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
8282
taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
8383
val filename = getFilename(taskContext, ext)
8484
val absOutputPath = new Path(absoluteDir, filename).toString
85-
val tmpOutputPath = new Path(absPathStagingDir, filename).toString
85+
86+
// Include a UUID here to prevent file collisions for one task writing to different dirs.
87+
// In principle we could include hash(absoluteDir) instead but this is simpler.
88+
val tmpOutputPath = new Path(
89+
absPathStagingDir, UUID.randomUUID().toString() + "-" + filename).toString
90+
8691
addedAbsPathFiles(tmpOutputPath) = absOutputPath
8792
tmpOutputPath
8893
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -321,10 +321,7 @@ object FileFormatWriter extends Logging {
321321
None
322322
}
323323
val path = if (customPath.isDefined) {
324-
// We need to include a uuid here since the commit protocol does not guarantee that
325-
// temp files requested by the same task for absolute placement do not collide.
326-
committer.newTaskTempFileAbsPath(
327-
taskAttemptContext, customPath.get, UUID.randomUUID().toString + ext)
324+
committer.newTaskTempFileAbsPath(taskAttemptContext, customPath.get, ext)
328325
} else {
329326
committer.newTaskTempFile(taskAttemptContext, partDir, ext)
330327
}

0 commit comments

Comments
 (0)