-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-8029] Robust shuffle writer #9610
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b2a90c6
9f0d2f9
55485a9
6deccff
f0c2a5d
d0b937f
35bd469
71b12bf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,13 +21,12 @@ import java.io._ | |
|
|
||
| import com.google.common.io.ByteStreams | ||
|
|
||
| import org.apache.spark.{SparkConf, SparkEnv, Logging} | ||
| import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} | ||
| import org.apache.spark.network.netty.SparkTransportConf | ||
| import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID | ||
| import org.apache.spark.storage._ | ||
| import org.apache.spark.util.Utils | ||
|
|
||
| import IndexShuffleBlockResolver.NOOP_REDUCE_ID | ||
| import org.apache.spark.{SparkEnv, Logging, SparkConf} | ||
|
|
||
| /** | ||
| * Create and maintain the shuffle blocks' mapping between logic block and physical file location. | ||
|
|
@@ -40,10 +39,13 @@ import IndexShuffleBlockResolver.NOOP_REDUCE_ID | |
| */ | ||
| // Note: Changes to the format in this file should be kept in sync with | ||
| // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getSortBasedShuffleBlockData(). | ||
| private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleBlockResolver | ||
| private[spark] class IndexShuffleBlockResolver( | ||
| conf: SparkConf, | ||
| _blockManager: BlockManager = null) | ||
| extends ShuffleBlockResolver | ||
| with Logging { | ||
|
|
||
| private lazy val blockManager = SparkEnv.get.blockManager | ||
| private lazy val blockManager = Option(_blockManager).getOrElse(SparkEnv.get.blockManager) | ||
|
|
||
| private val transportConf = SparkTransportConf.fromSparkConf(conf) | ||
|
|
||
|
|
@@ -74,14 +76,69 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Check whether the given index and data files match each other. | ||
| * If so, return the partition lengths in the data file. Otherwise return null. | ||
| */ | ||
| private def checkIndexAndDataFile(index: File, data: File, blocks: Int): Array[Long] = { | ||
| // the index file should have `block + 1` longs as offset. | ||
| if (index.length() != (blocks + 1) * 8) { | ||
| return null | ||
| } | ||
| val lengths = new Array[Long](blocks) | ||
| // Read the lengths of blocks | ||
| val in = try { | ||
| new DataInputStream(new BufferedInputStream(new FileInputStream(index))) | ||
| } catch { | ||
| case e: IOException => | ||
| return null | ||
| } | ||
| try { | ||
| // Convert the offsets into lengths of each block | ||
| var offset = in.readLong() | ||
| if (offset != 0L) { | ||
| return null | ||
| } | ||
| var i = 0 | ||
| while (i < blocks) { | ||
| val off = in.readLong() | ||
| lengths(i) = off - offset | ||
| offset = off | ||
| i += 1 | ||
| } | ||
| } catch { | ||
| case e: IOException => | ||
| return null | ||
| } finally { | ||
| in.close() | ||
| } | ||
|
|
||
| // the size of data file should match with index file | ||
| if (data.length() == lengths.sum) { | ||
| lengths | ||
| } else { | ||
| null | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Write an index file with the offsets of each block, plus a final offset at the end for the | ||
| * end of the output file. This will be used by getBlockData to figure out where each block | ||
| * begins and ends. | ||
| * | ||
| * It will commit the data and index file as an atomic operation, use the existing ones, or | ||
| * replace them with new ones. | ||
| * | ||
| * Note: the `lengths` will be updated to match the existing index file if use the existing ones. | ||
| * */ | ||
| def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]): Unit = { | ||
| def writeIndexFileAndCommit( | ||
| shuffleId: Int, | ||
| mapId: Int, | ||
| lengths: Array[Long], | ||
| dataTmp: File): Unit = { | ||
| val indexFile = getIndexFile(shuffleId, mapId) | ||
| val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile))) | ||
| val indexTmp = Utils.tempFileWith(indexFile) | ||
| val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp))) | ||
| Utils.tryWithSafeFinally { | ||
| // We take in lengths of each block, need to convert it to offsets. | ||
| var offset = 0L | ||
|
|
@@ -93,6 +150,37 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB | |
| } { | ||
| out.close() | ||
| } | ||
|
|
||
| val dataFile = getDataFile(shuffleId, mapId) | ||
| // There is only one IndexShuffleBlockResolver per executor, this synchronization make sure | ||
| // the following check and rename are atomic. | ||
| synchronized { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this |
||
| val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length) | ||
| if (existingLengths != null) { | ||
| // Another attempt for the same task has already written our map outputs successfully, | ||
| // so just use the existing partition lengths and delete our temporary map outputs. | ||
| System.arraycopy(existingLengths, 0, lengths, 0, lengths.length) | ||
| if (dataTmp != null && dataTmp.exists()) { | ||
| dataTmp.delete() | ||
| } | ||
| indexTmp.delete() | ||
| } else { | ||
| // This is the first successful attempt in writing the map outputs for this task, | ||
| // so override any existing index and data files with the ones we wrote. | ||
| if (indexFile.exists()) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| indexFile.delete() | ||
| } | ||
| if (dataFile.exists()) { | ||
| dataFile.delete() | ||
| } | ||
| if (!indexTmp.renameTo(indexFile)) { | ||
| throw new IOException("fail to rename file " + indexTmp + " to " + indexFile) | ||
| } | ||
| if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) { | ||
| throw new IOException("fail to rename file " + dataTmp + " to " + dataFile) | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we check what happens if this fails? Should we throw an exception?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will slowdown the normal path, I think it's not needed. |
||
| } | ||
| } | ||
| } | ||
|
|
||
| override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,8 @@ | |
|
|
||
| package org.apache.spark.shuffle.hash | ||
|
|
||
| import java.io.IOException | ||
|
|
||
| import org.apache.spark._ | ||
| import org.apache.spark.executor.ShuffleWriteMetrics | ||
| import org.apache.spark.scheduler.MapStatus | ||
|
|
@@ -106,6 +108,29 @@ private[spark] class HashShuffleWriter[K, V]( | |
| writer.commitAndClose() | ||
| writer.fileSegment().length | ||
| } | ||
| // rename all shuffle files to final paths | ||
| // Note: there is only one ShuffleBlockResolver in executor | ||
| shuffleBlockResolver.synchronized { | ||
| shuffle.writers.zipWithIndex.foreach { case (writer, i) => | ||
| val output = blockManager.diskBlockManager.getFile(writer.blockId) | ||
| if (sizes(i) > 0) { | ||
| if (output.exists()) { | ||
| // Use length of existing file and delete our own temporary one | ||
| sizes(i) = output.length() | ||
| writer.file.delete() | ||
| } else { | ||
| // Commit by renaming our temporary file to something the fetcher expects | ||
| if (!writer.file.renameTo(output)) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| throw new IOException(s"fail to rename ${writer.file} to $output") | ||
| } | ||
| } | ||
| } else { | ||
| if (output.exists()) { | ||
| output.delete() | ||
| } | ||
| } | ||
| } | ||
| } | ||
| MapStatus(blockManager.shuffleServerId, sizes) | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,8 +20,9 @@ package org.apache.spark.shuffle.sort | |
| import org.apache.spark._ | ||
| import org.apache.spark.executor.ShuffleWriteMetrics | ||
| import org.apache.spark.scheduler.MapStatus | ||
| import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriter, BaseShuffleHandle} | ||
| import org.apache.spark.shuffle.{BaseShuffleHandle, IndexShuffleBlockResolver, ShuffleWriter} | ||
| import org.apache.spark.storage.ShuffleBlockId | ||
| import org.apache.spark.util.Utils | ||
| import org.apache.spark.util.collection.ExternalSorter | ||
|
|
||
| private[spark] class SortShuffleWriter[K, V, C]( | ||
|
|
@@ -65,11 +66,11 @@ private[spark] class SortShuffleWriter[K, V, C]( | |
| // Don't bother including the time to open the merged output file in the shuffle write time, | ||
| // because it just opens a single file, so is typically too fast to measure accurately | ||
| // (see SPARK-3570). | ||
| val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) | ||
| val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) | ||
| val tmp = Utils.tempFileWith(output) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you call these
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is only one file |
||
| val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) | ||
| val partitionLengths = sorter.writePartitionedFile(blockId, outputFile) | ||
| shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths) | ||
|
|
||
| val partitionLengths = sorter.writePartitionedFile(blockId, tmp) | ||
| shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) | ||
| mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a big comment explaining what is going on here? Also worth noting that there is only one
IndexShuffleBlockResolverper executor.