true if and only if the file or directory is
+ * successfully deleted; false otherwise
+ */
+ boolean delete();
+
+ /**
+ * A channel for writing data to the file. This special channel allows access to the data for
+ * reading, after the channel is closed, via {@link DownloadFileWritableChannel#closeAndRead()}.
+ */
+ DownloadFileWritableChannel openForWriting() throws IOException;
+
+ /**
+ * The path of the file, intended only for debug purposes.
+ */
+ String path();
+}
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempFileManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFileManager.java
similarity index 75%
rename from common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempFileManager.java
rename to common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFileManager.java
index 552364d274f1..c335a17ae1fe 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempFileManager.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFileManager.java
@@ -17,20 +17,20 @@
package org.apache.spark.network.shuffle;
-import java.io.File;
+import org.apache.spark.network.util.TransportConf;
/**
- * A manager to create temp block files to reduce the memory usage and also clean temp
- * files when they won't be used any more.
+ * A manager to create temp block files used when fetching remote data to reduce the memory usage.
+ * It will clean files when they won't be used any more.
*/
-public interface TempFileManager {
+public interface DownloadFileManager {
/** Create a temp block file. */
- File createTempFile();
+ DownloadFile createTempFile(TransportConf transportConf);
/**
* Register a temp file to clean up when it won't be used any more. Return whether the
* file is registered successfully. If `false`, the caller should clean up the file by itself.
*/
- boolean registerTempFileToClean(File file);
+ boolean registerTempFileToClean(DownloadFile file);
}
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFileWritableChannel.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFileWritableChannel.java
new file mode 100644
index 000000000000..dbbbac43eb74
--- /dev/null
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFileWritableChannel.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * A channel for writing data which is fetched to disk, which allows access to the written data only
+ * after the writer has been closed. Used with DownloadFile and DownloadFileManager.
+ */
+public interface DownloadFileWritableChannel extends WritableByteChannel {
+ ManagedBuffer closeAndRead();
+}
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index 7ed0b6e93a7a..e49e27ab5aa7 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -76,7 +76,7 @@ protected void checkInit() {
@Override
public void init(String appId) {
this.appId = appId;
- TransportContext context = new TransportContext(conf, new NoOpRpcHandler(), true);
+ TransportContext context = new TransportContext(conf, new NoOpRpcHandler(), true, true);
Listnull, the remote blocks will be streamed
* into temp shuffle files to reduce the memory usage, otherwise,
* they will be kept in memory.
@@ -54,7 +54,7 @@ public abstract void fetchBlocks(
String execId,
String[] blockIds,
BlockFetchingListener listener,
- TempFileManager tempFileManager);
+ DownloadFileManager downloadFileManager);
/**
* Get the shuffle MetricsSet from ShuffleClient, this will be used in MetricsSystem to
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
index 386738ece51a..371149bef397 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
@@ -37,14 +37,8 @@ public ShuffleIndexInformation(File indexFile) throws IOException {
size = (int)indexFile.length();
ByteBuffer buffer = ByteBuffer.allocate(size);
offsets = buffer.asLongBuffer();
- DataInputStream dis = null;
- try {
- dis = new DataInputStream(Files.newInputStream(indexFile.toPath()));
+ try (DataInputStream dis = new DataInputStream(Files.newInputStream(indexFile.toPath()))) {
dis.readFully(buffer.array());
- } finally {
- if (dis != null) {
- dis.close();
- }
}
}
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/SimpleDownloadFile.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/SimpleDownloadFile.java
new file mode 100644
index 000000000000..670612fd6f66
--- /dev/null
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/SimpleDownloadFile.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.shuffle;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.util.TransportConf;
+
+/**
+ * A DownloadFile that does not take any encryption settings into account for reading and
+ * writing data.
+ *
+ * This does *not* mean the data in the file is un-encrypted -- it could be that the data is
+ * already encrypted when its written, and subsequent layer is responsible for decrypting.
+ */
+public class SimpleDownloadFile implements DownloadFile {
+
+ private final File file;
+ private final TransportConf transportConf;
+
+ public SimpleDownloadFile(File file, TransportConf transportConf) {
+ this.file = file;
+ this.transportConf = transportConf;
+ }
+
+ @Override
+ public boolean delete() {
+ return file.delete();
+ }
+
+ @Override
+ public DownloadFileWritableChannel openForWriting() throws IOException {
+ return new SimpleDownloadWritableChannel();
+ }
+
+ @Override
+ public String path() {
+ return file.getAbsolutePath();
+ }
+
+ private class SimpleDownloadWritableChannel implements DownloadFileWritableChannel {
+
+ private final WritableByteChannel channel;
+
+ SimpleDownloadWritableChannel() throws FileNotFoundException {
+ channel = Channels.newChannel(new FileOutputStream(file));
+ }
+
+ @Override
+ public ManagedBuffer closeAndRead() {
+ return new FileSegmentManagedBuffer(transportConf, file, 0, file.length());
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ return channel.write(src);
+ }
+
+ @Override
+ public boolean isOpen() {
+ return channel.isOpen();
+ }
+
+ @Override
+ public void close() throws IOException {
+ channel.close();
+ }
+ }
+}
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
index d2072a54fa41..459629c5f05f 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
@@ -98,19 +98,19 @@ public void testSortShuffleBlocks() throws IOException {
resolver.registerExecutor("app0", "exec0",
dataContext.createExecutorInfo(SORT_MANAGER));
- InputStream block0Stream =
- resolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream();
- String block0 = CharStreams.toString(
- new InputStreamReader(block0Stream, StandardCharsets.UTF_8));
- block0Stream.close();
- assertEquals(sortBlock0, block0);
-
- InputStream block1Stream =
- resolver.getBlockData("app0", "exec0", 0, 0, 1).createInputStream();
- String block1 = CharStreams.toString(
- new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
- block1Stream.close();
- assertEquals(sortBlock1, block1);
+ try (InputStream block0Stream = resolver.getBlockData(
+ "app0", "exec0", 0, 0, 0).createInputStream()) {
+ String block0 =
+ CharStreams.toString(new InputStreamReader(block0Stream, StandardCharsets.UTF_8));
+ assertEquals(sortBlock0, block0);
+ }
+
+ try (InputStream block1Stream = resolver.getBlockData(
+ "app0", "exec0", 0, 0, 1).createInputStream()) {
+ String block1 =
+ CharStreams.toString(new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
+ assertEquals(sortBlock1, block1);
+ }
}
@Test
@@ -149,7 +149,7 @@ public void testNormalizeAndInternPathname() {
private void assertPathsMatch(String p1, String p2, String p3, String expectedPathname) {
String normPathname =
- ExternalShuffleBlockResolver.createNormalizedInternedPathname(p1, p2, p3);
+ ExternalShuffleBlockResolver.createNormalizedInternedPathname(p1, p2, p3);
assertEquals(expectedPathname, normPathname);
File file = new File(normPathname);
String returnedPath = file.getPath();
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index a6a1b8d0ac3f..526b96b36447 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -133,37 +133,37 @@ private FetchResult fetchBlocks(
final Semaphore requestsRemaining = new Semaphore(0);
- ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, 5000);
- client.init(APP_ID);
- client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds,
- new BlockFetchingListener() {
- @Override
- public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
- synchronized (this) {
- if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) {
- data.retain();
- res.successBlocks.add(blockId);
- res.buffers.add(data);
- requestsRemaining.release();
+ try (ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, 5000)) {
+ client.init(APP_ID);
+ client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds,
+ new BlockFetchingListener() {
+ @Override
+ public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
+ synchronized (this) {
+ if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) {
+ data.retain();
+ res.successBlocks.add(blockId);
+ res.buffers.add(data);
+ requestsRemaining.release();
+ }
}
}
- }
-
- @Override
- public void onBlockFetchFailure(String blockId, Throwable exception) {
- synchronized (this) {
- if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) {
- res.failedBlocks.add(blockId);
- requestsRemaining.release();
+
+ @Override
+ public void onBlockFetchFailure(String blockId, Throwable exception) {
+ synchronized (this) {
+ if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) {
+ res.failedBlocks.add(blockId);
+ requestsRemaining.release();
+ }
}
}
- }
- }, null);
+ }, null);
- if (!requestsRemaining.tryAcquire(blockIds.length, 5, TimeUnit.SECONDS)) {
- fail("Timeout getting response from the server");
+ if (!requestsRemaining.tryAcquire(blockIds.length, 5, TimeUnit.SECONDS)) {
+ fail("Timeout getting response from the server");
+ }
}
- client.close();
return res;
}
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
index 16bad9f1b319..82caf392b821 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
@@ -96,14 +96,16 @@ private void validate(String appId, String secretKey, boolean encrypt)
ImmutableMap.of("spark.authenticate.enableSaslEncryption", "true")));
}
- ExternalShuffleClient client =
- new ExternalShuffleClient(testConf, new TestSecretKeyHolder(appId, secretKey), true, 5000);
- client.init(appId);
- // Registration either succeeds or throws an exception.
- client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0",
- new ExecutorShuffleInfo(new String[0], 0,
- "org.apache.spark.shuffle.sort.SortShuffleManager"));
- client.close();
+ try (ExternalShuffleClient client =
+ new ExternalShuffleClient(
+ testConf, new TestSecretKeyHolder(appId, secretKey), true, 5000)) {
+ client.init(appId);
+ // Registration either succeeds or throws an exception.
+ client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0",
+ new ExecutorShuffleInfo(
+ new String[0], 0, "org.apache.spark.shuffle.sort.SortShuffleManager")
+ );
+ }
}
/** Provides a secret key holder which always returns the given secret key, for a single appId. */
diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml
index 564e6583c909..a1cf761d12d8 100644
--- a/common/network-yarn/pom.xml
+++ b/common/network-yarn/pom.xml
@@ -22,7 +22,7 @@
Each executor will, during its initialization, invoke this method on each + * plugin provided in the spark.executor.plugins configuration.
+ * + *Plugins should create threads in their implementation of this method for + * any polling, blocking, or intensive computation.
+ */ + default void init() {} + + /** + * Clean up and terminate this plugin. + * + *This function is called during the executor shutdown phase. The executor + * will wait for the plugin to terminate before continuing its own shutdown.
+ */ + default void shutdown() {} +} diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java index 94c5c11b61a5..731f6fc767df 100644 --- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java +++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java @@ -103,6 +103,12 @@ public final void onExecutorMetricsUpdate( onEvent(executorMetricsUpdate); } + @Override + public final void onStageExecutorMetrics( + SparkListenerStageExecutorMetrics executorMetrics) { + onEvent(executorMetrics); + } + @Override public final void onExecutorAdded(SparkListenerExecutorAdded executorAdded) { onEvent(executorAdded); diff --git a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java index 0cced9e22295..2e18715b600e 100644 --- a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java +++ b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java @@ -135,62 +135,58 @@ private void readAsync() throws IOException { } finally { stateChangeLock.unlock(); } - executorService.execute(new Runnable() { - - @Override - public void run() { - stateChangeLock.lock(); - try { - if (isClosed) { - readInProgress = false; - return; - } - // Flip this so that the close method will not close the underlying input stream when we - // are reading. - isReading = true; - } finally { - stateChangeLock.unlock(); + executorService.execute(() -> { + stateChangeLock.lock(); + try { + if (isClosed) { + readInProgress = false; + return; } + // Flip this so that the close method will not close the underlying input stream when we + // are reading. + isReading = true; + } finally { + stateChangeLock.unlock(); + } - // Please note that it is safe to release the lock and read into the read ahead buffer - // because either of following two conditions will hold - 1. The active buffer has - // data available to read so the reader will not read from the read ahead buffer. - // 2. This is the first time read is called or the active buffer is exhausted, - // in that case the reader waits for this async read to complete. - // So there is no race condition in both the situations. - int read = 0; - int off = 0, len = arr.length; - Throwable exception = null; - try { - // try to fill the read ahead buffer. - // if a reader is waiting, possibly return early. - do { - read = underlyingInputStream.read(arr, off, len); - if (read <= 0) break; - off += read; - len -= read; - } while (len > 0 && !isWaiting.get()); - } catch (Throwable ex) { - exception = ex; - if (ex instanceof Error) { - // `readException` may not be reported to the user. Rethrow Error to make sure at least - // The user can see Error in UncaughtExceptionHandler. - throw (Error) ex; - } - } finally { - stateChangeLock.lock(); - readAheadBuffer.limit(off); - if (read < 0 || (exception instanceof EOFException)) { - endOfStream = true; - } else if (exception != null) { - readAborted = true; - readException = exception; - } - readInProgress = false; - signalAsyncReadComplete(); - stateChangeLock.unlock(); - closeUnderlyingInputStreamIfNecessary(); + // Please note that it is safe to release the lock and read into the read ahead buffer + // because either of following two conditions will hold - 1. The active buffer has + // data available to read so the reader will not read from the read ahead buffer. + // 2. This is the first time read is called or the active buffer is exhausted, + // in that case the reader waits for this async read to complete. + // So there is no race condition in both the situations. + int read = 0; + int off = 0, len = arr.length; + Throwable exception = null; + try { + // try to fill the read ahead buffer. + // if a reader is waiting, possibly return early. + do { + read = underlyingInputStream.read(arr, off, len); + if (read <= 0) break; + off += read; + len -= read; + } while (len > 0 && !isWaiting.get()); + } catch (Throwable ex) { + exception = ex; + if (ex instanceof Error) { + // `readException` may not be reported to the user. Rethrow Error to make sure at least + // The user can see Error in UncaughtExceptionHandler. + throw (Error) ex; } + } finally { + stateChangeLock.lock(); + readAheadBuffer.limit(off); + if (read < 0 || (exception instanceof EOFException)) { + endOfStream = true; + } else if (exception != null) { + readAborted = true; + readException = exception; + } + readInProgress = false; + signalAsyncReadComplete(); + stateChangeLock.unlock(); + closeUnderlyingInputStreamIfNecessary(); } }); } diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java index 8651a639c07f..d07faf1da124 100644 --- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java +++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java @@ -311,7 +311,7 @@ public MemoryBlock allocatePage(long size, MemoryConsumer consumer) { // this could trigger spilling to free some pages. return allocatePage(size, consumer); } - page.setPageNumber(pageNumber); + page.pageNumber = pageNumber; pageTable[pageNumber] = page; if (logger.isTraceEnabled()) { logger.trace("Allocate page number {} ({} bytes)", pageNumber, acquired); @@ -323,25 +323,25 @@ public MemoryBlock allocatePage(long size, MemoryConsumer consumer) { * Free a block of memory allocated via {@link TaskMemoryManager#allocatePage}. */ public void freePage(MemoryBlock page, MemoryConsumer consumer) { - assert (page.getPageNumber() != MemoryBlock.NO_PAGE_NUMBER) : + assert (page.pageNumber != MemoryBlock.NO_PAGE_NUMBER) : "Called freePage() on memory that wasn't allocated with allocatePage()"; - assert (page.getPageNumber() != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) : + assert (page.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) : "Called freePage() on a memory block that has already been freed"; - assert (page.getPageNumber() != MemoryBlock.FREED_IN_TMM_PAGE_NUMBER) : + assert (page.pageNumber != MemoryBlock.FREED_IN_TMM_PAGE_NUMBER) : "Called freePage() on a memory block that has already been freed"; - assert(allocatedPages.get(page.getPageNumber())); - pageTable[page.getPageNumber()] = null; + assert(allocatedPages.get(page.pageNumber)); + pageTable[page.pageNumber] = null; synchronized (this) { - allocatedPages.clear(page.getPageNumber()); + allocatedPages.clear(page.pageNumber); } if (logger.isTraceEnabled()) { - logger.trace("Freed page number {} ({} bytes)", page.getPageNumber(), page.size()); + logger.trace("Freed page number {} ({} bytes)", page.pageNumber, page.size()); } long pageSize = page.size(); // Clear the page number before passing the block to the MemoryAllocator's free(). // Doing this allows the MemoryAllocator to detect when a TaskMemoryManager-managed // page has been inappropriately directly freed without calling TMM.freePage(). - page.setPageNumber(MemoryBlock.FREED_IN_TMM_PAGE_NUMBER); + page.pageNumber = MemoryBlock.FREED_IN_TMM_PAGE_NUMBER; memoryManager.tungstenMemoryAllocator().free(page); releaseExecutionMemory(pageSize, consumer); } @@ -363,7 +363,7 @@ public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) { // relative to the page's base offset; this relative offset will fit in 51 bits. offsetInPage -= page.getBaseOffset(); } - return encodePageNumberAndOffset(page.getPageNumber(), offsetInPage); + return encodePageNumberAndOffset(page.pageNumber, offsetInPage); } @VisibleForTesting @@ -434,7 +434,7 @@ public long cleanUpAllAllocatedMemory() { for (MemoryBlock page : pageTable) { if (page != null) { logger.debug("unreleased page: " + page + " in task " + taskAttemptId); - page.setPageNumber(MemoryBlock.FREED_IN_TMM_PAGE_NUMBER); + page.pageNumber = MemoryBlock.FREED_IN_TMM_PAGE_NUMBER; memoryManager.tungstenMemoryAllocator().free(page); } } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index e3bd5496cf5b..b020a6d99247 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -125,7 +125,7 @@ public void write(Iterator