@@ -67,7 +67,6 @@ public final class UnsafeShuffleSpillWriter {
6767 private final BlockManager blockManager ;
6868 private final TaskContext taskContext ;
6969 private final boolean spillingEnabled ;
70- private ShuffleWriteMetrics writeMetrics ;
7170
7271 /** The buffer size to use when writing spills using DiskBlockObjectWriter */
7372 private final int fileBufferSize ;
@@ -107,15 +106,11 @@ public UnsafeShuffleSpillWriter(
107106 openSorter ();
108107 }
109108
110- // TODO: metrics tracking + integration with shuffle write metrics
111-
112109 /**
113110 * Allocates a new sorter. Called when opening the spill writer for the first time and after
114111 * each spill.
115112 */
116113 private void openSorter () throws IOException {
117- this .writeMetrics = new ShuffleWriteMetrics ();
118- // TODO: connect write metrics to task metrics?
119114 // TODO: move this sizing calculation logic into a static method of sorter:
120115 final long memoryRequested = initialSize * 8L ;
121116 if (spillingEnabled ) {
@@ -130,8 +125,8 @@ private void openSorter() throws IOException {
130125 }
131126
132127 /**
133- * Sorts the in-memory records, writes the sorted records to a spill file, and frees the in-memory
134- * data structures associated with this sort. New data structures are not automatically allocated .
128+ * Sorts the in-memory records and writes the sorted records to a spill file.
129+ * This method does not free the sort data structures.
135130 */
136131 private SpillInfo writeSpillFile () throws IOException {
137132 // This call performs the actual sort.
@@ -161,7 +156,17 @@ private SpillInfo writeSpillFile() throws IOException {
161156 // OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work
162157 // around this, we pass a dummy no-op serializer.
163158 final SerializerInstance ser = new DummySerializerInstance ();
164- writer = blockManager .getDiskWriter (blockId , file , ser , fileBufferSize , writeMetrics );
159+ // TODO: audit the metrics-related code and ensure proper metrics integration:
160+ // It's not clear how we should handle shuffle write metrics for spill files; currently, Spark
161+ // doesn't report IO time spent writing spill files (see SPARK-7413). This method,
162+ // writeSpillFile(), is called both when writing spill files and when writing the single output
163+ // file in cases where we didn't spill. As a result, we don't necessarily know whether this
164+ // should be reported as bytes spilled or as shuffle bytes written. We could defer the updating
165+ // of these metrics until the end of the shuffle write, but that would mean that that users
166+ // wouldn't get useful metrics updates in the UI from long-running tasks. Given this complexity,
167+ // I'm deferring these decisions to a separate follow-up commit or patch.
168+ writer =
169+ blockManager .getDiskWriter (blockId , file , ser , fileBufferSize , new ShuffleWriteMetrics ());
165170
166171 int currentPartition = -1 ;
167172 while (sortedRecords .hasNext ()) {
@@ -175,7 +180,8 @@ private SpillInfo writeSpillFile() throws IOException {
175180 spillInfo .partitionLengths [currentPartition ] = writer .fileSegment ().length ();
176181 }
177182 currentPartition = partition ;
178- writer = blockManager .getDiskWriter (blockId , file , ser , fileBufferSize , writeMetrics );
183+ writer =
184+ blockManager .getDiskWriter (blockId , file , ser , fileBufferSize , new ShuffleWriteMetrics ());
179185 }
180186
181187 final long recordPointer = sortedRecords .packedRecordPointer .getRecordPointer ();
@@ -295,7 +301,6 @@ private void ensureSpaceInDataPage(int requiredSpace) throws IOException {
295301 currentPage = memoryManager .allocatePage (PAGE_SIZE );
296302 currentPagePosition = currentPage .getBaseOffset ();
297303 allocatedPages .add (currentPage );
298- logger .info ("Acquired new page! " + allocatedPages .size () * PAGE_SIZE );
299304 }
300305 }
301306
0 commit comments