From 906f716c5bed962ca0eecd2e2114cefb1444d8b7 Mon Sep 17 00:00:00 2001 From: sychen Date: Tue, 1 Nov 2022 19:17:39 +0800 Subject: [PATCH 01/12] avoid mkdir when remove file --- .../org/apache/spark/storage/DiskBlockManager.scala | 10 ++++++++-- .../scala/org/apache/spark/storage/DiskStore.scala | 2 +- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index e29f3fc1b8050..d7376fd75046b 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -91,6 +91,10 @@ private[spark] class DiskBlockManager( // This method should be kept in sync with // org.apache.spark.network.shuffle.ExecutorDiskUtils#getFilePath(). def getFile(filename: String): File = { + getFile(filename, true) + } + + def getFile(filename: String, needCreate: Boolean = true): File = { // Figure out which local directory it hashes to, and which subdirectory in that val hash = Utils.nonNegativeHash(filename) val dirId = hash % localDirs.length @@ -103,7 +107,7 @@ private[spark] class DiskBlockManager( old } else { val newDir = new File(localDirs(dirId), "%02x".format(subDirId)) - if (!newDir.exists()) { + if (!newDir.exists() && needCreate) { val path = newDir.toPath Files.createDirectory(path) if (permissionChangingRequired) { @@ -115,7 +119,9 @@ private[spark] class DiskBlockManager( Files.setPosixFilePermissions(path, currentPerms) } } - subDirs(dirId)(subDirId) = newDir + if (newDir.exists()) { + subDirs(dirId)(subDirId) = newDir + } newDir } } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index d45947db69343..1c6cecf2afa8a 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -128,7 +128,7 @@ private[spark] class DiskStore( def remove(blockId: BlockId): Boolean = { blockSizes.remove(blockId) - val file = diskManager.getFile(blockId.name) + val file = diskManager.getFile(blockId.name, false) if (file.exists()) { val ret = file.delete() if (!ret) { From fa9c72fd6185d4497ed853872a349468181ccd99 Mon Sep 17 00:00:00 2001 From: sychen Date: Fri, 4 Nov 2022 11:36:04 +0800 Subject: [PATCH 02/12] avoid multiple calls to exists --- .../scala/org/apache/spark/storage/DiskBlockManager.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index d7376fd75046b..57a3568a9cd3f 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -107,7 +107,8 @@ private[spark] class DiskBlockManager( old } else { val newDir = new File(localDirs(dirId), "%02x".format(subDirId)) - if (!newDir.exists() && needCreate) { + val alreadyExists = newDir.exists() + if (!alreadyExists && needCreate) { val path = newDir.toPath Files.createDirectory(path) if (permissionChangingRequired) { @@ -119,7 +120,8 @@ private[spark] class DiskBlockManager( Files.setPosixFilePermissions(path, currentPerms) } } - if (newDir.exists()) { + // If needCreate is set, exception is thrown if directory cannot be created. + if (alreadyExists || needCreate) { subDirs(dirId)(subDirId) = newDir } newDir From 1ada64196c331daa2b40881db3327cb68b495545 Mon Sep 17 00:00:00 2001 From: sychen Date: Mon, 7 Nov 2022 15:02:06 +0800 Subject: [PATCH 03/12] containsBlock/removeDataByMap --- .../shuffle/IndexShuffleBlockResolver.scala | 33 ++++++++++++------- .../apache/spark/storage/BlockManager.scala | 3 +- .../spark/storage/DiskBlockManager.scala | 11 ++++--- .../org/apache/spark/storage/DiskStore.scala | 2 +- 4 files changed, 32 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index ba54555311e75..6e1435da3ca0d 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -62,7 +62,11 @@ private[spark] class IndexShuffleBlockResolver( private val remoteShuffleMaxDisk: Option[Long] = conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE) - def getDataFile(shuffleId: Int, mapId: Long): File = getDataFile(shuffleId, mapId, None) + def getDataFile(shuffleId: Int, mapId: Long): File = + getDataFile(shuffleId, mapId, None, true) + + def getDataFile(shuffleId: Int, mapId: Long, needCreate: Boolean): File = + getDataFile(shuffleId, mapId, None, needCreate) /** * Get the shuffle files that are stored locally. Used for block migrations. @@ -95,12 +99,16 @@ private[spark] class IndexShuffleBlockResolver( * When the dirs parameter is None then use the disk manager's local directories. Otherwise, * read from the specified directories. */ - def getDataFile(shuffleId: Int, mapId: Long, dirs: Option[Array[String]]): File = { + def getDataFile( + shuffleId: Int, + mapId: Long, + dirs: Option[Array[String]], + needCreate: Boolean): File = { val blockId = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID) dirs .map(d => new File(ExecutorDiskUtils.getFilePath(d, blockManager.subDirsPerLocalDir, blockId.name))) - .getOrElse(blockManager.diskBlockManager.getFile(blockId)) + .getOrElse(blockManager.diskBlockManager.getFile(blockId, needCreate)) } /** @@ -112,12 +120,13 @@ private[spark] class IndexShuffleBlockResolver( def getIndexFile( shuffleId: Int, mapId: Long, - dirs: Option[Array[String]] = None): File = { + dirs: Option[Array[String]] = None, + needCreate: Boolean = true): File = { val blockId = ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID) dirs .map(d => new File(ExecutorDiskUtils.getFilePath(d, blockManager.subDirsPerLocalDir, blockId.name))) - .getOrElse(blockManager.diskBlockManager.getFile(blockId)) + .getOrElse(blockManager.diskBlockManager.getFile(blockId, needCreate)) } private def getMergedBlockDataFile( @@ -154,17 +163,18 @@ private[spark] class IndexShuffleBlockResolver( * Remove data file and index file that contain the output data from one map. */ def removeDataByMap(shuffleId: Int, mapId: Long): Unit = { - var file = getDataFile(shuffleId, mapId) + var file = getDataFile(shuffleId, mapId, needCreate = false) if (file.exists() && !file.delete()) { logWarning(s"Error deleting data ${file.getPath()}") } - file = getIndexFile(shuffleId, mapId) + file = getIndexFile(shuffleId, mapId, needCreate = false) if (file.exists() && !file.delete()) { logWarning(s"Error deleting index ${file.getPath()}") } - file = getChecksumFile(shuffleId, mapId, conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM)) + file = getChecksumFile(shuffleId, mapId, conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM), + needCreate = false) if (file.exists() && !file.delete()) { logWarning(s"Error deleting checksum ${file.getPath()}") } @@ -549,13 +559,14 @@ private[spark] class IndexShuffleBlockResolver( shuffleId: Int, mapId: Long, algorithm: String, - dirs: Option[Array[String]] = None): File = { + dirs: Option[Array[String]] = None, + needCreate: Boolean = true): File = { val blockId = ShuffleChecksumBlockId(shuffleId, mapId, NOOP_REDUCE_ID) val fileName = ShuffleChecksumHelper.getChecksumFileName(blockId.name, algorithm) dirs .map(d => new File(ExecutorDiskUtils.getFilePath(d, blockManager.subDirsPerLocalDir, fileName))) - .getOrElse(blockManager.diskBlockManager.getFile(fileName)) + .getOrElse(blockManager.diskBlockManager.getFile(fileName, needCreate)) } override def getBlockData( @@ -594,7 +605,7 @@ private[spark] class IndexShuffleBlockResolver( } new FileSegmentManagedBuffer( transportConf, - getDataFile(shuffleId, mapId, dirs), + getDataFile(shuffleId, mapId, dirs, true), startOffset, endOffset - startOffset) } finally { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 42a6cddc55f21..f7fd17eeaf28a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -302,7 +302,8 @@ private[spark] class BlockManager( val shuffleBlock = blockId.asInstanceOf[ShuffleBlockId] val resolver = shuffleManager.shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver] val checksumFile = - resolver.getChecksumFile(shuffleBlock.shuffleId, shuffleBlock.mapId, algorithm) + resolver.getChecksumFile(shuffleBlock.shuffleId, shuffleBlock.mapId, algorithm, + needCreate = false) val reduceId = shuffleBlock.reduceId ShuffleChecksumHelper.diagnoseCorruption( algorithm, checksumFile, reduceId, resolver.getBlockData(shuffleBlock), checksumByReader) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 57a3568a9cd3f..dcd82d4b540de 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -91,10 +91,10 @@ private[spark] class DiskBlockManager( // This method should be kept in sync with // org.apache.spark.network.shuffle.ExecutorDiskUtils#getFilePath(). def getFile(filename: String): File = { - getFile(filename, true) + getFile(filename, needCreate = true) } - def getFile(filename: String, needCreate: Boolean = true): File = { + def getFile(filename: String, needCreate: Boolean): File = { // Figure out which local directory it hashes to, and which subdirectory in that val hash = Utils.nonNegativeHash(filename) val dirId = hash % localDirs.length @@ -131,7 +131,10 @@ private[spark] class DiskBlockManager( new File(subDir, filename) } - def getFile(blockId: BlockId): File = getFile(blockId.name) + def getFile(blockId: BlockId): File = getFile(blockId.name, needCreate = true) + + def getFile(blockId: BlockId, needCreate: Boolean): File = + getFile(blockId.name, needCreate) /** * This should be in sync with @@ -162,7 +165,7 @@ private[spark] class DiskBlockManager( /** Check if disk block manager has a block. */ def containsBlock(blockId: BlockId): Boolean = { - getFile(blockId.name).exists() + getFile(blockId.name, needCreate = false).exists() } /** List all the files currently stored on disk by the disk manager. */ diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 1c6cecf2afa8a..b0045e9f73579 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -128,7 +128,7 @@ private[spark] class DiskStore( def remove(blockId: BlockId): Boolean = { blockSizes.remove(blockId) - val file = diskManager.getFile(blockId.name, false) + val file = diskManager.getFile(blockId.name, needCreate = false) if (file.exists()) { val ret = file.delete() if (!ret) { From 95ae100b0d147b2bcfecdcb45eaa57e936957fcd Mon Sep 17 00:00:00 2001 From: sychen Date: Mon, 7 Nov 2022 22:43:32 +0800 Subject: [PATCH 04/12] fix mock getFile UT --- .../shuffle/sort/BypassMergeSortShuffleWriterSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala index ce2aefa74229a..8a818351d93f5 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala @@ -263,9 +263,9 @@ class BypassMergeSortShuffleWriterSuite val dataFile = new File(tempDir, dataBlockId.name) val indexFile = new File(tempDir, indexBlockId.name) reset(diskBlockManager) - when(diskBlockManager.getFile(checksumFileName)).thenAnswer(_ => checksumFile) - when(diskBlockManager.getFile(dataBlockId)).thenAnswer(_ => dataFile) - when(diskBlockManager.getFile(indexBlockId)).thenAnswer(_ => indexFile) + when(diskBlockManager.getFile(checksumFileName, true)).thenAnswer(_ => checksumFile) + when(diskBlockManager.getFile(dataBlockId, true)).thenAnswer(_ => dataFile) + when(diskBlockManager.getFile(indexBlockId, true)).thenAnswer(_ => indexFile) when(diskBlockManager.createTempShuffleBlock()) .thenAnswer { _ => val blockId = new TempShuffleBlockId(UUID.randomUUID) From e73fa2889a0532f24c41c1788b8be4e14fa47327 Mon Sep 17 00:00:00 2001 From: sychen Date: Tue, 8 Nov 2022 01:59:36 +0800 Subject: [PATCH 05/12] fix mock getFile UT --- .../spark/shuffle/sort/UnsafeShuffleWriterSuite.java | 12 ++++++------ .../sort/IndexShuffleBlockResolverSuite.scala | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index e038cd87687d5..5f54e2ed860e5 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -312,10 +312,10 @@ public void writeChecksumFileWithoutSpill() throws Exception { File checksumFile = new File(tempDir, checksumFileName); File dataFile = new File(tempDir, "data"); File indexFile = new File(tempDir, "index"); - when(diskBlockManager.getFile(checksumFileName)).thenReturn(checksumFile); - when(diskBlockManager.getFile(new ShuffleDataBlockId(shuffleDep.shuffleId(), 0, 0))) + when(diskBlockManager.getFile(checksumFileName, true)).thenReturn(checksumFile); + when(diskBlockManager.getFile(new ShuffleDataBlockId(shuffleDep.shuffleId(), 0, 0), true)) .thenReturn(dataFile); - when(diskBlockManager.getFile(new ShuffleIndexBlockId(shuffleDep.shuffleId(), 0, 0))) + when(diskBlockManager.getFile(new ShuffleIndexBlockId(shuffleDep.shuffleId(), 0, 0), true)) .thenReturn(indexFile); // In this example, each partition should have exactly one record: @@ -342,10 +342,10 @@ public void writeChecksumFileWithSpill() throws Exception { File checksumFile = new File(tempDir, checksumFileName); File dataFile = new File(tempDir, "data"); File indexFile = new File(tempDir, "index"); - when(diskBlockManager.getFile(checksumFileName)).thenReturn(checksumFile); - when(diskBlockManager.getFile(new ShuffleDataBlockId(shuffleDep.shuffleId(), 0, 0))) + when(diskBlockManager.getFile(checksumFileName, true)).thenReturn(checksumFile); + when(diskBlockManager.getFile(new ShuffleDataBlockId(shuffleDep.shuffleId(), 0, 0), true)) .thenReturn(dataFile); - when(diskBlockManager.getFile(new ShuffleIndexBlockId(shuffleDep.shuffleId(), 0, 0))) + when(diskBlockManager.getFile(new ShuffleIndexBlockId(shuffleDep.shuffleId(), 0, 0), true)) .thenReturn(indexFile); final UnsafeShuffleWriter writer1 = createWriter(true, blockResolver); diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala index 31b255cff7284..a89119ebc228c 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala @@ -47,9 +47,9 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite { MockitoAnnotations.openMocks(this).close() when(blockManager.diskBlockManager).thenReturn(diskBlockManager) - when(diskBlockManager.getFile(any[BlockId])).thenAnswer( + when(diskBlockManager.getFile(any[BlockId], any[Boolean])).thenAnswer( (invocation: InvocationOnMock) => new File(tempDir, invocation.getArguments.head.toString)) - when(diskBlockManager.getFile(any[String])).thenAnswer( + when(diskBlockManager.getFile(any[String], any[Boolean])).thenAnswer( (invocation: InvocationOnMock) => new File(tempDir, invocation.getArguments.head.toString)) when(diskBlockManager.getMergedShuffleFile( any[BlockId], any[Option[Array[String]]])).thenAnswer( From 5545f1593ebd27d5a4e95a42cc75ae71ac50cf88 Mon Sep 17 00:00:00 2001 From: sychen Date: Thu, 24 Nov 2022 17:40:30 +0800 Subject: [PATCH 06/12] Revert "fix mock getFile UT" This reverts commit e73fa2889a0532f24c41c1788b8be4e14fa47327. --- .../spark/shuffle/sort/UnsafeShuffleWriterSuite.java | 12 ++++++------ .../sort/IndexShuffleBlockResolverSuite.scala | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 5f54e2ed860e5..e038cd87687d5 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -312,10 +312,10 @@ public void writeChecksumFileWithoutSpill() throws Exception { File checksumFile = new File(tempDir, checksumFileName); File dataFile = new File(tempDir, "data"); File indexFile = new File(tempDir, "index"); - when(diskBlockManager.getFile(checksumFileName, true)).thenReturn(checksumFile); - when(diskBlockManager.getFile(new ShuffleDataBlockId(shuffleDep.shuffleId(), 0, 0), true)) + when(diskBlockManager.getFile(checksumFileName)).thenReturn(checksumFile); + when(diskBlockManager.getFile(new ShuffleDataBlockId(shuffleDep.shuffleId(), 0, 0))) .thenReturn(dataFile); - when(diskBlockManager.getFile(new ShuffleIndexBlockId(shuffleDep.shuffleId(), 0, 0), true)) + when(diskBlockManager.getFile(new ShuffleIndexBlockId(shuffleDep.shuffleId(), 0, 0))) .thenReturn(indexFile); // In this example, each partition should have exactly one record: @@ -342,10 +342,10 @@ public void writeChecksumFileWithSpill() throws Exception { File checksumFile = new File(tempDir, checksumFileName); File dataFile = new File(tempDir, "data"); File indexFile = new File(tempDir, "index"); - when(diskBlockManager.getFile(checksumFileName, true)).thenReturn(checksumFile); - when(diskBlockManager.getFile(new ShuffleDataBlockId(shuffleDep.shuffleId(), 0, 0), true)) + when(diskBlockManager.getFile(checksumFileName)).thenReturn(checksumFile); + when(diskBlockManager.getFile(new ShuffleDataBlockId(shuffleDep.shuffleId(), 0, 0))) .thenReturn(dataFile); - when(diskBlockManager.getFile(new ShuffleIndexBlockId(shuffleDep.shuffleId(), 0, 0), true)) + when(diskBlockManager.getFile(new ShuffleIndexBlockId(shuffleDep.shuffleId(), 0, 0))) .thenReturn(indexFile); final UnsafeShuffleWriter writer1 = createWriter(true, blockResolver); diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala index a89119ebc228c..31b255cff7284 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala @@ -47,9 +47,9 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite { MockitoAnnotations.openMocks(this).close() when(blockManager.diskBlockManager).thenReturn(diskBlockManager) - when(diskBlockManager.getFile(any[BlockId], any[Boolean])).thenAnswer( + when(diskBlockManager.getFile(any[BlockId])).thenAnswer( (invocation: InvocationOnMock) => new File(tempDir, invocation.getArguments.head.toString)) - when(diskBlockManager.getFile(any[String], any[Boolean])).thenAnswer( + when(diskBlockManager.getFile(any[String])).thenAnswer( (invocation: InvocationOnMock) => new File(tempDir, invocation.getArguments.head.toString)) when(diskBlockManager.getMergedShuffleFile( any[BlockId], any[Option[Array[String]]])).thenAnswer( From 08d2da6dbf338c859972701b3febee3e11c09fc8 Mon Sep 17 00:00:00 2001 From: sychen Date: Thu, 24 Nov 2022 17:40:30 +0800 Subject: [PATCH 07/12] Revert "fix mock getFile UT" This reverts commit 95ae100b0d147b2bcfecdcb45eaa57e936957fcd. --- .../shuffle/sort/BypassMergeSortShuffleWriterSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala index 8a818351d93f5..ce2aefa74229a 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala @@ -263,9 +263,9 @@ class BypassMergeSortShuffleWriterSuite val dataFile = new File(tempDir, dataBlockId.name) val indexFile = new File(tempDir, indexBlockId.name) reset(diskBlockManager) - when(diskBlockManager.getFile(checksumFileName, true)).thenAnswer(_ => checksumFile) - when(diskBlockManager.getFile(dataBlockId, true)).thenAnswer(_ => dataFile) - when(diskBlockManager.getFile(indexBlockId, true)).thenAnswer(_ => indexFile) + when(diskBlockManager.getFile(checksumFileName)).thenAnswer(_ => checksumFile) + when(diskBlockManager.getFile(dataBlockId)).thenAnswer(_ => dataFile) + when(diskBlockManager.getFile(indexBlockId)).thenAnswer(_ => indexFile) when(diskBlockManager.createTempShuffleBlock()) .thenAnswer { _ => val blockId = new TempShuffleBlockId(UUID.randomUUID) From 6addcef51c70f5719fcda43774d852aa5ca22852 Mon Sep 17 00:00:00 2001 From: sychen Date: Thu, 24 Nov 2022 17:40:31 +0800 Subject: [PATCH 08/12] Revert "containsBlock/removeDataByMap" This reverts commit 1ada64196c331daa2b40881db3327cb68b495545. --- .../shuffle/IndexShuffleBlockResolver.scala | 33 +++++++------------ .../apache/spark/storage/BlockManager.scala | 3 +- .../spark/storage/DiskBlockManager.scala | 11 +++---- .../org/apache/spark/storage/DiskStore.scala | 2 +- 4 files changed, 17 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 6e1435da3ca0d..ba54555311e75 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -62,11 +62,7 @@ private[spark] class IndexShuffleBlockResolver( private val remoteShuffleMaxDisk: Option[Long] = conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE) - def getDataFile(shuffleId: Int, mapId: Long): File = - getDataFile(shuffleId, mapId, None, true) - - def getDataFile(shuffleId: Int, mapId: Long, needCreate: Boolean): File = - getDataFile(shuffleId, mapId, None, needCreate) + def getDataFile(shuffleId: Int, mapId: Long): File = getDataFile(shuffleId, mapId, None) /** * Get the shuffle files that are stored locally. Used for block migrations. @@ -99,16 +95,12 @@ private[spark] class IndexShuffleBlockResolver( * When the dirs parameter is None then use the disk manager's local directories. Otherwise, * read from the specified directories. */ - def getDataFile( - shuffleId: Int, - mapId: Long, - dirs: Option[Array[String]], - needCreate: Boolean): File = { + def getDataFile(shuffleId: Int, mapId: Long, dirs: Option[Array[String]]): File = { val blockId = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID) dirs .map(d => new File(ExecutorDiskUtils.getFilePath(d, blockManager.subDirsPerLocalDir, blockId.name))) - .getOrElse(blockManager.diskBlockManager.getFile(blockId, needCreate)) + .getOrElse(blockManager.diskBlockManager.getFile(blockId)) } /** @@ -120,13 +112,12 @@ private[spark] class IndexShuffleBlockResolver( def getIndexFile( shuffleId: Int, mapId: Long, - dirs: Option[Array[String]] = None, - needCreate: Boolean = true): File = { + dirs: Option[Array[String]] = None): File = { val blockId = ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID) dirs .map(d => new File(ExecutorDiskUtils.getFilePath(d, blockManager.subDirsPerLocalDir, blockId.name))) - .getOrElse(blockManager.diskBlockManager.getFile(blockId, needCreate)) + .getOrElse(blockManager.diskBlockManager.getFile(blockId)) } private def getMergedBlockDataFile( @@ -163,18 +154,17 @@ private[spark] class IndexShuffleBlockResolver( * Remove data file and index file that contain the output data from one map. */ def removeDataByMap(shuffleId: Int, mapId: Long): Unit = { - var file = getDataFile(shuffleId, mapId, needCreate = false) + var file = getDataFile(shuffleId, mapId) if (file.exists() && !file.delete()) { logWarning(s"Error deleting data ${file.getPath()}") } - file = getIndexFile(shuffleId, mapId, needCreate = false) + file = getIndexFile(shuffleId, mapId) if (file.exists() && !file.delete()) { logWarning(s"Error deleting index ${file.getPath()}") } - file = getChecksumFile(shuffleId, mapId, conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM), - needCreate = false) + file = getChecksumFile(shuffleId, mapId, conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM)) if (file.exists() && !file.delete()) { logWarning(s"Error deleting checksum ${file.getPath()}") } @@ -559,14 +549,13 @@ private[spark] class IndexShuffleBlockResolver( shuffleId: Int, mapId: Long, algorithm: String, - dirs: Option[Array[String]] = None, - needCreate: Boolean = true): File = { + dirs: Option[Array[String]] = None): File = { val blockId = ShuffleChecksumBlockId(shuffleId, mapId, NOOP_REDUCE_ID) val fileName = ShuffleChecksumHelper.getChecksumFileName(blockId.name, algorithm) dirs .map(d => new File(ExecutorDiskUtils.getFilePath(d, blockManager.subDirsPerLocalDir, fileName))) - .getOrElse(blockManager.diskBlockManager.getFile(fileName, needCreate)) + .getOrElse(blockManager.diskBlockManager.getFile(fileName)) } override def getBlockData( @@ -605,7 +594,7 @@ private[spark] class IndexShuffleBlockResolver( } new FileSegmentManagedBuffer( transportConf, - getDataFile(shuffleId, mapId, dirs, true), + getDataFile(shuffleId, mapId, dirs), startOffset, endOffset - startOffset) } finally { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index f7fd17eeaf28a..42a6cddc55f21 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -302,8 +302,7 @@ private[spark] class BlockManager( val shuffleBlock = blockId.asInstanceOf[ShuffleBlockId] val resolver = shuffleManager.shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver] val checksumFile = - resolver.getChecksumFile(shuffleBlock.shuffleId, shuffleBlock.mapId, algorithm, - needCreate = false) + resolver.getChecksumFile(shuffleBlock.shuffleId, shuffleBlock.mapId, algorithm) val reduceId = shuffleBlock.reduceId ShuffleChecksumHelper.diagnoseCorruption( algorithm, checksumFile, reduceId, resolver.getBlockData(shuffleBlock), checksumByReader) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index dcd82d4b540de..57a3568a9cd3f 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -91,10 +91,10 @@ private[spark] class DiskBlockManager( // This method should be kept in sync with // org.apache.spark.network.shuffle.ExecutorDiskUtils#getFilePath(). def getFile(filename: String): File = { - getFile(filename, needCreate = true) + getFile(filename, true) } - def getFile(filename: String, needCreate: Boolean): File = { + def getFile(filename: String, needCreate: Boolean = true): File = { // Figure out which local directory it hashes to, and which subdirectory in that val hash = Utils.nonNegativeHash(filename) val dirId = hash % localDirs.length @@ -131,10 +131,7 @@ private[spark] class DiskBlockManager( new File(subDir, filename) } - def getFile(blockId: BlockId): File = getFile(blockId.name, needCreate = true) - - def getFile(blockId: BlockId, needCreate: Boolean): File = - getFile(blockId.name, needCreate) + def getFile(blockId: BlockId): File = getFile(blockId.name) /** * This should be in sync with @@ -165,7 +162,7 @@ private[spark] class DiskBlockManager( /** Check if disk block manager has a block. */ def containsBlock(blockId: BlockId): Boolean = { - getFile(blockId.name, needCreate = false).exists() + getFile(blockId.name).exists() } /** List all the files currently stored on disk by the disk manager. */ diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index b0045e9f73579..1c6cecf2afa8a 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -128,7 +128,7 @@ private[spark] class DiskStore( def remove(blockId: BlockId): Boolean = { blockSizes.remove(blockId) - val file = diskManager.getFile(blockId.name, needCreate = false) + val file = diskManager.getFile(blockId.name, false) if (file.exists()) { val ret = file.delete() if (!ret) { From c87c44e28593b598c7f0ee5dea5f175e86dce483 Mon Sep 17 00:00:00 2001 From: sychen Date: Thu, 24 Nov 2022 17:40:31 +0800 Subject: [PATCH 09/12] Revert "avoid multiple calls to exists" This reverts commit fa9c72fd6185d4497ed853872a349468181ccd99. --- .../scala/org/apache/spark/storage/DiskBlockManager.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 57a3568a9cd3f..d7376fd75046b 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -107,8 +107,7 @@ private[spark] class DiskBlockManager( old } else { val newDir = new File(localDirs(dirId), "%02x".format(subDirId)) - val alreadyExists = newDir.exists() - if (!alreadyExists && needCreate) { + if (!newDir.exists() && needCreate) { val path = newDir.toPath Files.createDirectory(path) if (permissionChangingRequired) { @@ -120,8 +119,7 @@ private[spark] class DiskBlockManager( Files.setPosixFilePermissions(path, currentPerms) } } - // If needCreate is set, exception is thrown if directory cannot be created. - if (alreadyExists || needCreate) { + if (newDir.exists()) { subDirs(dirId)(subDirId) = newDir } newDir From 3f86a24ac0cd00302a643ec1f2a046d7000b4d92 Mon Sep 17 00:00:00 2001 From: sychen Date: Thu, 24 Nov 2022 17:40:32 +0800 Subject: [PATCH 10/12] Revert "avoid mkdir when remove file" This reverts commit 906f716c5bed962ca0eecd2e2114cefb1444d8b7. --- .../org/apache/spark/storage/DiskBlockManager.scala | 10 ++-------- .../scala/org/apache/spark/storage/DiskStore.scala | 2 +- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index d7376fd75046b..e29f3fc1b8050 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -91,10 +91,6 @@ private[spark] class DiskBlockManager( // This method should be kept in sync with // org.apache.spark.network.shuffle.ExecutorDiskUtils#getFilePath(). def getFile(filename: String): File = { - getFile(filename, true) - } - - def getFile(filename: String, needCreate: Boolean = true): File = { // Figure out which local directory it hashes to, and which subdirectory in that val hash = Utils.nonNegativeHash(filename) val dirId = hash % localDirs.length @@ -107,7 +103,7 @@ private[spark] class DiskBlockManager( old } else { val newDir = new File(localDirs(dirId), "%02x".format(subDirId)) - if (!newDir.exists() && needCreate) { + if (!newDir.exists()) { val path = newDir.toPath Files.createDirectory(path) if (permissionChangingRequired) { @@ -119,9 +115,7 @@ private[spark] class DiskBlockManager( Files.setPosixFilePermissions(path, currentPerms) } } - if (newDir.exists()) { - subDirs(dirId)(subDirId) = newDir - } + subDirs(dirId)(subDirId) = newDir newDir } } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 1c6cecf2afa8a..d45947db69343 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -128,7 +128,7 @@ private[spark] class DiskStore( def remove(blockId: BlockId): Boolean = { blockSizes.remove(blockId) - val file = diskManager.getFile(blockId.name, false) + val file = diskManager.getFile(blockId.name) if (file.exists()) { val ret = file.delete() if (!ret) { From 08015ffbb78d00b76e8a1937e1e857f50c3b552e Mon Sep 17 00:00:00 2001 From: sychen Date: Thu, 24 Nov 2022 19:51:42 +0800 Subject: [PATCH 11/12] finally remove block --- .../apache/spark/storage/BlockManager.scala | 43 +++++++++++-------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 42a6cddc55f21..0de37b1373824 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1991,23 +1991,32 @@ private[spark] class BlockManager( * lock on the block. */ private def removeBlockInternal(blockId: BlockId, tellMaster: Boolean): Unit = { - val blockStatus = if (tellMaster) { - val blockInfo = blockInfoManager.assertBlockIsLockedForWriting(blockId) - Some(getCurrentBlockStatus(blockId, blockInfo)) - } else None - - // Removals are idempotent in disk store and memory store. At worst, we get a warning. - val removedFromMemory = memoryStore.remove(blockId) - val removedFromDisk = diskStore.remove(blockId) - if (!removedFromMemory && !removedFromDisk) { - logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory") - } - - blockInfoManager.removeBlock(blockId) - if (tellMaster) { - // Only update storage level from the captured block status before deleting, so that - // memory size and disk size are being kept for calculating delta. - reportBlockStatus(blockId, blockStatus.get.copy(storageLevel = StorageLevel.NONE)) + var hasRemoveBlock = false + try { + val blockStatus = if (tellMaster) { + val blockInfo = blockInfoManager.assertBlockIsLockedForWriting(blockId) + Some(getCurrentBlockStatus(blockId, blockInfo)) + } else None + + // Removals are idempotent in disk store and memory store. At worst, we get a warning. + val removedFromMemory = memoryStore.remove(blockId) + val removedFromDisk = diskStore.remove(blockId) + if (!removedFromMemory && !removedFromDisk) { + logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory") + } + + blockInfoManager.removeBlock(blockId) + hasRemoveBlock = true + if (tellMaster) { + // Only update storage level from the captured block status before deleting, so that + // memory size and disk size are being kept for calculating delta. + reportBlockStatus(blockId, blockStatus.get.copy(storageLevel = StorageLevel.NONE)) + } + } finally { + if (!hasRemoveBlock) { + logWarning(s"Block $blockId could not be removed normally.") + blockInfoManager.removeBlock(blockId) + } } } From 61456a85c3d974113161872ff4a7251238efc232 Mon Sep 17 00:00:00 2001 From: sychen Date: Fri, 25 Nov 2022 12:14:21 +0800 Subject: [PATCH 12/12] log --- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 0de37b1373824..d5fde96b146b4 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -2014,7 +2014,7 @@ private[spark] class BlockManager( } } finally { if (!hasRemoveBlock) { - logWarning(s"Block $blockId could not be removed normally.") + logWarning(s"Block $blockId was not removed normally.") blockInfoManager.removeBlock(blockId) } }