From 312b90b4ef9eb52f0dbd28c16054fa33b9488b6f Mon Sep 17 00:00:00 2001 From: eatoncys Date: Wed, 19 Apr 2017 18:13:43 +0800 Subject: [PATCH 01/10] PARK-20386 --- .../storage/BlockManagerMasterEndpoint.scala | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 467c3e0e6b51..91541797c023 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -497,11 +497,12 @@ private[spark] class BlockManagerInfo( updateLastSeenMs() + var originalMemSize: Long = 0 if (_blocks.containsKey(blockId)) { // The block exists on the slave already. val blockStatus: BlockStatus = _blocks.get(blockId) val originalLevel: StorageLevel = blockStatus.storageLevel - val originalMemSize: Long = blockStatus.memSize + originalMemSize = blockStatus.memSize if (originalLevel.useMemory) { _remainingMem += originalMemSize @@ -520,9 +521,16 @@ private[spark] class BlockManagerInfo( blockStatus = BlockStatus(storageLevel, memSize = memSize, diskSize = 0) _blocks.put(blockId, blockStatus) _remainingMem -= memSize - logInfo("Added %s in memory on %s (size: %s, free: %s)".format( - blockId, blockManagerId.hostPort, Utils.bytesToString(memSize), - Utils.bytesToString(_remainingMem))) + if((memSize-originalMemSize) >= 0){ + logInfo("Added %s in memory on %s (size: %s, free: %s)".format( + blockId, blockManagerId.hostPort, Utils.bytesToString(memSize-originalMemSize), + Utils.bytesToString(_remainingMem))) + } + else{ + logInfo("Removed %s in memory on %s (size: %s, free: %s)".format( + blockId, blockManagerId.hostPort, Utils.bytesToString(originalMemSize-memSize), + Utils.bytesToString(_remainingMem))) + } } if (storageLevel.useDisk) { blockStatus = BlockStatus(storageLevel, memSize = 0, diskSize = diskSize) From 2d9565212f753cb514950b2b0378e10ccf6d2568 Mon Sep 17 00:00:00 2001 From: eatoncys Date: Wed, 19 Apr 2017 19:04:52 +0800 Subject: [PATCH 02/10] modify the style problems and var problem --- .../spark/storage/BlockManagerMasterEndpoint.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 91541797c023..ca534f0ce23a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -497,12 +497,11 @@ private[spark] class BlockManagerInfo( updateLastSeenMs() - var originalMemSize: Long = 0 if (_blocks.containsKey(blockId)) { // The block exists on the slave already. val blockStatus: BlockStatus = _blocks.get(blockId) val originalLevel: StorageLevel = blockStatus.storageLevel - originalMemSize = blockStatus.memSize + val originalMemSize: Long = blockStatus.memSize if (originalLevel.useMemory) { _remainingMem += originalMemSize @@ -521,14 +520,15 @@ private[spark] class BlockManagerInfo( blockStatus = BlockStatus(storageLevel, memSize = memSize, diskSize = 0) _blocks.put(blockId, blockStatus) _remainingMem -= memSize - if((memSize-originalMemSize) >= 0){ + val originalMemSize = if (_blocks.containsKey(blockId)) _blocks.get(blockId).memSize else 0 + if(memSize >= originalMemSize){ logInfo("Added %s in memory on %s (size: %s, free: %s)".format( - blockId, blockManagerId.hostPort, Utils.bytesToString(memSize-originalMemSize), + blockId, blockManagerId.hostPort, Utils.bytesToString(memSize - originalMemSize), Utils.bytesToString(_remainingMem))) } else{ logInfo("Removed %s in memory on %s (size: %s, free: %s)".format( - blockId, blockManagerId.hostPort, Utils.bytesToString(originalMemSize-memSize), + blockId, blockManagerId.hostPort, Utils.bytesToString(originalMemSize - memSize), Utils.bytesToString(_remainingMem))) } } From 22ef469b6383f411c9f0e5884c2ef3bc5429a37f Mon Sep 17 00:00:00 2001 From: eatoncys Date: Wed, 19 Apr 2017 19:56:38 +0800 Subject: [PATCH 03/10] modify the style problems and var problem --- .../storage/BlockManagerMasterEndpoint.scala | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index ca534f0ce23a..a8a27c1d476b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -520,17 +520,19 @@ private[spark] class BlockManagerInfo( blockStatus = BlockStatus(storageLevel, memSize = memSize, diskSize = 0) _blocks.put(blockId, blockStatus) _remainingMem -= memSize - val originalMemSize = if (_blocks.containsKey(blockId)) _blocks.get(blockId).memSize else 0 - if(memSize >= originalMemSize){ - logInfo("Added %s in memory on %s (size: %s, free: %s)".format( - blockId, blockManagerId.hostPort, Utils.bytesToString(memSize - originalMemSize), - Utils.bytesToString(_remainingMem))) + val originalMemSize = if (_blocks.containsKey(blockId)) { + _blocks.get(blockId).memSize + } else { + 0 } - else{ - logInfo("Removed %s in memory on %s (size: %s, free: %s)".format( - blockId, blockManagerId.hostPort, Utils.bytesToString(originalMemSize - memSize), - Utils.bytesToString(_remainingMem))) + val (addedOrRemoved, size) = if (memSize >= originalMemSize) { + ("Added", memSize - originalMemSize) + } else { + ("Removed", originalMemSize - memSize) } + logInfo("%s %s in memory on %s (size: %s, free: %s)".format( + addedOrRemoved, blockId, blockManagerId.hostPort, Utils.bytesToString(size), + Utils.bytesToString(_remainingMem))) } if (storageLevel.useDisk) { blockStatus = BlockStatus(storageLevel, memSize = 0, diskSize = diskSize) From c14b1ad3a407af3bfa6635a0274b4ae08f25ed30 Mon Sep 17 00:00:00 2001 From: eatoncys Date: Wed, 19 Apr 2017 20:39:05 +0800 Subject: [PATCH 04/10] Change removed to updated --- .../storage/BlockManagerMasterEndpoint.scala | 21 +++++++------------ 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index a8a27c1d476b..14ae3cced67b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -516,29 +516,24 @@ private[spark] class BlockManagerInfo( * They can be both larger than 0, when a block is dropped from memory to disk. * Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */ var blockStatus: BlockStatus = null + val addOrUpdate = if (_blocks.containsKey(blockId)) { + "Added" + } else { + "Updated" + } if (storageLevel.useMemory) { blockStatus = BlockStatus(storageLevel, memSize = memSize, diskSize = 0) _blocks.put(blockId, blockStatus) _remainingMem -= memSize - val originalMemSize = if (_blocks.containsKey(blockId)) { - _blocks.get(blockId).memSize - } else { - 0 - } - val (addedOrRemoved, size) = if (memSize >= originalMemSize) { - ("Added", memSize - originalMemSize) - } else { - ("Removed", originalMemSize - memSize) - } logInfo("%s %s in memory on %s (size: %s, free: %s)".format( - addedOrRemoved, blockId, blockManagerId.hostPort, Utils.bytesToString(size), + addOrUpdate, blockId, blockManagerId.hostPort, Utils.bytesToString(memSize), Utils.bytesToString(_remainingMem))) } if (storageLevel.useDisk) { blockStatus = BlockStatus(storageLevel, memSize = 0, diskSize = diskSize) _blocks.put(blockId, blockStatus) - logInfo("Added %s on disk on %s (size: %s)".format( - blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize))) + logInfo("%s %s on disk on %s (size: %s)".format( + addOrUpdate, blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize))) } if (!blockId.isBroadcast && blockStatus.isCached) { _cachedBlocks += blockId From b769754bb480bb88ee6c85f88172f483f160f84a Mon Sep 17 00:00:00 2001 From: eatoncys Date: Wed, 19 Apr 2017 21:39:10 +0800 Subject: [PATCH 05/10] add 'updated' style log info --- .../storage/BlockManagerMasterEndpoint.scala | 37 +++++++++++++------ 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 14ae3cced67b..6199225f4de0 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -497,11 +497,17 @@ private[spark] class BlockManagerInfo( updateLastSeenMs() + var blockExists = false + var originalMemSize: Long = 0 + var originalDiskSize: Long = 0 + if (_blocks.containsKey(blockId)) { // The block exists on the slave already. val blockStatus: BlockStatus = _blocks.get(blockId) val originalLevel: StorageLevel = blockStatus.storageLevel - val originalMemSize: Long = blockStatus.memSize + originalMemSize = blockStatus.memSize + originalDiskSize = blockStatus.diskSize + blockExists = true if (originalLevel.useMemory) { _remainingMem += originalMemSize @@ -516,24 +522,33 @@ private[spark] class BlockManagerInfo( * They can be both larger than 0, when a block is dropped from memory to disk. * Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */ var blockStatus: BlockStatus = null - val addOrUpdate = if (_blocks.containsKey(blockId)) { - "Added" - } else { - "Updated" - } + if (storageLevel.useMemory) { blockStatus = BlockStatus(storageLevel, memSize = memSize, diskSize = 0) _blocks.put(blockId, blockStatus) _remainingMem -= memSize - logInfo("%s %s in memory on %s (size: %s, free: %s)".format( - addOrUpdate, blockId, blockManagerId.hostPort, Utils.bytesToString(memSize), - Utils.bytesToString(_remainingMem))) + if (blockExists) { + logInfo("Updated %s in memory on %s (current size: %s, original size %s, free: %s)".format( + blockId, blockManagerId.hostPort, Utils.bytesToString(memSize), + Utils.bytesToString(originalMemSize),Utils.bytesToString(_remainingMem))) + } else { + logInfo("Added %s in memory on %s (size: %s, free: %s)".format( + blockId, blockManagerId.hostPort, Utils.bytesToString(memSize), + Utils.bytesToString(_remainingMem))) + } + } if (storageLevel.useDisk) { blockStatus = BlockStatus(storageLevel, memSize = 0, diskSize = diskSize) _blocks.put(blockId, blockStatus) - logInfo("%s %s on disk on %s (size: %s)".format( - addOrUpdate, blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize))) + if (blockExists) { + logInfo("Updated %s on disk on %s (current size: %s, original size %s)".format( + blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize), + Utils.bytesToString(originalDiskSize))) + } else { + logInfo("Added %s on disk on %s (size: %s)".format( + blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize))) + } } if (!blockId.isBroadcast && blockStatus.isCached) { _cachedBlocks += blockId From 81e8e847e0bd55d570c7d01f7741c11c2f2236a4 Mon Sep 17 00:00:00 2001 From: eatoncys Date: Thu, 20 Apr 2017 07:22:15 +0800 Subject: [PATCH 06/10] Delete blank line --- .../org/apache/spark/storage/BlockManagerMasterEndpoint.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 6199225f4de0..972ff4f10713 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -536,7 +536,6 @@ private[spark] class BlockManagerInfo( blockId, blockManagerId.hostPort, Utils.bytesToString(memSize), Utils.bytesToString(_remainingMem))) } - } if (storageLevel.useDisk) { blockStatus = BlockStatus(storageLevel, memSize = 0, diskSize = diskSize) From 567a9ae8d119d46355dd7401f5bb08bd2b35f038 Mon Sep 17 00:00:00 2001 From: eatoncys Date: Thu, 20 Apr 2017 16:23:23 +0800 Subject: [PATCH 07/10] Reuse the new values --- .../spark/storage/BlockManagerMasterEndpoint.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 972ff4f10713..2f6ce66a1f59 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -500,11 +500,12 @@ private[spark] class BlockManagerInfo( var blockExists = false var originalMemSize: Long = 0 var originalDiskSize: Long = 0 + var originalLevel: StorageLevel = StorageLevel.NONE if (_blocks.containsKey(blockId)) { // The block exists on the slave already. val blockStatus: BlockStatus = _blocks.get(blockId) - val originalLevel: StorageLevel = blockStatus.storageLevel + originalLevel = blockStatus.storageLevel originalMemSize = blockStatus.memSize originalDiskSize = blockStatus.diskSize blockExists = true @@ -552,19 +553,18 @@ private[spark] class BlockManagerInfo( if (!blockId.isBroadcast && blockStatus.isCached) { _cachedBlocks += blockId } - } else if (_blocks.containsKey(blockId)) { + } else if (blockExists) { // If isValid is not true, drop the block. - val blockStatus: BlockStatus = _blocks.get(blockId) _blocks.remove(blockId) _cachedBlocks -= blockId - if (blockStatus.storageLevel.useMemory) { + if (originalLevel.useMemory) { logInfo("Removed %s on %s in memory (size: %s, free: %s)".format( - blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.memSize), + blockId, blockManagerId.hostPort, Utils.bytesToString(originalMemSize), Utils.bytesToString(_remainingMem))) } - if (blockStatus.storageLevel.useDisk) { + if (originalLevel.useDisk) { logInfo("Removed %s on %s on disk (size: %s)".format( - blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize))) + blockId, blockManagerId.hostPort, Utils.bytesToString(originalDiskSize))) } } } From f7e40b80011a7f8d9e64a5a1e6304f1aa403613e Mon Sep 17 00:00:00 2001 From: eatoncys Date: Thu, 20 Apr 2017 20:43:45 +0800 Subject: [PATCH 08/10] Using string interpolation for log info --- .../storage/BlockManagerMasterEndpoint.scala | 31 +++++++++---------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 2f6ce66a1f59..9a382d02b2f1 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -529,25 +529,25 @@ private[spark] class BlockManagerInfo( _blocks.put(blockId, blockStatus) _remainingMem -= memSize if (blockExists) { - logInfo("Updated %s in memory on %s (current size: %s, original size %s, free: %s)".format( - blockId, blockManagerId.hostPort, Utils.bytesToString(memSize), - Utils.bytesToString(originalMemSize),Utils.bytesToString(_remainingMem))) + logInfo(s"Updated $blockId in memory on ${blockManagerId.hostPort}" + + s" (current size: ${Utils.bytesToString(memSize)}," + + s" original size: ${Utils.bytesToString(originalMemSize)}," + + s" free: ${Utils.bytesToString(_remainingMem)})") } else { - logInfo("Added %s in memory on %s (size: %s, free: %s)".format( - blockId, blockManagerId.hostPort, Utils.bytesToString(memSize), - Utils.bytesToString(_remainingMem))) + logInfo(s"Added $blockId in memory on ${blockManagerId.hostPort}" + + s" (size: ${Utils.bytesToString(memSize)}, free: ${Utils.bytesToString(_remainingMem)})") } } if (storageLevel.useDisk) { blockStatus = BlockStatus(storageLevel, memSize = 0, diskSize = diskSize) _blocks.put(blockId, blockStatus) if (blockExists) { - logInfo("Updated %s on disk on %s (current size: %s, original size %s)".format( - blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize), - Utils.bytesToString(originalDiskSize))) + logInfo(s"Updated $blockId on disk on ${blockManagerId.hostPort}" + + s" (current size: ${Utils.bytesToString(diskSize)}," + + s" original size: ${Utils.bytesToString(originalDiskSize)})") } else { - logInfo("Added %s on disk on %s (size: %s)".format( - blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize))) + logInfo(s"Added $blockId on disk on ${blockManagerId.hostPort}" + + s" (size: ${Utils.bytesToString(diskSize)})") } } if (!blockId.isBroadcast && blockStatus.isCached) { @@ -558,13 +558,12 @@ private[spark] class BlockManagerInfo( _blocks.remove(blockId) _cachedBlocks -= blockId if (originalLevel.useMemory) { - logInfo("Removed %s on %s in memory (size: %s, free: %s)".format( - blockId, blockManagerId.hostPort, Utils.bytesToString(originalMemSize), - Utils.bytesToString(_remainingMem))) + logInfo(s"Removed $blockId on ${blockManagerId.hostPort} in memory" + + s" (size: ${Utils.bytesToString(originalMemSize)}, free: ${Utils.bytesToString(_remainingMem)})") } if (originalLevel.useDisk) { - logInfo("Removed %s on %s on disk (size: %s)".format( - blockId, blockManagerId.hostPort, Utils.bytesToString(originalDiskSize))) + logInfo(s"Removed $blockId on ${blockManagerId.hostPort} on disk" + + s" (size: ${Utils.bytesToString(originalDiskSize)})") } } } From 60ca9e0818b526ed47e2b5c3b1f0e1103fe4545a Mon Sep 17 00:00:00 2001 From: eatoncys Date: Thu, 20 Apr 2017 21:07:43 +0800 Subject: [PATCH 09/10] Remove the blank line --- .../org/apache/spark/storage/BlockManagerMasterEndpoint.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 9a382d02b2f1..3404005d39d5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -523,7 +523,6 @@ private[spark] class BlockManagerInfo( * They can be both larger than 0, when a block is dropped from memory to disk. * Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */ var blockStatus: BlockStatus = null - if (storageLevel.useMemory) { blockStatus = BlockStatus(storageLevel, memSize = memSize, diskSize = 0) _blocks.put(blockId, blockStatus) From 664dfb8848c38826886430700bfa926116ad28bf Mon Sep 17 00:00:00 2001 From: eatoncys Date: Fri, 21 Apr 2017 17:42:30 +0800 Subject: [PATCH 10/10] Modify the blockExists to val and resuse it below --- .../spark/storage/BlockManagerMasterEndpoint.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 3404005d39d5..6f85b9e4d6c7 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -497,18 +497,17 @@ private[spark] class BlockManagerInfo( updateLastSeenMs() - var blockExists = false + val blockExists = _blocks.containsKey(blockId) var originalMemSize: Long = 0 var originalDiskSize: Long = 0 var originalLevel: StorageLevel = StorageLevel.NONE - if (_blocks.containsKey(blockId)) { + if (blockExists) { // The block exists on the slave already. val blockStatus: BlockStatus = _blocks.get(blockId) originalLevel = blockStatus.storageLevel originalMemSize = blockStatus.memSize originalDiskSize = blockStatus.diskSize - blockExists = true if (originalLevel.useMemory) { _remainingMem += originalMemSize @@ -534,7 +533,8 @@ private[spark] class BlockManagerInfo( s" free: ${Utils.bytesToString(_remainingMem)})") } else { logInfo(s"Added $blockId in memory on ${blockManagerId.hostPort}" + - s" (size: ${Utils.bytesToString(memSize)}, free: ${Utils.bytesToString(_remainingMem)})") + s" (size: ${Utils.bytesToString(memSize)}," + + s" free: ${Utils.bytesToString(_remainingMem)})") } } if (storageLevel.useDisk) { @@ -558,7 +558,8 @@ private[spark] class BlockManagerInfo( _cachedBlocks -= blockId if (originalLevel.useMemory) { logInfo(s"Removed $blockId on ${blockManagerId.hostPort} in memory" + - s" (size: ${Utils.bytesToString(originalMemSize)}, free: ${Utils.bytesToString(_remainingMem)})") + s" (size: ${Utils.bytesToString(originalMemSize)}," + + s" free: ${Utils.bytesToString(_remainingMem)})") } if (originalLevel.useDisk) { logInfo(s"Removed $blockId on ${blockManagerId.hostPort} on disk" +